This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 4b0ba424837 KAFKA-19690 Add epoch check before verification guard
check to prevent unexpected fatal error (#20607)
4b0ba424837 is described below
commit 4b0ba424837a76c94b2432f6e2ac4237f92e1ff7
Author: Ritika Reddy <[email protected]>
AuthorDate: Mon Oct 6 07:20:57 2025 -0700
KAFKA-19690 Add epoch check before verification guard check to prevent
unexpected fatal error (#20607)
Cherry-pick changes (https://github.com/apache/kafka/pull/20534) to 4.0
Conflicts:
->
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
- kept the file the same, and the rest of the code is in
UnifiedLog.scala in 4.0 so added it there
-> core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala - just added
the required test and kept everything else the same
Reviewers: Justine Olshan
[[email protected]](mailto:[email protected]), Chia-Ping Tsai
[[email protected]](mailto:[email protected])
---
core/src/main/scala/kafka/log/UnifiedLog.scala | 13 +++-
.../test/scala/unit/kafka/log/UnifiedLogTest.scala | 91 ++++++++++++++++++++++
2 files changed, 102 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index e9985f619f5..9bd32c6b049 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1057,8 +1057,17 @@ class UnifiedLog(@volatile var logStartOffset: Long,
// transaction is completed or aborted. We can guarantee the
transaction coordinator knows about the transaction given step 1 and that the
transaction is still
// ongoing. If the transaction is expected to be ongoing, we will
not set a VerificationGuard. If the transaction is aborted,
hasOngoingTransaction is false and
// requestVerificationGuard is the sentinel, so we will throw an
error. A subsequent produce request (retry) should create verification state
and return to phase 1.
- if (batch.isTransactional &&
!hasOngoingTransaction(batch.producerId, batch.producerEpoch()) &&
batchMissingRequiredVerification(batch, requestVerificationGuard))
- throw new InvalidTxnStateException("Record was not part of an
ongoing transaction")
+ if (batch.isTransactional &&
!hasOngoingTransaction(batch.producerId, batch.producerEpoch)) {
+ // Check epoch first: if producer epoch is stale, throw
recoverable InvalidProducerEpochException.
+ val entry =
producerStateManager.activeProducers.get(batch.producerId)
+ if (entry != null && batch.producerEpoch < entry.producerEpoch) {
+ val message = "Epoch of producer " + batch.producerId + " is " +
batch.producerEpoch + ", which is smaller than the last seen epoch " +
entry.producerEpoch
+ throw new InvalidProducerEpochException(message)
+ }
+ // Only check verification if epoch is current
+ if (batchMissingRequiredVerification(batch,
requestVerificationGuard))
+ throw new InvalidTxnStateException("Record was not part of an
ongoing transaction")
+ }
}
// We cache offset metadata for the start of each transaction. This
allows us to
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 2b9d2d04375..44aefff62c9 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -57,6 +57,7 @@ import org.mockito.Mockito.{doAnswer, doThrow, spy}
import net.jqwik.api.AfterFailureMode
import net.jqwik.api.ForAll
import net.jqwik.api.Property
+import org.apache.kafka.server.common.RequestLocal
import java.io._
import java.nio.ByteBuffer
@@ -4660,6 +4661,96 @@ class UnifiedLogTest {
assertEquals(new OffsetResultHolder(Optional.empty(), Optional.empty()),
result)
}
+ @Test
+ def testStaleProducerEpochReturnsRecoverableErrorForTV1Clients(): Unit = {
+ // Producer epoch gets incremented (coordinator fail over, completed
transaction, etc.)
+ // and client has stale cached epoch. Fix prevents fatal
InvalidTxnStateException.
+
+ val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+ val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
+
+ val producerId = 123L
+ val oldEpoch = 5.toShort
+ val newEpoch = 6.toShort
+
+ // Step 1: Simulate a scenario where producer epoch was incremented to
fence the producer
+ val previousRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, newEpoch, 0,
+ new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
+ )
+ val previousGuard = log.maybeStartTransactionVerification(producerId, 0,
newEpoch, false) // TV1 = supportsEpochBump = false
+ log.appendAsLeader(previousRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, previousGuard)
+
+ // Complete the transaction normally (commits do update producer state
with current epoch)
+ val commitMarker = MemoryRecords.withEndTransactionMarker(
+ producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT,
0)
+ )
+ log.appendAsLeader(commitMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching, VerificationGuard.SENTINEL)
+
+ // Step 2: TV1 client tries to write with stale cached epoch (before
learning about epoch increment)
+ val staleEpochRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, oldEpoch, 0,
+ new SimpleRecord("stale-epoch-key".getBytes,
"stale-epoch-value".getBytes)
+ )
+
+ // Step 3: Verify our fix - should get InvalidProducerEpochException
(recoverable), not InvalidTxnStateException (fatal)
+ val exception = assertThrows(classOf[InvalidProducerEpochException], () =>
{
+ val staleGuard = log.maybeStartTransactionVerification(producerId, 0,
oldEpoch, false)
+ log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, staleGuard)
+ })
+
+ // Verify the error message indicates epoch mismatch
+ assertTrue(exception.getMessage.contains("smaller than the last seen
epoch"))
+ assertTrue(exception.getMessage.contains(s"$oldEpoch"))
+ assertTrue(exception.getMessage.contains(s"$newEpoch"))
+ }
+
+ @Test
+ def testStaleProducerEpochReturnsRecoverableErrorForTV2Clients(): Unit = {
+ // Check producer epoch FIRST - if stale, return recoverable error before
verification checks.
+
+ val producerStateManagerConfig = new ProducerStateManagerConfig(86400000,
true)
+ val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+ val log = createLog(logDir, logConfig, producerStateManagerConfig =
producerStateManagerConfig)
+
+ val producerId = 456L
+ val originalEpoch = 3.toShort
+ val bumpedEpoch = 4.toShort
+
+ // Step 1: Start transaction with epoch 3 (before timeout)
+ val initialRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, originalEpoch, 0,
+ new SimpleRecord("ks-initial-key".getBytes, "ks-initial-value".getBytes)
+ )
+ val initialGuard = log.maybeStartTransactionVerification(producerId, 0,
originalEpoch, true) // TV2 = supportsEpochBump = true
+ log.appendAsLeader(initialRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, initialGuard)
+
+ // Step 2: Coordinator times out and aborts transaction
+ // TV2 (KIP-890): Coordinator bumps epoch from 3 → 4 and sends abort
marker with epoch 4
+ val abortMarker = MemoryRecords.withEndTransactionMarker(
+ producerId, bumpedEpoch, new
EndTransactionMarker(ControlRecordType.ABORT, 0)
+ )
+ log.appendAsLeader(abortMarker, 0, AppendOrigin.COORDINATOR,
RequestLocal.noCaching, VerificationGuard.SENTINEL)
+
+ // Step 3: TV2 transactional producer tries to append with stale epoch
(timeout recovery scenario)
+ val staleEpochRecords = MemoryRecords.withTransactionalRecords(
+ Compression.NONE, producerId, originalEpoch, 0,
+ new SimpleRecord("ks-resume-key".getBytes, "ks-resume-value".getBytes)
+ )
+
+ // Step 4: Verify our fix works for TV2 - should get
InvalidProducerEpochException (recoverable), not InvalidTxnStateException
(fatal)
+ val exception = assertThrows(classOf[InvalidProducerEpochException], () =>
{
+ val staleGuard = log.maybeStartTransactionVerification(producerId, 0,
originalEpoch, true) // TV2 = supportsEpochBump = true
+ log.appendAsLeader(staleEpochRecords, 0, AppendOrigin.CLIENT,
RequestLocal.noCaching, staleGuard)
+ })
+
+ // Verify the error message indicates epoch mismatch (3 < 4)
+ assertTrue(exception.getMessage.contains("smaller than the last seen
epoch"))
+ assertTrue(exception.getMessage.contains(s"$originalEpoch"))
+ assertTrue(exception.getMessage.contains(s"$bumpedEpoch"))
+ }
+
private def appendTransactionalToBuffer(buffer: ByteBuffer,
producerId: Long,
producerEpoch: Short,