This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fe0fe68  KAFKA-13141; Skip follower fetch offset update in leader if 
diverging epoch is present (#11136)
fe0fe68 is described below

commit fe0fe686e92d019ac2b5c8407ab2cbb55ae069e1
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Wed Jul 28 17:27:26 2021 +0100

    KAFKA-13141; Skip follower fetch offset update in leader if diverging epoch 
is present (#11136)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/server/ReplicaManager.scala   |  7 ++++++-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 23 ++++++++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 949ff47..a9c99a9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1769,7 +1769,8 @@ class ReplicaManager(val config: KafkaConfig,
    * records in fetch response. Log start/end offset and high watermark may 
change not only due to
    * this fetch request, e.g., rolling new log segment and removing old log 
segment may move log
    * start offset further than the last offset in the fetched records. The 
followers will get the
-   * updated leader's state in the next fetch response.
+   * updated leader's state in the next fetch response. If follower has a 
diverging epoch or if read
+   * fails with any error, follower fetch state is not updated.
    */
   private def updateFollowerFetchState(followerId: Int,
                                        readResults: Seq[(TopicPartition, 
LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1778,6 +1779,10 @@ class ReplicaManager(val config: KafkaConfig,
         debug(s"Skipping update of fetch state for follower $followerId since 
the " +
           s"log read returned error ${readResult.error}")
         readResult
+      } else if (readResult.divergingEpoch.nonEmpty) {
+        debug(s"Skipping update of fetch state for follower $followerId since 
the " +
+          s"log read returned diverging epoch ${readResult.divergingEpoch}")
+        readResult
       } else {
         onlinePartition(topicPartition) match {
           case Some(partition) =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 1d875ae..d50aa7b 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -703,6 +703,29 @@ class ReplicaManagerTest {
       assertEquals(0L, followerReplica.logStartOffset)
       assertEquals(0L, followerReplica.logEndOffset)
 
+      // Next we receive an invalid request with a higher fetch offset, but a 
diverging epoch.
+      // We expect that the replica state does not get updated.
+      val divergingFetchPartitionData = new FetchRequest.PartitionData(3L, 0L, 
maxFetchBytes,
+        Optional.of(leaderEpoch), Optional.of(leaderEpoch - 1))
+
+      replicaManager.fetchMessages(
+        timeout = 0L,
+        replicaId = 1,
+        fetchMinBytes = 1,
+        fetchMaxBytes = maxFetchBytes,
+        hardMaxBytesLimit = false,
+        fetchInfos = Seq(tp -> divergingFetchPartitionData),
+        topicIds = topicIds.asJava,
+        quota = UnboundedQuota,
+        isolationLevel = IsolationLevel.READ_UNCOMMITTED,
+        responseCallback = callback,
+        clientMetadata = None
+      )
+
+      assertTrue(successfulFetch.isDefined)
+      assertEquals(0L, followerReplica.logStartOffset)
+      assertEquals(0L, followerReplica.logEndOffset)
+
     } finally {
       replicaManager.shutdown(checkpointHW = false)
     }

Reply via email to