kafka-937; fix bug exposed in ConsumerOffsetChecker; patched by Jun Rao; 
reviewed by Alexey Ozeritskiy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86e314aa
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86e314aa
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86e314aa

Branch: refs/heads/trunk
Commit: 86e314aa7301cc2802bb4910c07f0957e391ed18
Parents: 5f14a69
Author: Jun Rao <jun...@gmail.com>
Authored: Tue Jun 25 21:12:19 2013 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Tue Jun 25 21:12:19 2013 -0700

----------------------------------------------------------------------
 config/server.properties                                    | 2 +-
 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala | 6 +++++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/86e314aa/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index bc6a521..0589c71 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -52,7 +52,7 @@ log.dir=/tmp/kafka-logs
 
 # The number of logical partitions per topic per server. More partitions allow 
greater parallelism
 # for consumption, but also mean more files.
-num.partitions=1
+num.partitions=2
 
 ############################# Log Flush Policy #############################
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/86e314aa/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala 
b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
index 0e6d9b8..33d7c2c 100644
--- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
+++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
@@ -74,7 +74,6 @@ object ConsumerOffsetChecker extends Logging {
             val lag = logSize - offset
             println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, 
topic, pid, offset, logSize, lag,
               owner match {case Some(ownerStr) => ownerStr case None => 
"none"}))
-            consumer.close()
           case None => // ignore
         }
       case None =>
@@ -157,6 +156,11 @@ object ConsumerOffsetChecker extends Logging {
       if (options.has("broker-info"))
         printBrokerInfo();
 
+      for ((_, consumerOpt) <- consumerMap)
+        consumerOpt match {
+          case Some(consumer) => consumer.close()
+          case None => // ignore
+        }
     }
     finally {
       for (consumerOpt <- consumerMap.values) {

Reply via email to