KAFKA-916; Break deadlock between fetcher shutdown and error partition 
handling; reviewed by Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/517af477
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/517af477
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/517af477

Branch: refs/heads/trunk
Commit: 517af4779aed73efa9889b89d99a9c789f091b06
Parents: 59599cc
Author: Joel Koshy <jjko...@gmail.com>
Authored: Tue May 28 09:55:39 2013 -0700
Committer: Joel Koshy <jjko...@gmail.com>
Committed: Tue May 28 09:55:39 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala | 1 -
 core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala  | 1 +
 2 files changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/517af477/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index db104f1..96bd886 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -167,7 +167,6 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
     lock.lock()
     try {
       if (partitionMap != null) {
-        partitionList.foreach(tp => removeFetcher(tp.topic, tp.partition))
         noLeaderPartitionSet ++= partitionList
         cond.signalAll()
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/517af477/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index 1270e92..dda0a8f 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -66,6 +66,7 @@ class ConsumerFetcherThread(name: String,
 
   // any logic for partitions whose leader has changed
   def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
+    partitions.foreach(tap => removePartition(tap.topic, tap.partition))
     consumerFetcherManager.addPartitionsWithError(partitions)
   }
 }

Reply via email to