This is an automated email from the ASF dual-hosted git repository. jolshan pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new c8b8adf3c1d KAFKA-19367: Follow up bug fix (#19991) c8b8adf3c1d is described below commit c8b8adf3c1d5fc7ea6256846c4a82c0482ebf0e5 Author: Ritika Reddy <98577846+rreddy...@users.noreply.github.com> AuthorDate: Mon Jun 23 15:15:36 2025 -0700 KAFKA-19367: Follow up bug fix (#19991) This is a follow up to [https://github.com/apache/kafka/pull/19910](https://github.com/apache/kafka/pull/url) The coordinator failed to write an epoch fence transition for producer jt142 to the transaction log with error COORDINATOR_NOT_AVAILABLE. The epoch was increased to 2 but not returned to the client (kafka.coordinator.transaction.TransactionCoordinator) -- as we don't bump the epoch with this change, we should also update the message to not say "increased" and remove the epochAndMetadata.transactionMetadata.hasFailedEpochFence = true line In the test, the expected behavior is: First append transaction to the log fails with COORDINATOR_NOT_AVAILABLE (epoch 1) We try init_pid again, this time the SINGLE epoch bump succeeds, and the following things happen simultaneously (epoch 2) -> Transition to COMPLETE_ABORT -> Return CONCURRENT_TRANSACTION error to the client The client retries, and there is another epoch bump; state transitions to EMPTY (epoch 3) Reviewers: Justine Olshan <jols...@confluent.io> --- .../transaction/TransactionCoordinator.scala | 6 +- .../transaction/TransactionCoordinatorTest.scala | 144 +++++++++++++++++++++ 2 files changed, 147 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala index 064de12a4a9..fdccf5a0209 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala @@ -960,10 +960,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig, case Some(epochAndMetadata) => if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) { - // This was attempted epoch fence that failed, so mark this state on the metadata - epochAndMetadata.transactionMetadata.hasFailedEpochFence = true + // For TV2, we allow re-bumping the epoch on retry, since we don't complete the epoch bump. + // Therefore, we don't set hasFailedEpochFence = true. warn(s"The coordinator failed to write an epoch fence transition for producer $transactionalId to the transaction log " + - s"with error $error. The epoch was increased to ${newMetadata.producerEpoch} but not returned to the client") + s"with error $error") } } } diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala index bbca105fd44..3adec5c029a 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala @@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.ValueSource import org.mockito.ArgumentMatchers.{any, anyInt} import org.mockito.Mockito._ import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.doAnswer import scala.collection.mutable import scala.jdk.CollectionConverters._ @@ -1805,4 +1806,147 @@ class TransactionCoordinatorTest { else producerEpoch } + + @Test + def testTV2AllowsEpochReBumpingAfterFailedWrite(): Unit = { + // Test the complete TV2 flow: failed write → epoch fence → abort → retry with epoch bump + // This demonstrates that TV2 allows epoch re-bumping after failed writes (unlike TV1) + val producerEpoch = 1.toShort + val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerId, RecordBatch.NO_PRODUCER_ID, + producerEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, partitions, time.milliseconds(), time.milliseconds(), TV_2) + + when(transactionManager.validateTransactionTimeoutMs(anyInt())) + .thenReturn(true) + when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) + .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))) + when(transactionManager.transactionVersionLevel()).thenReturn(TV_2) + + // First attempt fails with COORDINATOR_NOT_AVAILABLE + when(transactionManager.appendTransactionToLog( + ArgumentMatchers.eq(transactionalId), + ArgumentMatchers.eq(coordinatorEpoch), + any(), + any(), + any(), + any() + )).thenAnswer(invocation => { + val callback = invocation.getArgument[Errors => Unit](3) + + // Simulate the real TransactionStateManager behavior: reset pendingState on failure + // since handleInitProducerId doesn't provide a custom retryOnError function + txnMetadata.pendingState = None + + // For TV2, hasFailedEpochFence is NOT set to true, allowing epoch bumps on retry + // The epoch remains at its original value (1) since completeTransitionTo was never called + + callback.apply(Errors.COORDINATOR_NOT_AVAILABLE) + }) + + coordinator.handleInitProducerId( + transactionalId, + txnTimeoutMs, + None, + initProducerIdMockCallback + ) + assertEquals(InitProducerIdResult(-1, -1, Errors.COORDINATOR_NOT_AVAILABLE), result) + + // After the first failed attempt, the state should be: + // - hasFailedEpochFence = false (NOT set for TV2) + // - pendingState = None (reset by TransactionStateManager) + // - producerEpoch = 1 (unchanged since completeTransitionTo was never called) + // - transaction still ONGOING + + // Second attempt: Should abort the ongoing transaction + reset(transactionManager) + when(transactionManager.validateTransactionTimeoutMs(anyInt())) + .thenReturn(true) + when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) + .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))) + when(transactionManager.transactionVersionLevel()).thenReturn(TV_2) + + // Mock the appendTransactionToLog to succeed for the endTransaction call + when(transactionManager.appendTransactionToLog( + ArgumentMatchers.eq(transactionalId), + ArgumentMatchers.eq(coordinatorEpoch), + any(), + any(), + any(), + any() + )).thenAnswer(invocation => { + val newMetadata = invocation.getArgument[TxnTransitMetadata](2) + val callback = invocation.getArgument[Errors => Unit](3) + + // Complete the transition and call the callback with success + txnMetadata.completeTransitionTo(newMetadata) + callback.apply(Errors.NONE) + }) + + // Mock the transactionMarkerChannelManager to simulate the second write (PREPARE_ABORT -> COMPLETE_ABORT) + doAnswer(invocation => { + val newMetadata = invocation.getArgument[TxnTransitMetadata](3) + // Simulate the completion of transaction markers and the second write + // This would normally happen asynchronously after markers are sent + txnMetadata.completeTransitionTo(newMetadata) // This transitions to COMPLETE_ABORT + txnMetadata.pendingState = None + + null + }).when(transactionMarkerChannelManager).addTxnMarkersToSend( + ArgumentMatchers.eq(coordinatorEpoch), + ArgumentMatchers.eq(TransactionResult.ABORT), + ArgumentMatchers.eq(txnMetadata), + any() + ) + + coordinator.handleInitProducerId( + transactionalId, + txnTimeoutMs, + None, + initProducerIdMockCallback + ) + + // The second attempt should return CONCURRENT_TRANSACTIONS (this is intentional) + assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), result) + + // The transactionMarkerChannelManager mock should have completed the transition to COMPLETE_ABORT + // Verify that hasFailedEpochFence was never set to true for TV2, allowing future epoch bumps + assertFalse(txnMetadata.hasFailedEpochFence) + + // Third attempt: Client retries after CONCURRENT_TRANSACTIONS + reset(transactionManager) + when(transactionManager.validateTransactionTimeoutMs(anyInt())) + .thenReturn(true) + when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId))) + .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata)))) + when(transactionManager.transactionVersionLevel()).thenReturn(TV_2) + + when(transactionManager.appendTransactionToLog( + ArgumentMatchers.eq(transactionalId), + ArgumentMatchers.eq(coordinatorEpoch), + any(), + any(), + any(), + any() + )).thenAnswer(invocation => { + val newMetadata = invocation.getArgument[TxnTransitMetadata](2) + val callback = invocation.getArgument[Errors => Unit](3) + + // Complete the transition and call the callback with success + txnMetadata.completeTransitionTo(newMetadata) + callback.apply(Errors.NONE) + }) + + coordinator.handleInitProducerId( + transactionalId, + txnTimeoutMs, + None, + initProducerIdMockCallback + ) + + // The third attempt should succeed with epoch 3 (2 + 1) + // This demonstrates that TV2 allows epoch re-bumping after failed writes + assertEquals(InitProducerIdResult(producerId, 3.toShort, Errors.NONE), result) + + // Final verification that hasFailedEpochFence was never set to true for TV2 + assertFalse(txnMetadata.hasFailedEpochFence) + } }