Repository: kafka Updated Branches: refs/heads/trunk 972b75453 -> 9fa0d52ca
KAFKA-5210: Application Reset Tool does not need to seek for internal topics mjsax dguy guozhangwang Could you please review the changes. Author: Bharat Viswanadham <[email protected]> Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang Closes #3073 from bharatviswa504/KAFKA-5210 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fa0d52c Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fa0d52c Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fa0d52c Branch: refs/heads/trunk Commit: 9fa0d52cac24c69dbc907208ccb3e603cab3503b Parents: 972b754 Author: Bharat Viswanadham <[email protected]> Authored: Wed May 17 23:15:23 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed May 17 23:15:23 2017 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/tools/StreamsResetter.java | 32 ++++++-------------- 1 file changed, 10 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9fa0d52c/core/src/main/scala/kafka/tools/StreamsResetter.java ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java b/core/src/main/scala/kafka/tools/StreamsResetter.java index a218125..3a778ee 100644 --- a/core/src/main/scala/kafka/tools/StreamsResetter.java +++ b/core/src/main/scala/kafka/tools/StreamsResetter.java @@ -118,7 +118,7 @@ public class StreamsResetter { if (dryRun) { System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----"); } - maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets(); + maybeResetInputAndSeekToEndIntermediateTopicOffsets(); maybeDeleteInternalTopics(zkUtils); } catch (final Throwable e) { @@ -173,11 +173,10 @@ public class StreamsResetter { } } - private void maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets() { + private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() { final List<String> inputTopics = options.valuesOf(inputTopicsOption); final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption); - final List<String> internalTopics = new ArrayList<>(); final List<String> notFoundInputTopics = new ArrayList<>(); final List<String> notFoundIntermediateTopics = new ArrayList<>(); @@ -191,7 +190,7 @@ public class StreamsResetter { if (!dryRun) { if (inputTopics.size() != 0) { - System.out.println("Seek-to-beginning for input topics " + inputTopics + " and all internal topics."); + System.out.println("Seek-to-beginning for input topics " + inputTopics); } if (intermediateTopics.size() != 0) { System.out.println("Seek-to-end for intermediate topics " + intermediateTopics); @@ -214,12 +213,6 @@ public class StreamsResetter { topicsToSubscribe.add(topic); } } - for (final String topic : allTopics) { - if (isInternalTopic(topic)) { - topicsToSubscribe.add(topic); - internalTopics.add(topic); - } - } final Properties config = new Properties(); config.putAll(consumerConfig); @@ -232,13 +225,13 @@ public class StreamsResetter { client.poll(1); final Set<TopicPartition> partitions = client.assignment(); - final Set<TopicPartition> inputAndInternalTopicPartitions = new HashSet<>(); + final Set<TopicPartition> inputTopicPartitions = new HashSet<>(); final Set<TopicPartition> intermediateTopicPartitions = new HashSet<>(); for (final TopicPartition p : partitions) { final String topic = p.topic(); - if (isInputTopic(topic) || isInternalTopic(topic)) { - inputAndInternalTopicPartitions.add(p); + if (isInputTopic(topic)) { + inputTopicPartitions.add(p); } else if (isIntermediateTopic(topic)) { intermediateTopicPartitions.add(p); } else { @@ -246,7 +239,7 @@ public class StreamsResetter { } } - maybeSeekToBeginning(client, inputAndInternalTopicPartitions, internalTopics); + maybeSeekToBeginning(client, inputTopicPartitions); maybeSeekToEnd(client, intermediateTopicPartitions); @@ -299,15 +292,14 @@ public class StreamsResetter { } private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> client, - final Set<TopicPartition> inputAndInternalTopicPartitions, - final List<String> internalTopics) { + final Set<TopicPartition> inputTopicPartitions) { final List<String> inputTopics = options.valuesOf(inputTopicsOption); final String groupId = options.valueOf(applicationIdOption); - if (inputAndInternalTopicPartitions.size() > 0) { + if (inputTopicPartitions.size() > 0) { if (!dryRun) { - client.seekToBeginning(inputAndInternalTopicPartitions); + client.seekToBeginning(inputTopicPartitions); } else { System.out.println("Following input topics offsets will be reset to beginning (for consumer group " + groupId + ")"); for (final String topic : inputTopics) { @@ -315,10 +307,6 @@ public class StreamsResetter { System.out.println("Topic: " + topic); } } - System.out.println("Following internal topics offsets will be reset to beginning (for consumer group " + groupId + ")"); - for (final String topic : internalTopics) { - System.out.println("Topic: " + topic); - } } } }
