This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 6a2fbea KAFKA-13266; `InitialFetchState` should be created after
partition is removed from the fetchers (#11294)
6a2fbea is described below
commit 6a2fbeaf8c50606996c635c192ad948697b3699a
Author: David Jacot <[email protected]>
AuthorDate: Wed Sep 8 18:23:37 2021 +0200
KAFKA-13266; `InitialFetchState` should be created after partition is
removed from the fetchers (#11294)
`ReplicationTest.test_replication_with_broker_failure` in KRaft mode
sometimes fails with the following error in the log:
```
[2021-08-31 11:31:25,092] ERROR [ReplicaFetcher replicaId=1, leaderId=2,
fetcherId=0] Unexpected error occurred while processing data for partition
__consumer_offsets-1 at offset 31727
(kafka.server.ReplicaFetcherThread)java.lang.IllegalStateException: Offset
mismatch for partition __consumer_offsets-1: fetched offset = 31727, log end
offset = 31728. at
kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:194)
at kafka.server.AbstractFetcherThread.$anonfun [...]
```
The issue is due to a race condition in
`ReplicaManager#applyLocalFollowersDelta`. The `InitialFetchState` is created
and populated before the partition is removed from the fetcher threads. This
means that the fetch offset of the `InitialFetchState` could be outdated when
the fetcher threads are re-started because the fetcher threads could have
incremented the log end offset in between.
The patch fixes the issue by removing the partitions from the replica
fetcher threads before creating the `InitialFetchState` for them.
(cherry-picked from commit 06e53afbefaf3ad1f64a49607775bbcc85edb81e)
Reviewers: Jason Gustafson <[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 79 ++++++++-------
.../unit/kafka/server/ReplicaManagerTest.scala | 108 ++++++++++++++++++++-
2 files changed, 150 insertions(+), 37 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 683e1a8..95491ed 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1687,6 +1687,8 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ // Stopping the fetchers must be done first in order to initialize the
fetch
+ // position correctly.
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
stateChangeLogger.info(s"Stopped fetchers as part of become-follower
request from controller $controllerId " +
s"epoch $controllerEpoch with correlation id $correlationId for
${partitionsToMakeFollower.size} partitions")
@@ -2154,44 +2156,34 @@ class ReplicaManager(val config: KafkaConfig,
stateChangeLogger.info(s"Transitioning ${newLocalFollowers.size}
partition(s) to " +
"local followers.")
val shuttingDown = isShuttingDown.get()
- val partitionsToMakeFollower = new mutable.HashMap[TopicPartition,
InitialFetchState]
+ val partitionsToMakeFollower = new mutable.HashMap[TopicPartition,
Partition]
val newFollowerTopicSet = new mutable.HashSet[String]
newLocalFollowers.forKeyValue { case (tp, info) =>
getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition,
isNew) =>
try {
- newFollowerTopicSet.add(tp.topic())
+ newFollowerTopicSet.add(tp.topic)
if (shuttingDown) {
stateChangeLogger.trace(s"Unable to start fetching ${tp} with
topic " +
s"ID ${info.topicId} because the replica manager is shutting
down.")
} else {
- val listenerName = config.interBrokerListenerName.value()
val leader = info.partition.leader
-
Option(newImage.cluster().broker(leader)).flatMap(_.node(listenerName).asScala)
match {
- case None =>
- stateChangeLogger.trace(
- s"Unable to start fetching ${tp} with topic ID
${info.topicId} from leader " +
- s"${leader} because it is not alive."
- )
-
- // Create the local replica even if the leader is unavailable.
This is required
- // to ensure that we include the partition's high watermark in
the checkpoint
- // file (see KAFKA-1647)
- partition.createLogIfNotExists(isNew, false,
offsetCheckpoints, Some(info.topicId))
- case Some(node) =>
- val state = info.partition.toLeaderAndIsrPartitionState(tp,
isNew)
- if (partition.makeFollower(state, offsetCheckpoints,
Some(info.topicId))) {
- val leaderEndPoint = new BrokerEndPoint(node.id(),
node.host(), node.port())
- val log = partition.localLogOrException
- val fetchOffset = initialFetchOffset(log)
- partitionsToMakeFollower.put(tp,
- InitialFetchState(leaderEndPoint,
partition.getLeaderEpoch, fetchOffset))
- } else {
- stateChangeLogger.info(
- s"Skipped the become-follower state change after marking
its partition as " +
- s"follower for partition $tp with id ${info.topicId} and
partition state $state."
- )
- }
+ if (newImage.cluster.broker(leader) == null) {
+ stateChangeLogger.trace(s"Unable to start fetching $tp with
topic ID ${info.topicId} " +
+ s"from leader $leader because it is not alive.")
+
+ // Create the local replica even if the leader is unavailable.
This is required
+ // to ensure that we include the partition's high watermark in
the checkpoint
+ // file (see KAFKA-1647).
+ partition.createLogIfNotExists(isNew, false, offsetCheckpoints,
Some(info.topicId))
+ } else {
+ val state = info.partition.toLeaderAndIsrPartitionState(tp,
isNew)
+ if (partition.makeFollower(state, offsetCheckpoints,
Some(info.topicId))) {
+ partitionsToMakeFollower.put(tp, partition)
+ } else {
+ stateChangeLogger.info("Skipped the become-follower state
change after marking its " +
+ s"partition as follower for partition $tp with id
${info.topicId} and partition state $state.")
+ }
}
}
changedPartitions.add(partition)
@@ -2203,10 +2195,27 @@ class ReplicaManager(val config: KafkaConfig,
}
}
+ // Stopping the fetchers must be done first in order to initialize the
fetch
+ // position correctly.
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.keySet)
stateChangeLogger.info(s"Stopped fetchers as part of become-follower for
${partitionsToMakeFollower.size} partitions")
- replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollower)
+ val listenerName = config.interBrokerListenerName.value
+ val partitionAndOffsets = new mutable.HashMap[TopicPartition,
InitialFetchState]
+ partitionsToMakeFollower.forKeyValue { (topicPartition, partition) =>
+ val node = partition.leaderReplicaIdOpt
+ .flatMap(leaderId => Option(newImage.cluster.broker(leaderId)))
+ .flatMap(_.node(listenerName).asScala)
+ .getOrElse(Node.noNode)
+ val log = partition.localLogOrException
+ partitionAndOffsets.put(topicPartition, InitialFetchState(
+ new BrokerEndPoint(node.id, node.host, node.port),
+ partition.getLeaderEpoch,
+ initialFetchOffset(log)
+ ))
+ }
+
+ replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
stateChangeLogger.info(s"Started fetchers as part of become-follower for
${partitionsToMakeFollower.size} partitions")
partitionsToMakeFollower.keySet.foreach(completeDelayedFetchOrProduceRequests)
@@ -2215,15 +2224,15 @@ class ReplicaManager(val config: KafkaConfig,
}
def deleteStrayReplicas(topicPartitions: Iterable[TopicPartition]): Unit = {
- stopPartitions(topicPartitions.map { tp => tp -> true }.toMap).foreach {
- case (topicPartition, e) =>
- if (e.isInstanceOf[KafkaStorageException]) {
+ stopPartitions(topicPartitions.map(tp => tp -> true).toMap).forKeyValue {
(topicPartition, exception) =>
+ exception match {
+ case e: KafkaStorageException =>
stateChangeLogger.error(s"Unable to delete stray replica
$topicPartition because " +
- "the local replica for the partition is in an offline log
directory")
- } else {
+ s"the local replica for the partition is in an offline log
directory: ${e.getMessage}.")
+ case e: Throwable =>
stateChangeLogger.error(s"Unable to delete stray replica
$topicPartition because " +
s"we got an unexpected ${e.getClass.getName} exception:
${e.getMessage}", e)
- }
+ }
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 71a1760..243f9ce 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1812,7 +1812,8 @@ class ReplicaManagerTest {
timer: MockTimer,
brokerId: Int = 0,
aliveBrokerIds: Seq[Int] = Seq(0, 1),
- propsModifier: Properties => Unit = _ => {}
+ propsModifier: Properties => Unit = _ => {},
+ mockReplicaFetcherManager: Option[ReplicaFetcherManager] = None
): ReplicaManager = {
val props = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
props.put("log.dirs", TestUtils.tempRelativeDir("data").getAbsolutePath +
"," + TestUtils.tempRelativeDir("data2").getAbsolutePath)
@@ -1837,7 +1838,24 @@ class ReplicaManagerTest {
new AtomicBoolean(false), quotaManager, new BrokerTopicStats,
metadataCache, new LogDirFailureChannel(config.logDirs.size),
mockProducePurgatory, mockFetchPurgatory,
mockDeleteRecordsPurgatory, mockDelayedElectLeaderPurgatory,
Option(this.getClass.getName),
- alterIsrManager)
+ alterIsrManager) {
+
+ override protected def createReplicaFetcherManager(
+ metrics: Metrics,
+ time: Time,
+ threadNamePrefix: Option[String],
+ quotaManager: ReplicationQuotaManager
+ ): ReplicaFetcherManager = {
+ mockReplicaFetcherManager.getOrElse {
+ super.createReplicaFetcherManager(
+ metrics,
+ time,
+ threadNamePrefix,
+ quotaManager
+ )
+ }
+ }
+ }
}
@Test
@@ -3044,6 +3062,92 @@ class ReplicaManagerTest {
}
+ @Test
+ def testDeltaFollowerStopFetcherBeforeCreatingInitialFetchOffset(): Unit = {
+ val localId = 1
+ val otherId = localId + 1
+ val topicPartition = new TopicPartition("foo", 0)
+
+ val mockReplicaFetcherManager =
Mockito.mock(classOf[ReplicaFetcherManager])
+ val replicaManager = setupReplicaManagerWithMockedPurgatories(
+ timer = new MockTimer(time),
+ brokerId = localId,
+ mockReplicaFetcherManager = Some(mockReplicaFetcherManager)
+ )
+
+ try {
+ // The first call to removeFetcherForPartitions should be ignored.
+ Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+ Set(topicPartition))
+ ).thenReturn(Map.empty[TopicPartition, PartitionFetchState])
+
+ // Make the local replica the follower
+ var followerTopicsDelta = topicsCreateDelta(localId, false)
+ var followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+ // Check the state of that partition
+ val HostedPartition.Online(followerPartition) =
replicaManager.getPartition(topicPartition)
+ assertFalse(followerPartition.isLeader)
+ assertEquals(0, followerPartition.getLeaderEpoch)
+ assertEquals(0, followerPartition.localLogOrException.logEndOffset)
+
+ // Verify that addFetcherForPartitions was called with the correct
+ // init offset.
+ Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+ .addFetcherForPartitions(
+ Map(topicPartition -> InitialFetchState(
+ leader = BrokerEndPoint(otherId, "localhost", 9093),
+ currentLeaderEpoch = 0,
+ initOffset = 0
+ ))
+ )
+
+ // The second call to removeFetcherForPartitions simulate the case
+ // where the fetcher write to the log before being shutdown.
+ Mockito.when(mockReplicaFetcherManager.removeFetcherForPartitions(
+ Set(topicPartition))
+ ).thenAnswer { _ =>
+ replicaManager.getPartition(topicPartition) match {
+ case HostedPartition.Online(partition) =>
+ partition.appendRecordsToFollowerOrFutureReplica(
+ records = MemoryRecords.withRecords(CompressionType.NONE, 0,
+ new SimpleRecord("first message".getBytes)),
+ isFuture = false
+ )
+
+ case _ =>
+ }
+
+ Map.empty[TopicPartition, PartitionFetchState]
+ }
+
+ // Apply changes that bumps the leader epoch.
+ followerTopicsDelta = topicsChangeDelta(followerMetadataImage.topics(),
localId, false)
+ followerMetadataImage = imageFromTopics(followerTopicsDelta.apply())
+ replicaManager.applyDelta(followerMetadataImage, followerTopicsDelta)
+
+ assertFalse(followerPartition.isLeader)
+ assertEquals(1, followerPartition.getLeaderEpoch)
+ assertEquals(1, followerPartition.localLogOrException.logEndOffset)
+
+ // Verify that addFetcherForPartitions was called with the correct
+ // init offset.
+ Mockito.verify(mockReplicaFetcherManager, Mockito.times(1))
+ .addFetcherForPartitions(
+ Map(topicPartition -> InitialFetchState(
+ leader = BrokerEndPoint(otherId, "localhost", 9093),
+ currentLeaderEpoch = 1,
+ initOffset = 1
+ ))
+ )
+ } finally {
+ replicaManager.shutdown()
+ }
+
+ TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
+ }
+
private def topicsCreateDelta(startId: Int, isStartIdLeader: Boolean):
TopicsDelta = {
val leader = if (isStartIdLeader) startId else startId + 1
val delta = new TopicsDelta(TopicsImage.EMPTY)