This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push: new 952c8a5e940 KAFKA-18991: FetcherThread should match leader epochs between fetch request and fetch state (#19223) 952c8a5e940 is described below commit 952c8a5e940a0d604c2c9b86cbc2c29c420c1318 Author: TengYao Chi <kiting...@gmail.com> AuthorDate: Wed Mar 26 00:14:01 2025 +0800 KAFKA-18991: FetcherThread should match leader epochs between fetch request and fetch state (#19223) This PR fixes a potential issue where the `FetchResponse` returns `divergingEndOffsets` with an older leader epoch. This can lead to committed records being removed from the follower's log, potentially causing data loss. In detail: `processFetchRequest` gets the requested leader epoch of partition data by `topicPartition` and compares it with the leader epoch of the current fetch state. If they don't match, the response is ignored. Reviewers: Jun Rao <jun...@gmail.com> --- .../scala/kafka/server/AbstractFetcherThread.scala | 12 ++++++---- .../kafka/server/AbstractFetcherThreadTest.scala | 26 +++++++++++++++++++++- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 7a98c83e7f4..0fd9c9333d3 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -307,7 +307,8 @@ abstract class AbstractFetcherThread(name: String, } } - private def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData], + // visible for testing + private[server] def processFetchRequest(sessionPartitions: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder): Unit = { val partitionsWithError = mutable.Set[TopicPartition]() val divergingEndOffsets = mutable.Map.empty[TopicPartition, EpochEndOffset] @@ -333,11 +334,14 @@ abstract class AbstractFetcherThread(name: String, responseData.foreachEntry { (topicPartition, partitionData) => Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState => // It's possible that a partition is removed and re-added or truncated when there is a pending fetch request. - // In this case, we only want to process the fetch response if the partition state is ready for fetch and - // the current offset is the same as the offset requested. + // In this case, we only want to process the fetch response if: + // - the partition state is ready for fetch + // - the current offset is the same as the offset requested + // - the current leader epoch is the same as the leader epoch requested val fetchPartitionData = sessionPartitions.get(topicPartition) if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && + fetchPartitionData.currentLeaderEpoch.map[Boolean](_ == currentFetchState.currentLeaderEpoch).orElse(true) && currentFetchState.isReadyForFetch) { Errors.forCode(partitionData.errorCode) match { case Errors.NONE => @@ -362,7 +366,7 @@ abstract class AbstractFetcherThread(name: String, val logAppendInfoOpt = processPartitionData( topicPartition, currentFetchState.fetchOffset, - fetchPartitionData.currentLeaderEpoch.orElse(currentFetchState.currentLeaderEpoch), + currentFetchState.currentLeaderEpoch, partitionData ) diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala index aa25cad89c8..174dbaafaaa 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala @@ -1162,4 +1162,28 @@ class AbstractFetcherThreadTest { assertTrue(fetcher.fetchState(unknownPartition).isEmpty) } -} + @Test + def testIgnoreFetchResponseWhenLeaderEpochChanged(): Unit = { + val newEpoch = 1 + val initEpoch = 0 + + val partition = new TopicPartition("topic", 0) + val mockLeaderEndpoint = new MockLeaderEndPoint(version = version) + val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) + val fetcher = new MockFetcherThread(mockLeaderEndpoint, mockTierStateMachine) + val replicaState = PartitionState(leaderEpoch = newEpoch) + fetcher.setReplicaState(partition, replicaState) + val initFetchState = initialFetchState(topicIds.get(partition.topic), 0L, leaderEpoch = newEpoch) + fetcher.addPartitions(Map(partition -> initFetchState)) + + val batch = mkBatch(baseOffset = 0L, leaderEpoch = initEpoch, new SimpleRecord("a".getBytes)) + val leaderState = PartitionState(Seq(batch), leaderEpoch = initEpoch, highWatermark = 1L) + fetcher.mockLeader.setLeaderState(partition, leaderState) + + val partitionData = Map(partition -> new FetchRequest.PartitionData(Uuid.randomUuid(), 0, 0, 1048576, Optional.of(initEpoch), Optional.of(initEpoch))).asJava + val fetchRequestOpt = FetchRequest.Builder.forReplica(0, 0, initEpoch, 0, Int.MaxValue, partitionData) + + fetcher.processFetchRequest(partitionData, fetchRequestOpt) + assertEquals(0, replicaState.logEndOffset, "FetchResponse should be ignored when leader epoch does not match") + } +} \ No newline at end of file