This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ccab9eb8b47 KAFKA-18660: Transactions Version 2 doesn't handle epoch 
overflow correctly (#18730)
ccab9eb8b47 is described below

commit ccab9eb8b4781eb4b8edbd03b505ae31e1abdff7
Author: Justine Olshan <[email protected]>
AuthorDate: Thu Jan 30 13:42:10 2025 -0800

    KAFKA-18660: Transactions Version 2 doesn't handle epoch overflow correctly 
(#18730)
    
    Fixed the typo that used the wrong producer ID and epoch when returning so 
that we handle epoch overflow correctly.
    
    We also had to rearrange the concurrent transaction handling so that we 
don't self-fence when we start the new transaction with the new producer ID.
    
    I also tested this with a modified version of the code where epoch overflow 
happens on the first epoch bump (every request has a new producer id)
    
    Reviewers: Artem Livshits <[email protected]>, Jeff Kim 
<[email protected]>
---
 .../transaction/TransactionCoordinator.scala       | 22 ++++---
 .../integration/kafka/api/TransactionsTest.scala   | 25 ++++----
 .../transaction/TransactionCoordinatorTest.scala   | 69 +++++++++++++++++++++-
 3 files changed, 97 insertions(+), 19 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index d6ca11add86..aec0f3a67ba 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -408,13 +408,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
           // generate the new transaction metadata with added partitions
           txnMetadata.inLock {
-            if (txnMetadata.producerId != producerId) {
+            if (txnMetadata.pendingTransitionInProgress) {
+              // return a retriable exception to let the client backoff and 
retry
+              // This check is performed first so that the pending transition 
can complete before subsequent checks.
+              // With TV2, we may be transitioning over a producer epoch 
overflow, and the producer may be using the
+              // new producer ID that is still only in pending state.
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.producerId != producerId) {
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             } else if (txnMetadata.producerEpoch != producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            } else if (txnMetadata.pendingTransitionInProgress) {
-              // return a retriable exception to let the client backoff and 
retry
-              Left(Errors.CONCURRENT_TRANSACTIONS)
             } else if (txnMetadata.state == PrepareCommit || txnMetadata.state 
== PrepareAbort) {
               Left(Errors.CONCURRENT_TRANSACTIONS)
             } else if (txnMetadata.state == Ongoing && 
partitions.subsetOf(txnMetadata.topicPartitions)) {
@@ -812,10 +815,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
               }
             }
 
-            if (txnMetadata.producerId != producerId && !retryOnOverflow)
-              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
-            else if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != PrepareEpochFence)
+            if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != PrepareEpochFence) {
+              // This check is performed first so that the pending transition 
can complete before the next checks.
+              // With TV2, we may be transitioning over a producer epoch 
overflow, and the producer may be using the
+              // new producer ID that is still only in pending state.
               Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.producerId != producerId && 
!retryOnOverflow)
+              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             else if (!isValidEpoch)
               Left(Errors.PRODUCER_FENCED)
             else txnMetadata.state match {
@@ -940,7 +946,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 case Right((txnMetadata, newPreSendMetadata)) =>
                   // we can respond to the client immediately and continue to 
write the txn markers if
                   // the log append was successful
-                  responseCallback(Errors.NONE, txnMetadata.producerId, 
txnMetadata.producerEpoch)
+                  responseCallback(Errors.NONE, newPreSendMetadata.producerId, 
newPreSendMetadata.producerEpoch)
 
                   
txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, 
txnMetadata, newPreSendMetadata)
               }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 2733167b469..fed2b313d06 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{InvalidProducerEpochException, 
ProducerFencedException, TimeoutException}
+import org.apache.kafka.common.errors.{ConcurrentTransactionsException, 
InvalidProducerEpochException, ProducerFencedException, TimeoutException}
 import org.apache.kafka.common.test.api.Flaky
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
@@ -617,15 +617,20 @@ class TransactionsTest extends IntegrationTestHarness {
     // Wait for the expiration cycle to kick in.
     Thread.sleep(600)
 
-    try {
-      // Now that the transaction has expired, the second send should fail 
with a InvalidProducerEpochException.
-      
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 
null, "2", "2", willBeCommitted = false)).get()
-      fail("should have raised a InvalidProducerEpochException since the 
transaction has expired")
-    } catch {
-      case _: InvalidProducerEpochException =>
-      case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
-    }
+    TestUtils.waitUntilTrue(() => {
+      var foundException = false
+      try {
+        // Now that the transaction has expired, the second send should fail 
with a InvalidProducerEpochException. We may see some 
concurrentTransactionsExceptions.
+        
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 
null, "2", "2", willBeCommitted = false)).get()
+        fail("should have raised an error due to concurrent transactions or 
invalid producer epoch")
+      } catch {
+        case _: ConcurrentTransactionsException =>
+        case _: InvalidProducerEpochException =>
+        case e: ExecutionException =>
+          foundException = 
e.getCause.isInstanceOf[InvalidProducerEpochException]
+      }
+      foundException
+    }, "Never returned the expected InvalidProducerEpochException")
 
     // Verify that the first message was aborted and the second one was never 
