kafka-937; delta patch; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/20953b52 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/20953b52 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/20953b52 Branch: refs/heads/trunk Commit: 20953b52558935ba210eaee18e9504bf16bfec27 Parents: f89ddce Author: Jun Rao <[email protected]> Authored: Tue Sep 3 20:50:45 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Tue Sep 3 20:50:45 2013 -0700 ---------------------------------------------------------------------- .../kafka/consumer/ConsumerFetcherManager.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/20953b52/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index b286312..fa6b213 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -92,7 +92,20 @@ class ConsumerFetcherManager(private val consumerIdString: String, leaderForPartitionsMap.foreach { case(topicAndPartition, leaderBroker) => val pti = partitionMap(topicAndPartition) - addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) + try { + addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) + } catch { + case t => { + if (!isRunning.get()) + throw t /* If this thread is stopped, propagate this exception to kill the thread. */ + else { + warn("Failed to add leader for partition %s; will retry".format(topicAndPartition), t) + lock.lock() + noLeaderPartitionSet += topicAndPartition + lock.unlock() + } + } + } } shutdownIdleFetcherThreads()
