This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 685ce7b3543 KAFKA-18660: Transactions Version 2 doesn't handle epoch 
overflow correctly (#18730) (#18758)
685ce7b3543 is described below

commit 685ce7b3543b6b9471e74b0963d39de1a7426ea8
Author: Justine Olshan <[email protected]>
AuthorDate: Fri Jan 31 09:23:31 2025 -0800

    KAFKA-18660: Transactions Version 2 doesn't handle epoch overflow correctly 
(#18730) (#18758)
    
    Fixed the typo that used the wrong producer ID and epoch when returning so 
that we handle epoch overflow correctly.
    
    We also had to rearrange the concurrent transaction handling so that we 
don't self-fence when we start the new transaction with the new producer ID.
    
    I also tested this with a modified version of the code where epoch overflow 
happens on the first epoch bump (every request has a new producer id)
    
    Reviewers: Artem Livshits <[email protected]>, Jeff Kim 
<[email protected]>
---
 .../transaction/TransactionCoordinator.scala       | 22 ++++---
 .../integration/kafka/api/TransactionsTest.scala   | 26 +++-----
 .../transaction/TransactionCoordinatorTest.scala   | 69 +++++++++++++++++++++-
 3 files changed, 91 insertions(+), 26 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index e1edd4e4ddd..e0019f0d773 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -408,13 +408,16 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
           // generate the new transaction metadata with added partitions
           txnMetadata.inLock {
-            if (txnMetadata.producerId != producerId) {
+            if (txnMetadata.pendingTransitionInProgress) {
+              // return a retriable exception to let the client backoff and 
retry
+              // This check is performed first so that the pending transition 
can complete before subsequent checks.
+              // With TV2, we may be transitioning over a producer epoch 
overflow, and the producer may be using the
+              // new producer ID that is still only in pending state.
+              Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.producerId != producerId) {
               Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             } else if (txnMetadata.producerEpoch != producerEpoch) {
               Left(Errors.PRODUCER_FENCED)
-            } else if (txnMetadata.pendingTransitionInProgress) {
-              // return a retriable exception to let the client backoff and 
retry
-              Left(Errors.CONCURRENT_TRANSACTIONS)
             } else if (txnMetadata.state == PrepareCommit || txnMetadata.state 
== PrepareAbort) {
               Left(Errors.CONCURRENT_TRANSACTIONS)
             } else if (txnMetadata.state == Ongoing && 
partitions.subsetOf(txnMetadata.topicPartitions)) {
@@ -812,10 +815,13 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
               }
             }
 
-            if (txnMetadata.producerId != producerId && !retryOnOverflow)
-              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
-            else if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != PrepareEpochFence)
+            if (txnMetadata.pendingTransitionInProgress && 
txnMetadata.pendingState.get != PrepareEpochFence) {
+              // This check is performed first so that the pending transition 
can complete before the next checks.
+              // With TV2, we may be transitioning over a producer epoch 
overflow, and the producer may be using the
+              // new producer ID that is still only in pending state.
               Left(Errors.CONCURRENT_TRANSACTIONS)
+            } else if (txnMetadata.producerId != producerId && 
!retryOnOverflow)
+              Left(Errors.INVALID_PRODUCER_ID_MAPPING)
             else if (!isValidEpoch)
               Left(Errors.PRODUCER_FENCED)
             else txnMetadata.state match {
@@ -940,7 +946,7 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 case Right((txnMetadata, newPreSendMetadata)) =>
                   // we can respond to the client immediately and continue to 
write the txn markers if
                   // the log append was successful
-                  responseCallback(Errors.NONE, txnMetadata.producerId, 
txnMetadata.producerEpoch)
+                  responseCallback(Errors.NONE, newPreSendMetadata.producerId, 
newPreSendMetadata.producerEpoch)
 
                   
txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, 
txnMetadata, newPreSendMetadata)
               }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala 
b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index b32fea75ca6..78eccde0365 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -22,7 +22,7 @@ import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{InvalidProducerEpochException, 
ProducerFencedException, TimeoutException}
+import org.apache.kafka.common.errors.{ConcurrentTransactionsException, 
InvalidProducerEpochException, ProducerFencedException, TimeoutException}
 import org.apache.kafka.common.test.api.Flaky
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
@@ -623,28 +623,20 @@ class TransactionsTest extends IntegrationTestHarness {
     // Wait for the expiration cycle to kick in.
     Thread.sleep(600)
 
-    if (quorum == "zk") {
-      // In zk mode, transaction v1 is used.
+    TestUtils.waitUntilTrue(() => {
+      var foundException = false
       try {
-        // Now that the transaction has expired, the second send should fail 
with a ProducerFencedException.
+        // Now that the transaction has expired, the second send should fail 
with a InvalidProducerEpochException. We may see some 
concurrentTransactionsExceptions.
         
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 
null, "2", "2", willBeCommitted = false)).get()
-        fail("should have raised a ProducerFencedException since the 
transaction has expired")
-      } catch {
-        case _: ProducerFencedException =>
-        case e: ExecutionException =>
-          assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
-      }
-    } else {
-      try {
-        // Now that the transaction has expired, the second send should fail 
with a InvalidProducerEpochException.
-        
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 
null, "2", "2", willBeCommitted = false)).get()
-        fail("should have raised a InvalidProducerEpochException since the 
transaction has expired")
+        fail("should have raised an error due to concurrent transactions or 
invalid producer epoch")
       } catch {
+        case _: ConcurrentTransactionsException =>
         case _: InvalidProducerEpochException =>
         case e: ExecutionException =>
-          assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
+          foundException = 
e.getCause.isInstanceOf[InvalidProducerEpochException]
       }
-    }
+      foundException
+    }, "Never returned the expected InvalidProducerEpochException")
 
     // Verify that the first message was aborted and the second one was never 
written at all.
     val nonTransactionalConsumer = nonTransactionalConsumers.head
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index ab5ff72cd98..e5b48d92466 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -182,6 +182,40 @@ class TransactionCoordinatorTest {
     assertEquals(Errors.NONE, result.error)
   }
 
+  @Test
+  def shouldGenerateNewProducerIdIfEpochsExhaustedV2(): Unit = {
+    initPidGenericMocks(transactionalId)
+
+    val txnMetadata1 = new TransactionMetadata(transactionalId, producerId, 
producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
+      (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty, 
time.milliseconds(), time.milliseconds(), TV_2)
+    // We start with txnMetadata1 so we can transform the metadata to 
PrepareCommit.
+    val txnMetadata2 = new TransactionMetadata(transactionalId, producerId, 
producerId, RecordBatch.NO_PRODUCER_ID, (Short.MaxValue - 1).toShort,
+      (Short.MaxValue - 2).toShort, txnTimeoutMs, Ongoing, mutable.Set.empty, 
time.milliseconds(), time.milliseconds(), TV_2)
+    val transitMetadata = txnMetadata2.prepareAbortOrCommit(PrepareCommit, 
TV_2, producerId2, time.milliseconds(), false)
+    txnMetadata2.completeTransitionTo(transitMetadata)
+
+    assertEquals(producerId, txnMetadata2.producerId)
+    assertEquals(Short.MaxValue, txnMetadata2.producerEpoch)
+
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata1))))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata2))))
+
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any[TxnTransitMetadata],
+      capturedErrorsCallback.capture(),
+      any(),
+      any()
+    )).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+    coordinator.handleEndTransaction(transactionalId, producerId, 
(Short.MaxValue - 1).toShort, TransactionResult.COMMIT, TV_2, endTxnCallback)
+    assertEquals(producerId2, newProducerId)
+    assertEquals(0, newEpoch)
+    assertEquals(Errors.NONE, error)
+  }
+
   @Test
   def shouldRespondWithNotCoordinatorOnInitPidWhenNotCoordinator(): Unit = {
     when(transactionManager.validateTransactionTimeoutMs(anyInt()))
@@ -519,7 +553,7 @@ class TransactionCoordinatorTest {
       .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
 
     val nextProducerEpoch = if (isRetry) producerEpoch - 1 else producerEpoch
-    coordinator.handleEndTransaction(transactionalId, producerId, 
nextProducerEpoch.toShort , TransactionResult.ABORT, clientTransactionVersion, 
endTxnCallback)
+    coordinator.handleEndTransaction(transactionalId, producerId, 
nextProducerEpoch.toShort, TransactionResult.ABORT, clientTransactionVersion, 
endTxnCallback)
     if (isRetry) {
       assertEquals(Errors.PRODUCER_FENCED, error)
     } else {
@@ -770,6 +804,39 @@ class TransactionCoordinatorTest {
     verify(transactionManager, 
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
   }
 
+  @Test
+  def 
shouldReturnConcurrentTxnOnAddPartitionsIfEndTxnV2EpochOverflowAndNotComplete():
 Unit = {
+    val prepareWithPending = new TransactionMetadata(transactionalId, 
producerId, producerId,
+      producerId2, Short.MaxValue, (Short.MaxValue - 1).toShort, 1, 
PrepareCommit, collection.mutable.Set.empty[TopicPartition], 0, 
time.milliseconds(), TV_2)
+    val txnTransitMetadata = 
prepareWithPending.prepareComplete(time.milliseconds())
+
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
prepareWithPending))))
+
+    // Return CONCURRENT_TRANSACTIONS while transaction is still completing
+    coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 
0, partitions, errorsCallback, TV_2)
+    assertEquals(Errors.CONCURRENT_TRANSACTIONS, error)
+    
verify(transactionManager).getTransactionState(ArgumentMatchers.eq(transactionalId))
+
+    prepareWithPending.completeTransitionTo(txnTransitMetadata)
+    assertEquals(CompleteCommit, prepareWithPending.state)
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
prepareWithPending))))
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any[TxnTransitMetadata],
+      capturedErrorsCallback.capture(),
+      any(),
+      any())
+    ).thenAnswer(_ => capturedErrorsCallback.getValue.apply(Errors.NONE))
+
+    coordinator.handleAddPartitionsToTransaction(transactionalId, producerId2, 
0, partitions, errorsCallback, TV_2)
+
+    assertEquals(Errors.NONE, error)
+    verify(transactionManager, 
times(2)).getTransactionState(ArgumentMatchers.eq(transactionalId))
+  }
+
   @ParameterizedTest
   @ValueSource(shorts = Array(0, 2))
   def 
shouldAppendPrepareCommitToLogOnEndTxnWhenStatusIsOngoingAndResultIsCommit(transactionVersion:
 Short): Unit = {

Reply via email to