ASF GitHub Bot commented on KAFKA-6481:

junrao closed pull request #4472: KAFKA-6481: Improving performance of the 
function ControllerChannelManager.addUpd…
URL: https://github.com/apache/kafka/pull/4472

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
index e389821ad3b..547da2759cf 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -375,20 +375,14 @@ class ControllerBrokerRequestBatch(controller: 
KafkaController, stateChangeLogge
-    val filteredPartitions = {
-      val givenPartitions = if (partitions.isEmpty)
-        controllerContext.partitionLeadershipInfo.keySet
-      else
-        partitions
-      if (controller.topicDeletionManager.partitionsToBeDeleted.isEmpty)
-        givenPartitions
-      else
-        givenPartitions -- 
-    }
+    val givenPartitions = if (partitions.isEmpty)
+      controllerContext.partitionLeadershipInfo.keySet
+    else
+      partitions
     updateMetadataRequestBrokerSet ++= brokerIds.filter(_ >= 0)
-    filteredPartitions.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = false))
-    controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition => 
updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
+    givenPartitions.foreach(partition => 
+      beingDeleted = 
   def sendRequestsToBrokers(controllerEpoch: Int) {


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:

> Improving performance of the function 
> ControllerChannelManager.addUpdateMetadataRequestForBrokers
> -------------------------------------------------------------------------------------------------
>                 Key: KAFKA-6481
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6481
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Lucas Wang
>            Assignee: Lucas Wang
>            Priority: Minor
> The function ControllerChannelManager.addUpdateMetadataRequestForBrokers 
> should only process the partitions specified in the partitions parameter, 
> i.e. the 2nd parameter, and avoid iterating through the set of partitions in 
> TopicDeletionManager.partitionsToBeDeleted.
> Here is why the current code can be a problem:
> The number of partitions-to-be-deleted stored in the field 
> TopicDeletionManager.partitionsToBeDeleted can become quite large under 
> certain scenarios. For instance, if a topic a0 has dead replicas, the topic 
> a0 would be marked as ineligible for deletion, and its partitions will be 
> retained in the field TopicDeletionManager.partitionsToBeDeleted for future 
> retries.
>  With a large set of partitions in 
> TopicDeletionManager.partitionsToBeDeleted, if some replicas in another topic 
> a1 needs to be transitioned to OfflineReplica state, possibly because of a 
> broker going offline, a call stack listed as following will happen on the 
> controller, causing a iteration of the whole partitions-to-be-deleted set for 
> every single affected partition.
>     controller.topicDeletionManager.partitionsToBeDeleted.foreach(partition 
> => updateMetadataRequestPartitionInfo(partition, beingDeleted = true))
>      ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers
>      ControllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers
>      _inside a for-loop for each partition_ 
>  ReplicaStateMachine.doHandleStateChanges
>  ReplicaStateMachine.handleStateChanges
>  KafkaController.onReplicasBecomeOffline
>  KafkaController.onBrokerFailure
> How to reproduce the problem:
>  1. Cretae a cluster with 2 brokers having id 1 and 2
>  2. Create a topic having 10 partitions and deliberately assign the replicas 
> to non-existing brokers, i.e. 
>  ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a0 
> --replica-assignment `echo -n 3:4; for i in \`seq 9\`; do echo -n ,3:4; done`
> 3. Delete the topic and cause all of its partitions to be retained in the 
> field TopicDeletionManager.partitionsToBeDeleted, since the topic has dead 
> replicas, and is ineligible for deletion.
> ./bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic a0
> 4._Verify that the following log message shows up 10 times in the 
> controller.log file, one line for each partition in topic a0: "Leader not yet 
> assigned for partition [a0,..]. Skip sending UpdateMetadataRequest."_
> 5. Create another topic a1 also having 10 partitions, i.e.
>  ./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic a1 
> --replica-assignment `echo -n 1:2; for i in \`seq 9\`; do echo -n ,1:2; done`
> 6. Verify that the log message in step 4 appears *100 more* times (). This is 
> because we have the following stack trace: 
>     addUpdateMetadataRequestForBrokers
>     addLeaderAndIsrRequestForBrokers
>     _inside a for-loop for each create response_   
> initializeLeaderAndIsrForPartitions
> In general, if n partitions have already been accumulated in the 
> partitionsToBeDeleted variable, and a new topic is created with m partitions, 
> m * n log messages above will be generated.
>  7. Kill the broker 2 and cause the replicas on broker 2 to be transitioned 
> to OfflineReplica state on the controller.
>  8. Verify that the following log message in step 4 appears another *210* 
> times. This is because
>     a. During controlled shutdown, the function 
> KafkaController.doControlledShutdown calls 
> replicaStateMachine.handleStateChanges to transition all the replicas on 
> broker 2 to OfflineState. That in turn generates 100 (10 x 10) entries of the 
> logs above.
>     b. When the broker zNode is gone in ZK, the function 
> KafkaController.onBrokerFailure calls KafkaController.onReplicasBecomeOffline 
> to transition all the replicas on broker 2 to OfflineState. And this again 
> generates 100 (10 x 10) logs above.
>    c. At the bottom of the the function onReplicasBecomeOffline, it calls 
> sendUpdateMetadataRequest (given the partitionsWithoutLeader is empty), which 
> generates 10 logs, one for each partition in the a0 topic.
> In general, when we have n partitions accumulated in the variable 
> partitionsToBeDeleted, and a broker with m partitions becomes offline, up to 
> 2 * m * n + n logs could be generated.
> Performance improvement benchmark: if we perform the steps above with topic 
> a0 having 5000 partitions, and topic a1 having 5000 partitions, when broker 2 
> goes down, it takes the controller ~4 minutes to go through controlled 
> shutdown, detect the broker failure through zookeeper, and transition all 
> replicas to OfflineReplica state. After applying the patch, the same process 
> takes 23 seconds.
> The testing done:
> After applying the patch in this RB, I've verified that by going through the 
> steps above, broker 2 going offline NO LONGER generates log entries for the 
> a0 partitions.
> Also I've verified that topic deletion for topic a1 still works fine.

This message was sent by Atlassian JIRA

Reply via email to