This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 94a1bfb1281 KAFKA-18575: Transaction Version 2 doesn't correctly
handle race condition with completing and new transaction(#18604)
94a1bfb1281 is described below
commit 94a1bfb1281f06263976b1ba8bba8c5ac5d7f2ce
Author: Justine Olshan <[email protected]>
AuthorDate: Wed Jan 22 13:44:08 2025 -0800
KAFKA-18575: Transaction Version 2 doesn't correctly handle race condition
with completing and new transaction(#18604)
There is a subtle race condition with transactions V2 if a transaction is
still completing when checking if we need to add a partition, but it completes
when the request reaches the coordinator.
One approach was to remove the verification for TV2 and just check the
epoch on write, but a simpler one is to simply return concurrent transactions
from the partition leader (before attempting to add the partition). I've done
this and added a test for this behavior.
Locally, I reproduced the race but adding a 1 second sleep when handling
the WriteTxnMarkersRequest and a 2 second delay before adding the partition to
the AddPartitionsToTxnManager. Without this change, the race happened on every
second transaction as the first one completed. With this change, the error went
away.
As a followup, we may want to clean up some of the code and comments with
respect to verification as the code is used by both TV0 + verification and TV2.
But that doesn't need to complete for 4.0. This does :)
Reviewers: Jeff Kim <[email protected]>, Artem Livshits
<[email protected]>, Calvin Liu <[email protected]>
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 9 ++++--
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 36 ++++++++++++++++++++++
2 files changed, 43 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index a3267e5ec8c..3253de11dfe 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -600,6 +600,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
*/
def hasOngoingTransaction(producerId: Long, producerEpoch: Short): Boolean =
lock synchronized {
val entry = producerStateManager.activeProducers.get(producerId)
+ // With transactions V2, if we see a future epoch, we are likely in the
process of completing the previous transaction.
+ // Return early with ConcurrentTransactionsException until the transaction
completes.
+ if (entry != null && entry.currentTxnFirstOffset.isPresent &&
entry.producerEpoch() < producerEpoch)
+ throw new ConcurrentTransactionsException("The producer attempted to
update a transaction " +
+ "while another concurrent operation on the same transaction was
ongoing.")
entry != null && entry.currentTxnFirstOffset.isPresent &&
entry.producerEpoch() == producerEpoch
}
@@ -1030,7 +1035,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// transaction is completed or aborted. We can guarantee the
transaction coordinator knows about the transaction given step 1 and that the
transaction is still
// ongoing. If the transaction is expected to be ongoing, we will
not set a VerificationGuard. If the transaction is aborted,
hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will throw an
error. A subsequent produce request (retry) should create verification state
and return to phase 1.
- if (batch.isTransactional &&
!hasOngoingTransaction(batch.producerId, batch.producerEpoch()) &&
batchMissingRequiredVerification(batch, requestVerificationGuard))
+ if (batch.isTransactional && !batch.isControlBatch &&
!hasOngoingTransaction(batch.producerId, batch.producerEpoch()) &&
batchMissingRequiredVerification(batch, requestVerificationGuard))
throw new InvalidTxnStateException("Record was not part of an
ongoing transaction")
}
@@ -1051,7 +1056,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
}
private def batchMissingRequiredVerification(batch: MutableRecordBatch,
requestVerificationGuard: VerificationGuard): Boolean = {
-
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()
&& !batch.isControlBatch &&
+
producerStateManager.producerStateManagerConfig().transactionVerificationEnabled()
&&
!verificationGuard(batch.producerId).verify(requestVerificationGuard)
}
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index edbc8db0fb2..242c939370f 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -4020,6 +4020,42 @@ class UnifiedLogTest {
log.appendAsLeader(transactionalRecords, leaderEpoch = 0,
verificationGuard = verificationGuard)
}
+ @Test
+ def testPreviousTransactionOngoing(): Unit = {
+ val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
+
+ val producerId = 23L
+ val producerEpoch = 1.toShort
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+ val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
+
+ val verificationGuard = log.maybeStartTransactionVerification(producerId,
0, producerEpoch)
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
+
+ val transactionalRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE,
+ producerId,
+ producerEpoch,
+ 0,
+ new SimpleRecord("1".getBytes),
+ new SimpleRecord("2".getBytes)
+ )
+ log.appendAsLeader(transactionalRecords, origin = AppendOrigin.CLIENT,
leaderEpoch = 0, verificationGuard = verificationGuard)
+
+ assertThrows(classOf[ConcurrentTransactionsException], () =>
log.maybeStartTransactionVerification(producerId, 0, (producerEpoch +
1).toShort))
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard)
+
+ val endTransactionMarkerRecord = MemoryRecords.withEndTransactionMarker(
+ producerId,
+ producerEpoch,
+ new EndTransactionMarker(ControlRecordType.COMMIT, 0)
+ )
+
+ log.appendAsLeader(endTransactionMarkerRecord, origin =
AppendOrigin.COORDINATOR, leaderEpoch = 0)
+ val verificationGuard2 = log.maybeStartTransactionVerification(producerId,
0, (producerEpoch + 1).toShort)
+ assertNotEquals(VerificationGuard.SENTINEL, verificationGuard2)
+ }
+
@Test
def testRecoveryPointNotIncrementedOnProducerStateSnapshotFlushFailure():
Unit = {
val logConfig = LogTestUtils.createLogConfig()