This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e4c00346793 KAFKA-18019: Make INVALID_PRODUCER_ID_MAPPING a fatal 
error (#17822)
e4c00346793 is described below

commit e4c00346793f93bf358acbf85da4bc18c916eb2a
Author: Ritika Reddy <[email protected]>
AuthorDate: Sun Nov 17 18:43:04 2024 -0800

    KAFKA-18019: Make INVALID_PRODUCER_ID_MAPPING a fatal error (#17822)
    
    This patch contains changes to the handling of the 
INVALID_PRODUCER_ID_MAPPING error.
    Quoted from KIP-890
    Since we bump epoch on abort, we no longer need to call InitProducerId to 
fence requests. InitProducerId will only be called when the producer starts up 
to fence a previous instance.
    
    With this change, some other calls to InitProducerId were inspected 
including the call after receiving an InvalidPidMappingException. This 
exception was changed to abortable as part of KIP-360: Improve reliability of 
idempotent/transactional producer. However, this change means that we can 
violate EOS guarantees. As an example:
    
    Consider an application that is copying data from one partition to another
    
    Application instance A processes to offset 4
    Application instance B comes up and fences application instance A
    Application instance B processes to offset 5
    Application instances A and B are idle for transaction.id.expiration.ms, 
transaction id expires on server
    Application instance A attempts to process offset 5 (since in its view, 
that is next) -- if we recover from invalid pid mapping, we can duplicate this 
processing
    Thus, INVALID_PID_MAPPING should be fatal to the producer.
    
    This is consistent with KIP-1050: Consistent error handling for 
Transactions where errors that are fatal to the producer are in the 
"application recoverable" category. This is a grouping that indicates to the 
client that the producer needs to restart and recovery on the application side 
is necessary. KIP-1050 is approved so we are consistent with that decision.
    
    This PR also fixes the flakiness of TransactionsExpirationTest.
    
    Reviewers:  Artem Livshits <[email protected]>, Justine Olshan 
<[email protected]>, Calvin Liu <[email protected]>
---
 .../producer/internals/TransactionManager.java     | 46 ++++++++++------------
 .../producer/internals/TransactionManagerTest.java |  4 +-
 .../kafka/api/ProducerIdExpirationTest.scala       | 16 +++++---
 .../kafka/api/TransactionsExpirationTest.scala     | 37 +++++++++++------
 .../integration/kafka/api/TransactionsTest.scala   |  9 +++--
 5 files changed, 66 insertions(+), 46 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 8057a82c568..3c916de9c0f 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
-import org.apache.kafka.common.errors.InvalidPidMappingException;
 import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.ProducerFencedException;
@@ -339,27 +338,24 @@ public class TransactionManager {
         if (!newPartitionsInTransaction.isEmpty())
             enqueueRequest(addPartitionsToTransactionHandler());
 
-        // If the error is an INVALID_PRODUCER_ID_MAPPING error, the server 
will not accept an EndTxnRequest, so skip
-        // directly to InitProducerId. Otherwise, we must first abort the 
transaction, because the producer will be
-        // fenced if we directly call InitProducerId.
-        if (!(lastError instanceof InvalidPidMappingException)) {
-            EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
-                    new EndTxnRequestData()
-                            .setTransactionalId(transactionalId)
-                            .setProducerId(producerIdAndEpoch.producerId)
-                            .setProducerEpoch(producerIdAndEpoch.epoch)
-                            .setCommitted(transactionResult.id),
-                    isTransactionV2Enabled
-            );
+        EndTxnRequest.Builder builder = new EndTxnRequest.Builder(
+            new EndTxnRequestData()
+                .setTransactionalId(transactionalId)
+                .setProducerId(producerIdAndEpoch.producerId)
+                .setProducerEpoch(producerIdAndEpoch.epoch)
+                .setCommitted(transactionResult.id),
+            isTransactionV2Enabled
+        );
 
-            EndTxnHandler handler = new EndTxnHandler(builder);
-            enqueueRequest(handler);
-            if (!epochBumpRequired) {
-                return handler.result;
-            }
+        EndTxnHandler handler = new EndTxnHandler(builder);
+        enqueueRequest(handler);
+
+        // If an epoch bump is required for recovery, initialize the 
transaction after completing the EndTxn request.
+        if (epochBumpRequired) {
+            return initializeTransactions(this.producerIdAndEpoch);
         }
 
-        return initializeTransactions(this.producerIdAndEpoch);
+        return handler.result;
     }
 
     public synchronized TransactionalRequestResult 
sendOffsetsToTransaction(final Map<TopicPartition, OffsetAndMetadata> offsets,
@@ -1410,7 +1406,7 @@ public class TransactionManager {
                     fatalError(Errors.PRODUCER_FENCED.exception());
                     return;
                 } else if (error == 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-                        error == Errors.INVALID_TXN_STATE) {
+                        error == Errors.INVALID_TXN_STATE || error == 
Errors.INVALID_PRODUCER_ID_MAPPING) {
                     fatalError(error.exception());
                     return;
                 } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
@@ -1419,7 +1415,7 @@ public class TransactionManager {
                     log.debug("Did not attempt to add partition {} to 
transaction because other partitions in the " +
                             "batch had errors.", topicPartition);
                     hasPartitionErrors = true;
-                } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == 
Errors.INVALID_PRODUCER_ID_MAPPING) {
+                } else if (error == Errors.UNKNOWN_PRODUCER_ID) {
                     abortableErrorIfPossible(error.exception());
                     return;
                 } else if (error == Errors.TRANSACTION_ABORTABLE) {
@@ -1595,9 +1591,9 @@ public class TransactionManager {
                 // just treat it the same as PRODUCE_FENCED.
                 fatalError(Errors.PRODUCER_FENCED.exception());
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-                    error == Errors.INVALID_TXN_STATE) {
+                    error == Errors.INVALID_TXN_STATE || error == 
Errors.INVALID_PRODUCER_ID_MAPPING) {
                 fatalError(error.exception());
-            } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == 
Errors.INVALID_PRODUCER_ID_MAPPING) {
+            } else if (error == Errors.UNKNOWN_PRODUCER_ID) {
                 abortableErrorIfPossible(error.exception());
             } else if (error == Errors.TRANSACTION_ABORTABLE) {
                 abortableError(error.exception());
@@ -1648,14 +1644,14 @@ public class TransactionManager {
                 reenqueue();
             } else if (error.exception() instanceof RetriableException) {
                 reenqueue();
-            } else if (error == Errors.UNKNOWN_PRODUCER_ID || error == 
Errors.INVALID_PRODUCER_ID_MAPPING) {
+            } else if (error == Errors.UNKNOWN_PRODUCER_ID) {
                 abortableErrorIfPossible(error.exception());
             } else if (error == Errors.INVALID_PRODUCER_EPOCH || error == 
Errors.PRODUCER_FENCED) {
                 // We could still receive INVALID_PRODUCER_EPOCH from old 
versioned transaction coordinator,
                 // just treat it the same as PRODUCE_FENCED.
                 fatalError(Errors.PRODUCER_FENCED.exception());
             } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED ||
-                    error == Errors.INVALID_TXN_STATE) {
+                    error == Errors.INVALID_TXN_STATE || error == 
Errors.INVALID_PRODUCER_ID_MAPPING) {
                 fatalError(error.exception());
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                 
abortableError(GroupAuthorizationException.forGroupId(builder.data.groupId()));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 76e5717024a..02570b083ec 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -3346,7 +3346,7 @@ public class TransactionManagerTest {
 
         transactionManager.beginTransaction();
         transactionManager.maybeAddPartition(tp0);
-        prepareAddPartitionsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, 
tp0, initialEpoch, producerId);
+        prepareAddPartitionsToTxnResponse(Errors.UNKNOWN_PRODUCER_ID, tp0, 
initialEpoch, producerId);
         runUntil(transactionManager::hasAbortableError);
         TransactionalRequestResult abortResult = 
transactionManager.beginAbort();
 
@@ -3378,7 +3378,7 @@ public class TransactionManagerTest {
         offsets.put(tp0, new OffsetAndMetadata(1));
         transactionManager.sendOffsetsToTransaction(offsets, new 
ConsumerGroupMetadata(consumerGroupId));
         assertFalse(transactionManager.hasPendingOffsetCommits());
-        prepareAddOffsetsToTxnResponse(Errors.INVALID_PRODUCER_ID_MAPPING, 
consumerGroupId, producerId, initialEpoch);
+        prepareAddOffsetsToTxnResponse(Errors.UNKNOWN_PRODUCER_ID, 
consumerGroupId, producerId, initialEpoch);
         runUntil(transactionManager::hasAbortableError);  // Send 
AddOffsetsRequest
         TransactionalRequestResult abortResult = 
transactionManager.beginAbort();
 
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
index 89e9e8dd6f6..918d79b436e 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerIdExpirationTest.scala
@@ -26,13 +26,12 @@ import kafka.utils.TestUtils.{consumeRecords, 
createAdminClient}
 import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, 
ProducerState}
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors.{InvalidPidMappingException, 
TransactionalIdNotFoundException}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs}
-import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
@@ -126,13 +125,20 @@ class ProducerIdExpirationTest extends 
KafkaServerTestHarness {
     // Producer IDs should be retained.
     assertEquals(1, producerState.size)
 
-    // Start a new transaction and attempt to send, which will trigger an 
AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
+    // Start a new transaction and attempt to send, triggering an 
AddPartitionsToTxnRequest that will fail
+    // due to the expired transactional ID, resulting in a fatal error.
     producer.beginTransaction()
     val failedFuture = 
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"1", "1", willBeCommitted = false))
     TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never 
completed.")
+    org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, 
classOf[InvalidPidMappingException])
 
