This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a4a3d70 KAFKA-10371; Partition reassignments can result in crashed
ReplicaFetcherThreads (#9140)
a4a3d70 is described below
commit a4a3d7064e16d4964a5b64a114579512c22ae6d2
Author: David Jacot <[email protected]>
AuthorDate: Sat Aug 8 00:30:00 2020 +0200
KAFKA-10371; Partition reassignments can result in crashed
ReplicaFetcherThreads (#9140)
The patch https://github.com/apache/kafka/pull/8672 introduced a bug
leading to crashing the replica fetcher threads. The issue is that
https://github.com/apache/kafka/pull/8672 deletes the Partitions prior to
stopping the replica fetchers. As the replica fetchers relies access the
Partition in the ReplicaManager, they crash with a NotLeaderOrFollowerException
that is not handled.
This PR reverts the code to the original ordering to avoid this issue.
The regression was caught and validated by our system test:
`kafkatest.tests.core.reassign_partitions_test`.
Reviewers: Vikas Singh <[email protected]>, Jason Gustafson
<[email protected]>
---
.../main/scala/kafka/server/ReplicaManager.scala | 60 ++++++++++++----------
1 file changed, 34 insertions(+), 26 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index be73bf2..32c9aa4 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -366,8 +366,7 @@ class ReplicaManager(val config: KafkaConfig,
} else {
this.controllerEpoch = controllerEpoch
- val stoppedPartitions = mutable.Set.empty[TopicPartition]
- val deletedPartitions = mutable.Set.empty[TopicPartition]
+ val stoppedPartitions = mutable.Map.empty[TopicPartition,
StopReplicaPartitionState]
partitionStates.foreach { case (topicPartition, partitionState) =>
val deletePartition = partitionState.deletePartition
@@ -379,7 +378,7 @@ class ReplicaManager(val config: KafkaConfig,
"partition is in an offline log directory")
responseMap.put(topicPartition, Errors.KAFKA_STORAGE_ERROR)
- case hostedPartition @ HostedPartition.Online(partition) =>
+ case HostedPartition.Online(partition) =>
val currentLeaderEpoch = partition.getLeaderEpoch
val requestLeaderEpoch = partitionState.leaderEpoch
// When a topic is deleted, the leader epoch is not incremented.
To circumvent this,
@@ -389,22 +388,7 @@ class ReplicaManager(val config: KafkaConfig,
if (requestLeaderEpoch == LeaderAndIsr.EpochDuringDelete ||
requestLeaderEpoch == LeaderAndIsr.NoEpoch ||
requestLeaderEpoch > currentLeaderEpoch) {
- stoppedPartitions += topicPartition
-
- if (deletePartition) {
- if (allPartitions.remove(topicPartition, hostedPartition)) {
- maybeRemoveTopicMetrics(topicPartition.topic)
- // Logs are not deleted here. They are deleted in a single
batch later on.
- // This is done to avoid having to checkpoint for every
deletions.
- partition.delete()
- deletedPartitions += topicPartition
- }
- }
-
- // If we were the leader, we may have some operations still
waiting for completion.
- // We force completion to prevent them from timing out.
- completeDelayedFetchOrProduceRequests(topicPartition)
-
+ stoppedPartitions += topicPartition -> partitionState
// Assume that everything will go right. It is overwritten in
case of an error.
responseMap.put(topicPartition, Errors.NONE)
} else if (requestLeaderEpoch < currentLeaderEpoch) {
@@ -425,18 +409,42 @@ class ReplicaManager(val config: KafkaConfig,
case HostedPartition.None =>
// Delete log and corresponding folders in case replica manager
doesn't hold them anymore.
// This could happen when topic is being deleted while broker is
down and recovers.
- stoppedPartitions += topicPartition
- if (deletePartition)
- deletedPartitions += topicPartition
+ stoppedPartitions += topicPartition -> partitionState
responseMap.put(topicPartition, Errors.NONE)
}
}
- // First stop fetchers for all partitions, then stop the corresponding
replicas
- replicaFetcherManager.removeFetcherForPartitions(stoppedPartitions)
-
replicaAlterLogDirsManager.removeFetcherForPartitions(stoppedPartitions)
+ // First stop fetchers for all partitions.
+ val partitions = stoppedPartitions.keySet
+ replicaFetcherManager.removeFetcherForPartitions(partitions)
+ replicaAlterLogDirsManager.removeFetcherForPartitions(partitions)
+
+ // Second remove deleted partitions from the partition map. Fetchers
rely on the
+ // ReplicaManager to get Partition's information so they must be
stopped first.
+ val deletedPartitions = mutable.Set.empty[TopicPartition]
+ stoppedPartitions.foreach { case (topicPartition, partitionState) =>
+ if (partitionState.deletePartition) {
+ getPartition(topicPartition) match {
+ case [email protected](partition) =>
+ if (allPartitions.remove(topicPartition, hostedPartition)) {
+ maybeRemoveTopicMetrics(topicPartition.topic)
+ // Logs are not deleted here. They are deleted in a single
batch later on.
+ // This is done to avoid having to checkpoint for every
deletions.
+ partition.delete()
+ }
+
+ case _ =>
+ }
+
+ deletedPartitions += topicPartition
+ }
+
+ // If we were the leader, we may have some operations still waiting
for completion.
+ // We force completion to prevent them from timing out.
+ completeDelayedFetchOrProduceRequests(topicPartition)
+ }
- // Delete the logs and checkpoint
+ // Third delete the logs and checkpoint.
logManager.asyncDelete(deletedPartitions, (topicPartition, exception)
=> {
exception match {
case e: KafkaStorageException =>