KAFKA-914; Break deadlock between initial rebalance and watcher-triggered rebalances; reviewed by Jun Rao and Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ffd84eb2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ffd84eb2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ffd84eb2 Branch: refs/heads/trunk Commit: ffd84eb23fcbb279a63ba5d4cb72077a0c079cff Parents: 32cd899 Author: Joel Koshy <jjko...@gmail.com> Authored: Wed May 22 16:26:04 2013 -0700 Committer: Joel Koshy <jjko...@gmail.com> Committed: Wed May 22 16:26:04 2013 -0700 ---------------------------------------------------------------------- .../kafka/consumer/ConsumerFetcherManager.scala | 28 +++++++++++++++-- .../main/scala/kafka/tools/MirrorMaker.scala | 32 ++++++++++++++------ 2 files changed, 49 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd84eb2/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 658b5c1..db104f1 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -85,13 +85,32 @@ class ConsumerFetcherManager(private val consumerIdString: String, addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) noLeaderPartitionSet -= topicAndPartition } catch { - case t => warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t) + case t => { + /* + * If we are shutting down (e.g., due to a rebalance) propagate this exception upward to avoid + * processing subsequent partitions without leader so the leader-finder-thread can exit. + * It is unfortunate that we depend on the following behavior and we should redesign this: as part of + * processing partitions, we catch the InterruptedException (thrown from addPartition's call to + * lockInterruptibly) when adding partitions, thereby clearing the interrupted flag. If we process + * more partitions, then the lockInterruptibly in addPartition will not throw an InterruptedException + * and we can run into the deadlock described in KAFKA-914. + */ + if (!isRunning.get()) + throw t + else + warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t) + } } } shutdownIdleFetcherThreads() } catch { - case t => warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) + case t => { + if (!isRunning.get()) + throw t /* See above for why we need to propagate this exception. */ + else + warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) + } } } finally { lock.unlock() @@ -122,6 +141,11 @@ class ConsumerFetcherManager(private val consumerIdString: String, } def stopConnections() { + /* + * Stop the leader finder thread first before stopping fetchers. Otherwise, if there are more partitions without + * leader, then the leader finder thread will process these partitions (before shutting down) and add fetchers for + * these partitions. + */ info("Stopping leader finder thread") if (leaderFinderThread != null) { leaderFinderThread.shutdown() http://git-wip-us.apache.org/repos/asf/kafka/blob/ffd84eb2/core/src/main/scala/kafka/tools/MirrorMaker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 3d22dc7..2d93947 100644 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -28,7 +28,6 @@ import collection.mutable.ListBuffer import kafka.tools.KafkaMigrationTool.{ProducerThread, ProducerDataChannel} import kafka.javaapi - object MirrorMaker extends Logging { def main(args: Array[String]) { @@ -114,23 +113,33 @@ object MirrorMaker extends Logging { else new Blacklist(options.valueOf(blacklistOpt)) - val streams = - connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())) + var streams: Seq[KafkaStream[Array[Byte], Array[Byte]]] = Nil + try { + streams = connectors.map(_.createMessageStreamsByFilter(filterSpec, numStreams.intValue(), new DefaultDecoder(), new DefaultDecoder())).flatten + } catch { + case t => + fatal("Unable to create stream - shutting down mirror maker.") + connectors.foreach(_.shutdown) + } val producerDataChannel = new ProducerDataChannel[KeyedMessage[Array[Byte], Array[Byte]]](bufferSize); val consumerThreads = - streams.flatten.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2)) + streams.zipWithIndex.map(streamAndIndex => new MirrorMakerThread(streamAndIndex._1, producerDataChannel, streamAndIndex._2)) val producerThreads = new ListBuffer[ProducerThread]() + def cleanShutdown() { + connectors.foreach(_.shutdown) + consumerThreads.foreach(_.awaitShutdown) + producerThreads.foreach(_.shutdown) + producerThreads.foreach(_.awaitShutdown) + info("Kafka mirror maker shutdown successfully") + } + Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { - connectors.foreach(_.shutdown) - consumerThreads.foreach(_.awaitShutdown) - producerThreads.foreach(_.shutdown) - producerThreads.foreach(_.awaitShutdown) - logger.info("Kafka migration tool shutdown successfully"); + cleanShutdown() } }) @@ -145,6 +154,10 @@ object MirrorMaker extends Logging { consumerThreads.foreach(_.start) producerThreads.foreach(_.start) + + // in case the consumer threads hit a timeout/other exception + consumerThreads.foreach(_.awaitShutdown) + cleanShutdown() } class MirrorMakerThread(stream: KafkaStream[Array[Byte], Array[Byte]], @@ -158,6 +171,7 @@ object MirrorMaker extends Logging { this.setName(threadName) override def run() { + info("Starting mirror maker thread " + threadName) try { for (msgAndMetadata <- stream) { val pd = new KeyedMessage[Array[Byte], Array[Byte]](msgAndMetadata.topic, msgAndMetadata.message)