Repository: kafka
Updated Branches:
  refs/heads/trunk 972b75453 -> 9fa0d52ca


KAFKA-5210: Application Reset Tool does not need to seek for internal topics

mjsax dguy guozhangwang Could you please review the changes.

Author: Bharat Viswanadham <[email protected]>

Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang

Closes #3073 from bharatviswa504/KAFKA-5210


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9fa0d52c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9fa0d52c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9fa0d52c

Branch: refs/heads/trunk
Commit: 9fa0d52cac24c69dbc907208ccb3e603cab3503b
Parents: 972b754
Author: Bharat Viswanadham <[email protected]>
Authored: Wed May 17 23:15:23 2017 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Wed May 17 23:15:23 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/tools/StreamsResetter.java | 32 ++++++--------------
 1 file changed, 10 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9fa0d52c/core/src/main/scala/kafka/tools/StreamsResetter.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index a218125..3a778ee 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -118,7 +118,7 @@ public class StreamsResetter {
             if (dryRun) {
                 System.out.println("----Dry run displays the actions which 
will be performed when running Streams Reset Tool----");
             }
-            maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets();
+            maybeResetInputAndSeekToEndIntermediateTopicOffsets();
             maybeDeleteInternalTopics(zkUtils);
 
         } catch (final Throwable e) {
@@ -173,11 +173,10 @@ public class StreamsResetter {
         }
     }
 
-    private void 
maybeResetInputAndInternalAndSeekToEndIntermediateTopicOffsets() {
+    private void maybeResetInputAndSeekToEndIntermediateTopicOffsets() {
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final List<String> intermediateTopics = 
options.valuesOf(intermediateTopicsOption);
 
-        final List<String> internalTopics = new ArrayList<>();
 
         final List<String> notFoundInputTopics = new ArrayList<>();
         final List<String> notFoundIntermediateTopics = new ArrayList<>();
@@ -191,7 +190,7 @@ public class StreamsResetter {
 
         if (!dryRun) {
             if (inputTopics.size() != 0) {
-                System.out.println("Seek-to-beginning for input topics " + 
inputTopics + " and all internal topics.");
+                System.out.println("Seek-to-beginning for input topics " + 
inputTopics);
             }
             if (intermediateTopics.size() != 0) {
                 System.out.println("Seek-to-end for intermediate topics " + 
intermediateTopics);
@@ -214,12 +213,6 @@ public class StreamsResetter {
                 topicsToSubscribe.add(topic);
             }
         }
-        for (final String topic : allTopics) {
-            if (isInternalTopic(topic)) {
-                topicsToSubscribe.add(topic);
-                internalTopics.add(topic);
-            }
-        }
 
         final Properties config = new Properties();
         config.putAll(consumerConfig);
@@ -232,13 +225,13 @@ public class StreamsResetter {
             client.poll(1);
 
             final Set<TopicPartition> partitions = client.assignment();
-            final Set<TopicPartition> inputAndInternalTopicPartitions = new 
HashSet<>();
+            final Set<TopicPartition> inputTopicPartitions = new HashSet<>();
             final Set<TopicPartition> intermediateTopicPartitions = new 
HashSet<>();
 
             for (final TopicPartition p : partitions) {
                 final String topic = p.topic();
-                if (isInputTopic(topic) || isInternalTopic(topic)) {
-                    inputAndInternalTopicPartitions.add(p);
+                if (isInputTopic(topic)) {
+                    inputTopicPartitions.add(p);
                 } else if (isIntermediateTopic(topic)) {
                     intermediateTopicPartitions.add(p);
                 } else {
@@ -246,7 +239,7 @@ public class StreamsResetter {
                 }
             }
 
-            maybeSeekToBeginning(client, inputAndInternalTopicPartitions, 
internalTopics);
+            maybeSeekToBeginning(client, inputTopicPartitions);
 
             maybeSeekToEnd(client, intermediateTopicPartitions);
 
@@ -299,15 +292,14 @@ public class StreamsResetter {
     }
 
     private void maybeSeekToBeginning(final KafkaConsumer<byte[], byte[]> 
client,
-                                      final Set<TopicPartition> 
inputAndInternalTopicPartitions,
-                                      final List<String> internalTopics) {
+                                      final Set<TopicPartition> 
inputTopicPartitions) {
 
         final List<String> inputTopics = options.valuesOf(inputTopicsOption);
         final String groupId = options.valueOf(applicationIdOption);
 
-        if (inputAndInternalTopicPartitions.size() > 0) {
+        if (inputTopicPartitions.size() > 0) {
             if (!dryRun) {
-                client.seekToBeginning(inputAndInternalTopicPartitions);
+                client.seekToBeginning(inputTopicPartitions);
             } else {
                 System.out.println("Following input topics offsets will be 
reset to beginning (for consumer group " + groupId + ")");
                 for (final String topic : inputTopics) {
@@ -315,10 +307,6 @@ public class StreamsResetter {
                         System.out.println("Topic: " + topic);
                     }
                 }
-                System.out.println("Following internal topics offsets will be 
reset to beginning (for consumer group " + groupId + ")");
-                for (final String topic : internalTopics) {
-                    System.out.println("Topic: " + topic);
-                }
             }
         }
     }

Reply via email to