Repository: kafka Updated Branches: refs/heads/0.10.0 afb65688a -> 45259d11a
MINOR: improve Streams application reset tool to make sure application is down guozhangwang miguno dguy enothereska hjafarpour See #1764 Author: Matthias J. Sax <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1765 from mjsax/improveResetTool-0.10.0 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45259d11 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45259d11 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45259d11 Branch: refs/heads/0.10.0 Commit: 45259d11aa77bda708a2e9422c3dc3899668965d Parents: afb6568 Author: Matthias J. Sax <[email protected]> Authored: Sat Aug 20 12:06:58 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Sat Aug 20 12:06:58 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/StreamsResetter.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/45259d11/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 8e463d1..8d9cd5e 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -20,6 +20,7 @@ import joptsimple.OptionException; import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; +import kafka.admin.AdminClient; import kafka.admin.TopicCommand; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -82,10 +83,18 @@ public class StreamsResetter { int exitCode = EXIT_CODE_SUCCESS; + AdminClient adminClient = null; ZkUtils zkUtils = null; try { parseArguments(args); + adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption)); + final String groupId = this.options.valueOf(applicationIdOption); + if (adminClient.describeConsumerGroup(groupId).size() != 0) { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + + "Make sure to stop all running application instances before running the reset tool."); + } + zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption), 30000, 30000, @@ -97,10 +106,13 @@ public class StreamsResetter { resetInputAndInternalTopicOffsets(); seekToEndIntermediateTopics(); deleteInternalTopics(zkUtils); - } catch (final Exception e) { + } catch (final Throwable e) { exitCode = EXIT_CODE_ERROR; System.err.println("ERROR: " + e.getMessage()); } finally { + if (adminClient != null) { + adminClient.close(); + } if (zkUtils != null) { zkUtils.close(); }
