[ 
https://issues.apache.org/jira/browse/KAFKA-927?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13671633#comment-13671633
 ] 

Neha Narkhede commented on KAFKA-927:
-------------------------------------

Thanks for the patch, very well thought out! Few comments -
1. KafkaServer
1.1 doControlledShutdown()
- Is there a reason why we cannot just invoke shutdown() on the ReplicaManager 
instead of hacking into the replica fetcher manager and shutting down the 
fetchers ?
- "starting controlled shutdown" -> "Starting controlled shutdown". Though it 
is not introduced in this patch, can we please change the same in the 
shutdown() API as well?
- Typo -> shutdownSuceeded
- This method is pretty big and slightly hard to read, for someone who is new 
to controlled shutdown. Can we move controller discovery/connection logic to a 
separate API named connectToController() ? -
        val controllerId = 
ZkUtils.getController(kafkaZookeeper.getZookeeperClient)
        ZkUtils.getBrokerInfo(kafkaZookeeper.getZookeeperClient, controllerId) 
match {
          case Some(broker) =>
            if (channel == null || prevController == null || 
!prevController.equals(broker)) {
              // if this is the first attempt or if the controller has changed, 
create a channel to the most recent
              // controller
              if (channel != null) {
                channel.disconnect()
              }
              channel = new BlockingChannel(broker.host, broker.port,
                BlockingChannel.UseDefaultBufferSize,
                BlockingChannel.UseDefaultBufferSize,
                config.controllerSocketTimeoutMs)
              channel.connect()
              prevController = broker
            }
          case None=>
            //ignore and try again
        }
- I also think it will be cleaner for the loop to look like, but it's upto you 
:) 
      while (!shutdownSucceeded && remainingRetries > 0) {
         val controller = connectToController(zkClient)
         val shutdownSucceeded = sendControllerShutdownRequest(controller)
         if(!shutdownSucceeded)
            Thread.sleep(...)
         remainingRetries -= 1
      }
- Can we add either a warn or an info message that the broker will retry 
controlled shutdown after n ms ?
        if (!shutdownSuceeded) {
          Thread.sleep(config.controlledShutdownRetryBackoffMs)
        }
- Can we rename doControlledShutdown() to just controlledShutdown(). This will 
follow the naming conventions in the rest of the code, since we don't name 
methods doSomething.
- Let's remove the zkClient unused variable

2. KafkaApis
- If the controller is not active, we should send the appropriate error code
  
3. KafkaController
- getPartitionsAssignedToBroker() does not need to read from zookeeper. The 
controller should already have the latest data available as the controllerLock 
is acquired at this point. 
- The following updates zookeeper which is not required since the leader 
would've done that long before the controller does it. This is because you 
shutdown the replica fetchers at the beginning of controlled shutdown. It will 
be much faster to just send a leader and isr request with the shrunk ISR to the 
existing leader, though I doubt that is required as well.
            else {
              // if the broker is a follower, updates the isr in ZK and 
notifies the current leader
              
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic,
                topicAndPartition.partition, id)), OfflineReplica)
            }

4. We want to know that the broker is rejecting the become-follower request in 
the state change log when the following happens. So it is not enough to just 
surround the addFetcher call with this condition 
5. New files are not included in the patch
          if (!replicaManager.isShuttingDown.get()) {
            // start fetcher thread to current leader if we are not shutting 
down
            replicaFetcherManager.addFetcher(topic, partitionId, 
localReplica.logEndOffset, leaderBroker)
          }

                
> Integrate controlled shutdown into kafka shutdown hook
> ------------------------------------------------------
>
>                 Key: KAFKA-927
>                 URL: https://issues.apache.org/jira/browse/KAFKA-927
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Sriram Subramanian
>            Assignee: Sriram Subramanian
>         Attachments: KAFKA-927.patch
>
>
> The controlled shutdown mechanism should be integrated into the software for 
> better operational benefits. Also few optimizations can be done to reduce 
> unnecessary rpc and zk calls. This patch has been tested on a prod like 
> environment by doing rolling bounces continuously for a day. The average time 
> of doing a rolling bounce with controlled shutdown for a cluster with 7 nodes 
> without this patch is 340 seconds. With this patch it reduces to 220 seconds. 
> Also it ensures correctness in scenarios where the controller shrinks the isr 
> and the new leader could place the broker to be shutdown back into the isr.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to