KAFKA-916; Break deadlock between fetcher shutdown and error partition handling; reviewed by Jun Rao.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/517af477 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/517af477 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/517af477 Branch: refs/heads/trunk Commit: 517af4779aed73efa9889b89d99a9c789f091b06 Parents: 59599cc Author: Joel Koshy <jjko...@gmail.com> Authored: Tue May 28 09:55:39 2013 -0700 Committer: Joel Koshy <jjko...@gmail.com> Committed: Tue May 28 09:55:39 2013 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala | 1 - core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/517af477/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala index db104f1..96bd886 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala @@ -167,7 +167,6 @@ class ConsumerFetcherManager(private val consumerIdString: String, lock.lock() try { if (partitionMap != null) { - partitionList.foreach(tp => removeFetcher(tp.topic, tp.partition)) noLeaderPartitionSet ++= partitionList cond.signalAll() } http://git-wip-us.apache.org/repos/asf/kafka/blob/517af477/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 1270e92..dda0a8f 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -66,6 +66,7 @@ class ConsumerFetcherThread(name: String, // any logic for partitions whose leader has changed def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { + partitions.foreach(tap => removePartition(tap.topic, tap.partition)) consumerFetcherManager.addPartitionsWithError(partitions) } }