This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bc883cd MINOR: Code cleanup in StreamsResetter (#9126)
bc883cd is described below
commit bc883cdbcfa6309116a48eb4ba1531c49e1a5d5e
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Aug 5 13:38:34 2020 -0700
MINOR: Code cleanup in StreamsResetter (#9126)
Reviewers: Guozhang Wang <[email protected]>
---
core/src/main/scala/kafka/tools/StreamsResetter.java | 18 +++++++-----------
1 file changed, 7 insertions(+), 11 deletions(-)
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index e2d3944..eee52fa 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -103,13 +103,13 @@ public class StreamsResetter {
private static OptionSpec<String> fromFileOption;
private static OptionSpec<Long> shiftByOption;
private static OptionSpecBuilder dryRunOption;
- private static OptionSpec helpOption;
- private static OptionSpec versionOption;
+ private static OptionSpec<Void> helpOption;
+ private static OptionSpec<Void> versionOption;
private static OptionSpecBuilder executeOption;
private static OptionSpec<String> commandConfigOption;
- private static OptionSpec forceOption;
+ private static OptionSpecBuilder forceOption;
- private static String usage = "This tool helps to quickly reset an
application in order to reprocess "
+ private final static String USAGE = "This tool helps to quickly reset an
application in order to reprocess "
+ "its data from scratch.\n"
+ "* This tool resets offsets of input topics to the earliest
available offset and it skips to the end of "
+ "intermediate topics (topics that are input and output topics,
e.g., used by deprecated through() method).\n"
@@ -193,11 +193,7 @@ public class StreamsResetter {
if (!members.isEmpty()) {
if (options.has(forceOption)) {
System.out.println("Force deleting all active members in the
group: " + groupId);
- try {
- adminClient.removeMembersFromConsumerGroup(groupId, new
RemoveMembersFromConsumerGroupOptions()).all().get();
- } catch (Exception e) {
- throw e;
- }
+ adminClient.removeMembersFromConsumerGroup(groupId, new
RemoveMembersFromConsumerGroupOptions()).all().get();
} else {
throw new IllegalStateException("Consumer group '" + groupId +
"' is still active "
+ "and has following members: " + members + ". "
@@ -264,7 +260,7 @@ public class StreamsResetter {
try {
options = optionParser.parse(args);
if (args.length == 0 || options.has(helpOption)) {
- CommandLineUtils.printUsageAndDie(optionParser, usage);
+ CommandLineUtils.printUsageAndDie(optionParser, USAGE);
}
if (options.has(versionOption)) {
CommandLineUtils.printVersionAndDie();
@@ -308,7 +304,7 @@ public class StreamsResetter {
JavaConverters.asScalaSetConverter(invalidOptions).asScala());
}
- private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map
consumerConfig,
+ private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final
Map<Object, Object> consumerConfig,
final
boolean dryRun)
throws IOException, ParseException {