This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c6b44b5d66f Cherry Pick KAFKA-19367 to 4.0 (#19958)
c6b44b5d66f is described below

commit c6b44b5d66f05c5a1ff792e9bcb43b7d9cfff267
Author: Ritika Reddy <[email protected]>
AuthorDate: Sat Jun 14 11:40:00 2025 -0700

    Cherry Pick KAFKA-19367 to 4.0 (#19958)
    
    
[https://github.com/apache/kafka/commit/0b2e410d61970e66c6f73a18c75028df0a871777](url)
    Bug fix in 4.0
    **Conflicts:**
    
    - The Transaction Coordinator had some conflicts, mainly with the
    transaction states. Ex: ongoing in 4.0 is TransactionState.ONGOING in
    4.1.
    - The TransactionCoordinatorTest file had conflicts w.r.t the 2PC
    changes from KIP-939 in 4.1 and the above mentioned state changes
    
    Reviewers: Justine Olshan <[email protected]>, Artem Livshits
    <[email protected]>
---
 .../transaction/TransactionCoordinator.scala       |   6 +-
 .../transaction/TransactionCoordinatorTest.scala   | 134 +++++++++++++++++++++
 2 files changed, 136 insertions(+), 4 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index e0019f0d773..064de12a4a9 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -802,11 +802,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
                 }
 
               if (nextState == PrepareAbort && isEpochFence) {
-                // We should clear the pending state to make way for the 
transition to PrepareAbort and also bump
-                // the epoch in the transaction metadata we are about to 
append.
+                // We should clear the pending state to make way for the 
transition to PrepareAbort
                 txnMetadata.pendingState = None
-                txnMetadata.producerEpoch = producerEpoch
-                txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
+                // For TV2+, don't manually set the epoch - let 
prepareAbortOrCommit handle it naturally.
               }
 
               nextProducerIdOrErrors.flatMap {
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index e5b48d92466..bbca105fd44 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -1165,6 +1165,140 @@ class TransactionCoordinatorTest {
       any())
   }
 
+  @Test
+  def shouldNotCauseEpochOverflowWhenInitPidDuringOngoingTxnV2(): Unit = {
+    // When InitProducerId is called with an ongoing transaction at epoch 
32766 (Short.MaxValue - 1),
+    // it should not cause an epoch overflow by incrementing twice.
+    // The only true increment happens in prepareAbortOrCommit
+    val txnMetadata = new TransactionMetadata(transactionalId, producerId, 
producerId, RecordBatch.NO_PRODUCER_ID,
+      (Short.MaxValue - 1).toShort, (Short.MaxValue - 2).toShort, 
txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds(), 
TV_2)
+
+    when(transactionManager.validateTransactionTimeoutMs(anyInt()))
+      .thenReturn(true)
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
+    when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+    // Capture the transition metadata to verify epoch increments
+    val capturedTxnTransitMetadata: ArgumentCaptor[TxnTransitMetadata] = 
ArgumentCaptor.forClass(classOf[TxnTransitMetadata])
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      capturedTxnTransitMetadata.capture(),
+      capturedErrorsCallback.capture(),
+      any(),
+      any())
+    ).thenAnswer(invocation => {
+      val transitMetadata = invocation.getArgument[TxnTransitMetadata](2)
+      // Simulate the metadata update that would happen in the real 
appendTransactionToLog
+      txnMetadata.completeTransitionTo(transitMetadata)
+      capturedErrorsCallback.getValue.apply(Errors.NONE)
+    })
+
+    // Handle InitProducerId with ongoing transaction at epoch 32766
+    coordinator.handleInitProducerId(
+      transactionalId,
+      txnTimeoutMs,
+      None,
+      initProducerIdMockCallback
+    )
+
+    // Verify that the epoch did not overflow (should be Short.MaxValue = 
32767, not negative)
+    assertEquals(Short.MaxValue, txnMetadata.producerEpoch)
+    assertEquals(PrepareAbort, txnMetadata.state)
+    
+    verify(transactionManager).validateTransactionTimeoutMs(anyInt())
+    verify(transactionManager, 
times(3)).getTransactionState(ArgumentMatchers.eq(transactionalId))
+    verify(transactionManager).appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any[TxnTransitMetadata],
+      any(),
+      any(),
+      any())
+  }
+
+  @Test
+  def shouldHandleTimeoutAtEpochOverflowBoundaryCorrectlyTV2(): Unit = {
+    // Test the scenario where we have an ongoing transaction at epoch 32766 
(Short.MaxValue - 1)
+    // and the producer crashes/times out. This test verifies that the timeout 
handling
+    // correctly manages the epoch overflow scenario without causing failures.
+
+    val epochAtMaxBoundary = (Short.MaxValue - 1).toShort // 32766
+    val now = time.milliseconds()
+
+    // Create transaction metadata at the epoch boundary that would cause 
overflow IFF double-incremented
+    val txnMetadata = new TransactionMetadata(
+      transactionalId = transactionalId,
+      producerId = producerId,
+      previousProducerId = RecordBatch.NO_PRODUCER_ID,
+      nextProducerId = RecordBatch.NO_PRODUCER_ID,
+      producerEpoch = epochAtMaxBoundary,
+      lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+      txnTimeoutMs = txnTimeoutMs,
+      state = Ongoing,
+      topicPartitions = partitions,
+      txnStartTimestamp = now,
+      txnLastUpdateTimestamp = now,
+      clientTransactionVersion = TV_2
+    )
+    assertTrue(txnMetadata.isProducerEpochExhausted)
+
+    // Mock the transaction manager to return our test transaction as timed out
+    when(transactionManager.timedOutTransactions())
+      .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId, 
producerId, epochAtMaxBoundary)))
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
+    when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+    // Mock the append operation to simulate successful write and update the 
metadata
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any[TxnTransitMetadata],
+      capturedErrorsCallback.capture(),
+      any(),
+      any())
+    ).thenAnswer(invocation => {
+      val transitMetadata = invocation.getArgument[TxnTransitMetadata](2)
+      // Simulate the metadata update that would happen in the real 
appendTransactionToLog
+      txnMetadata.completeTransitionTo(transitMetadata)
+      capturedErrorsCallback.getValue.apply(Errors.NONE)
+    })
+
+    // Track the actual behavior
+    var callbackInvoked = false
+    var resultError: Errors = null
+    var resultProducerId: Long = -1
+    var resultEpoch: Short = -1
+
+    def checkOnEndTransactionComplete(txnIdAndPidEpoch: 
TransactionalIdAndProducerIdEpoch)
+      (error: Errors, newProducerId: Long, newProducerEpoch: Short): Unit = {
+        callbackInvoked = true
+        resultError = error
+        resultProducerId = newProducerId
+        resultEpoch = newProducerEpoch
+      }
+
+    // Execute the timeout abort process
+    coordinator.abortTimedOutTransactions(checkOnEndTransactionComplete)
+
+    assertTrue(callbackInvoked, "Callback should have been invoked")
+    assertEquals(Errors.NONE, resultError, "Expected no errors in the 
callback")
+    assertEquals(producerId, resultProducerId, "Expected producer ID to match")
+    assertEquals(Short.MaxValue, resultEpoch, "Expected producer epoch to be 
Short.MaxValue (32767) single epoch bump")
+    
+    // Verify the transaction metadata was correctly updated to the final epoch
+    assertEquals(Short.MaxValue, txnMetadata.producerEpoch, 
+      s"Expected transaction metadata producer epoch to be ${Short.MaxValue} " 
+
+        s"after timeout handling, but was ${txnMetadata.producerEpoch}"
+    )
+
+    // Verify the basic flow was attempted
+    verify(transactionManager).timedOutTransactions()
+    verify(transactionManager, 
atLeast(1)).getTransactionState(ArgumentMatchers.eq(transactionalId))
+  }
+
   @Test
   def testInitProducerIdWithNoLastProducerData(): Unit = {
     // If the metadata doesn't include the previous producer data (for 
example, if it was written to the log by a broker

Reply via email to