-    JTestUtils.assertFutureThrows(failedFuture, 
classOf[InvalidPidMappingException])
-    producer.abortTransaction()
+    // Assert that aborting the transaction throws a KafkaException due to the 
fatal error.
+    assertThrows(classOf[KafkaException], () => producer.abortTransaction())
+
+    // Close the producer and reinitialize to recover from the fatal error.
+    producer.close()
+    producer = TestUtils.createTransactionalProducer("transactionalProducer", 
brokers)
+    producer.initTransactions()
 
     producer.beginTransaction()
     
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 0, 
"4", "4", willBeCommitted = true))
diff --git 
a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
index c718f71a5ba..b0e291c5efc 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsExpirationTest.scala
@@ -20,20 +20,20 @@ package kafka.api
 import java.util.{Collections, Properties}
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.{TestInfoUtils, TestUtils}
+import kafka.utils.TestUtils
 import kafka.utils.TestUtils.{consumeRecords, createAdminClient}
 import org.apache.kafka.clients.admin.{Admin, ProducerState}
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.{InvalidPidMappingException, 
TransactionalIdNotFoundException}
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs, 
ServerLogConfigs}
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, 
assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
+import org.junit.jupiter.params.provider.CsvSource
 
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
@@ -81,9 +81,14 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     super.tearDown()
   }
 
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
-  def testBumpTransactionalEpochAfterInvalidProducerIdMapping(quorum: String, 
groupProtocol: String): Unit = {
+  @ParameterizedTest
+  @CsvSource(Array(
+    "kraft,classic,false",
+    "kraft,consumer,false",
+    "kraft,classic,true",
+    "kraft,consumer,true",
+  ))
+  def testFatalErrorAfterInvalidProducerIdMapping(quorum: String, 
groupProtocol: String, isTV2Enabled: Boolean): Unit = {
     producer.initTransactions()
 
     // Start and then abort a transaction to allow the transactional ID to 
expire.
@@ -96,14 +101,22 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     waitUntilTransactionalStateExists()
     waitUntilTransactionalStateExpires()
 
-    // Start a new transaction and attempt to send, which will trigger an 
AddPartitionsToTxnRequest, which will fail due to the expired transactional ID.
+    // Start a new transaction and attempt to send, triggering an 
AddPartitionsToTxnRequest that will fail
+    // due to the expired transactional ID, resulting in a fatal error.
     producer.beginTransaction()
     val failedFuture = 
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 3, 
"1", "1", willBeCommitted = false))
     TestUtils.waitUntilTrue(() => failedFuture.isDone, "Producer future never 
completed.")
-
     org.apache.kafka.test.TestUtils.assertFutureThrows(failedFuture, 
classOf[InvalidPidMappingException])
-    producer.abortTransaction()
 
+    // Assert that aborting the transaction throws a KafkaException due to the 
fatal error.
+    assertThrows(classOf[KafkaException], () => producer.abortTransaction())
+
+    // Close the producer and reinitialize to recover from the fatal error.
+    producer.close()
+    producer = TestUtils.createTransactionalProducer("transactionalProducer", 
brokers)
+    producer.initTransactions()
+
+    // Proceed with a new transaction after reinitializing.
     producer.beginTransaction()
     
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 
null, "2", "2", willBeCommitted = true))
     
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 2, 
"4", "4", willBeCommitted = true))
@@ -170,7 +183,9 @@ class TransactionsExpirationTest extends 
KafkaServerTestHarness {
     // soon after the first will re-use the same producerId, while bumping the 
epoch to indicate that they are distinct.
     assertEquals(oldProducerId, newProducerId)
     if (isTV2Enabled) {
-      assertEquals(oldProducerEpoch + 3, newProducerEpoch)
+      // TV2 bumps epoch on EndTxn, and the final commit may or may not have 
bumped the epoch in the producer state.
+      // The epoch should be at least oldProducerEpoch + 2 for the first 
commit and the restarted producer.
+      assertTrue(oldProducerEpoch + 2 <= newProducerEpoch)
     } else {
       assertEquals(oldProducerEpoch + 1, newProducerEpoch)
     }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index fba101e1f76..0c9c1f2cabc 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -697,7 +697,7 @@ class TransactionsTest extends IntegrationTestHarness {
     assertThrows(classOf[IllegalStateException], () => 
producer.initTransactions())
   }
 
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @ParameterizedTest
   @CsvSource(Array(
     "kraft,classic,false",
     "kraft,consumer,false",
@@ -762,8 +762,11 @@ class TransactionsTest extends IntegrationTestHarness {
     }
   }
 
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  @CsvSource(Array("kraft, classic, true", "kraft, consumer, true"))
+  @ParameterizedTest
+  @CsvSource(Array(
+    "kraft, classic, true",
+    "kraft, consumer, true"
+  ))
   def testBumpTransactionalEpochWithTV2Enabled(quorum: String, groupProtocol: 
String, isTV2Enabled: Boolean): Unit = {
     val producer = createTransactionalProducer("transactionalProducer",
       deliveryTimeoutMs = 5000, requestTimeoutMs = 5000)

Reply via email to