kafka-937; ConsumerFetcherThread can deadlock; patched by Jun Rao; reviewed by 
Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bd33c15
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bd33c15
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bd33c15

Branch: refs/heads/trunk
Commit: 5bd33c1517bb2e7734166dc3e787ac90a4ef8f86
Parents: 6400264
Author: Jun Rao <jun...@gmail.com>
Authored: Wed Jun 12 20:50:38 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed Jun 12 20:50:38 2013 -0700

----------------------------------------------------------------------
 .../scala/kafka/consumer/ConsoleConsumer.scala  |  2 +-
 .../kafka/consumer/ConsumerFetcherManager.scala | 78 ++++++++------------
 .../scala/kafka/consumer/SimpleConsumer.scala   |  6 +-
 .../kafka/server/AbstractFetcherThread.scala    |  2 +-
 4 files changed, 35 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd33c15/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala 
b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index 89ff382..719beb5 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -110,7 +110,7 @@ object ConsoleConsumer extends Logging {
             .withRequiredArg
             .describedAs("ms")
             .ofType(classOf[java.lang.Integer])
-            .defaultsTo(10*1000)
+            .defaultsTo(ConsumerConfig.AutoCommitInterval)
     val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of 
messages to consume before exiting. If not set, consumption is continual.")
             .withRequiredArg
             .describedAs("num_messages")

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd33c15/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala 
b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 3e497b9..71ae640 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -51,6 +51,7 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
   private class LeaderFinderThread(name: String) extends 
ShutdownableThread(name) {
     // thread responsible for adding the fetcher to the right broker when 
leader is available
     override def doWork() {
+      val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
       lock.lock()
       try {
         if (noLeaderPartitionSet.isEmpty) {
@@ -58,64 +59,43 @@ class ConsumerFetcherManager(private val consumerIdString: 
String,
           cond.await()
         }
 
-        try {
-          trace("Partitions without leader %s".format(noLeaderPartitionSet))
-          val brokers = getAllBrokersInCluster(zkClient)
-          val topicsMetadata = 
ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
-                                                              brokers,
-                                                              config.clientId,
-                                                              
config.socketTimeoutMs,
-                                                              
correlationId.getAndIncrement).topicsMetadata
-          if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => 
debug(topicMetadata.toString()))
-          val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker]
-          topicsMetadata.foreach { tmd =>
-            val topic = tmd.topic
-            tmd.partitionsMetadata.foreach { pmd =>
-              val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
-              if(pmd.leader.isDefined && 
noLeaderPartitionSet.contains(topicAndPartition)) {
-                val leaderBroker = pmd.leader.get
-                leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
-              }
+        trace("Partitions without leader %s".format(noLeaderPartitionSet))
+        val brokers = getAllBrokersInCluster(zkClient)
+        val topicsMetadata = 
ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet,
+                                                            brokers,
+                                                            config.clientId,
+                                                            
config.socketTimeoutMs,
+                                                            
correlationId.getAndIncrement).topicsMetadata
+        if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => 
debug(topicMetadata.toString()))
+        topicsMetadata.foreach { tmd =>
+          val topic = tmd.topic
+          tmd.partitionsMetadata.foreach { pmd =>
+            val topicAndPartition = TopicAndPartition(topic, pmd.partitionId)
+            if(pmd.leader.isDefined && 
noLeaderPartitionSet.contains(topicAndPartition)) {
+              val leaderBroker = pmd.leader.get
+              leaderForPartitionsMap.put(topicAndPartition, leaderBroker)
+              noLeaderPartitionSet -= topicAndPartition
             }
           }
-
-          leaderForPartitionsMap.foreach {
-            case(topicAndPartition, leaderBroker) =>
-              val pti = partitionMap(topicAndPartition)
-              try {
-                  addFetcher(topicAndPartition.topic, 
topicAndPartition.partition, pti.getFetchOffset(), leaderBroker)
-                  noLeaderPartitionSet -= topicAndPartition
-              } catch {
-                case t => {
-                  /*
-                   * If we are shutting down (e.g., due to a rebalance) 
propagate this exception upward to avoid
-                   * processing subsequent partitions without leader so the 
leader-finder-thread can exit.
-                   * It is unfortunate that we depend on the following 
behavior and we should redesign this: as part of
-                   * processing partitions, we catch the InterruptedException 
(thrown from addPartition's call to
-                   * lockInterruptibly) when adding partitions, thereby 
clearing the interrupted flag. If we process
-                   * more partitions, then the lockInterruptibly in 
addPartition will not throw an InterruptedException
-                   * and we can run into the deadlock described in KAFKA-914.
-                   */
-                  if (!isRunning.get())
-                    throw t
-                  else
-                    warn("Failed to add fetcher for %s to broker 
%s".format(topicAndPartition, leaderBroker), t)
-                }
-              }
-          }
-
-          shutdownIdleFetcherThreads()
-        } catch {
-          case t => {
+        }
+      } catch {
+        case t => {
             if (!isRunning.get())
-              throw t /* See above for why we need to propagate this 
exception. */
+              throw t /* If this thread is stopped, propagate this exception 
to kill the thread. */
             else
               warn("Failed to find leader for 
%s".format(noLeaderPartitionSet), t)
           }
-        }
       } finally {
         lock.unlock()
       }
+
+      leaderForPartitionsMap.foreach {
+        case(topicAndPartition, leaderBroker) =>
+          val pti = partitionMap(topicAndPartition)
+          addFetcher(topicAndPartition.topic, topicAndPartition.partition, 
pti.getFetchOffset(), leaderBroker)
+      }
+
+      shutdownIdleFetcherThreads()
       Thread.sleep(config.refreshLeaderBackoffMs)
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd33c15/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala 
b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index bdeee91..1c28328 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -37,6 +37,7 @@ class SimpleConsumer(val host: String,
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, 
BlockingChannel.UseDefaultBufferSize, soTimeout)
   val brokerInfo = "host_%s-port_%s".format(host, port)
   private val fetchRequestAndResponseStats = 
FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
+  private var isClosed = false
 
   private def connect(): BlockingChannel = {
     close
@@ -58,7 +59,8 @@ class SimpleConsumer(val host: String,
 
   def close() {
     lock synchronized {
-        disconnect()
+      disconnect()
+      isClosed = true
     }
   }
   
@@ -123,7 +125,7 @@ class SimpleConsumer(val host: String,
   def getOffsetsBefore(request: OffsetRequest) = 
OffsetResponse.readFrom(sendRequest(request).buffer)
 
   private def getOrMakeConnection() {
-    if(!blockingChannel.isConnected) {
+    if(!isClosed && !blockingChannel.isConnected) {
       connect()
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd33c15/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index b7cbb98..7663fac 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -96,8 +96,8 @@ abstract class AbstractFetcherThread(name: String, clientId: 
String, sourceBroke
       response = simpleConsumer.fetch(fetchRequest)
     } catch {
       case t =>
-        warn("Error in fetch %s".format(fetchRequest), t)
         if (isRunning.get) {
+          warn("Error in fetch %s".format(fetchRequest), t)
           partitionMapLock synchronized {
             partitionsWithError ++= partitionMap.keys
           }

Reply via email to