This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e4c00346793 KAFKA-18019: Make INVALID_PRODUCER_ID_MAPPING a fatal
error (#17822)
e4c00346793 is described below
commit e4c00346793f93bf358acbf85da4bc18c916eb2a
Author: Ritika Reddy <[email protected]>
AuthorDate: Sun Nov 17 18:43:04 2024 -0800
KAFKA-18019: Make INVALID_PRODUCER_ID_MAPPING a fatal error (#17822)
This patch contains changes to the handling of the
INVALID_PRODUCER_ID_MAPPING error.
Quoted from KIP-890
Since we bump epoch on abort, we no longer need to call InitProducerId to
fence requests. InitProducerId will only be called when the producer starts up
to fence a previous instance.
With this change, some other calls to InitProducerId were inspected
including the call after receiving an InvalidPidMappingException. This
exception was changed to abortable as part of KIP-360: Improve reliability of
idempotent/transactional producer. However, this change means that we can
violate EOS guarantees. As an example:
Consider an application that is copying data from one partition to another
Application instance A processes to offset 4
Application instance B comes up and fences application instance A
Application instance B processes to offset 5
Application instances A and B are idle for transaction.id.expiration.ms,
transaction id expires on server
Application instance A attempts to process offset 5 (since in its view,
that is next) -- if we recover from invalid pid mapping, we can duplicate this
processing
Thus, INVALID_PID_MAPPING should be fatal to the producer.
This is consistent with KIP-1050: Consistent error handling for
Transactions where errors that are fatal to the producer are in the
"application recoverable" category. This is a grouping that indicates to the
client that the producer needs to restart and recovery on the application side
is necessary. KIP-1050 is approved so we are consistent with that decision.
This PR also fixes the flakiness of TransactionsExpirationTest.
Reviewers: Artem Livshits <[email protected]>, Justine Olshan
<[email protected]>, Calvin Liu <[email protected]>
---
.../producer/internals/TransactionManager.java | 46 ++++++++++------------
.../producer/internals/TransactionManagerTest.java | 4 +-
.../kafka/api/ProducerIdExpirationTest.scala | 16 +++++---
.../kafka/api/TransactionsExpirationTest.scala | 37 +++++++++++------
.../integration/kafka/api/TransactionsTest.scala | 9 +++--
5 files changed, 66 insertions(+), 46 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 8057a82c568..3c916de9c0f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.InvalidPidMappingException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;
@@ -339,27 +338,24 @@ public class TransactionManager {
if (!newPartitionsInTransaction.isEmpty())
enqueueRequest(addPartitionsToTransactionHandler());
- // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server
will not accept an EndTxnRequest, so skip
- // directly to InitProducerId. Otherwise, we must first abort the
transaction, because the producer will be
- // fenced if we directly call InitProducerId.
- if (!(lastError instanceof InvalidPidMappingException)) {
- EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
- new EndTxnRequestData()
- .setTransactionalId(transactionalId)
- .setProducerId(producerIdAndEpoch.producerId)
- .setProducerEpoch(producerIdAndEpoch.epoch)
- .setCommitted(transactionResult.id),
- isTransactionV2Enabled
- );
+ EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
+ new EndTxnRequestData()
+ .setTransactionalId(transactionalId)
+ .setProducerId(producerIdAndEpoch.producerId)
+ .setProducerEpoch(producerIdAndEpoch.epoch)
+ .setCommitted(transactionResult.id),
+ isTransactionV2Enabled
+ );
- EndTxnHandler handler = new EndTxnHandler(builder);
- enqueueRequest(handler);
- if (!epochBumpRequired) {
- return handler.result;
- }
+ EndTxnHandler handler = new EndTxnHandler(builder);
+ enqueueRequest(handler);
+
+ // If an epoch bump is required for recovery, initialize the
transaction after completing the EndTxn request.
+ if (epochBumpRequired) {
+ return initializeTransactions(this.producerIdAndEpoch);
}
- return initializeTransactions(this.producerIdAndEpoch);
+ return handler.result;
}
public synchronized TransactionalRequestResult
sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
@@ -1410,7 +1406,7 @@ public class TransactionManager {
fatalError(Errors.PRODUCER_FENCED.exception());
return;
} else if (error ==
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
- error == Errors.INVALID_TXN_STATE) {
+ error == Errors.INVALID_TXN_STATE || error ==
Errors.INVALID_PRODUCER_ID_MAPPING) {
fatalError(error.exception());
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
@@ -1419,7 +1415,7 @@ public class TransactionManager {
log.debug("Did not attempt to add partition {} to
transaction because other partitions in the " +
"batch had errors.", topicPartition);
hasPartitionErrors = true;
- } else if (error == Errors.UNKNOWN_PRODUCER_ID || error ==
Errors.INVALID_PRODUCER_ID_MAPPING) {
+ } else if (error == Errors.UNKNOWN_PRODUCER_ID) {
abortableErrorIfPossible(error.exception());
return;
} else if (error == Errors.TRANSACTION_ABORTABLE) {
@@ -1595,9 +1591,9 @@ public class TransactionManager {
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
- error == Errors.INVALID_TXN_STATE) {
+ error == Errors.INVALID_TXN_STATE || error ==
Errors.INVALID_PRODUCER_ID_MAPPING) {
fatalError(error.exception());
- } else if (error == Errors.UNKNOWN_PRODUCER_ID || error ==
Errors.INVALID_PRODUCER_ID_MAPPING) {
+ } else if (error == Errors.UNKNOWN_PRODUCER_ID) {
abortableErrorIfPossible(error.exception());
} else if (error == Errors.TRANSACTION_ABORTABLE) {
abortableError(error.exception());
@@ -1648,14 +1644,14 @@ public class TransactionManager {
reenqueue();
} else if (error.exception() instanceof RetriableException) {
reenqueue();
- } else if (error == Errors.UNKNOWN_PRODUCER_ID || error ==
Errors.INVALID_PRODUCER_ID_MAPPING) {
+ } else if (error == Errors.UNKNOWN_PRODUCER_ID) {
abortableErrorIfPossible(error.exception());
} else if (error == Errors.INVALID_PRODUCER_EPOCH || error ==
Errors.PRODUCER_FENCED) {
// We could still receive INVALID_PRODUCER_EPOCH from old
versioned transaction coordinator,
// just treat it the same as PRODUCE_FENCED.
fatalError(Errors.PRODUCER_FENCED.exception());
} else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
- error == Errors.INVALID_TXN_STATE) {
+ error == Errors.INVALID_TXN_STATE || error ==
Errors.INVALID_PRODUCER_ID_MAPPING) {
fatalError(error.exception());
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 76e5717024a..02570b083ec 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -3346,7 +3346,7 @@ public class TransactionManagerTest {
transactionManager.beginTransaction();
transactionManager.maybeAddPartition(tp0);
- prepareAddPartitionsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING,
tp0, initialEpoch, producerId);
+ prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_PRODUCER_ID, tp0,
initialEpoch, producerId);
runUntil(transactionManager::hasAbortableError);
TransactionalRequestResult abortResult =
transactionManager.beginAbort();
@@ -3378,7 +3378,7 @@ public class TransactionManagerTest {
offsets.put(tp0, new OffsetAndMetadata(1));
transactionManager.sendOffsetsToTransaction(offsets, new
ConsumerGroupMetadata(consumerGroupId));
assertFalse(transactionManager.hasPendingOffsetCommits());
- prepareAddOffsetsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING,
consumerGroupId, producerId, initialEpoch);
+ prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_PRODUCER_ID,
consumerGroupId, producerId, initialEpoch);
runUntil(transactionManager::hasAbortableError); // Send
AddOffsetsRequest
TransactionalRequestResult abortResult =
transactionManager.beginAbort();
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index 89e9e8dd6f6..918d79b436e 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -26,13 +26,12 @@ import kafka.utils.TestUtils.{consumeRecords,
createAdminClient}
import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry,
ProducerState}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidPidMappingException,
TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs,
ServerLogConfigs}
-import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
@@ -126,13 +125,20 @@ class ProducerIdExpirationTest extends
KafkaServerTestHarness {
// Producer IDs should be retained.
assertEquals(1, producerState.size)
- // Start a new transaction and attempt to send, which will trigger an
AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
+ // Start a new transaction and attempt to send, triggering an
AddPartitionsToTxnRequest that will fail
+ // due to the expired transactional ID, resulting in a fatal error.
producer.beginTransaction()
val failedFuture =
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"1", "1", willBeCommitted = false))
TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never
completed.")
+ org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture,
classOf[InvalidPidMappingException])
- JTestUtils.assertFutureThrows(failedFuture,
classOf[InvalidPidMappingException])
- producer.abortTransaction()
+ // Assert that aborting the transaction throws a KafkaException due to the
fatal error.
+ assertThrows(classOf[KafkaException], () => producer.abortTransaction())
+
+ // Close the producer and reinitialize to recover from the fatal error.
+ producer.close()
+ producer = TestUtils.createTransactionalProducer("transactionalProducer",
brokers)
+ producer.initTransactions()
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0,
"4", "4", willBeCommitted = true))
diff --git
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index c718f71a5ba..b0e291c5efc 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -20,20 +20,20 @@ package kafka.api
import java.util.{Collections, Properties}
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.{TestInfoUtils, TestUtils}
+import kafka.utils.TestUtils
import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
import org.apache.kafka.clients.admin.{Admin, ProducerState}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.{InvalidPidMappingException,
TransactionalIdNotFoundException}
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs,
ServerLogConfigs}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows,
assertTrue}
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
+import org.junit.jupiter.params.provider.CsvSource
import scala.jdk.CollectionConverters._
import scala.collection.Seq
@@ -81,9 +81,14 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
super.tearDown()
}
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
- def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String,
groupProtocol: String): Unit = {
+ @ParameterizedTest
+ @CsvSource(Array(
+ "kraft,classic,false",
+ "kraft,consumer,false",
+ "kraft,classic,true",
+ "kraft,consumer,true",
+ ))
+ def testFatalErrorAfterInvalidProducerIdMapping(quorum: String,
groupProtocol: String, isTV2Enabled: Boolean): Unit = {
producer.initTransactions()
// Start and then abort a transaction to allow the transactional ID to
expire.
@@ -96,14 +101,22 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
waitUntilTransactionalStateExists()
waitUntilTransactionalStateExpires()
- // Start a new transaction and attempt to send, which will trigger an
AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
+ // Start a new transaction and attempt to send, triggering an
AddPartitionsToTxnRequest that will fail
+ // due to the expired transactional ID, resulting in a fatal error.
producer.beginTransaction()
val failedFuture =
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3,
"1", "1", willBeCommitted = false))
TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never
completed.")
-
org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture,
classOf[InvalidPidMappingException])
- producer.abortTransaction()
+ // Assert that aborting the transaction throws a KafkaException due to the
fatal error.
+ assertThrows(classOf[KafkaException], () => producer.abortTransaction())
+
+ // Close the producer and reinitialize to recover from the fatal error.
+ producer.close()
+ producer = TestUtils.createTransactionalProducer("transactionalProducer",
brokers)
+ producer.initTransactions()
+
+ // Proceed with a new transaction after reinitializing.
producer.beginTransaction()
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2,
null, "2", "2", willBeCommitted = true))
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 2,
"4", "4", willBeCommitted = true))
@@ -170,7 +183,9 @@ class TransactionsExpirationTest extends
KafkaServerTestHarness {
// soon after the first will re-use the same producerId, while bumping the
epoch to indicate that they are distinct.
assertEquals(oldProducerId, newProducerId)
if (isTV2Enabled) {
- assertEquals(oldProducerEpoch + 3, newProducerEpoch)
+ // TV2 bumps epoch on EndTxn, and the final commit may or may not have
bumped the epoch in the producer state.
+ // The epoch should be at least oldProducerEpoch + 2 for the first
commit and the restarted producer.
+ assertTrue(oldProducerEpoch + 2 <= newProducerEpoch)
} else {
assertEquals(oldProducerEpoch + 1, newProducerEpoch)
}
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index fba101e1f76..0c9c1f2cabc 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -697,7 +697,7 @@ class TransactionsTest extends IntegrationTestHarness {
assertThrows(classOf[IllegalStateException], () =>
producer.initTransactions())
}
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+ @ParameterizedTest
@CsvSource(Array(
"kraft,classic,false",
"kraft,consumer,false",
@@ -762,8 +762,11 @@ class TransactionsTest extends IntegrationTestHarness {
}
}
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
- @CsvSource(Array("kraft, classic, true", "kraft, consumer, true"))
+ @ParameterizedTest
+ @CsvSource(Array(
+ "kraft, classic, true",
+ "kraft, consumer, true"
+ ))
def testBumpTransactionalEpochWithTV2Enabled(quorum: String, groupProtocol:
String, isTV2Enabled: Boolean): Unit = {
val producer = createTransactionalProducer("transactionalProducer",
deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)