This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 45226543e05 KAFKA-20550 Rename OffsetsUtils to GroupOffsetsResetter 
(#22207)
45226543e05 is described below

commit 45226543e05c6891823f94eadea8238cd3fbc29f
Author: majialong <[email protected]>
AuthorDate: Mon May 11 01:34:21 2026 +0800

    KAFKA-20550 Rename OffsetsUtils to GroupOffsetsResetter (#22207)
    
    `OffsetsUtils` has state and objects, so it's not a traditional utility
    class. Rename it to `GroupOffsetsResetter` to better reflect its actual
    role.
    
    Reviewers: Ken Huang <[email protected]>, PoAn Yang
    <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 ...OffsetsUtils.java => GroupOffsetsResetter.java} | 26 ++++++------
 .../tools/consumer/group/ConsumerGroupCommand.java | 48 +++++++++++-----------
 .../tools/consumer/group/ShareGroupCommand.java    | 24 +++++------
 .../kafka/tools/streams/StreamsGroupCommand.java   | 36 ++++++++--------
 4 files changed, 67 insertions(+), 67 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java 
b/tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
similarity index 96%
rename from tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
rename to tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
index bec80b6fc25..c5f983fb0c0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
@@ -58,14 +58,14 @@ import java.util.stream.Stream;
 import joptsimple.OptionParser;
 
 
-public class OffsetsUtils {
-    public static final Logger LOGGER = 
LoggerFactory.getLogger(OffsetsUtils.class);
+public class GroupOffsetsResetter {
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(GroupOffsetsResetter.class);
     private static final String TOPIC_PARTITION_SEPARATOR = ":";
     private final Admin adminClient;
-    private final OffsetsUtilsOptions opts;
+    private final GroupOffsetsResetterOptions opts;
     private final OptionParser parser;
 
-    public OffsetsUtils(Admin adminClient, OptionParser parser, 
OffsetsUtilsOptions opts) {
+    public GroupOffsetsResetter(Admin adminClient, OptionParser parser, 
GroupOffsetsResetterOptions opts) {
         this.adminClient = adminClient;
         this.opts = opts;
         this.parser = parser;
@@ -372,16 +372,16 @@ public class OffsetsUtils {
         Instant now = Instant.now();
         durationParsed.negated().addTo(now);
         long timestamp = now.minus(durationParsed).toEpochMilli();
-        Map<TopicPartition, OffsetsUtils.LogOffsetResult> logTimestampOffsets =
+        Map<TopicPartition, GroupOffsetsResetter.LogOffsetResult> 
logTimestampOffsets =
             getLogTimestampOffsets(partitionsToReset, timestamp);
         return 
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(), 
topicPartition -> {
-            OffsetsUtils.LogOffsetResult logTimestampOffset = 
logTimestampOffsets.get(topicPartition);
+            GroupOffsetsResetter.LogOffsetResult logTimestampOffset = 
logTimestampOffsets.get(topicPartition);
 
-            if (!(logTimestampOffset instanceof OffsetsUtils.LogOffset)) {
+            if (!(logTimestampOffset instanceof 
GroupOffsetsResetter.LogOffset)) {
                 CommandLineUtils.printUsageAndExit(parser, "Error getting 
offset by timestamp of topic partition: " + topicPartition);
             }
 
-            return new OffsetAndMetadata(((OffsetsUtils.LogOffset) 
logTimestampOffset).value);
+            return new OffsetAndMetadata(((GroupOffsetsResetter.LogOffset) 
logTimestampOffset).value);
         }));
     }
 
@@ -428,10 +428,10 @@ public class OffsetsUtils {
         Map<TopicPartition, OffsetAndMetadata> 
preparedOffsetsForPartitionsWithoutCommittedOffset =
             getLogEndOffsets(partitionsToResetWithoutCommittedOffset)
                 
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
-                    if (!(e.getValue() instanceof OffsetsUtils.LogOffset)) {
+                    if (!(e.getValue() instanceof 
GroupOffsetsResetter.LogOffset)) {
                         CommandLineUtils.printUsageAndExit(parser, "Error 
getting ending offset of topic partition: " + e.getKey());
                     }
-                    return new OffsetAndMetadata(((OffsetsUtils.LogOffset) 
e.getValue()).value);
+                    return new 
OffsetAndMetadata(((GroupOffsetsResetter.LogOffset) e.getValue()).value);
                 }));
 
         
preparedOffsetsForPartitionsWithCommittedOffset.putAll(preparedOffsetsForPartitionsWithoutCommittedOffset);
@@ -508,7 +508,7 @@ public class OffsetsUtils {
     public static class Ignore implements LogOffsetResult { }
 
 
-    public static class OffsetsUtilsOptions {
+    public static class GroupOffsetsResetterOptions {
         List<String> groupOpt;
         List<Long> resetToOffsetOpt;
         List<String> resetFromFileOpt;
@@ -517,7 +517,7 @@ public class OffsetsUtils {
         Long resetShiftByOpt;
         long timeoutMsOpt;
 
-        public OffsetsUtilsOptions(
+        public GroupOffsetsResetterOptions(
             List<String> groupOpt,
             List<Long> resetToOffsetOpt,
             List<String> resetFromFileOpt,
@@ -535,7 +535,7 @@ public class OffsetsUtils {
             this.timeoutMsOpt = timeoutMsOpt;
         }
 
-        public OffsetsUtilsOptions(
+        public GroupOffsetsResetterOptions(
             List<String> groupOpt,
             List<String> resetToDatetimeOpt,
             long timeoutMsOpt) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index cfdef8f7518..895ce1eaba1 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -44,7 +44,7 @@ import 
org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
-import org.apache.kafka.tools.OffsetsUtils;
+import org.apache.kafka.tools.GroupOffsetsResetter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectWriter;
@@ -131,7 +131,7 @@ public class ConsumerGroupCommand {
                     String exported = 
consumerGroupService.exportOffsetsToCsv(offsetsToReset);
                     System.out.println(exported);
                 } else
-                    OffsetsUtils.printOffsetsToReset(offsetsToReset);
+                    GroupOffsetsResetter.printOffsetsToReset(offsetsToReset);
             } else if (opts.options.has(opts.deleteOffsetsOpt)) {
                 consumerGroupService.deleteOffsets();
             }
@@ -182,7 +182,7 @@ public class ConsumerGroupCommand {
         final ConsumerGroupCommandOptions opts;
         final Map<String, String> configOverrides;
         private final Admin adminClient;
-        private final OffsetsUtils offsetsUtils;
+        private final GroupOffsetsResetter groupOffsetsResetter;
 
         ConsumerGroupService(ConsumerGroupCommandOptions opts, Map<String, 
String> configOverrides) {
             this.opts = opts;
@@ -192,12 +192,12 @@ public class ConsumerGroupCommand {
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-            this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser, 
getOffsetsUtilsOptions(opts));
+            this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient, 
opts.parser, getGroupOffsetsResetterOptions(opts));
         }
 
-        private OffsetsUtils.OffsetsUtilsOptions 
getOffsetsUtilsOptions(ConsumerGroupCommandOptions opts) {
+        private GroupOffsetsResetter.GroupOffsetsResetterOptions 
getGroupOffsetsResetterOptions(ConsumerGroupCommandOptions opts) {
             return
-                new 
OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
+                new 
GroupOffsetsResetter.GroupOffsetsResetterOptions(opts.options.valuesOf(opts.groupOpt),
                     opts.options.valuesOf(opts.resetToOffsetOpt),
                     opts.options.valuesOf(opts.resetFromFileOpt),
                     opts.options.valuesOf(opts.resetToDatetimeOpt),
@@ -598,19 +598,19 @@ public class ConsumerGroupCommand {
                     consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt, 
leaderEpoch);
             };
 
-            List<TopicPartition> topicPartitionsWithoutLeader = 
offsetsUtils.filterNoneLeaderPartitions(topicPartitions);
+            List<TopicPartition> topicPartitionsWithoutLeader = 
groupOffsetsResetter.filterNoneLeaderPartitions(topicPartitions);
             List<TopicPartition> topicPartitionsWithLeader = 
topicPartitions.stream().filter(tp -> 
!topicPartitionsWithoutLeader.contains(tp)).toList();
 
             // prepare data for partitions with leaders
-            List<PartitionAssignmentState> existLeaderAssignments = 
offsetsUtils.getLogEndOffsets(topicPartitionsWithLeader).entrySet().stream().map(logEndOffsetResult
 -> {
-                if (logEndOffsetResult.getValue() instanceof 
OffsetsUtils.LogOffset)
+            List<PartitionAssignmentState> existLeaderAssignments = 
groupOffsetsResetter.getLogEndOffsets(topicPartitionsWithLeader).entrySet().stream().map(logEndOffsetResult
 -> {
+                if (logEndOffsetResult.getValue() instanceof 
GroupOffsetsResetter.LogOffset)
                     return getDescribePartitionResult.apply(
                         logEndOffsetResult.getKey(),
-                        Optional.of(((OffsetsUtils.LogOffset) 
logEndOffsetResult.getValue()).value())
+                        Optional.of(((GroupOffsetsResetter.LogOffset) 
logEndOffsetResult.getValue()).value())
                     );
-                else if (logEndOffsetResult.getValue() instanceof 
OffsetsUtils.Unknown)
+                else if (logEndOffsetResult.getValue() instanceof 
GroupOffsetsResetter.Unknown)
                     return 
getDescribePartitionResult.apply(logEndOffsetResult.getKey(), Optional.empty());
-                else if (logEndOffsetResult.getValue() instanceof 
OffsetsUtils.Ignore)
+                else if (logEndOffsetResult.getValue() instanceof 
GroupOffsetsResetter.Ignore)
                     return null;
 
                 throw new IllegalStateException("Unknown LogOffset subclass: " 
+ logEndOffsetResult.getValue());
@@ -706,7 +706,7 @@ public class ConsumerGroupCommand {
                     topicWithoutPartitions.add(topic);
             }
 
-            List<TopicPartition> knownPartitions = 
topicWithPartitions.stream().flatMap(offsetsUtils::parseTopicsWithPartitions).toList();
+            List<TopicPartition> knownPartitions = 
topicWithPartitions.stream().flatMap(groupOffsetsResetter::parseTopicsWithPartitions).toList();
 
             // Get the partitions of topics that the user did not explicitly 
specify the partitions
             DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(
@@ -947,7 +947,7 @@ public class ConsumerGroupCommand {
                 return getCommittedOffsets(groupId).keySet();
             } else if (opts.options.has(opts.topicOpt)) {
                 List<String> topics = opts.options.valuesOf(opts.topicOpt);
-                return offsetsUtils.parseTopicPartitionsToReset(topics);
+                return 
groupOffsetsResetter.parseTopicPartitionsToReset(topics);
             } else {
                 if (!opts.options.has(opts.resetFromFileOpt))
                     CommandLineUtils.printUsageAndExit(opts.parser, "One of 
the reset scopes should be defined: --all-topics, --topic.");
@@ -969,26 +969,26 @@ public class ConsumerGroupCommand {
 
         private Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId, Collection<TopicPartition> 
partitionsToReset) {
             // ensure all partitions are valid, otherwise throw a runtime 
exception
-            offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
+            
groupOffsetsResetter.checkAllTopicPartitionsValid(partitionsToReset);
 
             if (opts.options.has(opts.resetToOffsetOpt)) {
-                return offsetsUtils.resetToOffset(partitionsToReset);
+                return groupOffsetsResetter.resetToOffset(partitionsToReset);
             } else if (opts.options.has(opts.resetToEarliestOpt)) {
-                return offsetsUtils.resetToEarliest(partitionsToReset);
+                return groupOffsetsResetter.resetToEarliest(partitionsToReset);
             } else if (opts.options.has(opts.resetToLatestOpt)) {
-                return offsetsUtils.resetToLatest(partitionsToReset);
+                return groupOffsetsResetter.resetToLatest(partitionsToReset);
             } else if (opts.options.has(opts.resetShiftByOpt)) {
                 Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets 
= getCommittedOffsets(groupId);
-                return offsetsUtils.resetByShiftBy(partitionsToReset, 
currentCommittedOffsets);
+                return groupOffsetsResetter.resetByShiftBy(partitionsToReset, 
currentCommittedOffsets);
             } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-                return offsetsUtils.resetToDateTime(partitionsToReset);
+                return groupOffsetsResetter.resetToDateTime(partitionsToReset);
             } else if (opts.options.has(opts.resetByDurationOpt)) {
-                return offsetsUtils.resetByDuration(partitionsToReset);
-            } else if (offsetsUtils.resetPlanFromFile().isPresent()) {
-                return offsetsUtils.resetFromFile(groupId);
+                return groupOffsetsResetter.resetByDuration(partitionsToReset);
+            } else if (groupOffsetsResetter.resetPlanFromFile().isPresent()) {
+                return groupOffsetsResetter.resetFromFile(groupId);
             } else if (opts.options.has(opts.resetToCurrentOpt)) {
                 Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets 
= getCommittedOffsets(groupId);
-                return offsetsUtils.resetToCurrent(partitionsToReset, 
currentCommittedOffsets);
+                return groupOffsetsResetter.resetToCurrent(partitionsToReset, 
currentCommittedOffsets);
             }
 
             CommandLineUtils.printUsageAndExit(opts.parser, 
String.format("Option '%s' requires one of the following scenarios: %s", 
opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts));
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index d39eb9396e5..aa02dd22d3b 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -44,7 +44,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
-import org.apache.kafka.tools.OffsetsUtils;
+import org.apache.kafka.tools.GroupOffsetsResetter;
 
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
@@ -129,7 +129,7 @@ public class ShareGroupCommand {
     static class ShareGroupService implements AutoCloseable {
         final ShareGroupCommandOptions opts;
         private final Admin adminClient;
-        private final OffsetsUtils offsetsUtils;
+        private final GroupOffsetsResetter groupOffsetsResetter;
 
         public ShareGroupService(ShareGroupCommandOptions opts, Map<String, 
String> configOverrides) {
             this.opts = opts;
@@ -138,18 +138,18 @@ public class ShareGroupCommand {
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-            this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser, 
getOffsetsUtilsOptions(opts));
+            this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient, 
opts.parser, getGroupOffsetsResetterOptions(opts));
         }
 
         public ShareGroupService(ShareGroupCommandOptions opts, Admin 
adminClient) {
             this.opts = opts;
             this.adminClient = adminClient;
-            this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser, 
getOffsetsUtilsOptions(opts));
+            this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient, 
opts.parser, getGroupOffsetsResetterOptions(opts));
         }
 
-        private OffsetsUtils.OffsetsUtilsOptions 
getOffsetsUtilsOptions(ShareGroupCommandOptions opts) {
+        private GroupOffsetsResetter.GroupOffsetsResetterOptions 
getGroupOffsetsResetterOptions(ShareGroupCommandOptions opts) {
             return
-                new 
OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
+                new 
GroupOffsetsResetter.GroupOffsetsResetterOptions(opts.options.valuesOf(opts.groupOpt),
                     opts.options.valuesOf(opts.resetToDatetimeOpt),
                     opts.options.valueOf(opts.timeoutMsOpt));
         }
@@ -418,7 +418,7 @@ public class ShareGroupCommand {
                         withTimeoutMs(new AlterShareGroupOffsetsOptions())
                     ).all().get();
                 }
-                OffsetsUtils.printOffsetsToReset(Map.of(groupId, 
offsetsToReset));
+                GroupOffsetsResetter.printOffsetsToReset(Map.of(groupId, 
offsetsToReset));
             } catch (InterruptedException ie) {
                 throw new RuntimeException(ie);
             } catch (ExecutionException ee) {
@@ -435,7 +435,7 @@ public class ShareGroupCommand {
             Collection<TopicPartition> partitionsToReset;
 
             if (opts.options.has(opts.topicOpt)) {
-                partitionsToReset = 
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
+                partitionsToReset = 
groupOffsetsResetter.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
             } else {
                 Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
                 Map<TopicPartition, SharePartitionOffsetInfo> 
offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
@@ -449,13 +449,13 @@ public class ShareGroupCommand {
         }
 
         private Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(Collection<TopicPartition> partitionsToReset) {
-            offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
+            
groupOffsetsResetter.checkAllTopicPartitionsValid(partitionsToReset);
             if (opts.options.has(opts.resetToEarliestOpt)) {
-                return offsetsUtils.resetToEarliest(partitionsToReset);
+                return groupOffsetsResetter.resetToEarliest(partitionsToReset);
             } else if (opts.options.has(opts.resetToLatestOpt)) {
-                return offsetsUtils.resetToLatest(partitionsToReset);
+                return groupOffsetsResetter.resetToLatest(partitionsToReset);
             } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-                return offsetsUtils.resetToDateTime(partitionsToReset);
+                return groupOffsetsResetter.resetToDateTime(partitionsToReset);
             }
             CommandLineUtils
                 .printUsageAndExit(opts.parser, String.format("Option '%s' 
requires one of the following scenarios: %s", opts.resetOffsetsOpt, 
opts.allResetOffsetScenarioOpts));
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index c9d6597a52b..8e40d81228d 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -56,7 +56,7 @@ import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.utils.internals.Exit;
 import org.apache.kafka.server.util.CommandLineUtils;
-import org.apache.kafka.tools.OffsetsUtils;
+import org.apache.kafka.tools.GroupOffsetsResetter;
 import org.apache.kafka.tools.consumer.group.CsvUtils;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
@@ -187,7 +187,7 @@ public class StreamsGroupCommand {
     static class StreamsGroupService implements AutoCloseable {
         final StreamsGroupCommandOptions opts;
         private final Admin adminClient;
-        private final OffsetsUtils offsetsUtils;
+        private final GroupOffsetsResetter groupOffsetsResetter;
 
         public StreamsGroupService(StreamsGroupCommandOptions opts, 
Map<String, String> configOverrides) {
             this.opts = opts;
@@ -196,18 +196,18 @@ public class StreamsGroupCommand {
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-            this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser, 
getOffsetsUtilsOptions(opts));
+            this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient, 
opts.parser, getGroupOffsetsResetterOptions(opts));
         }
 
         public StreamsGroupService(StreamsGroupCommandOptions opts, Admin 
adminClient) {
             this.opts = opts;
             this.adminClient = adminClient;
-            this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser, 
getOffsetsUtilsOptions(opts));
+            this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient, 
opts.parser, getGroupOffsetsResetterOptions(opts));
         }
 
-        private OffsetsUtils.OffsetsUtilsOptions 
getOffsetsUtilsOptions(StreamsGroupCommandOptions opts) {
+        private GroupOffsetsResetter.GroupOffsetsResetterOptions 
getGroupOffsetsResetterOptions(StreamsGroupCommandOptions opts) {
             return
-                new 
OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
+                new 
GroupOffsetsResetter.GroupOffsetsResetterOptions(opts.options.valuesOf(opts.groupOpt),
                     opts.options.valuesOf(opts.resetToOffsetOpt),
                     opts.options.valuesOf(opts.resetFromFileOpt),
                     opts.options.valuesOf(opts.resetToDatetimeOpt),
@@ -604,7 +604,7 @@ public class StreamsGroupCommand {
                     topicWithoutPartitions.add(topic);
             }
 
-            List<TopicPartition> specifiedPartitions = 
topicWithPartitions.stream().flatMap(offsetsUtils::parseTopicsWithPartitions).toList();
+            List<TopicPartition> specifiedPartitions = 
topicWithPartitions.stream().flatMap(groupOffsetsResetter::parseTopicsWithPartitions).toList();
 
             // Get the partitions of topics that the user did not explicitly 
specify the partitions
             DescribeTopicsResult describeTopicsResult = 
adminClient.describeTopics(
@@ -951,8 +951,8 @@ public class StreamsGroupCommand {
             } else if (opts.options.has(opts.inputTopicOpt)) {
                 List<String> topics = 
opts.options.valuesOf(opts.inputTopicOpt);
 
-                List<TopicPartition> partitions = 
offsetsUtils.parseTopicPartitionsToReset(topics);
-                offsetsUtils.checkAllTopicPartitionsValid(partitions);
+                List<TopicPartition> partitions = 
groupOffsetsResetter.parseTopicPartitionsToReset(topics);
+                groupOffsetsResetter.checkAllTopicPartitionsValid(partitions);
                 // if the user specified topics that do not belong to this 
group, we filter them out
                 partitions = filterExistingGroupTopics(groupId, partitions);
                 return partitions;
@@ -966,23 +966,23 @@ public class StreamsGroupCommand {
 
         private Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId, Collection<TopicPartition> 
partitionsToReset) {
             if (opts.options.has(opts.resetToOffsetOpt)) {
-                return offsetsUtils.resetToOffset(partitionsToReset);
+                return groupOffsetsResetter.resetToOffset(partitionsToReset);
             } else if (opts.options.has(opts.resetToEarliestOpt)) {
-                return offsetsUtils.resetToEarliest(partitionsToReset);
+                return groupOffsetsResetter.resetToEarliest(partitionsToReset);
             } else if (opts.options.has(opts.resetToLatestOpt)) {
-                return offsetsUtils.resetToLatest(partitionsToReset);
+                return groupOffsetsResetter.resetToLatest(partitionsToReset);
             } else if (opts.options.has(opts.resetShiftByOpt)) {
                 Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets 
= getCommittedOffsets(groupId);
-                return offsetsUtils.resetByShiftBy(partitionsToReset, 
currentCommittedOffsets);
+                return groupOffsetsResetter.resetByShiftBy(partitionsToReset, 
currentCommittedOffsets);
             } else if (opts.options.has(opts.resetToDatetimeOpt)) {
-                return offsetsUtils.resetToDateTime(partitionsToReset);
+                return groupOffsetsResetter.resetToDateTime(partitionsToReset);
             } else if (opts.options.has(opts.resetByDurationOpt)) {
-                return offsetsUtils.resetByDuration(partitionsToReset);
-            } else if (offsetsUtils.resetPlanFromFile().isPresent()) {
-                return offsetsUtils.resetFromFile(groupId);
+                return groupOffsetsResetter.resetByDuration(partitionsToReset);
+            } else if (groupOffsetsResetter.resetPlanFromFile().isPresent()) {
+                return groupOffsetsResetter.resetFromFile(groupId);
             } else if (opts.options.has(opts.resetToCurrentOpt)) {
                 Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets 
= getCommittedOffsets(groupId);
-                return offsetsUtils.resetToCurrent(partitionsToReset, 
currentCommittedOffsets);
+                return groupOffsetsResetter.resetToCurrent(partitionsToReset, 
currentCommittedOffsets);
             }
 
             CommandLineUtils

Reply via email to