KAFKA-914; Break deadlock between initial rebalance and watcher-triggered 
rebalances; reviewed by Jun Rao and Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffd84eb2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffd84eb2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffd84eb2

Branch: refs/heads/trunk
Commit: ffd84eb23fcbb279a63ba5d4cb72077a0c079cff
Parents: 32cd899
Author: Joel Koshy <jjko...@gmail.com>
Authored: Wed May 22 16:26:04 2013 -0700
Committer: Joel Koshy <jjko...@gmail.com>
Committed: Wed May 22 16:26:04 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala | 28 +++++++++++++++--
 .../main/scala/kafka/tools/MirrorMaker.scala    | 32 ++++++++++++++------
 2 files changed, 49 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd84eb2/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 658b5c1..db104f1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -85,13 +85,32 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
                   addFetcher(topicAndPartition.topic, 
topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
                   noLeaderPartitionSet -= topicAndPartition
               } catch {
-                case t => warn("Failed to add fetcher for %s to broker 
%s".format(topicAndPartition, leaderBroker), t)
+                case t => {
+                  /*
+                   * If we are shutting down (e.g., due to a rebalance) 
propagate this exception upward to avoid
+                   * processing subsequent partitions without leader so the 
leader-finder-thread can exit.
+                   * It is unfortunate that we depend on the following 
behavior and we should redesign this: as part of
+                   * processing partitions, we catch the InterruptedException 
(thrown from addPartition's call to
+                   * lockInterruptibly) when adding partitions, thereby 
clearing the interrupted flag. If we process
+                   * more partitions, then the lockInterruptibly in 
addPartition will not throw an InterruptedException
+                   * and we can run into the deadlock described in KAFKA-914.
+                   */
+                  if (!isRunning.get())
+                    throw t
+                  else
+                    warn("Failed to add fetcher for %s to broker 
%s".format(topicAndPartition, leaderBroker), t)
+                }
               }
           }
 
           shutdownIdleFetcherThreads()
         } catch {
-          case t => warn("Failed to find leader for 
%s".format(noLeaderPartitionSet), t)
+          case t => {
+            if (!isRunning.get())
+              throw t /* See above for why we need to propagate this 
exception. */
+            else
+              warn("Failed to find leader for 
%s".format(noLeaderPartitionSet), t)
+          }
         }
       } finally {
         lock.unlock()
@@ -122,6 +141,11 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
   }
 
   def stopConnections() {
+    /*
+     * Stop the leader finder thread first before stopping fetchers. 
Otherwise, if there are more partitions without
+     * leader, then the leader finder thread will process these partitions 
(before shutting down) and add fetchers for
+     * these partitions.
+     */
     info("Stopping leader finder thread")
     if (leaderFinderThread != null) {
       leaderFinderThread.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd84eb2/core/src/main/scala/kafka/tools/MirrorMaker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala 
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 3d22dc7..2d93947 100644
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -28,7 +28,6 @@ import collection.mutable.ListBuffer
 import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel}
 import kafka.javaapi
 
-
 object MirrorMaker extends Logging {
 
   def main(args: Array[String]) {
@@ -114,23 +113,33 @@ object MirrorMaker extends Logging {
     else
       new Blacklist(options.valueOf(blacklistOpt))
 
-    val streams =
-      connectors.map(_.createMessageStreamsByFilter(filterSpec, 
numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder()))
+    var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil
+    try {
+      streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, 
numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten
+    } catch {
+      case t =>
+        fatal("Unable to create stream - shutting down mirror maker.")
+        connectors.foreach(_.shutdown)
+    }
 
     val producerDataChannel = new 
ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize);
 
     val consumerThreads =
-      streams.flatten.zipWithIndex.map(streamAndIndex => new 
MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
+      streams.zipWithIndex.map(streamAndIndex => new 
MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2))
 
     val producerThreads = new ListBuffer[ProducerThread]()
 
+    def cleanShutdown() {
+      connectors.foreach(_.shutdown)
+      consumerThreads.foreach(_.awaitShutdown)
+      producerThreads.foreach(_.shutdown)
+      producerThreads.foreach(_.awaitShutdown)
+      info("Kafka mirror maker shutdown successfully")
+    }
+
     Runtime.getRuntime.addShutdownHook(new Thread() {
       override def run() {
-        connectors.foreach(_.shutdown)
-        consumerThreads.foreach(_.awaitShutdown)
-        producerThreads.foreach(_.shutdown)
-        producerThreads.foreach(_.awaitShutdown)
-        logger.info("Kafka migration tool shutdown successfully");
+        cleanShutdown()
       }
     })
 
@@ -145,6 +154,10 @@ object MirrorMaker extends Logging {
 
     consumerThreads.foreach(_.start)
     producerThreads.foreach(_.start)
+
+    // in case the consumer threads hit a timeout/other exception
+    consumerThreads.foreach(_.awaitShutdown)
+    cleanShutdown()
   }
 
   class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]],
@@ -158,6 +171,7 @@ object MirrorMaker extends Logging {
     this.setName(threadName)
 
     override def run() {
+      info("Starting mirror maker thread " + threadName)
       try {
         for (msgAndMetadata <- stream) {
           val pd = new KeyedMessage[Array[Byte], 
Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)

Reply via email to