This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new c6b44b5d66f Cherry Pick KAFKA-19367 to 4.0 (#19958)
c6b44b5d66f is described below
commit c6b44b5d66f05c5a1ff792e9bcb43b7d9cfff267
Author: Ritika Reddy <[email protected]>
AuthorDate: Sat Jun 14 11:40:00 2025 -0700
Cherry Pick KAFKA-19367 to 4.0 (#19958)
[https://github.com/apache/kafka/commit/0b2e410d61970e66c6f73a18c75028df0a871777](url)
Bug fix in 4.0
**Conflicts:**
- The Transaction Coordinator had some conflicts, mainly with the
transaction states. Ex: ongoing in 4.0 is TransactionState.ONGOING in
4.1.
- The TransactionCoordinatorTest file had conflicts w.r.t the 2PC
changes from KIP-939 in 4.1 and the above mentioned state changes
Reviewers: Justine Olshan <[email protected]>, Artem Livshits
<[email protected]>
---
.../transaction/TransactionCoordinator.scala | 6 +-
.../transaction/TransactionCoordinatorTest.scala | 134 +++++++++++++++++++++
2 files changed, 136 insertions(+), 4 deletions(-)
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index e0019f0d773..064de12a4a9 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -802,11 +802,9 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
}
if (nextState == PrepareAbort && isEpochFence) {
- // We should clear the pending state to make way for the
transition to PrepareAbort and also bump
- // the epoch in the transaction metadata we are about to
append.
+ // We should clear the pending state to make way for the
transition to PrepareAbort
txnMetadata.pendingState = None
- txnMetadata.producerEpoch = producerEpoch
- txnMetadata.lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH
+ // For TV2+, don't manually set the epoch - let
prepareAbortOrCommit handle it naturally.
}
nextProducerIdOrErrors.flatMap {
diff --git
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index e5b48d92466..bbca105fd44 100644
---
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -1165,6 +1165,140 @@ class TransactionCoordinatorTest {
any())
}
+ @Test
+ def shouldNotCauseEpochOverflowWhenInitPidDuringOngoingTxnV2(): Unit = {
+ // When InitProducerId is called with an ongoing transaction at epoch
32766 (Short.MaxValue - 1),
+ // it should not cause an epoch overflow by incrementing twice.
+ // The only true increment happens in prepareAbortOrCommit
+ val txnMetadata = new TransactionMetadata(transactionalId, producerId,
producerId, RecordBatch.NO_PRODUCER_ID,
+ (Short.MaxValue - 1).toShort, (Short.MaxValue - 2).toShort,
txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds(),
TV_2)
+
+ when(transactionManager.validateTransactionTimeoutMs(anyInt()))
+ .thenReturn(true)
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+ // Capture the transition metadata to verify epoch increments
+ val capturedTxnTransitMetadata: ArgumentCaptor[TxnTransitMetadata] =
ArgumentCaptor.forClass(classOf[TxnTransitMetadata])
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ capturedTxnTransitMetadata.capture(),
+ capturedErrorsCallback.capture(),
+ any(),
+ any())
+ ).thenAnswer(invocation => {
+ val transitMetadata = invocation.getArgument[TxnTransitMetadata](2)
+ // Simulate the metadata update that would happen in the real
appendTransactionToLog
+ txnMetadata.completeTransitionTo(transitMetadata)
+ capturedErrorsCallback.getValue.apply(Errors.NONE)
+ })
+
+ // Handle InitProducerId with ongoing transaction at epoch 32766
+ coordinator.handleInitProducerId(
+ transactionalId,
+ txnTimeoutMs,
+ None,
+ initProducerIdMockCallback
+ )
+
+ // Verify that the epoch did not overflow (should be Short.MaxValue =
32767, not negative)
+ assertEquals(Short.MaxValue, txnMetadata.producerEpoch)
+ assertEquals(PrepareAbort, txnMetadata.state)
+
+ verify(transactionManager).validateTransactionTimeoutMs(anyInt())
+ verify(transactionManager,
times(3)).getTransactionState(ArgumentMatchers.eq(transactionalId))
+ verify(transactionManager).appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any[TxnTransitMetadata],
+ any(),
+ any(),
+ any())
+ }
+
+ @Test
+ def shouldHandleTimeoutAtEpochOverflowBoundaryCorrectlyTV2(): Unit = {
+ // Test the scenario where we have an ongoing transaction at epoch 32766
(Short.MaxValue - 1)
+ // and the producer crashes/times out. This test verifies that the timeout
handling
+ // correctly manages the epoch overflow scenario without causing failures.
+
+ val epochAtMaxBoundary = (Short.MaxValue - 1).toShort // 32766
+ val now = time.milliseconds()
+
+ // Create transaction metadata at the epoch boundary that would cause
overflow IFF double-incremented
+ val txnMetadata = new TransactionMetadata(
+ transactionalId = transactionalId,
+ producerId = producerId,
+ previousProducerId = RecordBatch.NO_PRODUCER_ID,
+ nextProducerId = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch = epochAtMaxBoundary,
+ lastProducerEpoch = RecordBatch.NO_PRODUCER_EPOCH,
+ txnTimeoutMs = txnTimeoutMs,
+ state = Ongoing,
+ topicPartitions = partitions,
+ txnStartTimestamp = now,
+ txnLastUpdateTimestamp = now,
+ clientTransactionVersion = TV_2
+ )
+ assertTrue(txnMetadata.isProducerEpochExhausted)
+
+ // Mock the transaction manager to return our test transaction as timed out
+ when(transactionManager.timedOutTransactions())
+ .thenReturn(List(TransactionalIdAndProducerIdEpoch(transactionalId,
producerId, epochAtMaxBoundary)))
+
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+ .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch,
txnMetadata))))
+ when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+ // Mock the append operation to simulate successful write and update the
metadata
+ when(transactionManager.appendTransactionToLog(
+ ArgumentMatchers.eq(transactionalId),
+ ArgumentMatchers.eq(coordinatorEpoch),
+ any[TxnTransitMetadata],
+ capturedErrorsCallback.capture(),
+ any(),
+ any())
+ ).thenAnswer(invocation => {
+ val transitMetadata = invocation.getArgument[TxnTransitMetadata](2)
+ // Simulate the metadata update that would happen in the real
appendTransactionToLog
+ txnMetadata.completeTransitionTo(transitMetadata)
+ capturedErrorsCallback.getValue.apply(Errors.NONE)
+ })
+
+ // Track the actual behavior
+ var callbackInvoked = false
+ var resultError: Errors = null
+ var resultProducerId: Long = -1
+ var resultEpoch: Short = -1
+
+ def checkOnEndTransactionComplete(txnIdAndPidEpoch:
TransactionalIdAndProducerIdEpoch)
+ (error: Errors, newProducerId: Long, newProducerEpoch: Short): Unit = {
+ callbackInvoked = true
+ resultError = error
+ resultProducerId = newProducerId
+ resultEpoch = newProducerEpoch
+ }
+
+ // Execute the timeout abort process
+ coordinator.abortTimedOutTransactions(checkOnEndTransactionComplete)
+
+ assertTrue(callbackInvoked, "Callback should have been invoked")
+ assertEquals(Errors.NONE, resultError, "Expected no errors in the
callback")
+ assertEquals(producerId, resultProducerId, "Expected producer ID to match")
+ assertEquals(Short.MaxValue, resultEpoch, "Expected producer epoch to be
Short.MaxValue (32767) single epoch bump")
+
+ // Verify the transaction metadata was correctly updated to the final epoch
+ assertEquals(Short.MaxValue, txnMetadata.producerEpoch,
+ s"Expected transaction metadata producer epoch to be ${Short.MaxValue} "
+
+ s"after timeout handling, but was ${txnMetadata.producerEpoch}"
+ )
+
+ // Verify the basic flow was attempted
+ verify(transactionManager).timedOutTransactions()
+ verify(transactionManager,
atLeast(1)).getTransactionState(ArgumentMatchers.eq(transactionalId))
+ }
+
@Test
def testInitProducerIdWithNoLastProducerData(): Unit = {
// If the metadata doesn't include the previous producer data (for
example, if it was written to the log by a broker