This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push:
new 649d5497442 KAFKA-14704; Follower should truncate before incrementing
high watermark (#13230)
649d5497442 is described below
commit 649d5497442ff815fa5af0ebb142069a2343eeb8
Author: David Jacot <[email protected]>
AuthorDate: Tue Feb 14 09:54:32 2023 +0100
KAFKA-14704; Follower should truncate before incrementing high watermark
(#13230)
When a leader becomes a follower, it is likely that it has uncommitted
records in its log. When it reaches out to the leader, the leader will detect
that they have diverged and it will return the diverging epoch and offset. The
follower truncates it log based on this.
There is a small caveat in this process. When the leader return the
diverging epoch and offset, it also includes its high watermark, low watermark,
start offset and end offset. The current code in the `AbstractFetcherThread`
works as follow. First it process the partition data and then it checks whether
there is a diverging epoch/offset. The former may accidentally expose
uncommitted records as this step updates the local watermark to whatever is
received from the leader. As the follo [...]
When this happens, the follower logs the following messages:
* `Truncating XXX to offset 21434 below high watermark 21437`
* `Non-monotonic update of high watermark from (offset=21437
segment=[20998:98390]) to (offset=21434 segment=[20998:97843])`.
This patch proposes to mitigate the issue by starting by checking on
whether a diverging epoch/offset is provided by the leader and skip processing
the partition data if it is. This basically means that the first fetch request
will result in truncating the log and a subsequent fetch request will update
the low/high watermarks.
Reviewers: Ritika Reddy <[email protected]>, Justine Olshan
<[email protected]>, Jason Gustafson <[email protected]>
---
.../scala/kafka/server/AbstractFetcherThread.scala | 60 ++++++++--------
.../kafka/server/AbstractFetcherThreadTest.scala | 80 +++++++++++++++++++--
.../kafka/server/ReplicaFetcherThreadTest.scala | 81 ++++++++++++++++++++++
3 files changed, 187 insertions(+), 34 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 9701f552647..4d014ade8ed 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -331,33 +331,39 @@ abstract class AbstractFetcherThread(name: String,
Errors.forCode(partitionData.errorCode) match {
case Errors.NONE =>
try {
- // Once we hand off the partition data to the subclass, we
can't mess with it any more in this thread
- val logAppendInfoOpt =
processPartitionData(topicPartition, currentFetchState.fetchOffset,
- partitionData)
-
- logAppendInfoOpt.foreach { logAppendInfo =>
- val validBytes = logAppendInfo.validBytes
- val nextOffset = if (validBytes > 0)
logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
- val lag = Math.max(0L, partitionData.highWatermark -
nextOffset)
- fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
-
- // ReplicaDirAlterThread may have removed topicPartition
from the partitionStates after processing the partition data
- if (validBytes > 0 &&
partitionStates.contains(topicPartition)) {
- // Update partitionStates only if there is no
exception during processPartitionData
- val newFetchState =
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
- currentFetchState.currentLeaderEpoch, state =
Fetching,
- logAppendInfo.lastLeaderEpoch)
- partitionStates.updateAndMoveToEnd(topicPartition,
newFetchState)
- fetcherStats.byteRate.mark(validBytes)
- }
- }
- if (leader.isTruncationOnFetchSupported) {
- FetchResponse.divergingEpoch(partitionData).ifPresent {
divergingEpoch =>
- divergingEndOffsets += topicPartition -> new
EpochEndOffset()
- .setPartition(topicPartition.partition)
- .setErrorCode(Errors.NONE.code)
- .setLeaderEpoch(divergingEpoch.epoch)
- .setEndOffset(divergingEpoch.endOffset)
+ if (leader.isTruncationOnFetchSupported &&
FetchResponse.isDivergingEpoch(partitionData)) {
+ // If a diverging epoch is present, we truncate the log
of the replica
+ // but we don't process the partition data in order to
not update the
+ // low/high watermarks until the truncation is actually
done. Those will
+ // be updated by the next fetch.
+ divergingEndOffsets += topicPartition -> new
EpochEndOffset()
+ .setPartition(topicPartition.partition)
+ .setErrorCode(Errors.NONE.code)
+ .setLeaderEpoch(partitionData.divergingEpoch.epoch)
+ .setEndOffset(partitionData.divergingEpoch.endOffset)
+ } else {
+ // Once we hand off the partition data to the subclass,
we can't mess with it any more in this thread
+ val logAppendInfoOpt = processPartitionData(
+ topicPartition,
+ currentFetchState.fetchOffset,
+ partitionData
+ )
+
+ logAppendInfoOpt.foreach { logAppendInfo =>
+ val validBytes = logAppendInfo.validBytes
+ val nextOffset = if (validBytes > 0)
logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
+ val lag = Math.max(0L, partitionData.highWatermark -
nextOffset)
+ fetcherLagStats.getAndMaybePut(topicPartition).lag =
lag
+
+ // ReplicaDirAlterThread may have removed
topicPartition from the partitionStates after processing the partition data
+ if (validBytes > 0 &&
partitionStates.contains(topicPartition)) {
+ // Update partitionStates only if there is no
exception during processPartitionData
+ val newFetchState =
PartitionFetchState(currentFetchState.topicId, nextOffset, Some(lag),
+ currentFetchState.currentLeaderEpoch, state =
Fetching,
+ logAppendInfo.lastLeaderEpoch)
+ partitionStates.updateAndMoveToEnd(topicPartition,
newFetchState)
+ fetcherStats.byteRate.mark(validBytes)
+ }
}
}
} catch {
diff --git
a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index cdd17b1af2c..9a64de06085 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -999,6 +999,77 @@ class AbstractFetcherThreadTest {
fetcher.verifyLastFetchedEpoch(partition, Some(5))
}
+ @Test
+ def testTruncateOnFetchDoesNotProcessPartitionData(): Unit = {
+ assumeTrue(truncateOnFetch)
+
+ val partition = new TopicPartition("topic", 0)
+
+ var truncateCalls = 0
+ var processPartitionDataCalls = 0
+ val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
+ override def processPartitionData(topicPartition: TopicPartition,
fetchOffset: Long, partitionData: FetchData): Option[LogAppendInfo] = {
+ processPartitionDataCalls += 1
+ super.processPartitionData(topicPartition, fetchOffset, partitionData)
+ }
+
+ override def truncate(topicPartition: TopicPartition, truncationState:
OffsetTruncationState): Unit = {
+ truncateCalls += 1
+ super.truncate(topicPartition, truncationState)
+ }
+ }
+
+ val replicaLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+ mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+ mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 3, leaderEpoch = 4, new SimpleRecord("d".getBytes)),
+ mkBatch(baseOffset = 4, leaderEpoch = 4, new SimpleRecord("e".getBytes)),
+ mkBatch(baseOffset = 5, leaderEpoch = 4, new SimpleRecord("f".getBytes)),
+ )
+
+ val replicaState = PartitionState(replicaLog, leaderEpoch = 5,
highWatermark = 1L)
+ fetcher.setReplicaState(partition, replicaState)
+ fetcher.addPartitions(Map(partition ->
initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+ assertEquals(6L, replicaState.logEndOffset)
+ fetcher.verifyLastFetchedEpoch(partition, expectedEpoch = Some(4))
+
+ val leaderLog = Seq(
+ mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+ mkBatch(baseOffset = 1, leaderEpoch = 0, new SimpleRecord("b".getBytes)),
+ mkBatch(baseOffset = 2, leaderEpoch = 2, new SimpleRecord("c".getBytes)),
+ mkBatch(baseOffset = 3, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+ mkBatch(baseOffset = 4, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+ )
+
+ val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark
= 4L)
+ fetcher.mockLeader.setLeaderState(partition, leaderState)
+
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+ // The first fetch should result in truncating the follower's log and
+ // it should not process the data hence not update the high watermarks.
+ fetcher.doWork()
+
+ assertEquals(1, truncateCalls)
+ assertEquals(0, processPartitionDataCalls)
+ assertEquals(3L, replicaState.logEndOffset)
+ assertEquals(1L, replicaState.highWatermark)
+
+ // Truncate should have been called only once and process partition data
+ // should have been called at least once. The log end offset and the high
+ // watermark are updated.
+ TestUtils.waitUntilTrue(() => {
+ fetcher.doWork()
+ fetcher.replicaPartitionState(partition).log ==
fetcher.mockLeader.leaderPartitionState(partition).log
+ }, "Failed to reconcile leader and follower logs")
+ fetcher.verifyLastFetchedEpoch(partition, Some(5))
+
+ assertEquals(1, truncateCalls)
+ assertTrue(processPartitionDataCalls >= 1)
+ assertEquals(5L, replicaState.logEndOffset)
+ assertEquals(4L, replicaState.highWatermark)
+ }
+
@Test
def testMaybeUpdateTopicIds(): Unit = {
val partition = new TopicPartition("topic1", 0)
@@ -1287,13 +1358,8 @@ class AbstractFetcherThreadTest {
val state = replicaPartitionState(topicPartition)
if (leader.isTruncationOnFetchSupported &&
FetchResponse.isDivergingEpoch(partitionData)) {
- val divergingEpoch = partitionData.divergingEpoch
- truncateOnFetchResponse(Map(topicPartition -> new EpochEndOffset()
- .setPartition(topicPartition.partition)
- .setErrorCode(Errors.NONE.code)
- .setLeaderEpoch(divergingEpoch.epoch)
- .setEndOffset(divergingEpoch.endOffset)))
- return None
+ throw new IllegalStateException("processPartitionData should not be
called for a partition with " +
+ "a diverging epoch.")
}
// Throw exception if the fetchOffset does not match the fetcherThread
partition state
diff --git
a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 8eeb22046e6..84f5d84926e 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -659,6 +659,87 @@ class ReplicaFetcherThreadTest {
partitions.foreach { tp => assertEquals(Fetching,
thread.fetchState(tp).get.state) }
}
+ @Test
+ def testTruncateOnFetchDoesNotUpdateHighWatermark(): Unit = {
+ val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1,
"localhost:1234"))
+ val quota: ReplicationQuotaManager = mock(classOf[ReplicationQuotaManager])
+ val logManager: LogManager = mock(classOf[LogManager])
+ val log: UnifiedLog = mock(classOf[UnifiedLog])
+ val partition: Partition = mock(classOf[Partition])
+ val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+
+ val logEndOffset = 150
+ val highWatermark = 130
+
+ when(log.highWatermark).thenReturn(highWatermark)
+ when(log.latestEpoch).thenReturn(Some(5))
+ when(log.endOffsetForEpoch(4)).thenReturn(Some(OffsetAndEpoch(149, 4)))
+ when(log.logEndOffset).thenReturn(logEndOffset)
+
+ when(replicaManager.metadataCache).thenReturn(metadataCache)
+ when(replicaManager.logManager).thenReturn(logManager)
+
+ when(replicaManager.localLogOrException(t1p0)).thenReturn(log)
+ when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
+
+ when(partition.localLogOrException).thenReturn(log)
+ when(partition.appendRecordsToFollowerOrFutureReplica(any(),
any())).thenReturn(None)
+
+ val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${config.brokerId}, leaderId=${brokerEndPoint.id}, fetcherId=0] ")
+
+ val mockNetwork = new MockBlockingSender(
+ Collections.emptyMap(),
+ brokerEndPoint,
+ new SystemTime()
+ )
+
+ val leader = new RemoteLeaderEndPoint(
+ logContext.logPrefix,
+ mockNetwork,
+ new FetchSessionHandler(logContext, brokerEndPoint.id),
+ config,
+ replicaManager,
+ quota,
+ () => config.interBrokerProtocolVersion
+ )
+
+ val thread = new ReplicaFetcherThread(
+ "fetcher-thread",
+ leader,
+ config,
+ failedPartitions,
+ replicaManager,
+ quota,
+ logContext.logPrefix,
+ () => config.interBrokerProtocolVersion
+ )
+
+ thread.addPartitions(Map(
+ t1p0 -> initialFetchState(Some(topicId1), logEndOffset))
+ )
+
+ // Prepare the fetch response data.
+ mockNetwork.setFetchPartitionDataForNextResponse(Map(
+ t1p0 -> new FetchResponseData.PartitionData()
+ .setPartitionIndex(t1p0.partition)
+ .setLastStableOffset(0)
+ .setLogStartOffset(0)
+ .setHighWatermark(160) // HWM is higher on the leader.
+ .setDivergingEpoch(new FetchResponseData.EpochEndOffset()
+ .setEpoch(4)
+ .setEndOffset(140))
+ ))
+ mockNetwork.setIdsForNextResponse(topicIds)
+
+ // Sends the fetch request and processes the response. This should
truncate the
+ // log but it should not update the high watermark.
+ thread.doWork()
+
+ assertEquals(1, mockNetwork.fetchCount)
+ verify(partition, times(1)).truncateTo(140, false)
+ verify(log, times(0)).maybeUpdateHighWatermark(anyLong())
+ }
+
@Test
def shouldUseLeaderEndOffsetIfInterBrokerVersionBelow20(): Unit = {