kafka-832; 0.8 Consumer prevents rebalance if consumer thread is blocked or slow; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6e05d7da Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6e05d7da Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6e05d7da Branch: refs/heads/trunk Commit: 6e05d7da865898c1c44f552c735484dfe2603b60 Parents: 1d5e95f Author: Jun Rao <[email protected]> Authored: Thu Mar 28 16:34:47 2013 -0700 Committer: Jun Rao <[email protected]> Committed: Thu Mar 28 16:34:47 2013 -0700 ---------------------------------------------------------------------- .../kafka/consumer/ConsumerFetcherManager.scala | 39 +++++++++----------- .../consumer/ZookeeperConsumerConnector.scala | 4 +- .../kafka/server/AbstractFetcherThread.scala | 15 ++------ .../unit/kafka/integration/FetcherTest.scala | 4 +- 4 files changed, 25 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index 3aa7b08..d1373c9 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -43,13 +43,17 @@ class ConsumerFetcherManager(private val consumerIdString: String, private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition] private val lock = new ReentrantLock private val cond = lock.newCondition() - private val leaderFinderThread = new ShutdownableThread(consumerIdString + "-leader-finder-thread"){ + private var leaderFinderThread: ShutdownableThread = null + + private class LeaderFinderThread(name: String) extends ShutdownableThread(name) { // thread responsible for adding the fetcher to the right broker when leader is available override def doWork() { lock.lock() try { - if (noLeaderPartitionSet.isEmpty) + if (noLeaderPartitionSet.isEmpty) { + trace("No partition for leader election.") cond.await() + } try { trace("Partitions without leader %s".format(noLeaderPartitionSet)) @@ -93,8 +97,6 @@ class ConsumerFetcherManager(private val consumerIdString: String, Thread.sleep(config.refreshLeaderBackoffMs) } } - leaderFinderThread.start() - override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { new ConsumerFetcherThread( @@ -103,8 +105,9 @@ class ConsumerFetcherManager(private val consumerIdString: String, } def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { - if (!leaderFinderThread.isRunning.get()) - throw new RuntimeException("%s already shutdown".format(name)) + leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") + leaderFinderThread.start() + lock.lock() try { partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap @@ -116,16 +119,17 @@ class ConsumerFetcherManager(private val consumerIdString: String, } } - def stopAllConnections() { - lock.lock() - // first, clear noLeaderPartitionSet so that no more fetchers can be added to leader_finder_thread - noLeaderPartitionSet.clear() - // second, clear partitionMap - partitionMap = null - lock.unlock() + def stopConnections() { + info("Stopping leader finder thread") + if (leaderFinderThread != null) { + leaderFinderThread.shutdown() + leaderFinderThread = null + } - // third, stop all existing fetchers + info("Stopping all fetchers") closeAllFetchers() + + info("All connections stopped") } def addPartitionsWithError(partitionList: Iterable[TopicAndPartition]) { @@ -141,11 +145,4 @@ class ConsumerFetcherManager(private val consumerIdString: String, lock.unlock() } } - - def shutdown() { - info("shutting down") - leaderFinderThread.shutdown() - stopAllConnections() - info("shutdown completed") - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index dcbcf21..9a5fbfe 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -162,7 +162,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, if (config.autoCommitEnable) scheduler.shutdownNow() fetcher match { - case Some(f) => f.shutdown + case Some(f) => f.stopConnections case None => } sendShutdownToAllQueues() @@ -483,7 +483,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, val allPartitionInfos = topicRegistry.values.map(p => p.values).flatten fetcher match { case Some(f) => - f.stopAllConnections + f.stopConnections clearFetcherQueues(allPartitionInfos, cluster, queuesToBeCleared, messageStreams) info("Committing all offsets after clearing the fetcher queues") /** http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index 4ee23cd..087979f 100644 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -159,7 +159,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } def addPartition(topic: String, partitionId: Int, initialOffset: Long) { - partitionMapLock.lock() + partitionMapLock.lockInterruptibly() try { val topicPartition = TopicAndPartition(topic, partitionId) partitionMap.put( @@ -172,7 +172,7 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } def removePartition(topic: String, partitionId: Int) { - partitionMapLock.lock() + partitionMapLock.lockInterruptibly() try { partitionMap.remove(TopicAndPartition(topic, partitionId)) } finally { @@ -180,17 +180,8 @@ abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroke } } - def hasPartition(topic: String, partitionId: Int): Boolean = { - partitionMapLock.lock() - try { - partitionMap.get(TopicAndPartition(topic, partitionId)).isDefined - } finally { - partitionMapLock.unlock() - } - } - def partitionCount() = { - partitionMapLock.lock() + partitionMapLock.lockInterruptibly() try { partitionMap.size } finally { http://git-wip-us.apache.org/repos/asf/kafka/blob/6e05d7da/core/src/test/scala/unit/kafka/integration/FetcherTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala index 5a57bd1..c5cddea 100644 --- a/core/src/test/scala/unit/kafka/integration/FetcherTest.scala +++ b/core/src/test/scala/unit/kafka/integration/FetcherTest.scala @@ -58,12 +58,12 @@ class FetcherTest extends JUnit3Suite with KafkaServerTestHarness { CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString) waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500) fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient) - fetcher.stopAllConnections() + fetcher.stopConnections() fetcher.startConnections(topicInfos, cluster) } override def tearDown() { - fetcher.shutdown() + fetcher.stopConnections() super.tearDown }
