[ 
https://issues.apache.org/jira/browse/KAFKA-2029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14378433#comment-14378433
 ] 

Dmitry Bugaychenko commented on KAFKA-2029:
-------------------------------------------

We tried prioritization of controller messages, but it din't help. 
Communication with a single broker is synchronous, but different brokers might 
handle requests on different speed - as a result with a large queue one broker 
can get way behind another one while controller thinks it is doing fine. 
Addiding "tracked" we ensure taht *all* brokers done the leadership movement, 
thus no one can get behind others to more than one partition. 

The fix for controller messages prioritization were in RequestChannel.scala:
{code}
...
class RequestChannel(val numProcessors: Int, val queueSize: Int) extends 
KafkaMetricsGroup {
...
  private val requestQueue = new 
LinkedBlockingDeque[RequestChannel.Request](queueSize)
...
  /** Send a request to be handled, potentially blocking until there is room in 
the queue for the request */
  def sendRequest(request: RequestChannel.Request) {
    if (request.requestId == RequestKeys.LeaderAndIsrKey || request.requestId 
== RequestKeys.StopReplicaKey) {
      // ODKL Patch: prioritize controller requests over data requests.
      requestQueue.putFirst(request)
      info("Escalated controller request: " + 
request.requestObj.describe(details = true))
    } else {
      requestQueue.putLast(request)
    }
  }
...
  /** Get the next request or block until there is one */
  def receiveRequest(): RequestChannel.Request =
    requestQueue.takeFirst()
...
{code}

It increased GC overhead but didn't improved the speed of partitions movement - 
it looks like the network request processing is not the botlleneck.

> Improving controlled shutdown for rolling updates
> -------------------------------------------------
>
>                 Key: KAFKA-2029
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2029
>             Project: Kafka
>          Issue Type: Improvement
>          Components: controller
>    Affects Versions: 0.8.1.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Neha Narkhede
>            Priority: Critical
>
> Controlled shutdown as implemented currently can cause numerous problems: 
> deadlocks, local and global datalos, partitions without leader and etc. In 
> some cases the only way to restore cluster is to stop it completelly using 
> kill -9 and start again.
> Note 1: I'm aware of KAFKA-1305, but the proposed workaround to increase 
> queue size makes things much worse (see discussion there).
> Note 2: The problems described here can occure in any setup, but they are 
> extremly painful in setup with large brokers (36 disks, 1000+ assigned 
> partitions per broker in our case).
> Note 3: These improvements are actually workarounds and it is worth to 
> consider global refactoring of the controller (make it single thread, or even 
> get rid of it in the favour of ZK leader elections for partitions).
> The problems and improvements are:
> # Controlled shutdown takes a long time (10+ minutes), broker sends multiple 
> shutdown requests and finally considers it as failed and procedes to unclean 
> shutdow, controller got stuck for a while (holding a lock waiting for free 
> space in controller-to-broker queue). After broker starts back it receives 
> followers request and erases highwatermarks (with a message that "replica 
> does not exists" - controller hadn't yet sent a request with replica 
> assignment), then controller starts replicas on the broker it deletes all 
> local data (due to missing highwatermarks). Furthermore, controller starts 
> processing pending shutdown request and stops replicas on the broker letting 
> it in a non-functional state. Solution to the problem might be to increase 
> time broker waits for controller reponse to shutdown request, but this 
> timeout is taken from controller.socket.timeout.ms which is global for all 
> broker-controller communication and setting it to 30 minutes is dangerous. 
> *Proposed solution: introduce dedicated config parameter for this timeout 
> with a high default*.
> # If a broker gets down during controlled shutdown and did not come back 
> controller got stuck in a deadlock (one thread owns the lock and tries to add 
> message to the dead broker's queue, send thread is a infinite loop trying to 
> retry message to the dead broker, and the broker failure handler is waiting 
> for a lock). There are numerous partitions without a leader and the only way 
> out is to kill -9 the controller. *Proposed solution: add timeout for adding 
> message to broker's queue*. ControllerChannelManager.sendRequest:
> {code}
>   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: 
> (RequestOrResponse) => Unit = null) {
>     brokerLock synchronized {
>       val stateInfoOpt = brokerStateInfo.get(brokerId)
>       stateInfoOpt match {
>         case Some(stateInfo) =>
>           // ODKL Patch: prevent infinite hang on trying to send message to a 
> dead broker.
>           // TODO: Move timeout to config
>           if (!stateInfo.messageQueue.offer((request, callback), 10, 
> TimeUnit.SECONDS)) {
>             error("Timed out trying to send message to broker " + 
> brokerId.toString)
>             // Do not throw, as it brings controller into completely 
> non-functional state
>             // "Controller to broker state change requests batch is not empty 
> while creating a new one"
>             //throw new IllegalStateException("Timed out trying to send 
> message to broker " + brokerId.toString)
>           }
>         case None =>
>           warn("Not sending request %s to broker %d, since it is 
> offline.".format(request, brokerId))
>       }
>     }
>   }
> {code}
> # When broker which is a controler starts shut down if auto leader rebalance 
> is running it deadlocks in the end (shutdown thread owns the lock and waits 
> for rebalance thread to exit and rebalance thread wait for lock). *Proposed 
> solution: use bounded wait in rebalance thread*. KafkaController.scala:
> {code}
>   // ODKL Patch to prevent deadlocks in shutdown.
>   /**
>    * Execute the given function inside the lock
>    */
>   def inLockIfRunning[T](lock: ReentrantLock)(fun: => T): T = {
>     if (isRunning || lock.isHeldByCurrentThread) {
>       // TODO: Configure timeout.
>       if (!lock.tryLock(10, TimeUnit.SECONDS)) {
>         throw new IllegalStateException("Failed to acquire controller lock in 
> 10 seconds.");
>       }
>       try {
>         return fun
>       } finally {
>         lock.unlock()
>       }
>     } else {
>       throw new IllegalStateException("Controller is not running, not allowed 
> to lock.")
>     }
>   }
>   private def checkAndTriggerPartitionRebalance(): Unit = {
>     // Use inLockIfRunning here instead of inLock
>   }
> {code}
> # Both OfflinePartitionLeaderSelector and ControlledShutdownLeaderSelector 
> act in a way that they prefer the oldes replica in ISR (the one that joined 
> the ISR first). In case of rolling update it means moving partitions to the 
> tail which increases the overal amount of movements and finally significantly 
> overloads the last broker (with 4 brokers and RF 3 the last one gets 3/4 of 
> leadership). In case of multiple failures this logic can cuase a significant 
> disbalance in the leadership. *Proposed solution: Move leadership to preferd 
> replica if possible or to the younges replica (in controlled shutdown) or 
> second prefered replica (in offline partition)*:
> {code}
> class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) 
> extends PartitionLeaderSelector with Logging {
> ...
>             // ODKL Patch: Trying to select replica from ISR not depending on 
> ISR join order, but following the
>             // assignment order. Preferred replica is the first one, thus if 
> possible it'll be chosen, but most
>             // probably it is the dead one, thus we fallback to second 
> preferred replica. Here we do not care about
>             // overloading second preferred replica as we do not expect 
> rolling crashed.
>             val newLeader = liveBrokersInIsr.sortBy(x => 
> assignedReplicas.indexOf(x)).head
> ...
> }
> class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
>         extends PartitionLeaderSelector
>         with Logging {
> ...
>     // ODKL Patch: Trying to select replica from ISR not depending on ISR 
> join order. If preferred replica is in ISR, choose
>     // it, choose the last replica from ISR - it is expected to be the 
> youngest (most probably already survived rolling
>     // update)
>     val newLeaderOpt = if (newIsr.contains(assignedReplicas.head)) 
> assignedReplicas.headOption else newIsr.lastOption
> ...
> }
> {code}
> # Auto leader rebalance started simultaneously with controlled shutdown 
> compete with it for space in queue and can slow down the process. If the 
> queue size is large it could also create a significant data loss (for few 
> minutes there might be multiple brokers considering itself as a leader and 
> accepting produce requests). *Proposed solution: add throttling to the auto 
> rebalance*:
> {code}
> private def checkAndTriggerPartitionRebalance(): Unit = {
> ...
>           if (imbalanceRatio > 
> (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
>             info("Balancing broker " + leaderBroker + " with imbalance rate " 
> + imbalanceRatio)
>             topicsNotInPreferredReplica.foreach {
>               case (topicPartition, replicas) => {
>                 inLockIfRunning(controllerContext.controllerLock) {
>                   // do this check only if the broker is live and there are 
> no partitions being reassigned currently
>                   // and preferred replica election is not in progress
>                   if (controllerContext.liveBrokerIds.contains(leaderBroker) 
> &&
>                     controllerContext.partitionsBeingReassigned.size == 0 &&
>                     
> controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
>                     
> !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
>                     
> !deleteTopicManager.isTopicDeletionInProgress(topicPartition.topic) &&
>                     
> controllerContext.allTopics.contains(topicPartition.topic)) {
>                     onPreferredReplicaElection(Set(topicPartition), true)
>                   }
>                 }
>                 // ODKL patch: prevent too fast prefered replica elections.
>                 // TODO: Make configurable/use true throttling
>                 Utils.swallow(Thread.sleep(2000))
>               }
>             }
>             info("Balancing broker " + leaderBroker + " done")
>           }
> ...
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to