This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fe0fe68 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) fe0fe68 is described below commit fe0fe686e92d019ac2b5c8407ab2cbb55ae069e1 Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Wed Jul 28 17:27:26 2021 +0100 KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch is present (#11136) Reviewers: Jason Gustafson <ja...@confluent.io> --- .../main/scala/kafka/server/ReplicaManager.scala | 7 ++++++- .../unit/kafka/server/ReplicaManagerTest.scala | 23 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 949ff47..a9c99a9 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1769,7 +1769,8 @@ class ReplicaManager(val config: KafkaConfig, * records in fetch response. Log start/end offset and high watermark may change not only due to * this fetch request, e.g., rolling new log segment and removing old log segment may move log * start offset further than the last offset in the fetched records. The followers will get the - * updated leader's state in the next fetch response. + * updated leader's state in the next fetch response. If follower has a diverging epoch or if read + * fails with any error, follower fetch state is not updated. */ private def updateFollowerFetchState(followerId: Int, readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = { @@ -1778,6 +1779,10 @@ class ReplicaManager(val config: KafkaConfig, debug(s"Skipping update of fetch state for follower $followerId since the " + s"log read returned error ${readResult.error}") readResult + } else if (readResult.divergingEpoch.nonEmpty) { + debug(s"Skipping update of fetch state for follower $followerId since the " + + s"log read returned diverging epoch ${readResult.divergingEpoch}") + readResult } else { onlinePartition(topicPartition) match { case Some(partition) => diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1d875ae..d50aa7b 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -703,6 +703,29 @@ class ReplicaManagerTest { assertEquals(0L, followerReplica.logStartOffset) assertEquals(0L, followerReplica.logEndOffset) + // Next we receive an invalid request with a higher fetch offset, but a diverging epoch. + // We expect that the replica state does not get updated. + val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, maxFetchBytes, + Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1)) + + replicaManager.fetchMessages( + timeout = 0L, + replicaId = 1, + fetchMinBytes = 1, + fetchMaxBytes = maxFetchBytes, + hardMaxBytesLimit = false, + fetchInfos = Seq(tp -> divergingFetchPartitionData), + topicIds = topicIds.asJava, + quota = UnboundedQuota, + isolationLevel = IsolationLevel.READ_UNCOMMITTED, + responseCallback = callback, + clientMetadata = None + ) + + assertTrue(successfulFetch.isDefined) + assertEquals(0L, followerReplica.logStartOffset) + assertEquals(0L, followerReplica.logEndOffset) + } finally { replicaManager.shutdown(checkpointHW = false) }