kafka-937; fix bug exposed in ConsumerOffsetChecker; patched by Jun Rao; reviewed by Alexey Ozeritskiy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86e314aa Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86e314aa Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86e314aa Branch: refs/heads/trunk Commit: 86e314aa7301cc2802bb4910c07f0957e391ed18 Parents: 5f14a69 Author: Jun Rao <jun...@gmail.com> Authored: Tue Jun 25 21:12:19 2013 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Tue Jun 25 21:12:19 2013 -0700 ---------------------------------------------------------------------- config/server.properties | 2 +- core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/86e314aa/config/server.properties ---------------------------------------------------------------------- diff --git a/config/server.properties b/config/server.properties index bc6a521..0589c71 100644 --- a/config/server.properties +++ b/config/server.properties @@ -52,7 +52,7 @@ log.dir=/tmp/kafka-logs # The number of logical partitions per topic per server. More partitions allow greater parallelism # for consumption, but also mean more files. -num.partitions=1 +num.partitions=2 ############################# Log Flush Policy ############################# http://git-wip-us.apache.org/repos/asf/kafka/blob/86e314aa/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala index 0e6d9b8..33d7c2c 100644 --- a/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala +++ b/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala @@ -74,7 +74,6 @@ object ConsumerOffsetChecker extends Logging { val lag = logSize - offset println("%-15s %-30s %-3s %-15s %-15s %-15s %s".format(group, topic, pid, offset, logSize, lag, owner match {case Some(ownerStr) => ownerStr case None => "none"})) - consumer.close() case None => // ignore } case None => @@ -157,6 +156,11 @@ object ConsumerOffsetChecker extends Logging { if (options.has("broker-info")) printBrokerInfo(); + for ((_, consumerOpt) <- consumerMap) + consumerOpt match { + case Some(consumer) => consumer.close() + case None => // ignore + } } finally { for (consumerOpt <- consumerMap.values) {