This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 94a1bfb1281  KAFKA-18575: Transaction Version 2 doesn't correctly 
handle race condition with completing and new transaction(#18604)
94a1bfb1281 is described below

commit 94a1bfb1281f06263976b1ba8bba8c5ac5d7f2ce
Author: Justine Olshan <[email protected]>
AuthorDate: Wed Jan 22 13:44:08 2025 -0800

     KAFKA-18575: Transaction Version 2 doesn't correctly handle race condition 
with completing and new transaction(#18604)
    
    There is a subtle race condition with transactions V2 if a transaction is 
still completing when checking if we need to add a partition, but it completes 
when the request reaches the coordinator.
    
    One approach was to remove the verification for TV2 and just check the 
epoch on write, but a simpler one is to simply return concurrent transactions 
from the partition leader (before attempting to add the partition). I've done 
this and added a test for this behavior.
    
    Locally, I reproduced the race but adding a 1 second sleep when handling 
the WriteTxnMarkersRequest and a 2 second delay before adding the partition to 
the AddPartitionsToTxnManager. Without this change, the race happened on every 
second transaction as the first one completed. With this change, the error went 
away.
    
    As a followup, we may want to clean up some of the code and comments with 
respect to verification as the code is used by both TV0 + verification and TV2. 
But that doesn't need to complete for 4.0. This does :)
    
    Reviewers: Jeff Kim <[email protected]>, Artem Livshits 
<[email protected]>, Calvin Liu <[email protected]>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  9 ++++--
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 36 ++++++++++++++++++++++
 2 files changed, 43 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index a3267e5ec8c..3253de11dfe 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -600,6 +600,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   def hasOngoingTransaction(producerId: Long, producerEpoch: Short): Boolean = 
lock synchronized {
     val entry = producerStateManager.activeProducers.get(producerId)
+    // With transactions V2, if we see a future epoch, we are likely in the 
process of completing the previous transaction.
+    // Return early with ConcurrentTransactionsException until the transaction 
completes.
+    if (entry != null && entry.currentTxnFirstOffset.isPresent && 
entry.producerEpoch() < producerEpoch)
+      throw new ConcurrentTransactionsException("The producer attempted to 
update a transaction " +
+        "while another concurrent operation on the same transaction was 
ongoing.")
     entry != null && entry.currentTxnFirstOffset.isPresent && 
entry.producerEpoch() == producerEpoch
   }
 
@@ -1030,7 +1035,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
           // ongoing. If the transaction is expected to be ongoing, we will 
not set a VerificationGuard. If the transaction is aborted, 
hasOngoingTransaction is false and
           // requestVerificationGuard is the sentinel, so we will throw an 
error. A subsequent produce request (retry) should create verification state 
and return to phase 1.
-          if (batch.isTransactional && 
!hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && 
batchMissingRequiredVerification(batch, requestVerificationGuard))
+          if (batch.isTransactional && !batch.isControlBatch && 
!hasOngoingTransaction(batch.producerId, batch.producerEpoch()) && 
batchMissingRequiredVerification(batch, requestVerificationGuard))
             throw new InvalidTxnStateException("Record was not part of an 
ongoing transaction")
         }
 
@@ -1051,7 +1056,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   private def batchMissingRequiredVerification(batch: MutableRecordBatch, 
requestVerificationGuard: VerificationGuard): Boolean = {
-    
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()
 && !batch.isControlBatch &&
+    
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()
 &&
       !verificationGuard(batch.producerId).verify(requestVerificationGuard)
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index edbc8db0fb2..242c939370f 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4020,6 +4020,42 @@ class UnifiedLogTest {
     log.appendAsLeader(transactionalRecords, leaderEpoch = 0, 
verificationGuard = verificationGuard)
   }
 
+  @Test
+  def testPreviousTransactionOngoing(): Unit = {
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
+
+    val producerId = 23L
+    val producerEpoch = 1.toShort
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
+
+    val verificationGuard = log.maybeStartTransactionVerification(producerId, 
0, producerEpoch)
+    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
+
+    val transactionalRecords = MemoryRecords.withTransactionalRecords(
+      Compression.NONE,
+      producerId,
+      producerEpoch,
+      0,
+      new SimpleRecord("1".getBytes),
+      new SimpleRecord("2".getBytes)
+    )
+    log.appendAsLeader(transactionalRecords, origin = AppendOrigin.CLIENT, 
leaderEpoch = 0, verificationGuard = verificationGuard)
+
+    assertThrows(classOf[ConcurrentTransactionsException], () => 
log.maybeStartTransactionVerification(producerId, 0, (producerEpoch + 
1).toShort))
+    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
+
+    val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
+      producerId,
+      producerEpoch,
+      new EndTransactionMarker(ControlRecordType.COMMIT, 0)
+    )
+
+    log.appendAsLeader(endTransactionMarkerRecord, origin = 
AppendOrigin.COORDINATOR, leaderEpoch = 0)
+    val verificationGuard2 = log.maybeStartTransactionVerification(producerId, 
0, (producerEpoch + 1).toShort)
+    assertNotEquals(VerificationGuard.SENTINEL, verificationGuard2)
+  }
+
   @Test
   def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure(): 
Unit = {
     val logConfig = LogTestUtils.createLogConfig()

Reply via email to