This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 952c8a5e940 KAFKA-18991: FetcherThread should match leader epochs 
between fetch request and fetch state (#19223)
952c8a5e940 is described below

commit 952c8a5e940a0d604c2c9b86cbc2c29c420c1318
Author: TengYao Chi <kiting...@gmail.com>
AuthorDate: Wed Mar 26 00:14:01 2025 +0800

    KAFKA-18991: FetcherThread should match leader epochs between fetch request 
and fetch state (#19223)
    
    This PR fixes a potential issue where the `FetchResponse` returns
    `divergingEndOffsets` with an older leader epoch. This can lead to
    committed records being removed from the follower's log, potentially
    causing data loss.
    
    In detail:
    `processFetchRequest` gets the requested leader epoch of partition data
    by `topicPartition` and compares it with the leader epoch of the current
    fetch state. If they don't match, the response is ignored.
    
    Reviewers: Jun Rao <jun...@gmail.com>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 12 ++++++----
 .../kafka/server/AbstractFetcherThreadTest.scala   | 26 +++++++++++++++++++++-
 2 files changed, 33 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 7a98c83e7f4..0fd9c9333d3 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -307,7 +307,8 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
-  private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, 
FetchRequest.PartitionData],
+  // visible for testing
+  private[server] def processFetchRequest(sessionPartitions: 
util.Map[TopicPartition, FetchRequest.PartitionData],
                                   fetchRequest: FetchRequest.Builder): Unit = {
     val partitionsWithError = mutable.Set[TopicPartition]()
     val divergingEndOffsets = mutable.Map.empty[TopicPartition, EpochEndOffset]
@@ -333,11 +334,14 @@ abstract class AbstractFetcherThread(name: String,
         responseData.foreachEntry { (topicPartition, partitionData) =>
           Option(partitionStates.stateValue(topicPartition)).foreach { 
currentFetchState =>
             // It's possible that a partition is removed and re-added or 
truncated when there is a pending fetch request.
-            // In this case, we only want to process the fetch response if the 
partition state is ready for fetch and
-            // the current offset is the same as the offset requested.
+            // In this case, we only want to process the fetch response if:
+            // - the partition state is ready for fetch
+            // - the current offset is the same as the offset requested
+            // - the current leader epoch is the same as the leader epoch 
requested
             val fetchPartitionData = sessionPartitions.get(topicPartition)
             if (fetchPartitionData != null &&
                 fetchPartitionData.fetchOffset == 
currentFetchState.fetchOffset &&
+                fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == 
currentFetchState.currentLeaderEpoch).orElse(true) &&
                 currentFetchState.isReadyForFetch) {
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
@@ -362,7 +366,7 @@ abstract class AbstractFetcherThread(name: String,
                       val logAppendInfoOpt = processPartitionData(
                         topicPartition,
                         currentFetchState.fetchOffset,
-                        
fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch),
+                        currentFetchState.currentLeaderEpoch,
                         partitionData
                       )
 
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index aa25cad89c8..174dbaafaaa 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -1162,4 +1162,28 @@ class AbstractFetcherThreadTest {
     assertTrue(fetcher.fetchState(unknownPartition).isEmpty)
   }
 
-}
+  @Test
+  def testIgnoreFetchResponseWhenLeaderEpochChanged(): Unit = {
+    val newEpoch = 1
+    val initEpoch = 0
+
+    val partition = new TopicPartition("topic", 0)
+    val mockLeaderEndpoint = new MockLeaderEndPoint(version = version)
+    val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint)
+    val fetcher = new MockFetcherThread(mockLeaderEndpoint, 
mockTierStateMachine)
+    val replicaState = PartitionState(leaderEpoch = newEpoch)
+    fetcher.setReplicaState(partition, replicaState)
+    val initFetchState = initialFetchState(topicIds.get(partition.topic), 0L, 
leaderEpoch = newEpoch)
+    fetcher.addPartitions(Map(partition -> initFetchState))
+
+    val batch = mkBatch(baseOffset = 0L, leaderEpoch = initEpoch, new 
SimpleRecord("a".getBytes))
+    val leaderState = PartitionState(Seq(batch), leaderEpoch = initEpoch, 
highWatermark = 1L)
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+
+    val partitionData = Map(partition -> new 
FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0, 1048576, 
Optional.of(initEpoch), Optional.of(initEpoch))).asJava
+    val fetchRequestOpt = FetchRequest.Builder.forReplica(0, 0, initEpoch, 0, 
Int.MaxValue, partitionData)
+
+    fetcher.processFetchRequest(partitionData, fetchRequestOpt)
+    assertEquals(0, replicaState.logEndOffset, "FetchResponse should be 
ignored when leader epoch does not match")
+  }
+}
\ No newline at end of file

Reply via email to