This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new dbe0fdadb8c MINOR: Consistently apply timeout in group commands
(#20764)
dbe0fdadb8c is described below
commit dbe0fdadb8c78d055d3b77bbd0f5fea6e587a657
Author: Lucas Brutschy <[email protected]>
AuthorDate: Sun Oct 26 11:52:31 2025 +0100
MINOR: Consistently apply timeout in group commands (#20764)
Group commands sometimes apply the timeout from --timeout, and sometimes
don't. This change applies the timeout in every call to adminClient.
Reviewers: Shivsundar R <[email protected]>, Andrew Schofield
<[email protected]>, Alieh Saeedi <[email protected]>
---
.../java/org/apache/kafka/tools/OffsetsUtils.java | 4 +-
.../group/ConsumerGroupCommandOptions.java | 7 --
.../tools/consumer/group/ShareGroupCommand.java | 15 ++++-
.../kafka/tools/streams/StreamsGroupCommand.java | 74 +++++++++++++---------
.../tools/streams/StreamsGroupCommandOptions.java | 10 +--
.../consumer/group/ConsumerGroupServiceTest.java | 6 +-
.../consumer/group/ShareGroupCommandTest.java | 57 ++++++++---------
.../tools/streams/StreamsGroupCommandTest.java | 60 +++++++++---------
8 files changed, 119 insertions(+), 114 deletions(-)
diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
index 269cd53875d..bec80b6fc25 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
@@ -460,7 +460,7 @@ public class OffsetsUtils {
Set<String> topics =
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
try {
- return
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+ return adminClient.describeTopics(topics, withTimeoutMs(new
DescribeTopicsOptions())).allTopicNames().get().entrySet()
.stream()
.flatMap(entry -> entry.getValue().partitions().stream()
.filter(partitionInfo -> partitionInfo.leader() == null)
@@ -476,7 +476,7 @@ public class OffsetsUtils {
// collect all topics
Set<String> topics =
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
try {
- List<TopicPartition> existPartitions =
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+ List<TopicPartition> existPartitions =
adminClient.describeTopics(topics, withTimeoutMs(new
DescribeTopicsOptions())).allTopicNames().get().entrySet()
.stream()
.flatMap(entry -> entry.getValue().partitions().stream()
.map(partitionInfo -> new TopicPartition(entry.getKey(),
partitionInfo.partition())))
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
index 30ba137cd8d..2d51d0525ac 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
@@ -19,9 +19,6 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -31,7 +28,6 @@ import joptsimple.OptionSpec;
import static org.apache.kafka.tools.ToolsUtils.minus;
public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
private static final String BOOTSTRAP_SERVER_DOC = "The server(s) to
connect to. REQUIRED for all options except for --validate-regex.";
private static final String GROUP_DOC = "The consumer group we wish to act
on.";
@@ -230,9 +226,6 @@ public class ConsumerGroupCommandOptions extends
CommandDefaultOptions {
if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
CommandLineUtils.printUsageAndExit(parser,
"Option " + describeOpt + " does not take a value for " +
stateOpt);
- } else {
- if (options.has(timeoutMsOpt))
- LOGGER.debug("Option " + timeoutMsOpt + " is applicable only
when " + describeOpt + " is used.");
}
if (options.has(deleteOpt)) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 87cf0f1e837..2c3e21fedb1 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
@@ -26,6 +27,7 @@ import
org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ShareGroupDescription;
import org.apache.kafka.clients.admin.ShareMemberAssignment;
@@ -411,7 +413,8 @@ public class ShareGroupCommand {
offsetsToReset.entrySet().stream()
.collect(Collectors.toMap(
Entry::getKey, entry ->
entry.getValue().offset()
- ))
+ )),
+ withTimeoutMs(new AlterShareGroupOffsetsOptions())
).all().get();
}
OffsetsUtils.printOffsetsToReset(Map.of(groupId,
offsetsToReset));
@@ -434,7 +437,10 @@ public class ShareGroupCommand {
partitionsToReset =
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
} else {
Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
- Map<TopicPartition, OffsetAndMetadata>
offsetsByTopicPartitions =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+ Map<TopicPartition, OffsetAndMetadata>
offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
+ groupSpecs,
+ withTimeoutMs(new ListShareGroupOffsetsOptions())
+ ).all().get().get(groupId);
partitionsToReset = offsetsByTopicPartitions.keySet();
}
@@ -488,7 +494,10 @@ public class ShareGroupCommand {
Map<String, ListShareGroupOffsetsSpec> groupSpecs =
Map.of(groupId, new ListShareGroupOffsetsSpec());
try {
- Map<TopicPartition, OffsetAndMetadata> startOffsets =
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+ Map<TopicPartition, OffsetAndMetadata> startOffsets =
adminClient.listShareGroupOffsets(
+ groupSpecs,
+ withTimeoutMs(new ListShareGroupOffsetsOptions())
+ ).all().get().get(groupId);
Set<SharePartitionOffsetInformation> partitionOffsets =
mapOffsetsToSharePartitionInformation(groupId, startOffsets);
groupOffsets.put(groupId, new
SimpleImmutableEntry<>(shareGroup, partitionOffsets));
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 897615deef8..f76a81b9c19 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -19,9 +19,11 @@ package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AbstractOptions;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
+import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
@@ -30,8 +32,11 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -279,7 +284,7 @@ public class StreamsGroupCommand {
StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(
List.of(group),
- new
DescribeStreamsGroupsOptions().timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()));
+ withTimeoutMs(new DescribeStreamsGroupsOptions()));
Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
return descriptionMap.get(group);
}
@@ -428,8 +433,14 @@ public class StreamsGroupCommand {
earliest.put(tp, OffsetSpec.earliest());
latest.put(tp, OffsetSpec.latest());
}
- Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
earliestResult = adminClient.listOffsets(earliest).all().get();
- Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
latestResult = adminClient.listOffsets(latest).all().get();
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
earliestResult = adminClient.listOffsets(
+ earliest,
+ withTimeoutMs(new ListOffsetsOptions())
+ ).all().get();
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
latestResult = adminClient.listOffsets(
+ latest,
+ withTimeoutMs(new ListOffsetsOptions())
+ ).all().get();
Map<TopicPartition, OffsetAndMetadata> committedOffsets =
getCommittedOffsets(description.groupId());
Map<TopicPartition, OffsetsInfo> output = new HashMap<>();
@@ -449,14 +460,18 @@ public class StreamsGroupCommand {
Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String
groupId) {
try {
- var sourceTopics =
adminClient.describeStreamsGroups(List.of(groupId))
- .all().get().get(groupId)
+ var sourceTopics = adminClient.describeStreamsGroups(
+ List.of(groupId),
+ withTimeoutMs(new DescribeStreamsGroupsOptions())
+ ).all().get().get(groupId)
.subtopologies().stream()
.flatMap(subtopology ->
subtopology.sourceTopics().stream())
.collect(Collectors.toSet());
- var allTopicPartitions =
adminClient.listStreamsGroupOffsets(Map.of(groupId, new
ListStreamsGroupOffsetsSpec()))
- .partitionsToOffsetAndMetadata(groupId).get();
+ var allTopicPartitions = adminClient.listStreamsGroupOffsets(
+ Map.of(groupId, new ListStreamsGroupOffsetsSpec()),
+ withTimeoutMs(new ListStreamsGroupOffsetsOptions())
+ ).partitionsToOffsetAndMetadata(groupId).get();
allTopicPartitions.keySet().removeIf(tp ->
!sourceTopics.contains(tp.topic()));
return allTopicPartitions;
@@ -467,8 +482,10 @@ public class StreamsGroupCommand {
private List<TopicPartition> filterExistingGroupTopics(String groupId,
List<TopicPartition> topicPartitions) {
try {
- var allTopicPartitions =
adminClient.listStreamsGroupOffsets(Map.of(groupId, new
ListStreamsGroupOffsetsSpec()))
- .partitionsToOffsetAndMetadata(groupId).get();
+ var allTopicPartitions = adminClient.listStreamsGroupOffsets(
+ Map.of(groupId, new ListStreamsGroupOffsetsSpec()),
+ withTimeoutMs(new ListStreamsGroupOffsetsOptions())
+ ).partitionsToOffsetAndMetadata(groupId).get();
boolean allPresent =
topicPartitions.stream().allMatch(allTopicPartitions::containsKey);
if (!allPresent) {
printError("One or more topics are not part of the group
'" + groupId + "'.", Optional.empty());
@@ -490,7 +507,8 @@ public class StreamsGroupCommand {
: opts.options.valuesOf(opts.groupOpt);
if (!groupIds.isEmpty()) {
Map<String, KafkaFuture<StreamsGroupDescription>>
streamsGroups = adminClient.describeStreamsGroups(
- groupIds
+ groupIds,
+ withTimeoutMs(new DescribeStreamsGroupsOptions())
).describedGroups();
streamsGroups.forEach((groupId, groupDescription) -> {
@@ -506,7 +524,10 @@ public class StreamsGroupCommand {
List<String> internalTopics =
getInternalTopicsToBeDeleted(groupId);
if (!internalTopics.isEmpty()) {
try {
-
adminClient.deleteTopics(internalTopics).all().get();
+ adminClient.deleteTopics(
+ internalTopics,
+ withTimeoutMs(new
DeleteTopicsOptions())
+ ).all().get();
} catch (InterruptedException |
ExecutionException e) {
if (e.getCause() instanceof
UnknownTopicOrPartitionException) {
printError("Deleting internal
topics for group '" + groupId + "' failed because the topics do not exist.",
Optional.empty());
@@ -742,7 +763,10 @@ public class StreamsGroupCommand {
if (internalTopicsToDelete != null &&
!internalTopicsToDelete.isEmpty()) {
DeleteTopicsResult deleteTopicsResult = null;
try {
- deleteTopicsResult =
adminClient.deleteTopics(internalTopicsToDelete);
+ deleteTopicsResult = adminClient.deleteTopics(
+ internalTopicsToDelete,
+ withTimeoutMs(new DeleteTopicsOptions())
+ );
deleteTopicsResult.all().get();
} catch (InterruptedException | ExecutionException e) {
if (deleteTopicsResult != null) {
@@ -820,7 +844,10 @@ public class StreamsGroupCommand {
Map<String, List<String>> retrieveInternalTopics(List<String>
groupIds) {
Map<String, List<String>> groupToInternalTopics = new HashMap<>();
try {
- Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(groupIds).all().get();
+ Map<String, StreamsGroupDescription> descriptionMap =
adminClient.describeStreamsGroups(
+ groupIds,
+ withTimeoutMs(new DescribeStreamsGroupsOptions())
+ ).all().get();
for (StreamsGroupDescription description :
descriptionMap.values()) {
List<String> sourceTopics =
description.subtopologies().stream()
@@ -848,7 +875,7 @@ public class StreamsGroupCommand {
if (e.getCause() instanceof UnsupportedVersionException) {
try {
// Retrieve internal topic list if possible, and add
the list of topic names to error message
- Set<String> allTopics =
adminClient.listTopics().names().get();
+ Set<String> allTopics =
adminClient.listTopics(withTimeoutMs(new ListTopicsOptions())).names().get();
List<String> internalTopics = allTopics.stream()
.filter(topic ->
groupIds.stream().anyMatch(groupId -> isInferredInternalTopic(topic, groupId)))
.collect(Collectors.toList());
@@ -873,7 +900,8 @@ public class StreamsGroupCommand {
if (!dryRun) {
adminClient.alterStreamsGroupOffsets(
groupId,
- preparedOffsets
+ preparedOffsets,
+ withTimeoutMs(new AlterStreamsGroupOffsetsOptions())
).all().get();
}
@@ -947,22 +975,6 @@ public class StreamsGroupCommand {
||
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
}
- List<String> collectAllTopics(String groupId) {
- try {
- return adminClient.describeStreamsGroups(List.of(groupId))
- .all().get().get(groupId)
- .subtopologies().stream()
- .flatMap(subtopology -> Stream.of(
- subtopology.sourceTopics().stream(),
- subtopology.repartitionSinkTopics().stream(),
-
subtopology.repartitionSourceTopics().keySet().stream(),
- subtopology.stateChangelogTopics().keySet().stream()
- ).flatMap(s -> s)).distinct().collect(Collectors.toList());
- } catch (InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
Collection<StreamsGroupMemberDescription> collectGroupMembers(String
groupId) throws Exception {
return getDescribeGroup(groupId).members();
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index d6715f833d2..a69e0bbe3a4 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -19,9 +19,6 @@ package org.apache.kafka.tools.streams;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -32,7 +29,6 @@ import static org.apache.kafka.tools.ToolsUtils.minus;
public class StreamsGroupCommandOptions extends CommandDefaultOptions {
private static final String NL = System.lineSeparator();
- static final Logger LOGGER =
LoggerFactory.getLogger(StreamsGroupCommandOptions.class);
private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The
server(s) to connect to.";
private static final String GROUP_DOC = "The streams group we wish to act
on.";
@@ -144,11 +140,10 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
deleteOpt = parser.accepts("delete", DELETE_DOC);
deleteOffsetsOpt = parser.accepts("delete-offsets",
DELETE_OFFSETS_DOC);
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
- .availableIf(describeOpt)
.withRequiredArg()
.describedAs("timeout (ms)")
.ofType(Long.class)
- .defaultsTo(5000L);
+ .defaultsTo(30000L);
commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
.withRequiredArg()
.describedAs("command config property file")
@@ -214,9 +209,6 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
if (options.has(describeOpt)) {
checkDescribeArgs();
- } else {
- if (options.has(timeoutMsOpt))
- LOGGER.debug("Option " + timeoutMsOpt + " is applicable only
when " + describeOpt + " is used.");
}
if (options.has(deleteOpt)) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index 236d1ce51cc..956bebaad82 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -90,7 +90,7 @@ public class ConsumerGroupServiceTest {
.thenReturn(listGroupOffsetsResult(GROUP));
when(admin.listOffsets(offsetsArgMatcher(), any()))
.thenReturn(listOffsetsResult());
- when(admin.describeTopics(ArgumentMatchers.anySet()))
+ when(admin.describeTopics(ArgumentMatchers.anySet(), any()))
.thenReturn(describeTopicsResult());
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> statesAndAssignments =
groupService.collectGroupOffsets(GROUP);
@@ -175,7 +175,7 @@ public class ConsumerGroupServiceTest {
any()
)).thenReturn(new
ListOffsetsResult(endOffsets.entrySet().stream().filter(e ->
unassignedTopicPartitions.contains(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
-
when(admin.describeTopics(ArgumentMatchers.anySet())).thenReturn(describeTopicsResult());
+ when(admin.describeTopics(ArgumentMatchers.anySet(),
any())).thenReturn(describeTopicsResult());
Entry<Optional<GroupState>,
Optional<Collection<PartitionAssignmentState>>> statesAndAssignments =
groupService.collectGroupOffsets(GROUP);
Optional<GroupState> state = statesAndAssignments.getKey();
@@ -233,7 +233,7 @@ public class ConsumerGroupServiceTest {
.thenReturn(describeGroupsResult(GroupState.DEAD));
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified),
any()))
.thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified));
- when(admin.describeTopics(anySet()))
+ when(admin.describeTopics(anySet(), any()))
.thenReturn(describeTopicsResult(TOPICS));
when(admin.listOffsets(offsetsArgMatcher(), any()))
.thenReturn(listOffsetsResult());
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 29fe151ba2b..7c7d38822b9 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.tools.consumer.group;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
@@ -31,6 +32,7 @@ import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
@@ -211,7 +213,7 @@ public class ShareGroupCommandTest {
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup,
KafkaFuture.completedFuture(exp)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
try (ShareGroupService service =
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
@@ -260,7 +262,7 @@ public class ShareGroupCommandTest {
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup,
KafkaFuture.completedFuture(exp)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
try (ShareGroupService service =
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
TestUtils.waitForCondition(() -> {
Entry<String, String> res =
ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
@@ -327,7 +329,7 @@ public class ShareGroupCommandTest {
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup,
KafkaFuture.completedFuture(exp1), secondGroup,
KafkaFuture.completedFuture(exp2)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer(
+ when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap(),
any(ListShareGroupOffsetsOptions.class))).thenAnswer(
invocation -> {
Map<String, Object> argument = invocation.getArgument(0);
if (argument.containsKey(firstGroup)) {
@@ -1082,7 +1084,7 @@ public class ShareGroupCommandTest {
new TopicPartition(topic2, 0), new OffsetAndMetadata(0L)))
)
);
-
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mockAlterShareGroupOffsets(adminClient, group);
TopicPartition tp0 = new TopicPartition(topic1, 0);
@@ -1101,25 +1103,22 @@ public class ShareGroupCommandTest {
DescribeShareGroupsResult describeShareGroupsResult =
mock(DescribeShareGroupsResult.class);
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group,
KafkaFuture.completedFuture(exp)));
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
- Map<String, TopicDescription> d1 = Map.of(
+ Map<String, TopicDescription> descriptions = Map.of(
topic1, new TopicDescription(topic1, false, List.of(
new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()),
new TopicPartitionInfo(1, Node.noNode(), List.of(), List.of()))
- ));
- Map<String, TopicDescription> d2 = Map.of(
+ ),
topic2, new TopicDescription(topic2, false, List.of(
new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
)));
- DescribeTopicsResult topicsResult1 = mock(DescribeTopicsResult.class);
- DescribeTopicsResult topicsResult2 = mock(DescribeTopicsResult.class);
- when(topicsResult1.allTopicNames()).thenReturn(completedFuture(d1));
- when(topicsResult2.allTopicNames()).thenReturn(completedFuture(d2));
- when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(topicsResult1, topicsResult2);
-
when(adminClient.describeTopics(anyCollection())).thenReturn(topicsResult1,
topicsResult2);
+ DescribeTopicsResult topicsResult = mock(DescribeTopicsResult.class);
+
when(topicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(topicsResult);
+
when(adminClient.describeTopics(anyCollection())).thenReturn(topicsResult);
try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
service.resetOffsets();
- verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
- verify(adminClient).describeTopics(anyCollection(),
any(DescribeTopicsOptions.class));
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(),
any(AlterShareGroupOffsetsOptions.class));
+ verify(adminClient, times(3)).describeTopics(anyCollection(),
any(DescribeTopicsOptions.class));
verify(alterShareGroupOffsetsResult, times(1)).all();
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
}
@@ -1149,7 +1148,7 @@ public class ShareGroupCommandTest {
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
-
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mockAlterShareGroupOffsets(adminClient, group);
Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(t1,
new OffsetAndMetadata(40L), t2, new OffsetAndMetadata(40L));
@@ -1169,7 +1168,7 @@ public class ShareGroupCommandTest {
topicPartitionOffsets.values().stream().allMatch(offsetSpec ->
offsetSpec instanceof OffsetSpec.LatestSpec);
try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
service.resetOffsets();
- verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(),
any(AlterShareGroupOffsetsOptions.class));
verify(adminClient,
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher.apply(Set.of(t1,
t2))), any());
verify(alterShareGroupOffsetsResult, times(1)).all();
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
@@ -1193,7 +1192,7 @@ public class ShareGroupCommandTest {
new TopicPartition(topic2, 0), new OffsetAndMetadata(10L),
new TopicPartition(topic3, 0), new OffsetAndMetadata(10L)))
)
);
-
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
Set<String> topics = Set.of(topic1, topic2, topic3);
when(listTopicsResult.names()).thenReturn(completedFuture(topics));
@@ -1238,7 +1237,7 @@ public class ShareGroupCommandTest {
topicPartitionOffsets.values().stream().allMatch(offsetSpec ->
offsetSpec instanceof OffsetSpec.TimestampSpec);
try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
service.resetOffsets();
- verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(),
any(AlterShareGroupOffsetsOptions.class));
verify(adminClient,
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher.apply(Set.of(tp1,
tp2, tp3, tp4))), any());
verify(alterShareGroupOffsetsResult, times(1)).all();
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
@@ -1259,7 +1258,7 @@ public class ShareGroupCommandTest {
KafkaFuture.completedFuture(Map.of(new TopicPartition(topic,
0), new OffsetAndMetadata(10L)))
)
);
-
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(new
TopicPartition(topic, 0), new OffsetAndMetadata(0L));
ListOffsetsResult listOffsetsResult =
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
@@ -1288,7 +1287,7 @@ public class ShareGroupCommandTest {
try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
service.resetOffsets();
- verify(adminClient, times(0)).alterShareGroupOffsets(any(), any());
+ verify(adminClient, times(0)).alterShareGroupOffsets(any(), any(),
any(AlterShareGroupOffsetsOptions.class));
}
}
@@ -1326,7 +1325,7 @@ public class ShareGroupCommandTest {
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
)
);
-
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
Set<String> topics = Set.of("topic");
when(listTopicsResult.names()).thenReturn(completedFuture(topics));
@@ -1385,7 +1384,7 @@ public class ShareGroupCommandTest {
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
)
);
-
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mockAlterShareGroupOffsets(adminClient, group);
TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -1411,8 +1410,8 @@ public class ShareGroupCommandTest {
when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
service.resetOffsets();
- verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
- verify(adminClient).describeTopics(anyCollection(),
any(DescribeTopicsOptions.class));
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(),
any(AlterShareGroupOffsetsOptions.class));
+ verify(adminClient, times(3)).describeTopics(anyCollection(),
any(DescribeTopicsOptions.class));
verify(alterShareGroupOffsetsResult, times(1)).all();
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
}
@@ -1432,7 +1431,7 @@ public class ShareGroupCommandTest {
KafkaFuture.completedFuture(Map.of(new TopicPartition("topic",
0), new OffsetAndMetadata(10L)))
)
);
-
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+ when(adminClient.listShareGroupOffsets(any(),
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
AlterShareGroupOffsetsResult alterShareGroupOffsetsResult =
mockAlterShareGroupOffsets(adminClient, group);
TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -1455,8 +1454,8 @@ public class ShareGroupCommandTest {
when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
try (ShareGroupService service = getShareGroupService(cgcArgs,
adminClient)) {
service.resetOffsets();
- verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
- verify(adminClient).describeTopics(anyCollection(),
any(DescribeTopicsOptions.class));
+ verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(),
any(AlterShareGroupOffsetsOptions.class));
+ verify(adminClient, times(3)).describeTopics(anyCollection(),
any(DescribeTopicsOptions.class));
verify(alterShareGroupOffsetsResult, times(1)).all();
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(),
any(DescribeShareGroupsOptions.class));
}
@@ -1467,7 +1466,7 @@ public class ShareGroupCommandTest {
KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
resultFuture.complete(null);
when(alterShareGroupOffsetsResult.all()).thenReturn(resultFuture);
- when(client.alterShareGroupOffsets(eq(groupId),
any())).thenReturn(alterShareGroupOffsetsResult);
+ when(client.alterShareGroupOffsets(eq(groupId), any(),
any(AlterShareGroupOffsetsOptions.class))).thenReturn(alterShareGroupOffsetsResult);
return alterShareGroupOffsetsResult;
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index ecfddf725f3..47f8605218d 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsOptions;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
@@ -28,7 +29,9 @@ import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsResult;
import org.apache.kafka.clients.admin.StreamsGroupDescription;
import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -47,7 +50,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import java.util.ArrayList;
@@ -74,6 +76,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
@@ -175,7 +179,7 @@ public class StreamsGroupCommandTest {
null);
resultMap.put(firstGroup, exp);
when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
-
when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection(),
any(DescribeStreamsGroupsOptions.class))).thenReturn(result);
+ when(ADMIN_CLIENT.describeStreamsGroups(anyCollection(),
any(DescribeStreamsGroupsOptions.class))).thenReturn(result);
StreamsGroupCommandOptions streamsGroupCommandOptions = new
StreamsGroupCommandOptions(
new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--group",
firstGroup, "--describe"});
@@ -201,19 +205,19 @@ public class StreamsGroupCommandTest {
when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap));
when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap));
-
when(ADMIN_CLIENT.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
endOffset);
+ when(ADMIN_CLIENT.listOffsets(anyMap(),
any(ListOffsetsOptions.class))).thenReturn(startOffset, endOffset);
ListStreamsGroupOffsetsResult result =
mock(ListStreamsGroupOffsetsResult.class);
Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new
HashMap<>();
committedOffsetsMap.put(new TopicPartition("topic1", 0), new
OffsetAndMetadata(12, Optional.of(0), ""));
-
when(ADMIN_CLIENT.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
-
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+ when(ADMIN_CLIENT.listStreamsGroupOffsets(anyMap(),
any(ListStreamsGroupOffsetsOptions.class))).thenReturn(result);
+
when(result.partitionsToOffsetAndMetadata(anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
DescribeStreamsGroupsResult describeResult =
mock(DescribeStreamsGroupsResult.class);
StreamsGroupDescription groupDescription =
mock(StreamsGroupDescription.class);
StreamsGroupSubtopologyDescription subtopology =
mock(StreamsGroupSubtopologyDescription.class);
-
when(ADMIN_CLIENT.describeStreamsGroups(List.of(groupId))).thenReturn(describeResult);
+ when(ADMIN_CLIENT.describeStreamsGroups(eq(List.of(groupId)),
any(DescribeStreamsGroupsOptions.class))).thenReturn(describeResult);
when(describeResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
groupDescription)));
when(groupDescription.subtopologies()).thenReturn(List.of(subtopology));
when(subtopology.sourceTopics()).thenReturn(List.of("topic1"));
@@ -302,26 +306,24 @@ public class StreamsGroupCommandTest {
List<String> topics = List.of(topic);
DescribeTopicsResult describeTopicsResult =
mock(DescribeTopicsResult.class);
- when(adminClient.describeStreamsGroups(List.of(groupId)))
+ when(adminClient.describeStreamsGroups(eq(List.of(groupId)),
any(DescribeStreamsGroupsOptions.class)))
.thenReturn(describeStreamsResult(groupId, GroupState.DEAD));
Map<String, TopicDescription> descriptions = Map.of(
topic, new TopicDescription(topic, false, List.of(
new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()))
));
- when(adminClient.describeTopics(anyCollection()))
- .thenReturn(describeTopicsResult);
- when(adminClient.describeTopics(eq(topics),
any(DescribeTopicsOptions.class)))
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class)))
.thenReturn(describeTopicsResult);
when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
- when(adminClient.listOffsets(any(), any()))
+ when(adminClient.listOffsets(anyMap(), any(ListOffsetsOptions.class)))
.thenReturn(listOffsetsResult());
ListGroupsResult listGroupsResult = listGroupResult(groupId);
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
ListStreamsGroupOffsetsResult result =
mock(ListStreamsGroupOffsetsResult.class);
Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new
HashMap<>();
committedOffsetsMap.put(new TopicPartition(topic, 0),
mock(OffsetAndMetadata.class));
-
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
-
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+ when(adminClient.listStreamsGroupOffsets(anyMap(),
any(ListStreamsGroupOffsetsOptions.class))).thenReturn(result);
+
when(result.partitionsToOffsetAndMetadata(anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args.toArray(new String[0]), adminClient);
Map<String, Map<TopicPartition, OffsetAndMetadata>> resetResult =
service.resetOffsets();
@@ -330,10 +332,10 @@ public class StreamsGroupCommandTest {
assertEquals(Set.of(new TopicPartition(topics.get(0), 0)),
resetResult.get(groupId).keySet());
- verify(adminClient, times(1)).describeStreamsGroups(List.of(groupId));
+ verify(adminClient,
times(1)).describeStreamsGroups(eq(List.of(groupId)),
any(DescribeStreamsGroupsOptions.class));
verify(adminClient, times(1)).describeTopics(eq(topics),
any(DescribeTopicsOptions.class));
- verify(adminClient, times(1)).listOffsets(any(), any());
- verify(adminClient, times(1)).listStreamsGroupOffsets(any());
+ verify(adminClient, times(1)).listOffsets(anyMap(),
any(ListOffsetsOptions.class));
+ verify(adminClient, times(1)).listStreamsGroupOffsets(anyMap(),
any(ListStreamsGroupOffsetsOptions.class));
service.close();
}
@@ -367,7 +369,7 @@ public class StreamsGroupCommandTest {
null));
DescribeStreamsGroupsResult result =
mock(DescribeStreamsGroupsResult.class);
when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
-
when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
+ when(ADMIN_CLIENT.describeStreamsGroups(anyCollection(),
any(DescribeStreamsGroupsOptions.class))).thenReturn(result);
StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args.toArray(new String[0]));
Map<String, List<String>> internalTopics =
service.retrieveInternalTopics(List.of(groupId));
@@ -394,10 +396,10 @@ public class StreamsGroupCommandTest {
when(deleteStreamsGroupsResult.deletedGroups()).thenReturn(Map.of(groupId,
KafkaFuture.completedFuture(null)));
DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class);
when(deleteTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(null));
-
when(adminClient.deleteTopics(ArgumentMatchers.anyCollection())).thenReturn(deleteTopicsResult);
+ when(adminClient.deleteTopics(anyCollection(),
any(DeleteTopicsOptions.class))).thenReturn(deleteTopicsResult);
DescribeStreamsGroupsResult describeStreamsGroupsResult =
mock(DescribeStreamsGroupsResult.class);
when(describeStreamsGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
mock(StreamsGroupDescription.class))));
-
when(adminClient.describeStreamsGroups(any())).thenReturn(describeStreamsGroupsResult);
+ when(adminClient.describeStreamsGroups(any(),
any(DescribeStreamsGroupsOptions.class))).thenReturn(describeStreamsGroupsResult);
ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
when(adminClient.listGroups(any())).thenReturn(listGroupsResult);
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(new
GroupListing(groupId, Optional.of(GroupType.STREAMS), "streams",
Optional.of(GroupState.EMPTY)))));
@@ -407,9 +409,9 @@ public class StreamsGroupCommandTest {
verify(adminClient, times(1)).listGroups(any(ListGroupsOptions.class));
verify(adminClient,
times(1)).deleteStreamsGroups(eq(List.of(groupId)),
any(DeleteStreamsGroupsOptions.class));
- verify(adminClient, times(1)).describeStreamsGroups(any());
+ verify(adminClient, times(1)).describeStreamsGroups(any(),
any(DescribeStreamsGroupsOptions.class));
// because of having 0 internal topics, we do not expect deleteTopics
to be called
- verify(adminClient,
times(0)).deleteTopics(ArgumentMatchers.anyCollection());
+ verify(adminClient, times(0)).deleteTopics(anyCollection(),
any(DeleteTopicsOptions.class));
service.close();
}
@@ -434,8 +436,8 @@ public class StreamsGroupCommandTest {
verify(adminClient, times(1)).listGroups(any(ListGroupsOptions.class));
// we do not expect any further API to be called
verify(adminClient,
times(0)).deleteStreamsGroups(eq(List.of(groupId)),
any(DeleteStreamsGroupsOptions.class));
- verify(adminClient, times(0)).describeStreamsGroups(any());
- verify(adminClient,
times(0)).deleteTopics(ArgumentMatchers.anyCollection());
+ verify(adminClient, times(0)).describeStreamsGroups(any(),
any(DescribeStreamsGroupsOptions.class));
+ verify(adminClient, times(0)).deleteTopics(anyCollection(),
any(DeleteTopicsOptions.class));
service.close();
}
@@ -447,7 +449,7 @@ public class StreamsGroupCommandTest {
String topic = "topic";
List<String> args = new
ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group",
groupId, "--reset-offsets", "--input-topic", "topic:3", "--to-latest"));
- when(adminClient.describeStreamsGroups(List.of(groupId)))
+ when(adminClient.describeStreamsGroups(eq(List.of(groupId)),
any(DescribeStreamsGroupsOptions.class)))
.thenReturn(describeStreamsResult(groupId, GroupState.DEAD));
DescribeTopicsResult describeTopicsResult =
mock(DescribeTopicsResult.class);
@@ -455,12 +457,10 @@ public class StreamsGroupCommandTest {
topic, new TopicDescription(topic, false, List.of(
new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()))
));
- when(adminClient.describeTopics(anyCollection()))
- .thenReturn(describeTopicsResult);
- when(adminClient.describeTopics(eq(List.of(topic)),
any(DescribeTopicsOptions.class)))
+ when(adminClient.describeTopics(anyCollection(),
any(DescribeTopicsOptions.class)))
.thenReturn(describeTopicsResult);
when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
- when(adminClient.listOffsets(any(), any()))
+ when(adminClient.listOffsets(anyMap(), any(ListOffsetsOptions.class)))
.thenReturn(listOffsetsResult());
ListStreamsGroupOffsetsResult result =
mock(ListStreamsGroupOffsetsResult.class);
Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = Map.of(
@@ -470,8 +470,8 @@ public class StreamsGroupCommandTest {
new OffsetAndMetadata(12, Optional.of(0), "")
);
-
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
-
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+ when(adminClient.listStreamsGroupOffsets(anyMap(),
any(ListStreamsGroupOffsetsOptions.class))).thenReturn(result);
+
when(result.partitionsToOffsetAndMetadata(anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(args.toArray(new String[0]), adminClient);
assertThrows(UnknownTopicOrPartitionException.class, () ->
service.resetOffsets());
service.close();