This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new c8b8adf3c1d KAFKA-19367: Follow up bug fix (#19991)
c8b8adf3c1d is described below

commit c8b8adf3c1d5fc7ea6256846c4a82c0482ebf0e5
Author: Ritika Reddy <98577846+rreddy...@users.noreply.github.com>
AuthorDate: Mon Jun 23 15:15:36 2025 -0700

    KAFKA-19367: Follow up bug fix (#19991)
    
    This is a follow up to
    
[https://github.com/apache/kafka/pull/19910](https://github.com/apache/kafka/pull/url)
    The coordinator failed to write an epoch fence transition for producer
    jt142 to the transaction log with error COORDINATOR_NOT_AVAILABLE. The
    epoch was increased to 2 but not returned to the client
    (kafka.coordinator.transaction.TransactionCoordinator) -- as we don't
    bump the epoch with this change, we should also update the message to
    not say "increased" and remove the
    epochAndMetadata.transactionMetadata.hasFailedEpochFence = true line
    
    In the test, the expected behavior is:
    
    First append transaction to the log fails with
    COORDINATOR_NOT_AVAILABLE (epoch 1)
    We try init_pid again, this time the SINGLE epoch bump succeeds, and
    the following things happen simultaneously (epoch 2)
    -> Transition to COMPLETE_ABORT
    -> Return CONCURRENT_TRANSACTION error to the client
    The client retries, and there is another epoch bump; state
    transitions to EMPTY (epoch 3)
    
    Reviewers: Justine Olshan <jols...@confluent.io>
---
 .../transaction/TransactionCoordinator.scala       |   6 +-
 .../transaction/TransactionCoordinatorTest.scala   | 144 +++++++++++++++++++++
 2 files changed, 147 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 064de12a4a9..fdccf5a0209 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -960,10 +960,10 @@ class TransactionCoordinator(txnConfig: TransactionConfig,
 
                   case Some(epochAndMetadata) =>
                     if (epochAndMetadata.coordinatorEpoch == coordinatorEpoch) 
{
-                      // This was attempted epoch fence that failed, so mark 
this state on the metadata
-                      epochAndMetadata.transactionMetadata.hasFailedEpochFence 
= true
+                      // For TV2, we allow re-bumping the epoch on retry, 
since we don't complete the epoch bump.
+                      // Therefore, we don't set hasFailedEpochFence = true.
                       warn(s"The coordinator failed to write an epoch fence 
transition for producer $transactionalId to the transaction log " +
-                        s"with error $error. The epoch was increased to 
${newMetadata.producerEpoch} but not returned to the client")
+                        s"with error $error")
                     }
                 }
               }
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
index bbca105fd44..3adec5c029a 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorTest.scala
@@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.ArgumentMatchers.{any, anyInt}
 import org.mockito.Mockito._
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.Mockito.doAnswer
 
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
@@ -1805,4 +1806,147 @@ class TransactionCoordinatorTest {
     else
       producerEpoch
   }
+
+  @Test
+  def testTV2AllowsEpochReBumpingAfterFailedWrite(): Unit = {
+    // Test the complete TV2 flow: failed write → epoch fence → abort → retry 
with epoch bump
+    // This demonstrates that TV2 allows epoch re-bumping after failed writes 
(unlike TV1)
+    val producerEpoch = 1.toShort
+    val txnMetadata = new TransactionMetadata(transactionalId, producerId, 
producerId, RecordBatch.NO_PRODUCER_ID,
+      producerEpoch, RecordBatch.NO_PRODUCER_EPOCH, txnTimeoutMs, Ongoing, 
partitions, time.milliseconds(), time.milliseconds(), TV_2)
+
+    when(transactionManager.validateTransactionTimeoutMs(anyInt()))
+      .thenReturn(true)
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
+    when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+    // First attempt fails with COORDINATOR_NOT_AVAILABLE
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any(),
+      any(),
+      any(),
+      any()
+    )).thenAnswer(invocation => {
+      val callback = invocation.getArgument[Errors => Unit](3)
+
+      // Simulate the real TransactionStateManager behavior: reset 
pendingState on failure
+      // since handleInitProducerId doesn't provide a custom retryOnError 
function
+      txnMetadata.pendingState = None
+
+      // For TV2, hasFailedEpochFence is NOT set to true, allowing epoch bumps 
on retry
+      // The epoch remains at its original value (1) since 
completeTransitionTo was never called
+
+      callback.apply(Errors.COORDINATOR_NOT_AVAILABLE)
+    })
+
+    coordinator.handleInitProducerId(
+      transactionalId,
+      txnTimeoutMs,
+      None,
+      initProducerIdMockCallback
+    )
+    assertEquals(InitProducerIdResult(-1, -1, 
Errors.COORDINATOR_NOT_AVAILABLE), result)
+
+    // After the first failed attempt, the state should be:
+    // - hasFailedEpochFence = false (NOT set for TV2)
+    // - pendingState = None (reset by TransactionStateManager)
+    // - producerEpoch = 1 (unchanged since completeTransitionTo was never 
called)
+    // - transaction still ONGOING
+
+    // Second attempt: Should abort the ongoing transaction
+    reset(transactionManager)
+    when(transactionManager.validateTransactionTimeoutMs(anyInt()))
+      .thenReturn(true)
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
+    when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+    // Mock the appendTransactionToLog to succeed for the endTransaction call
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any(),
+      any(),
+      any(),
+      any()
+    )).thenAnswer(invocation => {
+      val newMetadata = invocation.getArgument[TxnTransitMetadata](2)
+      val callback = invocation.getArgument[Errors => Unit](3)
+      
+      // Complete the transition and call the callback with success
+      txnMetadata.completeTransitionTo(newMetadata)
+      callback.apply(Errors.NONE)
+    })
+
+    // Mock the transactionMarkerChannelManager to simulate the second write 
(PREPARE_ABORT -> COMPLETE_ABORT)
+    doAnswer(invocation => {
+      val newMetadata = invocation.getArgument[TxnTransitMetadata](3)
+      // Simulate the completion of transaction markers and the second write
+      // This would normally happen asynchronously after markers are sent
+      txnMetadata.completeTransitionTo(newMetadata) // This transitions to 
COMPLETE_ABORT
+      txnMetadata.pendingState = None
+      
+      null
+    }).when(transactionMarkerChannelManager).addTxnMarkersToSend(
+      ArgumentMatchers.eq(coordinatorEpoch),
+      ArgumentMatchers.eq(TransactionResult.ABORT),
+      ArgumentMatchers.eq(txnMetadata),
+      any()
+    )
+
+    coordinator.handleInitProducerId(
+      transactionalId,
+      txnTimeoutMs,
+      None,
+      initProducerIdMockCallback
+    )
+
+    // The second attempt should return CONCURRENT_TRANSACTIONS (this is 
intentional)
+    assertEquals(InitProducerIdResult(-1, -1, Errors.CONCURRENT_TRANSACTIONS), 
result)
+
+    // The transactionMarkerChannelManager mock should have completed the 
transition to COMPLETE_ABORT
+    // Verify that hasFailedEpochFence was never set to true for TV2, allowing 
future epoch bumps
+    assertFalse(txnMetadata.hasFailedEpochFence)
+
+    // Third attempt: Client retries after CONCURRENT_TRANSACTIONS
+    reset(transactionManager)
+    when(transactionManager.validateTransactionTimeoutMs(anyInt()))
+      .thenReturn(true)
+    
when(transactionManager.getTransactionState(ArgumentMatchers.eq(transactionalId)))
+      .thenReturn(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, 
txnMetadata))))
+    when(transactionManager.transactionVersionLevel()).thenReturn(TV_2)
+
+    when(transactionManager.appendTransactionToLog(
+      ArgumentMatchers.eq(transactionalId),
+      ArgumentMatchers.eq(coordinatorEpoch),
+      any(),
+      any(),
+      any(),
+      any()
+    )).thenAnswer(invocation => {
+      val newMetadata = invocation.getArgument[TxnTransitMetadata](2)
+      val callback = invocation.getArgument[Errors => Unit](3)
+      
+      // Complete the transition and call the callback with success
+      txnMetadata.completeTransitionTo(newMetadata)
+      callback.apply(Errors.NONE)
+    })
+
+    coordinator.handleInitProducerId(
+      transactionalId,
+      txnTimeoutMs,
+      None,
+      initProducerIdMockCallback
+    )
+
+    // The third attempt should succeed with epoch 3 (2 + 1)
+    // This demonstrates that TV2 allows epoch re-bumping after failed writes
+    assertEquals(InitProducerIdResult(producerId, 3.toShort, Errors.NONE), 
result)
+    
+    // Final verification that hasFailedEpochFence was never set to true for 
TV2
+    assertFalse(txnMetadata.hasFailedEpochFence)
+  }
 }

Reply via email to