[ https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13671633#comment-13671633 ]
Neha Narkhede commented on KAFKA-927: ------------------------------------- Thanks for the patch, very well thought out! Few comments - 1. KafkaServer 1.1 doControlledShutdown() - Is there a reason why we cannot just invoke shutdown() on the ReplicaManager instead of hacking into the replica fetcher manager and shutting down the fetchers ? - "starting controlled shutdown" -> "Starting controlled shutdown". Though it is not introduced in this patch, can we please change the same in the shutdown() API as well? - Typo -> shutdownSuceeded - This method is pretty big and slightly hard to read, for someone who is new to controlled shutdown. Can we move controller discovery/connection logic to a separate API named connectToController() ? - val controllerId = ZkUtils.getController(kafkaZookeeper.getZookeeperClient) ZkUtils.getBrokerInfo(kafkaZookeeper.getZookeeperClient, controllerId) match { case Some(broker) => if (channel == null || prevController == null || !prevController.equals(broker)) { // if this is the first attempt or if the controller has changed, create a channel to the most recent // controller if (channel != null) { channel.disconnect() } channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, config.controllerSocketTimeoutMs) channel.connect() prevController = broker } case None=> //ignore and try again } - I also think it will be cleaner for the loop to look like, but it's upto you :) while (!shutdownSucceeded && remainingRetries > 0) { val controller = connectToController(zkClient) val shutdownSucceeded = sendControllerShutdownRequest(controller) if(!shutdownSucceeded) Thread.sleep(...) remainingRetries -= 1 } - Can we add either a warn or an info message that the broker will retry controlled shutdown after n ms ? if (!shutdownSuceeded) { Thread.sleep(config.controlledShutdownRetryBackoffMs) } - Can we rename doControlledShutdown() to just controlledShutdown(). This will follow the naming conventions in the rest of the code, since we don't name methods doSomething. - Let's remove the zkClient unused variable 2. KafkaApis - If the controller is not active, we should send the appropriate error code 3. KafkaController - getPartitionsAssignedToBroker() does not need to read from zookeeper. The controller should already have the latest data available as the controllerLock is acquired at this point. - The following updates zookeeper which is not required since the leader would've done that long before the controller does it. This is because you shutdown the replica fetchers at the beginning of controlled shutdown. It will be much faster to just send a leader and isr request with the shrunk ISR to the existing leader, though I doubt that is required as well. else { // if the broker is a follower, updates the isr in ZK and notifies the current leader replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, id)), OfflineReplica) } 4. We want to know that the broker is rejecting the become-follower request in the state change log when the following happens. So it is not enough to just surround the addFetcher call with this condition 5. New files are not included in the patch if (!replicaManager.isShuttingDown.get()) { // start fetcher thread to current leader if we are not shutting down replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) } > Integrate controlled shutdown into kafka shutdown hook > ------------------------------------------------------ > > Key: KAFKA-927 > URL: https://issues.apache.org/jira/browse/KAFKA-927 > Project: Kafka > Issue Type: Bug > Reporter: Sriram Subramanian > Assignee: Sriram Subramanian > Attachments: KAFKA-927.patch > > > The controlled shutdown mechanism should be integrated into the software for > better operational benefits. Also few optimizations can be done to reduce > unnecessary rpc and zk calls. This patch has been tested on a prod like > environment by doing rolling bounces continuously for a day. The average time > of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes > without this patch is 340 seconds. With this patch it reduces to 220 seconds. > Also it ensures correctness in scenarios where the controller shrinks the isr > and the new leader could place the broker to be shutdown back into the isr. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira