Repository: kafka Updated Branches: refs/heads/trunk f153407c4 -> d851ce76f
MINOR: improve Streams application reset tool to make sure application is down Author: Matthias J. Sax <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1764 from mjsax/improveResetTool Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d851ce76 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d851ce76 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d851ce76 Branch: refs/heads/trunk Commit: d851ce76fdaa469dcaed7f66340574529d79a3cd Parents: f153407 Author: Matthias J. Sax <[email protected]> Authored: Mon Aug 22 15:45:29 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Mon Aug 22 15:45:29 2016 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/tools/StreamsResetter.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d851ce76/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index 8e463d1..22b8bd6 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -20,6 +20,7 @@ import joptsimple.OptionException; import joptsimple.OptionParser; import joptsimple.OptionSet; import joptsimple.OptionSpec; +import kafka.admin.AdminClient; import kafka.admin.TopicCommand; import kafka.utils.ZkUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -82,10 +83,18 @@ public class StreamsResetter { int exitCode = EXIT_CODE_SUCCESS; + AdminClient adminClient = null; ZkUtils zkUtils = null; try { parseArguments(args); + adminClient = AdminClient.createSimplePlaintext(this.options.valueOf(bootstrapServerOption)); + final String groupId = this.options.valueOf(applicationIdOption); + if (adminClient.describeConsumerGroup(groupId).get().size() != 0) { + throw new IllegalStateException("Consumer group '" + groupId + "' is still active. " + + "Make sure to stop all running application instances before running the reset tool."); + } + zkUtils = ZkUtils.apply(this.options.valueOf(zookeeperOption), 30000, 30000, @@ -97,10 +106,13 @@ public class StreamsResetter { resetInputAndInternalTopicOffsets(); seekToEndIntermediateTopics(); deleteInternalTopics(zkUtils); - } catch (final Exception e) { + } catch (final Throwable e) { exitCode = EXIT_CODE_ERROR; System.err.println("ERROR: " + e.getMessage()); } finally { + if (adminClient != null) { + adminClient.close(); + } if (zkUtils != null) { zkUtils.close(); }
