This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 4dd35126656 KAFKA-19690 Add epoch check before verification guard 
check to prevent unexpected fatal error (#20577)
4dd35126656 is described below

commit 4dd35126656e2af2ff3cd67705d1dd495f68c0ec
Author: Ritika Reddy <[email protected]>
AuthorDate: Sun Sep 28 09:07:58 2025 -0700

    KAFKA-19690 Add epoch check before verification guard check to prevent 
unexpected fatal error (#20577)
    
    Cherry-pick changes (#20534) to 3.9
    
    Conflicts:
    ->
    storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java
    - it is UnifiedLog.scala in 3.9
    -> core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala - had more
    changes than the 3.9 version, just added the test and kept everything
    else the same
    
    Reviewers: Justine Olshan <[email protected]>, Chia-Ping Tsai
    <[email protected]>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 15 ++++++-
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 47 +++++++++++++++++++++-
 2 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index d648fff6194..e414124f56e 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -1067,8 +1067,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           // transaction is completed or aborted. We can guarantee the 
transaction coordinator knows about the transaction given step 1 and that the 
transaction is still
           // ongoing. If the transaction is expected to be ongoing, we will 
not set a VerificationGuard. If the transaction is aborted, 
hasOngoingTransaction is false and
           // requestVerificationGuard is the sentinel, so we will throw an 
error. A subsequent produce request (retry) should create verification state 
and return to phase 1.
-          if (batch.isTransactional && 
!hasOngoingTransaction(batch.producerId) && 
batchMissingRequiredVerification(batch, requestVerificationGuard))
-            throw new InvalidTxnStateException("Record was not part of an 
ongoing transaction")
+          if (batch.isTransactional && 
!hasOngoingTransaction(batch.producerId)) {
+            // Check epoch first: if producer epoch is stale, throw 
recoverable InvalidProducerEpochException.
+            val entry = 
producerStateManager.activeProducers.get(batch.producerId)
+            if (entry != null && batch.producerEpoch < entry.producerEpoch) {
+              val message = s"Epoch of producer ${batch.producerId} is 
${batch.producerEpoch}, which is smaller than the last seen epoch 
${entry.producerEpoch}"
+              throw new InvalidProducerEpochException(message)
+            }
+            
+            // Only check verification if epoch is current
+            if (batchMissingRequiredVerification(batch, 
requestVerificationGuard)) {
+              throw new InvalidTxnStateException("Record was not part of an 
ongoing transaction")
+            }
+          }
         }
 
         // We cache offset metadata for the start of each transaction. This 
allows us to
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 5bc8c44ec5c..80b812ab2c6 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 
 import kafka.common.{OffsetsOutOfOrderException, 
UnexpectedAppendOffsetException}
 import kafka.log.remote.RemoteLogManager
-import kafka.server.{BrokerTopicStats, KafkaConfig}
+import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
@@ -4666,6 +4666,51 @@ class UnifiedLogTest {
 
     (log, segmentWithOverflow)
   }
+
+  @Test
+  def testStaleProducerEpochReturnsRecoverableError(): Unit = {
+    // Producer epoch gets incremented (coordinator fail over, completed 
transaction, etc.)
+    // and client has stale cached epoch. Fix prevents fatal 
InvalidTxnStateException.
+
+    val producerStateManagerConfig = new ProducerStateManagerConfig(86400000, 
true)
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 2048 * 5)
+    val log = createLog(logDir, logConfig, producerStateManagerConfig = 
producerStateManagerConfig)
+
+    val producerId = 123L
+    val oldEpoch = 5.toShort
+    val newEpoch = 6.toShort
+
+    // Step 1: Simulate a scenario where producer epoch was incremented to 
fence the producer
+    val previousRecords = MemoryRecords.withTransactionalRecords(
+      Compression.NONE, producerId, newEpoch, 0,
+      new SimpleRecord("previous-key".getBytes, "previous-value".getBytes)
+    )
+    val previousGuard = log.maybeStartTransactionVerification(producerId, 0, 
newEpoch)
+    log.appendAsLeader(previousRecords, leaderEpoch = 0, origin = 
AppendOrigin.CLIENT, requestLocal = RequestLocal.NoCaching, verificationGuard = 
previousGuard)
+
+    // Complete the transaction normally (commits do update producer state 
with current epoch)
+    val commitMarker = MemoryRecords.withEndTransactionMarker(
+      producerId, newEpoch, new EndTransactionMarker(ControlRecordType.COMMIT, 
0)
+    )
+    log.appendAsLeader(commitMarker, leaderEpoch = 0, origin = 
AppendOrigin.COORDINATOR, requestLocal = RequestLocal.NoCaching, 
verificationGuard = VerificationGuard.SENTINEL)
+
+    // Step 2: TV1 client tries to write with stale cached epoch (before 
learning about epoch increment)
+    val staleEpochRecords = MemoryRecords.withTransactionalRecords(
+      Compression.NONE, producerId, oldEpoch, 0,
+      new SimpleRecord("stale-epoch-key".getBytes, 
"stale-epoch-value".getBytes)
+    )
+
+    // Step 3: Verify our fix - should get InvalidProducerEpochException 
(recoverable), not InvalidTxnStateException (fatal)
+    val exception = assertThrows(classOf[InvalidProducerEpochException], () => 
{
+      val staleGuard = log.maybeStartTransactionVerification(producerId, 0, 
oldEpoch)
+      log.appendAsLeader(staleEpochRecords, leaderEpoch = 0, origin = 
AppendOrigin.CLIENT, requestLocal = RequestLocal.NoCaching, verificationGuard = 
staleGuard)
+    })
+
+    // Verify the error message indicates epoch mismatch
+    assertTrue(exception.getMessage.contains("smaller than the last seen 
epoch"))
+    assertTrue(exception.getMessage.contains(s"$oldEpoch"))
+    assertTrue(exception.getMessage.contains(s"$newEpoch"))
+  }
 }
 
 object UnifiedLogTest {

Reply via email to