This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bbf1ee7  KAFKA-13207: Skip truncation on fetch response with diverging 
epoch if partition removed from fetcher (#11221)
bbf1ee7 is described below

commit bbf1ee74d719494ed5a5cac9dc54b7093171707c
Author: Rajini Sivaram <[email protected]>
AuthorDate: Tue Aug 17 17:12:51 2021 +0100

    KAFKA-13207: Skip truncation on fetch response with diverging epoch if 
partition removed from fetcher (#11221)
    
    AbstractFetcherThread#truncateOnFetchResponse is used with IBP 2.7 and 
above to truncate partitions based on diverging epoch returned in fetch 
responses. Truncation should only be performed for partitions that are still 
owned by the fetcher and this check should be done while holding 
partitionMapLock to ensure that any partitions removed from the fetcher thread 
are not truncated. Truncation will be performed by any new fetcher that owns 
the partition when it restarts fetching.
    
    Reviewers: David Jacot <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 40 ++++++++++++--------
 .../kafka/server/AbstractFetcherThreadTest.scala   | 43 ++++++++++++++++++++++
 2 files changed, 67 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 1de1835..6de1588 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -229,7 +229,8 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
-  protected def truncateOnFetchResponse(epochEndOffsets: Map[TopicPartition, 
EpochEndOffset]): Unit = {
+  // Visibility for unit tests
+  protected[server] def truncateOnFetchResponse(epochEndOffsets: 
Map[TopicPartition, EpochEndOffset]): Unit = {
     inLock(partitionMapLock) {
       val ResultWithPartitions(fetchOffsets, partitionsWithError) = 
maybeTruncateToEpochEndOffsets(epochEndOffsets, Map.empty)
       handlePartitionsWithErrors(partitionsWithError, 
"truncateOnFetchResponse")
@@ -262,22 +263,29 @@ abstract class AbstractFetcherThread(name: String,
     val partitionsWithError = mutable.HashSet.empty[TopicPartition]
 
     fetchedEpochs.forKeyValue { (tp, leaderEpochOffset) =>
-      Errors.forCode(leaderEpochOffset.errorCode) match {
-        case Errors.NONE =>
-          val offsetTruncationState = getOffsetTruncationState(tp, 
leaderEpochOffset)
-          info(s"Truncating partition $tp with $offsetTruncationState due to 
leader epoch and offset $leaderEpochOffset")
-          if (doTruncate(tp, offsetTruncationState))
-            fetchOffsets.put(tp, offsetTruncationState)
-
-        case Errors.FENCED_LEADER_EPOCH =>
-          val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
-            .map(epochEndOffset => 
Int.box(epochEndOffset.currentLeaderEpoch)).asJava
-          if (onPartitionFenced(tp, currentLeaderEpoch))
+      if (partitionStates.contains(tp)) {
+        Errors.forCode(leaderEpochOffset.errorCode) match {
+          case Errors.NONE =>
+            val offsetTruncationState = getOffsetTruncationState(tp, 
leaderEpochOffset)
+            info(s"Truncating partition $tp with $offsetTruncationState due to 
leader epoch and offset $leaderEpochOffset")
+            if (doTruncate(tp, offsetTruncationState))
+              fetchOffsets.put(tp, offsetTruncationState)
+
+          case Errors.FENCED_LEADER_EPOCH =>
+            val currentLeaderEpoch = latestEpochsForPartitions.get(tp)
+              .map(epochEndOffset => 
Int.box(epochEndOffset.currentLeaderEpoch)).asJava
+            if (onPartitionFenced(tp, currentLeaderEpoch))
+              partitionsWithError += tp
+
+          case error =>
+            info(s"Retrying leaderEpoch request for partition $tp as the 
leader reported an error: $error")
             partitionsWithError += tp
-
-        case error =>
-          info(s"Retrying leaderEpoch request for partition $tp as the leader 
reported an error: $error")
-          partitionsWithError += tp
+        }
+      } else {
+        // Partitions may have been removed from the fetcher while the thread 
was waiting for fetch
+        // response. Removed partitions are filtered out while holding 
`partitionMapLock` to ensure that we
+        // don't update state for any partition that may have already been 
migrated to another thread.
+        trace(s"Ignoring epoch offsets for partition $tp since it has been 
removed from this fetcher thread.")
       }
     }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index abe7e45..f5e7390 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -40,6 +40,7 @@ import 
org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.utils.Time
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Assumptions.assumeTrue
 import org.junit.jupiter.api.{BeforeEach, Test}
 
 import scala.jdk.CollectionConverters._
@@ -480,6 +481,48 @@ class AbstractFetcherThreadTest {
   }
 
   @Test
+  def testTruncationOnFetchSkippedIfPartitionRemoved(): Unit = {
+    assumeTrue(truncateOnFetch)
+    val partition = new TopicPartition("topic", 0)
+    var truncations = 0
+    val fetcher = new MockFetcherThread {
+      override def truncate(topicPartition: TopicPartition, truncationState: 
OffsetTruncationState): Unit = {
+        truncations += 1
+        super.truncate(topicPartition, truncationState)
+      }
+    }
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = MockFetcherThread.PartitionState(replicaLog, 
leaderEpoch = 5, highWatermark = 2L)
+    fetcher.setReplicaState(partition, replicaState)
+
+    // Verify that truncation based on fetch response is performed if 
partition is owned by fetcher thread
+    fetcher.addPartitions(Map(partition -> initialFetchState(6L, leaderEpoch = 
4)))
+    val endOffset = new EpochEndOffset()
+      .setPartition(partition.partition)
+      .setErrorCode(Errors.NONE.code)
+      .setLeaderEpoch(4)
+      .setEndOffset(3L)
+    fetcher.truncateOnFetchResponse(Map(partition -> endOffset))
+    assertEquals(1, truncations)
+
+    // Verify that truncation based on fetch response is not performed if 
partition is removed from fetcher thread
+    val offsets = fetcher.removePartitions(Set(partition))
+    assertEquals(Set(partition), offsets.keySet)
+    assertEquals(3L, offsets(partition).fetchOffset)
+    val newEndOffset = new EpochEndOffset()
+      .setPartition(partition.partition)
+      .setErrorCode(Errors.NONE.code)
+      .setLeaderEpoch(4)
+      .setEndOffset(2L)
+    fetcher.truncateOnFetchResponse(Map(partition -> newEndOffset))
+    assertEquals(1, truncations)
+  }
+
+  @Test
   def testFollowerFetchOutOfRangeHigh(): Unit = {
     val partition = new TopicPartition("topic", 0)
     val fetcher = new MockFetcherThread()

Reply via email to