kafka-937; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bd33c15 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bd33c15 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bd33c15 Branch: refs/heads/trunk Commit: 5bd33c1517bb2e7734166dc3e787ac90a4ef8f86 Parents: 6400264 Author: Jun Rao <jun...@gmail.com> Authored: Wed Jun 12 20:50:38 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Jun 12 20:50:38 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/consumer/ConsoleConsumer.scala | 2 +- .../kafka/consumer/ConsumerFetcherManager.scala | 78 ++++++++------------ .../scala/kafka/consumer/SimpleConsumer.scala | 6 +- .../kafka/server/AbstractFetcherThread.scala | 2 +- 4 files changed, 35 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd33c15/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala index 89ff382..719beb5 100644 --- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala @@ -110,7 +110,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("ms") .ofType(classOf[java.lang.Integer]) - .defaultsTo(10*1000) + .defaultsTo(ConsumerConfig.AutoCommitInterval) val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") .withRequiredArg .describedAs("num_messages") http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd33c15/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 3e497b9..71ae640 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -51,6 +51,7 @@ class ConsumerFetcherManager(private val consumerIdString: String, private class LeaderFinderThread(name: String) extends ShutdownableThread(name) { // thread responsible for adding the fetcher to the right broker when leader is available override def doWork() { + val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] lock.lock() try { if (noLeaderPartitionSet.isEmpty) { @@ -58,64 +59,43 @@ class ConsumerFetcherManager(private val consumerIdString: String, cond.await() } - try { - trace("Partitions without leader %s".format(noLeaderPartitionSet)) - val brokers = getAllBrokersInCluster(zkClient) - val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, - brokers, - config.clientId, - config.socketTimeoutMs, - correlationId.getAndIncrement).topicsMetadata - if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) - val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] - topicsMetadata.foreach { tmd => - val topic = tmd.topic - tmd.partitionsMetadata.foreach { pmd => - val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) - if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { - val leaderBroker = pmd.leader.get - leaderForPartitionsMap.put(topicAndPartition, leaderBroker) - } + trace("Partitions without leader %s".format(noLeaderPartitionSet)) + val brokers = getAllBrokersInCluster(zkClient) + val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, + brokers, + config.clientId, + config.socketTimeoutMs, + correlationId.getAndIncrement).topicsMetadata + if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) + topicsMetadata.foreach { tmd => + val topic = tmd.topic + tmd.partitionsMetadata.foreach { pmd => + val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) + if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { + val leaderBroker = pmd.leader.get + leaderForPartitionsMap.put(topicAndPartition, leaderBroker) + noLeaderPartitionSet -= topicAndPartition } } - - leaderForPartitionsMap.foreach { - case(topicAndPartition, leaderBroker) => - val pti = partitionMap(topicAndPartition) - try { - addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) - noLeaderPartitionSet -= topicAndPartition - } catch { - case t => { - /* - * If we are shutting down (e.g., due to a rebalance) propagate this exception upward to avoid - * processing subsequent partitions without leader so the leader-finder-thread can exit. - * It is unfortunate that we depend on the following behavior and we should redesign this: as part of - * processing partitions, we catch the InterruptedException (thrown from addPartition's call to - * lockInterruptibly) when adding partitions, thereby clearing the interrupted flag. If we process - * more partitions, then the lockInterruptibly in addPartition will not throw an InterruptedException - * and we can run into the deadlock described in KAFKA-914. - */ - if (!isRunning.get()) - throw t - else - warn("Failed to add fetcher for %s to broker %s".format(topicAndPartition, leaderBroker), t) - } - } - } - - shutdownIdleFetcherThreads() - } catch { - case t => { + } + } catch { + case t => { if (!isRunning.get()) - throw t /* See above for why we need to propagate this exception. */ + throw t /* If this thread is stopped, propagate this exception to kill the thread. */ else warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) } - } } finally { lock.unlock() } + + leaderForPartitionsMap.foreach { + case(topicAndPartition, leaderBroker) => + val pti = partitionMap(topicAndPartition) + addFetcher(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset(), leaderBroker) + } + + shutdownIdleFetcherThreads() Thread.sleep(config.refreshLeaderBackoffMs) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd33c15/core/src/main/scala/kafka/consumer/SimpleConsumer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala index bdeee91..1c28328 100644 --- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala +++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala @@ -37,6 +37,7 @@ class SimpleConsumer(val host: String, private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout) val brokerInfo = "host_%s-port_%s".format(host, port) private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId) + private var isClosed = false private def connect(): BlockingChannel = { close @@ -58,7 +59,8 @@ class SimpleConsumer(val host: String, def close() { lock synchronized { - disconnect() + disconnect() + isClosed = true } } @@ -123,7 +125,7 @@ class SimpleConsumer(val host: String, def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer) private def getOrMakeConnection() { - if(!blockingChannel.isConnected) { + if(!isClosed && !blockingChannel.isConnected) { connect() } } http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd33c15/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index b7cbb98..7663fac 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -96,8 +96,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke response = simpleConsumer.fetch(fetchRequest) } catch { case t => - warn("Error in fetch %s".format(fetchRequest), t) if (isRunning.get) { + warn("Error in fetch %s".format(fetchRequest), t) partitionMapLock synchronized { partitionsWithError ++= partitionMap.keys }