written at all.
     val nonTransactionalConsumer = nonTransactionalConsumers.head
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index c8e9a47df1a..7cfe5acb728 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -182,6 +182,40 @@ class TransactionCoordinatorTest {
     assertEquals(Errors.NONE, result.error)
   }
 
+  @Test
+  def shouldGenerateNewProducerIdIfEpochsExhaustedV2(): Unit = {
+    initPidGenericMocks(transactionalId)
+
+    val txnMetadata1 = new TransactionMetadata(transactionalId, producerId, 
producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
+      (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty, 
time.milliseconds(), time.milliseconds(), TV_2)
+    // We start with txnMetadata1 so we can transform the metadata to 
PrepareCommit.
+    val txnMetadata2 = new TransactionMetadata(transactionalId, producerId, 
producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
+      (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty, 
time.milliseconds(), time.milliseconds(), TV_2)
+    val transitMetadata = txnMetadata2.prepareAbortOrCommit(PrepareCommit, 
TV_2, producerId2, time.milliseconds(), false)
+    txnMetadata2.completeTransitionTo(transitMetadata)
+
+    assertEquals(producerId, txnMetadata2.producerId)
+    assertEquals(Short.MaxValue, txnMetadata2.producerEpoch)
+
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata2))))
+
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any[TxnTransitMetadata],
+      capturedErrorsCallback.capture(),
+      any(),
+      any()
+    )).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+    coordinator.handleEndTransaction(transactionalId, producerId, 
(Short.MaxValue - 1).toShort, TransactionResult.COMMIT, TV_2, endTxnCallback)
+    assertEquals(producerId2, newProducerId)
+    assertEquals(0, newEpoch)
+    assertEquals(Errors.NONE, error)
+  }
+
   @Test
   def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator(): Unit = {
     when(transactionManager.validateTransactionTimeoutMs(anyInt()))
@@ -519,7 +553,7 @@ class TransactionCoordinatorTest {
       .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
 
     val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
-    coordinator.handleEndTransaction(transactionalId, producerId, 
nextProducerEpoch.toShort , TransactionResult.ABORT, clientTransactionVersion, 
endTxnCallback)
+    coordinator.handleEndTransaction(transactionalId, producerId, 
nextProducerEpoch.toShort, TransactionResult.ABORT, clientTransactionVersion, 
endTxnCallback)
     if (isRetry) {
       assertEquals(Errors.PRODUCER_FENCED, error)
     } else {
@@ -770,6 +804,39 @@ class TransactionCoordinatorTest {
     verify(transactionManager, 
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
   }
 
+  @Test
+  def 
shouldReturnConcurrentTxnOnAddPartitionsIfEndTxnV2EpochOverflowAndNotComplete():
 Unit = {
+    val prepareWithPending = new TransactionMetadata(transactionalId, 
producerId, producerId,
+      producerId2, Short.MaxValue, (Short.MaxValue - 1).toShort, 1, 
PrepareCommit, collection.mutable.Set.empty[TopicPartition], 0, 
time.milliseconds(), TV_2)
+    val txnTransitMetadata = 
prepareWithPending.prepareComplete(time.milliseconds())
+
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
prepareWithPending))))
+
+    // Return CONCURRENT_TRANSACTIONS while transaction is still completing
+    coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 
0, partitions, errorsCallback, TV_2)
+    assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
+    
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
+
+    prepareWithPending.completeTransitionTo(txnTransitMetadata)
+    assertEquals(CompleteCommit, prepareWithPending.state)
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
prepareWithPending))))
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any[TxnTransitMetadata],
+      capturedErrorsCallback.capture(),
+      any(),
+      any())
+    ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+    coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 
0, partitions, errorsCallback, TV_2)
+
+    assertEquals(Errors.NONE, error)
+    verify(transactionManager, 
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
+  }
+
   @ParameterizedTest
   @ValueSource(shorts = Array(0, 2))
   def 
shouldAppendPrepareCommitToLogOnEndTxnWhenStatusIsOngoingAndResultIsCommit(transactionVersion:
 Short): Unit = {

Reply via email to