This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 45226543e05 KAFKA-20550 Rename OffsetsUtils to GroupOffsetsResetter
(#22207)
45226543e05 is described below
commit 45226543e05c6891823f94eadea8238cd3fbc29f
Author: majialong <[email protected]>
AuthorDate: Mon May 11 01:34:21 2026 +0800
KAFKA-20550 Rename OffsetsUtils to GroupOffsetsResetter (#22207)
`OffsetsUtils` has state and objects, so it's not a traditional utility
class. Rename it to `GroupOffsetsResetter` to better reflect its actual
role.
Reviewers: Ken Huang <[email protected]>, PoAn Yang
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
...OffsetsUtils.java => GroupOffsetsResetter.java} | 26 ++++++------
.../tools/consumer/group/ConsumerGroupCommand.java | 48 +++++++++++-----------
.../tools/consumer/group/ShareGroupCommand.java | 24 +++++------
.../kafka/tools/streams/StreamsGroupCommand.java | 36 ++++++++--------
4 files changed, 67 insertions(+), 67 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
b/tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
similarity index 96%
rename from tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
rename to tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
index bec80b6fc25..c5f983fb0c0 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/GroupOffsetsResetter.java
@@ -58,14 +58,14 @@ import java.util.stream.Stream;
import joptsimple.OptionParser;
-public class OffsetsUtils {
- public static final Logger LOGGER =
LoggerFactory.getLogger(OffsetsUtils.class);
+public class GroupOffsetsResetter {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(GroupOffsetsResetter.class);
private static final String TOPIC_PARTITION_SEPARATOR = ":";
private final Admin adminClient;
- private final OffsetsUtilsOptions opts;
+ private final GroupOffsetsResetterOptions opts;
private final OptionParser parser;
- public OffsetsUtils(Admin adminClient, OptionParser parser,
OffsetsUtilsOptions opts) {
+ public GroupOffsetsResetter(Admin adminClient, OptionParser parser,
GroupOffsetsResetterOptions opts) {
this.adminClient = adminClient;
this.opts = opts;
this.parser = parser;
@@ -372,16 +372,16 @@ public class OffsetsUtils {
Instant now = Instant.now();
durationParsed.negated().addTo(now);
long timestamp = now.minus(durationParsed).toEpochMilli();
- Map<TopicPartition, OffsetsUtils.LogOffsetResult> logTimestampOffsets =
+ Map<TopicPartition, GroupOffsetsResetter.LogOffsetResult>
logTimestampOffsets =
getLogTimestampOffsets(partitionsToReset, timestamp);
return
partitionsToReset.stream().collect(Collectors.toMap(Function.identity(),
topicPartition -> {
- OffsetsUtils.LogOffsetResult logTimestampOffset =
logTimestampOffsets.get(topicPartition);
+ GroupOffsetsResetter.LogOffsetResult logTimestampOffset =
logTimestampOffsets.get(topicPartition);
- if (!(logTimestampOffset instanceof OffsetsUtils.LogOffset)) {
+ if (!(logTimestampOffset instanceof
GroupOffsetsResetter.LogOffset)) {
CommandLineUtils.printUsageAndExit(parser, "Error getting
offset by timestamp of topic partition: " + topicPartition);
}
- return new OffsetAndMetadata(((OffsetsUtils.LogOffset)
logTimestampOffset).value);
+ return new OffsetAndMetadata(((GroupOffsetsResetter.LogOffset)
logTimestampOffset).value);
}));
}
@@ -428,10 +428,10 @@ public class OffsetsUtils {
Map<TopicPartition, OffsetAndMetadata>
preparedOffsetsForPartitionsWithoutCommittedOffset =
getLogEndOffsets(partitionsToResetWithoutCommittedOffset)
.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> {
- if (!(e.getValue() instanceof OffsetsUtils.LogOffset)) {
+ if (!(e.getValue() instanceof
GroupOffsetsResetter.LogOffset)) {
CommandLineUtils.printUsageAndExit(parser, "Error
getting ending offset of topic partition: " + e.getKey());
}
- return new OffsetAndMetadata(((OffsetsUtils.LogOffset)
e.getValue()).value);
+ return new
OffsetAndMetadata(((GroupOffsetsResetter.LogOffset) e.getValue()).value);
}));
preparedOffsetsForPartitionsWithCommittedOffset.putAll(preparedOffsetsForPartitionsWithoutCommittedOffset);
@@ -508,7 +508,7 @@ public class OffsetsUtils {
public static class Ignore implements LogOffsetResult { }
- public static class OffsetsUtilsOptions {
+ public static class GroupOffsetsResetterOptions {
List<String> groupOpt;
List<Long> resetToOffsetOpt;
List<String> resetFromFileOpt;
@@ -517,7 +517,7 @@ public class OffsetsUtils {
Long resetShiftByOpt;
long timeoutMsOpt;
- public OffsetsUtilsOptions(
+ public GroupOffsetsResetterOptions(
List<String> groupOpt,
List<Long> resetToOffsetOpt,
List<String> resetFromFileOpt,
@@ -535,7 +535,7 @@ public class OffsetsUtils {
this.timeoutMsOpt = timeoutMsOpt;
}
- public OffsetsUtilsOptions(
+ public GroupOffsetsResetterOptions(
List<String> groupOpt,
List<String> resetToDatetimeOpt,
long timeoutMsOpt) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index cfdef8f7518..895ce1eaba1 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -44,7 +44,7 @@ import
org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
-import org.apache.kafka.tools.OffsetsUtils;
+import org.apache.kafka.tools.GroupOffsetsResetter;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectWriter;
@@ -131,7 +131,7 @@ public class ConsumerGroupCommand {
String exported =
consumerGroupService.exportOffsetsToCsv(offsetsToReset);
System.out.println(exported);
} else
- OffsetsUtils.printOffsetsToReset(offsetsToReset);
+ GroupOffsetsResetter.printOffsetsToReset(offsetsToReset);
} else if (opts.options.has(opts.deleteOffsetsOpt)) {
consumerGroupService.deleteOffsets();
}
@@ -182,7 +182,7 @@ public class ConsumerGroupCommand {
final ConsumerGroupCommandOptions opts;
final Map<String, String> configOverrides;
private final Admin adminClient;
- private final OffsetsUtils offsetsUtils;
+ private final GroupOffsetsResetter groupOffsetsResetter;
ConsumerGroupService(ConsumerGroupCommandOptions opts, Map<String,
String> configOverrides) {
this.opts = opts;
@@ -192,12 +192,12 @@ public class ConsumerGroupCommand {
} catch (IOException e) {
throw new RuntimeException(e);
}
- this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser,
getOffsetsUtilsOptions(opts));
+ this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient,
opts.parser, getGroupOffsetsResetterOptions(opts));
}
- private OffsetsUtils.OffsetsUtilsOptions
getOffsetsUtilsOptions(ConsumerGroupCommandOptions opts) {
+ private GroupOffsetsResetter.GroupOffsetsResetterOptions
getGroupOffsetsResetterOptions(ConsumerGroupCommandOptions opts) {
return
- new
OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
+ new
GroupOffsetsResetter.GroupOffsetsResetterOptions(opts.options.valuesOf(opts.groupOpt),
opts.options.valuesOf(opts.resetToOffsetOpt),
opts.options.valuesOf(opts.resetFromFileOpt),
opts.options.valuesOf(opts.resetToDatetimeOpt),
@@ -598,19 +598,19 @@ public class ConsumerGroupCommand {
consumerIdOpt, hostOpt, clientIdOpt, logEndOffsetOpt,
leaderEpoch);
};
- List<TopicPartition> topicPartitionsWithoutLeader =
offsetsUtils.filterNoneLeaderPartitions(topicPartitions);
+ List<TopicPartition> topicPartitionsWithoutLeader =
groupOffsetsResetter.filterNoneLeaderPartitions(topicPartitions);
List<TopicPartition> topicPartitionsWithLeader =
topicPartitions.stream().filter(tp ->
!topicPartitionsWithoutLeader.contains(tp)).toList();
// prepare data for partitions with leaders
- List<PartitionAssignmentState> existLeaderAssignments =
offsetsUtils.getLogEndOffsets(topicPartitionsWithLeader).entrySet().stream().map(logEndOffsetResult
-> {
- if (logEndOffsetResult.getValue() instanceof
OffsetsUtils.LogOffset)
+ List<PartitionAssignmentState> existLeaderAssignments =
groupOffsetsResetter.getLogEndOffsets(topicPartitionsWithLeader).entrySet().stream().map(logEndOffsetResult
-> {
+ if (logEndOffsetResult.getValue() instanceof
GroupOffsetsResetter.LogOffset)
return getDescribePartitionResult.apply(
logEndOffsetResult.getKey(),
- Optional.of(((OffsetsUtils.LogOffset)
logEndOffsetResult.getValue()).value())
+ Optional.of(((GroupOffsetsResetter.LogOffset)
logEndOffsetResult.getValue()).value())
);
- else if (logEndOffsetResult.getValue() instanceof
OffsetsUtils.Unknown)
+ else if (logEndOffsetResult.getValue() instanceof
GroupOffsetsResetter.Unknown)
return
getDescribePartitionResult.apply(logEndOffsetResult.getKey(), Optional.empty());
- else if (logEndOffsetResult.getValue() instanceof
OffsetsUtils.Ignore)
+ else if (logEndOffsetResult.getValue() instanceof
GroupOffsetsResetter.Ignore)
return null;
throw new IllegalStateException("Unknown LogOffset subclass: "
+ logEndOffsetResult.getValue());
@@ -706,7 +706,7 @@ public class ConsumerGroupCommand {
topicWithoutPartitions.add(topic);
}
- List<TopicPartition> knownPartitions =
topicWithPartitions.stream().flatMap(offsetsUtils::parseTopicsWithPartitions).toList();
+ List<TopicPartition> knownPartitions =
topicWithPartitions.stream().flatMap(groupOffsetsResetter::parseTopicsWithPartitions).toList();
// Get the partitions of topics that the user did not explicitly
specify the partitions
DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(
@@ -947,7 +947,7 @@ public class ConsumerGroupCommand {
return getCommittedOffsets(groupId).keySet();
} else if (opts.options.has(opts.topicOpt)) {
List<String> topics = opts.options.valuesOf(opts.topicOpt);
- return offsetsUtils.parseTopicPartitionsToReset(topics);
+ return
groupOffsetsResetter.parseTopicPartitionsToReset(topics);
} else {
if (!opts.options.has(opts.resetFromFileOpt))
CommandLineUtils.printUsageAndExit(opts.parser, "One of
the reset scopes should be defined: --all-topics, --topic.");
@@ -969,26 +969,26 @@ public class ConsumerGroupCommand {
private Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(String groupId, Collection<TopicPartition>
partitionsToReset) {
// ensure all partitions are valid, otherwise throw a runtime
exception
- offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
+
groupOffsetsResetter.checkAllTopicPartitionsValid(partitionsToReset);
if (opts.options.has(opts.resetToOffsetOpt)) {
- return offsetsUtils.resetToOffset(partitionsToReset);
+ return groupOffsetsResetter.resetToOffset(partitionsToReset);
} else if (opts.options.has(opts.resetToEarliestOpt)) {
- return offsetsUtils.resetToEarliest(partitionsToReset);
+ return groupOffsetsResetter.resetToEarliest(partitionsToReset);
} else if (opts.options.has(opts.resetToLatestOpt)) {
- return offsetsUtils.resetToLatest(partitionsToReset);
+ return groupOffsetsResetter.resetToLatest(partitionsToReset);
} else if (opts.options.has(opts.resetShiftByOpt)) {
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets
= getCommittedOffsets(groupId);
- return offsetsUtils.resetByShiftBy(partitionsToReset,
currentCommittedOffsets);
+ return groupOffsetsResetter.resetByShiftBy(partitionsToReset,
currentCommittedOffsets);
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
- return offsetsUtils.resetToDateTime(partitionsToReset);
+ return groupOffsetsResetter.resetToDateTime(partitionsToReset);
} else if (opts.options.has(opts.resetByDurationOpt)) {
- return offsetsUtils.resetByDuration(partitionsToReset);
- } else if (offsetsUtils.resetPlanFromFile().isPresent()) {
- return offsetsUtils.resetFromFile(groupId);
+ return groupOffsetsResetter.resetByDuration(partitionsToReset);
+ } else if (groupOffsetsResetter.resetPlanFromFile().isPresent()) {
+ return groupOffsetsResetter.resetFromFile(groupId);
} else if (opts.options.has(opts.resetToCurrentOpt)) {
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets
= getCommittedOffsets(groupId);
- return offsetsUtils.resetToCurrent(partitionsToReset,
currentCommittedOffsets);
+ return groupOffsetsResetter.resetToCurrent(partitionsToReset,
currentCommittedOffsets);
}
CommandLineUtils.printUsageAndExit(opts.parser,
String.format("Option '%s' requires one of the following scenarios: %s",
opts.resetOffsetsOpt, opts.allResetOffsetScenarioOpts));
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index d39eb9396e5..aa02dd22d3b 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -44,7 +44,7 @@ import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
-import org.apache.kafka.tools.OffsetsUtils;
+import org.apache.kafka.tools.GroupOffsetsResetter;
import java.io.IOException;
import java.util.AbstractMap.SimpleImmutableEntry;
@@ -129,7 +129,7 @@ public class ShareGroupCommand {
static class ShareGroupService implements AutoCloseable {
final ShareGroupCommandOptions opts;
private final Admin adminClient;
- private final OffsetsUtils offsetsUtils;
+ private final GroupOffsetsResetter groupOffsetsResetter;
public ShareGroupService(ShareGroupCommandOptions opts, Map<String,
String> configOverrides) {
this.opts = opts;
@@ -138,18 +138,18 @@ public class ShareGroupCommand {
} catch (IOException e) {
throw new RuntimeException(e);
}
- this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser,
getOffsetsUtilsOptions(opts));
+ this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient,
opts.parser, getGroupOffsetsResetterOptions(opts));
}
public ShareGroupService(ShareGroupCommandOptions opts, Admin
adminClient) {
this.opts = opts;
this.adminClient = adminClient;
- this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser,
getOffsetsUtilsOptions(opts));
+ this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient,
opts.parser, getGroupOffsetsResetterOptions(opts));
}
- private OffsetsUtils.OffsetsUtilsOptions
getOffsetsUtilsOptions(ShareGroupCommandOptions opts) {
+ private GroupOffsetsResetter.GroupOffsetsResetterOptions
getGroupOffsetsResetterOptions(ShareGroupCommandOptions opts) {
return
- new
OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
+ new
GroupOffsetsResetter.GroupOffsetsResetterOptions(opts.options.valuesOf(opts.groupOpt),
opts.options.valuesOf(opts.resetToDatetimeOpt),
opts.options.valueOf(opts.timeoutMsOpt));
}
@@ -418,7 +418,7 @@ public class ShareGroupCommand {
withTimeoutMs(new AlterShareGroupOffsetsOptions())
).all().get();
}
- OffsetsUtils.printOffsetsToReset(Map.of(groupId,
offsetsToReset));
+ GroupOffsetsResetter.printOffsetsToReset(Map.of(groupId,
offsetsToReset));
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
} catch (ExecutionException ee) {
@@ -435,7 +435,7 @@ public class ShareGroupCommand {
Collection<TopicPartition> partitionsToReset;
if (opts.options.has(opts.topicOpt)) {
- partitionsToReset =
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
+ partitionsToReset =
groupOffsetsResetter.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
} else {
Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
Map<TopicPartition, SharePartitionOffsetInfo>
offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
@@ -449,13 +449,13 @@ public class ShareGroupCommand {
}
private Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(Collection<TopicPartition> partitionsToReset) {
- offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
+
groupOffsetsResetter.checkAllTopicPartitionsValid(partitionsToReset);
if (opts.options.has(opts.resetToEarliestOpt)) {
- return offsetsUtils.resetToEarliest(partitionsToReset);
+ return groupOffsetsResetter.resetToEarliest(partitionsToReset);
} else if (opts.options.has(opts.resetToLatestOpt)) {
- return offsetsUtils.resetToLatest(partitionsToReset);
+ return groupOffsetsResetter.resetToLatest(partitionsToReset);
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
- return offsetsUtils.resetToDateTime(partitionsToReset);
+ return groupOffsetsResetter.resetToDateTime(partitionsToReset);
}
CommandLineUtils
.printUsageAndExit(opts.parser, String.format("Option '%s'
requires one of the following scenarios: %s", opts.resetOffsetsOpt,
opts.allResetOffsetScenarioOpts));
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index c9d6597a52b..8e40d81228d 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -56,7 +56,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.internals.Exit;
import org.apache.kafka.server.util.CommandLineUtils;
-import org.apache.kafka.tools.OffsetsUtils;
+import org.apache.kafka.tools.GroupOffsetsResetter;
import org.apache.kafka.tools.consumer.group.CsvUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -187,7 +187,7 @@ public class StreamsGroupCommand {
static class StreamsGroupService implements AutoCloseable {
final StreamsGroupCommandOptions opts;
private final Admin adminClient;
- private final OffsetsUtils offsetsUtils;
+ private final GroupOffsetsResetter groupOffsetsResetter;
public StreamsGroupService(StreamsGroupCommandOptions opts,
Map<String, String> configOverrides) {
this.opts = opts;
@@ -196,18 +196,18 @@ public class StreamsGroupCommand {
} catch (IOException e) {
throw new RuntimeException(e);
}
- this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser,
getOffsetsUtilsOptions(opts));
+ this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient,
opts.parser, getGroupOffsetsResetterOptions(opts));
}
public StreamsGroupService(StreamsGroupCommandOptions opts, Admin
adminClient) {
this.opts = opts;
this.adminClient = adminClient;
- this.offsetsUtils = new OffsetsUtils(adminClient, opts.parser,
getOffsetsUtilsOptions(opts));
+ this.groupOffsetsResetter = new GroupOffsetsResetter(adminClient,
opts.parser, getGroupOffsetsResetterOptions(opts));
}
- private OffsetsUtils.OffsetsUtilsOptions
getOffsetsUtilsOptions(StreamsGroupCommandOptions opts) {
+ private GroupOffsetsResetter.GroupOffsetsResetterOptions
getGroupOffsetsResetterOptions(StreamsGroupCommandOptions opts) {
return
- new
OffsetsUtils.OffsetsUtilsOptions(opts.options.valuesOf(opts.groupOpt),
+ new
GroupOffsetsResetter.GroupOffsetsResetterOptions(opts.options.valuesOf(opts.groupOpt),
opts.options.valuesOf(opts.resetToOffsetOpt),
opts.options.valuesOf(opts.resetFromFileOpt),
opts.options.valuesOf(opts.resetToDatetimeOpt),
@@ -604,7 +604,7 @@ public class StreamsGroupCommand {
topicWithoutPartitions.add(topic);
}
- List<TopicPartition> specifiedPartitions =
topicWithPartitions.stream().flatMap(offsetsUtils::parseTopicsWithPartitions).toList();
+ List<TopicPartition> specifiedPartitions =
topicWithPartitions.stream().flatMap(groupOffsetsResetter::parseTopicsWithPartitions).toList();
// Get the partitions of topics that the user did not explicitly
specify the partitions
DescribeTopicsResult describeTopicsResult =
adminClient.describeTopics(
@@ -951,8 +951,8 @@ public class StreamsGroupCommand {
} else if (opts.options.has(opts.inputTopicOpt)) {
List<String> topics =
opts.options.valuesOf(opts.inputTopicOpt);
- List<TopicPartition> partitions =
offsetsUtils.parseTopicPartitionsToReset(topics);
- offsetsUtils.checkAllTopicPartitionsValid(partitions);
+ List<TopicPartition> partitions =
groupOffsetsResetter.parseTopicPartitionsToReset(topics);
+ groupOffsetsResetter.checkAllTopicPartitionsValid(partitions);
// if the user specified topics that do not belong to this
group, we filter them out
partitions = filterExistingGroupTopics(groupId, partitions);
return partitions;
@@ -966,23 +966,23 @@ public class StreamsGroupCommand {
private Map<TopicPartition, OffsetAndMetadata>
prepareOffsetsToReset(String groupId, Collection<TopicPartition>
partitionsToReset) {
if (opts.options.has(opts.resetToOffsetOpt)) {
- return offsetsUtils.resetToOffset(partitionsToReset);
+ return groupOffsetsResetter.resetToOffset(partitionsToReset);
} else if (opts.options.has(opts.resetToEarliestOpt)) {
- return offsetsUtils.resetToEarliest(partitionsToReset);
+ return groupOffsetsResetter.resetToEarliest(partitionsToReset);
} else if (opts.options.has(opts.resetToLatestOpt)) {
- return offsetsUtils.resetToLatest(partitionsToReset);
+ return groupOffsetsResetter.resetToLatest(partitionsToReset);
} else if (opts.options.has(opts.resetShiftByOpt)) {
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets
= getCommittedOffsets(groupId);
- return offsetsUtils.resetByShiftBy(partitionsToReset,
currentCommittedOffsets);
+ return groupOffsetsResetter.resetByShiftBy(partitionsToReset,
currentCommittedOffsets);
} else if (opts.options.has(opts.resetToDatetimeOpt)) {
- return offsetsUtils.resetToDateTime(partitionsToReset);
+ return groupOffsetsResetter.resetToDateTime(partitionsToReset);
} else if (opts.options.has(opts.resetByDurationOpt)) {
- return offsetsUtils.resetByDuration(partitionsToReset);
- } else if (offsetsUtils.resetPlanFromFile().isPresent()) {
- return offsetsUtils.resetFromFile(groupId);
+ return groupOffsetsResetter.resetByDuration(partitionsToReset);
+ } else if (groupOffsetsResetter.resetPlanFromFile().isPresent()) {
+ return groupOffsetsResetter.resetFromFile(groupId);
} else if (opts.options.has(opts.resetToCurrentOpt)) {
Map<TopicPartition, OffsetAndMetadata> currentCommittedOffsets
= getCommittedOffsets(groupId);
- return offsetsUtils.resetToCurrent(partitionsToReset,
currentCommittedOffsets);
+ return groupOffsetsResetter.resetToCurrent(partitionsToReset,
currentCommittedOffsets);
}
CommandLineUtils