This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0a483618b9c KAFKA-19690-Add epoch check before verification guard
check to prevent unexpected fatal error (#20534)
0a483618b9c is described below
commit 0a483618b9cc169a0f923478812141630baf2a4c
Author: Ritika Reddy <[email protected]>
AuthorDate: Tue Sep 23 13:45:42 2025 -0700
KAFKA-19690-Add epoch check before verification guard check to prevent
unexpected fatal error (#20534)
We are seeing cases where a Kafka Streams (KS) thread stalls for ~20
seconds. During this stall, the broker correctly aborts the open
transaction (triggered by the 10-second transaction timeout). However,
when the KS thread resumes, instead of receiving the expected
InvalidProducerEpochException (which we already handle gracefully as
part of transaction abort), the client is instead hit with an
InvalidTxnStateException. KS currently treats this as a fatal error,
causing the application to fail.
To fix this, we've added an epoch check before the verification check to
send the recoverable InvalidProducerEpochException instead of the fatal
InvalidTxnStateException. This helps safeguard both tv1 and tv2 clients
Reviewers: Justine Olshan <[email protected]>
---
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 90 ++++++++++++++++++++++
.../kafka/storage/internals/log/UnifiedLog.java | 17 +++-
2 files changed, 103 insertions(+), 4 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index c4cb7a9f7b4..e6fdf09331b 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -5065,6 +5065,96 @@ class UnifiedLogTest {
}
case class TimestampAndEpoch(timestamp: Long, leaderEpoch: Int)
+
+ @Test
+ def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
+ // Producer epoch gets incremented (coordinator fail over, completed
transaction, etc.)
+ // and client has stale cached epoch. Fix prevents fatal
InvalidTxnStateException.
+
+ val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+ val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
+
+ val producerId = 123L
+ val oldEpoch = 5.toShort
+ val newEpoch = 6.toShort
+
+ // Step 1: Simulate a scenario where producer epoch was incremented to
fence the producer
+ val previousRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, newEpoch, 0,
+ new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
+ )
+ val previousGuard = log.maybeStartTransactionVerification(producerId, 0,
newEpoch, false) // TV1 = supportsEpochBump = false
+ log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, previousGuard)
+
+ // Complete the transaction normally (commits do update producer state
with current epoch)
+ val commitMarker = MemoryRecords.withEndTransactionMarker(
+ producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT,
0)
+ )
+ log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching, VerificationGuard.SENTINEL)
+
+ // Step 2: TV1 client tries to write with stale cached epoch (before
learning about epoch increment)
+ val staleEpochRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, oldEpoch, 0,
+ new SimpleRecord("stale-epoch-key".getBytes,
"stale-epoch-value".getBytes)
+ )
+
+ // Step 3: Verify our fix - should get InvalidProducerEpochException
(recoverable), not InvalidTxnStateException (fatal)
+ val exception = assertThrows(classOf[InvalidProducerEpochException], () =>
{
+ val staleGuard = log.maybeStartTransactionVerification(producerId, 0,
oldEpoch, false)
+ log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, staleGuard)
+ })
+
+ // Verify the error message indicates epoch mismatch
+ assertTrue(exception.getMessage.contains("smaller than the last seen
epoch"))
+ assertTrue(exception.getMessage.contains(s"$oldEpoch"))
+ assertTrue(exception.getMessage.contains(s"$newEpoch"))
+ }
+
+ @Test
+ def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
+ // Check producer epoch FIRST - if stale, return recoverable error before
verification checks.
+
+ val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+ val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
+
+ val producerId = 456L
+ val originalEpoch = 3.toShort
+ val bumpedEpoch = 4.toShort
+
+ // Step 1: Start transaction with epoch 3 (before timeout)
+ val initialRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, originalEpoch, 0,
+ new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
+ )
+ val initialGuard = log.maybeStartTransactionVerification(producerId, 0,
originalEpoch, true) // TV2 = supportsEpochBump = true
+ log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, initialGuard)
+
+ // Step 2: Coordinator times out and aborts transaction
+ // TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort
marker with epoch 4
+ val abortMarker = MemoryRecords.withEndTransactionMarker(
+ producerId, bumpedEpoch, new
EndTransactionMarker(ControlRecordType.ABORT, 0)
+ )
+ log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching, VerificationGuard.SENTINEL)
+
+ // Step 3: TV2 transactional producer tries to append with stale epoch
(timeout recovery scenario)
+ val staleEpochRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, originalEpoch, 0,
+ new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
+ )
+
+ // Step 4: Verify our fix works for TV2 - should get
InvalidProducerEpochException (recoverable), not InvalidTxnStateException
(fatal)
+ val exception = assertThrows(classOf[InvalidProducerEpochException], () =>
{
+ val staleGuard = log.maybeStartTransactionVerification(producerId, 0,
originalEpoch, true) // TV2 = supportsEpochBump = true
+ log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, staleGuard)
+ })
+
+ // Verify the error message indicates epoch mismatch (3 < 4)
+ assertTrue(exception.getMessage.contains("smaller than the last seen
epoch"))
+ assertTrue(exception.getMessage.contains(s"$originalEpoch"))
+ assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
+ }
}
object UnifiedLogTest {
diff --git
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
index 61b4b9d0edb..d190c861998 100644
---
a/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
+++
b/storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
@@ -1385,10 +1385,19 @@ public class UnifiedLog implements AutoCloseable {
// transaction is completed or aborted. We can guarantee
the transaction coordinator knows about the transaction given step 1 and that
the transaction is still
// ongoing. If the transaction is expected to be ongoing,
we will not set a VerificationGuard. If the transaction is aborted,
hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will
throw an error. A subsequent produce request (retry) should create verification
state and return to phase 1.
- if (batch.isTransactional()
- && !hasOngoingTransaction(batch.producerId(),
batch.producerEpoch())
- && batchMissingRequiredVerification(batch,
requestVerificationGuard)) {
- throw new InvalidTxnStateException("Record was not
part of an ongoing transaction");
+ if (batch.isTransactional() &&
!hasOngoingTransaction(batch.producerId(), batch.producerEpoch())) {
+ // Check epoch first: if producer epoch is stale,
throw recoverable InvalidProducerEpochException.
+ ProducerStateEntry entry =
producerStateManager.activeProducers().get(batch.producerId());
+ if (entry != null && batch.producerEpoch() <
entry.producerEpoch()) {
+ String message = "Epoch of producer " +
batch.producerId() + " is " + batch.producerEpoch() +
+ ", which is smaller than the last seen epoch "
+ entry.producerEpoch();
+ throw new InvalidProducerEpochException(message);
+ }
+
+ // Only check verification if epoch is current
+ if (batchMissingRequiredVerification(batch,
requestVerificationGuard)) {
+ throw new InvalidTxnStateException("Record was not
part of an ongoing transaction");
+ }
}
}