This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bd32d98bd7c KAFKA-14704; Follower should truncate before incrementing 
high watermark (#13230)
bd32d98bd7c is described below

commit bd32d98bd7cbd8cff35d6786bc8b00061e0fedf7
Author: David Jacot <[email protected]>
AuthorDate: Tue Feb 14 09:54:32 2023 +0100

    KAFKA-14704; Follower should truncate before incrementing high watermark 
(#13230)
    
    When a leader becomes a follower, it is likely that it has uncommitted 
records in its log. When it reaches out to the leader, the leader will detect 
that they have diverged and it will return the diverging epoch and offset. The 
follower truncates it log based on this.
    
    There is a small caveat in this process. When the leader return the 
diverging epoch and offset, it also includes its high watermark, low watermark, 
start offset and end offset. The current code in the `AbstractFetcherThread` 
works as follow. First it process the partition data and then it checks whether 
there is a diverging epoch/offset. The former may accidentally expose 
uncommitted records as this step updates the local watermark to whatever is 
received from the leader. As the follo [...]
    
    When this happens, the follower logs the following messages:
    * `Truncating XXX to offset 21434 below high watermark 21437`
    * `Non-monotonic update of high watermark from (offset=21437 
segment=[20998:98390]) to (offset=21434 segment=[20998:97843])`.
    
    This patch proposes to mitigate the issue by starting by checking on 
whether a diverging epoch/offset is provided by the leader and skip processing 
the partition data if it is. This basically means that the first fetch request 
will result in truncating the log and a subsequent fetch request will update 
the low/high watermarks.
    
    Reviewers: Ritika Reddy <[email protected]>, Justine Olshan 
<[email protected]>, Jason Gustafson <[email protected]>
---
 .../scala/kafka/server/AbstractFetcherThread.scala | 60 ++++++++--------
 .../kafka/server/AbstractFetcherThreadTest.scala   | 80 +++++++++++++++++++--
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 81 ++++++++++++++++++++++
 3 files changed, 187 insertions(+), 34 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index dd8c6a83e58..4a879d453b8 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -348,33 +348,39 @@ abstract class AbstractFetcherThread(name: String,
               Errors.forCode(partitionData.errorCode) match {
                 case Errors.NONE =>
                   try {
-                    // Once we hand off the partition data to the subclass, we 
can't mess with it any more in this thread
-                    val logAppendInfoOpt = 
processPartitionData(topicPartition, currentFetchState.fetchOffset,
-                      partitionData)
-
-                    logAppendInfoOpt.foreach { logAppendInfo =>
-                      val validBytes = logAppendInfo.validBytes
-                      val nextOffset = if (validBytes > 0) 
logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
-                      val lag = Math.max(0L, partitionData.highWatermark - 
nextOffset)
-                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
-                      // ReplicaDirAlterThread may have removed topicPartition 
from the partitionStates after processing the partition data
-                      if (validBytes > 0 && 
partitionStates.contains(topicPartition)) {
-                        // Update partitionStates only if there is no 
exception during processPartitionData
-                        val newFetchState = 
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
-                          currentFetchState.currentLeaderEpoch, state = 
Fetching,
-                          logAppendInfo.lastLeaderEpoch)
-                        partitionStates.updateAndMoveToEnd(topicPartition, 
newFetchState)
-                        fetcherStats.byteRate.mark(validBytes)
-                      }
-                    }
-                    if (leader.isTruncationOnFetchSupported) {
-                      FetchResponse.divergingEpoch(partitionData).ifPresent { 
divergingEpoch =>
-                        divergingEndOffsets += topicPartition -> new 
EpochEndOffset()
-                          .setPartition(topicPartition.partition)
-                          .setErrorCode(Errors.NONE.code)
-                          .setLeaderEpoch(divergingEpoch.epoch)
-                          .setEndOffset(divergingEpoch.endOffset)
+                    if (leader.isTruncationOnFetchSupported && 
FetchResponse.isDivergingEpoch(partitionData)) {
+                      // If a diverging epoch is present, we truncate the log 
of the replica
+                      // but we don't process the partition data in order to 
not update the
+                      // low/high watermarks until the truncation is actually 
done. Those will
+                      // be updated by the next fetch.
+                      divergingEndOffsets += topicPartition -> new 
EpochEndOffset()
+                        .setPartition(topicPartition.partition)
+                        .setErrorCode(Errors.NONE.code)
+                        .setLeaderEpoch(partitionData.divergingEpoch.epoch)
+                        .setEndOffset(partitionData.divergingEpoch.endOffset)
+                    } else {
+                      // Once we hand off the partition data to the subclass, 
we can't mess with it any more in this thread
+                      val logAppendInfoOpt = processPartitionData(
+                        topicPartition,
+                        currentFetchState.fetchOffset,
+                        partitionData
+                      )
+
+                      logAppendInfoOpt.foreach { logAppendInfo =>
+                        val validBytes = logAppendInfo.validBytes
+                        val nextOffset = if (validBytes > 0) 
logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
+                        val lag = Math.max(0L, partitionData.highWatermark - 
nextOffset)
+                        fetcherLagStats.getAndMaybePut(topicPartition).lag = 
lag
+
+                        // ReplicaDirAlterThread may have removed 
topicPartition from the partitionStates after processing the partition data
+                        if (validBytes > 0 && 
partitionStates.contains(topicPartition)) {
+                          // Update partitionStates only if there is no 
exception during processPartitionData
+                          val newFetchState = 
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
+                            currentFetchState.currentLeaderEpoch, state = 
Fetching,
+                            logAppendInfo.lastLeaderEpoch)
+                          partitionStates.updateAndMoveToEnd(topicPartition, 
newFetchState)
+                          fetcherStats.byteRate.mark(validBytes)
+                        }
                       }
                     }
                   } catch {
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index f816064d029..51ef19424af 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -1089,6 +1089,77 @@ class AbstractFetcherThreadTest {
     fetcher.verifyLastFetchedEpoch(partition, Some(5))
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotProcessPartitionData(): Unit = {
+    assumeTrue(truncateOnFetch)
+
+    val partition = new TopicPartition("topic", 0)
+
+    var truncateCalls = 0
+    var processPartitionDataCalls = 0
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
+      override def processPartitionData(topicPartition: TopicPartition, 
fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
+        processPartitionDataCalls += 1
+        super.processPartitionData(topicPartition, fetchOffset, partitionData)
+      }
+
+      override def truncate(topicPartition: TopicPartition, truncationState: 
OffsetTruncationState): Unit = {
+        truncateCalls += 1
+        super.truncate(topicPartition, truncationState)
+      }
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 4, new SimpleRecord("d".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 4, new SimpleRecord("e".getBytes)),
+      mkBatch(baseOffset = 5, leaderEpoch = 4, new SimpleRecord("f".getBytes)),
+    )
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, 
highWatermark = 1L)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> 
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+    assertEquals(6L, replicaState.logEndOffset)
+    fetcher.verifyLastFetchedEpoch(partition, expectedEpoch = Some(4))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+      mkBatch(baseOffset = 3, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 4, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+    )
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark 
= 4L)
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    // The first fetch should result in truncating the follower's log and
+    // it should not process the data hence not update the high watermarks.
+    fetcher.doWork()
+
+    assertEquals(1, truncateCalls)
+    assertEquals(0, processPartitionDataCalls)
+    assertEquals(3L, replicaState.logEndOffset)
+    assertEquals(1L, replicaState.highWatermark)
+
+    // Truncate should have been called only once and process partition data
+    // should have been called at least once. The log end offset and the high
+    // watermark are updated.
+    TestUtils.waitUntilTrue(() => {
+      fetcher.doWork()
+      fetcher.replicaPartitionState(partition).log == 
fetcher.mockLeader.leaderPartitionState(partition).log
+    }, "Failed to reconcile leader and follower logs")
+    fetcher.verifyLastFetchedEpoch(partition, Some(5))
+
+    assertEquals(1, truncateCalls)
+    assertTrue(processPartitionDataCalls >= 1)
+    assertEquals(5L, replicaState.logEndOffset)
+    assertEquals(4L, replicaState.highWatermark)
+  }
+
   @Test
   def testMaybeUpdateTopicIds(): Unit = {
     val partition = new TopicPartition("topic1", 0)
@@ -1387,13 +1458,8 @@ class AbstractFetcherThreadTest {
       val state = replicaPartitionState(topicPartition)
 
       if (leader.isTruncationOnFetchSupported && 
FetchResponse.isDivergingEpoch(partitionData)) {
-        val divergingEpoch = partitionData.divergingEpoch
-        truncateOnFetchResponse(Map(topicPartition -> new EpochEndOffset()
-          .setPartition(topicPartition.partition)
-          .setErrorCode(Errors.NONE.code)
-          .setLeaderEpoch(divergingEpoch.epoch)
-          .setEndOffset(divergingEpoch.endOffset)))
-        return None
+        throw new IllegalStateException("processPartitionData should not be 
called for a partition with " +
+          "a diverging epoch.")
       }
 
       // Throw exception if the fetchOffset does not match the fetcherThread 
partition state
diff --git 
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 8eeb22046e6..84f5d84926e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -659,6 +659,87 @@ class ReplicaFetcherThreadTest {
     partitions.foreach { tp => assertEquals(Fetching, 
thread.fetchState(tp).get.state) }
   }
 
+  @Test
+  def testTruncateOnFetchDoesNotUpdateHighWatermark(): Unit = {
+    val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
"localhost:1234"))
+    val quota: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = mock(classOf[LogManager])
+    val log: UnifiedLog = mock(classOf[UnifiedLog])
+    val partition: Partition = mock(classOf[Partition])
+    val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
+    val logEndOffset = 150
+    val highWatermark = 130
+
+    when(log.highWatermark).thenReturn(highWatermark)
+    when(log.latestEpoch).thenReturn(Some(5))
+    when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
+    when(log.logEndOffset).thenReturn(logEndOffset)
+
+    when(replicaManager.metadataCache).thenReturn(metadataCache)
+    when(replicaManager.logManager).thenReturn(logManager)
+
+    when(replicaManager.localLogOrException(t1p0)).thenReturn(log)
+    when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
+
+    when(partition.localLogOrException).thenReturn(log)
+    when(partition.appendRecordsToFollowerOrFutureReplica(any(), 
any())).thenReturn(None)
+
+    val logContext = new LogContext(s"[ReplicaFetcher 
replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
+
+    val mockNetwork = new MockBlockingSender(
+      Collections.emptyMap(),
+      brokerEndPoint,
+      new SystemTime()
+    )
+
+    val leader = new RemoteLeaderEndPoint(
+      logContext.logPrefix,
+      mockNetwork,
+      new FetchSessionHandler(logContext, brokerEndPoint.id),
+      config,
+      replicaManager,
+      quota,
+      () => config.interBrokerProtocolVersion
+    )
+
+    val thread = new ReplicaFetcherThread(
+      "fetcher-thread",
+      leader,
+      config,
+      failedPartitions,
+      replicaManager,
+      quota,
+      logContext.logPrefix,
+      () => config.interBrokerProtocolVersion
+    )
+
+    thread.addPartitions(Map(
+      t1p0 -> initialFetchState(Some(topicId1), logEndOffset))
+    )
+
+    // Prepare the fetch response data.
+    mockNetwork.setFetchPartitionDataForNextResponse(Map(
+      t1p0 -> new FetchResponseData.PartitionData()
+        .setPartitionIndex(t1p0.partition)
+        .setLastStableOffset(0)
+        .setLogStartOffset(0)
+        .setHighWatermark(160) // HWM is higher on the leader.
+        .setDivergingEpoch(new FetchResponseData.EpochEndOffset()
+          .setEpoch(4)
+          .setEndOffset(140))
+    ))
+    mockNetwork.setIdsForNextResponse(topicIds)
+
+    // Sends the fetch request and processes the response. This should 
truncate the
+    // log but it should not update the high watermark.
+    thread.doWork()
+
+    assertEquals(1, mockNetwork.fetchCount)
+    verify(partition, times(1)).truncateTo(140, false)
+    verify(log, times(0)).maybeUpdateHighWatermark(anyLong())
+  }
+
   @Test
   def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = {
 

Reply via email to