This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dbe0fdadb8c MINOR: Consistently apply timeout in group commands 
(#20764)
dbe0fdadb8c is described below

commit dbe0fdadb8c78d055d3b77bbd0f5fea6e587a657
Author: Lucas Brutschy <[email protected]>
AuthorDate: Sun Oct 26 11:52:31 2025 +0100

    MINOR: Consistently apply timeout in group commands (#20764)
    
    Group commands sometimes apply the timeout from --timeout, and sometimes
    don't. This change applies the timeout in every call to adminClient.
    
    Reviewers: Shivsundar R <[email protected]>, Andrew Schofield
     <[email protected]>, Alieh Saeedi <[email protected]>
---
 .../java/org/apache/kafka/tools/OffsetsUtils.java  |  4 +-
 .../group/ConsumerGroupCommandOptions.java         |  7 --
 .../tools/consumer/group/ShareGroupCommand.java    | 15 ++++-
 .../kafka/tools/streams/StreamsGroupCommand.java   | 74 +++++++++++++---------
 .../tools/streams/StreamsGroupCommandOptions.java  | 10 +--
 .../consumer/group/ConsumerGroupServiceTest.java   |  6 +-
 .../consumer/group/ShareGroupCommandTest.java      | 57 ++++++++---------
 .../tools/streams/StreamsGroupCommandTest.java     | 60 +++++++++---------
 8 files changed, 119 insertions(+), 114 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java 
b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
index 269cd53875d..bec80b6fc25 100644
--- a/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
+++ b/tools/src/main/java/org/apache/kafka/tools/OffsetsUtils.java
@@ -460,7 +460,7 @@ public class OffsetsUtils {
         Set<String> topics = 
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
 
         try {
-            return 
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+            return adminClient.describeTopics(topics, withTimeoutMs(new 
DescribeTopicsOptions())).allTopicNames().get().entrySet()
                 .stream()
                 .flatMap(entry -> entry.getValue().partitions().stream()
                     .filter(partitionInfo -> partitionInfo.leader() == null)
@@ -476,7 +476,7 @@ public class OffsetsUtils {
         // collect all topics
         Set<String> topics = 
topicPartitions.stream().map(TopicPartition::topic).collect(Collectors.toSet());
         try {
-            List<TopicPartition> existPartitions = 
adminClient.describeTopics(topics).allTopicNames().get().entrySet()
+            List<TopicPartition> existPartitions = 
adminClient.describeTopics(topics, withTimeoutMs(new 
DescribeTopicsOptions())).allTopicNames().get().entrySet()
                 .stream()
                 .flatMap(entry -> entry.getValue().partitions().stream()
                     .map(partitionInfo -> new TopicPartition(entry.getKey(), 
partitionInfo.partition())))
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
index 30ba137cd8d..2d51d0525ac 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandOptions.java
@@ -19,9 +19,6 @@ package org.apache.kafka.tools.consumer.group;
 import org.apache.kafka.server.util.CommandDefaultOptions;
 import org.apache.kafka.server.util.CommandLineUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -31,7 +28,6 @@ import joptsimple.OptionSpec;
 import static org.apache.kafka.tools.ToolsUtils.minus;
 
 public class ConsumerGroupCommandOptions extends CommandDefaultOptions {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ConsumerGroupCommandOptions.class);
 
     private static final String BOOTSTRAP_SERVER_DOC = "The server(s) to 
connect to. REQUIRED for all options except for --validate-regex.";
     private static final String GROUP_DOC = "The consumer group we wish to act 
on.";
@@ -230,9 +226,6 @@ public class ConsumerGroupCommandOptions extends 
CommandDefaultOptions {
             if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
                 CommandLineUtils.printUsageAndExit(parser,
                     "Option " + describeOpt + " does not take a value for " + 
stateOpt);
-        } else {
-            if (options.has(timeoutMsOpt))
-                LOGGER.debug("Option " + timeoutMsOpt + " is applicable only 
when " + describeOpt + " is used.");
         }
 
         if (options.has(deleteOpt)) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 87cf0f1e837..2c3e21fedb1 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools.consumer.group;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AbstractOptions;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
@@ -26,6 +27,7 @@ import 
org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsSpec;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
 import org.apache.kafka.clients.admin.ShareMemberAssignment;
@@ -411,7 +413,8 @@ public class ShareGroupCommand {
                         offsetsToReset.entrySet().stream()
                             .collect(Collectors.toMap(
                                 Entry::getKey, entry -> 
entry.getValue().offset()
-                            ))
+                            )),
+                        withTimeoutMs(new AlterShareGroupOffsetsOptions())
                     ).all().get();
                 }
                 OffsetsUtils.printOffsetsToReset(Map.of(groupId, 
offsetsToReset));
@@ -434,7 +437,10 @@ public class ShareGroupCommand {
                 partitionsToReset = 
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
             } else {
                 Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
-                Map<TopicPartition, OffsetAndMetadata> 
offsetsByTopicPartitions = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+                Map<TopicPartition, OffsetAndMetadata> 
offsetsByTopicPartitions = adminClient.listShareGroupOffsets(
+                    groupSpecs,
+                    withTimeoutMs(new ListShareGroupOffsetsOptions())
+                ).all().get().get(groupId);
                 partitionsToReset = offsetsByTopicPartitions.keySet();
             }
 
@@ -488,7 +494,10 @@ public class ShareGroupCommand {
                 Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
 
                 try {
-                    Map<TopicPartition, OffsetAndMetadata> startOffsets = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+                    Map<TopicPartition, OffsetAndMetadata> startOffsets = 
adminClient.listShareGroupOffsets(
+                        groupSpecs,
+                        withTimeoutMs(new ListShareGroupOffsetsOptions())
+                    ).all().get().get(groupId);
                     Set<SharePartitionOffsetInformation> partitionOffsets = 
mapOffsetsToSharePartitionInformation(groupId, startOffsets);
 
                     groupOffsets.put(groupId, new 
SimpleImmutableEntry<>(shareGroup, partitionOffsets));
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 897615deef8..f76a81b9c19 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -19,9 +19,11 @@ package org.apache.kafka.tools.streams;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AbstractOptions;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterStreamsGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
+import org.apache.kafka.clients.admin.DeleteTopicsOptions;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
@@ -30,8 +32,11 @@ import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsSpec;
+import org.apache.kafka.clients.admin.ListTopicsOptions;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.StreamsGroupDescription;
 import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -279,7 +284,7 @@ public class StreamsGroupCommand {
         StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
             DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(
                 List.of(group),
-                new 
DescribeStreamsGroupsOptions().timeoutMs(opts.options.valueOf(opts.timeoutMsOpt).intValue()));
+                withTimeoutMs(new DescribeStreamsGroupsOptions()));
             Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
             return descriptionMap.get(group);
         }
@@ -428,8 +433,14 @@ public class StreamsGroupCommand {
                 earliest.put(tp, OffsetSpec.earliest());
                 latest.put(tp, OffsetSpec.latest());
             }
-            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
earliestResult = adminClient.listOffsets(earliest).all().get();
-            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
latestResult = adminClient.listOffsets(latest).all().get();
+            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
earliestResult = adminClient.listOffsets(
+                earliest,
+                withTimeoutMs(new ListOffsetsOptions())
+            ).all().get();
+            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
latestResult = adminClient.listOffsets(
+                latest,
+                withTimeoutMs(new ListOffsetsOptions())
+            ).all().get();
             Map<TopicPartition, OffsetAndMetadata> committedOffsets = 
getCommittedOffsets(description.groupId());
 
             Map<TopicPartition, OffsetsInfo> output = new HashMap<>();
@@ -449,14 +460,18 @@ public class StreamsGroupCommand {
 
         Map<TopicPartition, OffsetAndMetadata> getCommittedOffsets(String 
groupId) {
             try {
-                var sourceTopics = 
adminClient.describeStreamsGroups(List.of(groupId))
-                    .all().get().get(groupId)
+                var sourceTopics = adminClient.describeStreamsGroups(
+                    List.of(groupId),
+                    withTimeoutMs(new DescribeStreamsGroupsOptions())
+                ).all().get().get(groupId)
                     .subtopologies().stream()
                     .flatMap(subtopology -> 
subtopology.sourceTopics().stream())
                     .collect(Collectors.toSet());
 
-                var allTopicPartitions = 
adminClient.listStreamsGroupOffsets(Map.of(groupId, new 
ListStreamsGroupOffsetsSpec()))
-                    .partitionsToOffsetAndMetadata(groupId).get();
+                var allTopicPartitions = adminClient.listStreamsGroupOffsets(
+                    Map.of(groupId, new ListStreamsGroupOffsetsSpec()),
+                    withTimeoutMs(new ListStreamsGroupOffsetsOptions())
+                ).partitionsToOffsetAndMetadata(groupId).get();
 
                 allTopicPartitions.keySet().removeIf(tp -> 
!sourceTopics.contains(tp.topic()));
                 return allTopicPartitions;
@@ -467,8 +482,10 @@ public class StreamsGroupCommand {
 
         private List<TopicPartition> filterExistingGroupTopics(String groupId, 
List<TopicPartition> topicPartitions) {
             try {
-                var allTopicPartitions = 
adminClient.listStreamsGroupOffsets(Map.of(groupId, new 
ListStreamsGroupOffsetsSpec()))
-                    .partitionsToOffsetAndMetadata(groupId).get();
+                var allTopicPartitions = adminClient.listStreamsGroupOffsets(
+                    Map.of(groupId, new ListStreamsGroupOffsetsSpec()),
+                    withTimeoutMs(new ListStreamsGroupOffsetsOptions())
+                ).partitionsToOffsetAndMetadata(groupId).get();
                 boolean allPresent = 
topicPartitions.stream().allMatch(allTopicPartitions::containsKey);
                 if (!allPresent) {
                     printError("One or more topics are not part of the group 
'" + groupId + "'.", Optional.empty());
@@ -490,7 +507,8 @@ public class StreamsGroupCommand {
                 : opts.options.valuesOf(opts.groupOpt);
             if (!groupIds.isEmpty()) {
                 Map<String, KafkaFuture<StreamsGroupDescription>> 
streamsGroups = adminClient.describeStreamsGroups(
-                    groupIds
+                    groupIds,
+                    withTimeoutMs(new DescribeStreamsGroupsOptions())
                 ).describedGroups();
 
                 streamsGroups.forEach((groupId, groupDescription) -> {
@@ -506,7 +524,10 @@ public class StreamsGroupCommand {
                                     List<String> internalTopics = 
getInternalTopicsToBeDeleted(groupId);
                                     if (!internalTopics.isEmpty()) {
                                         try {
-                                            
adminClient.deleteTopics(internalTopics).all().get();
+                                            adminClient.deleteTopics(
+                                                internalTopics,
+                                                withTimeoutMs(new 
DeleteTopicsOptions())
+                                            ).all().get();
                                         } catch (InterruptedException | 
ExecutionException e) {
                                             if (e.getCause() instanceof 
UnknownTopicOrPartitionException) {
                                                 printError("Deleting internal 
topics for group '" + groupId + "' failed because the topics do not exist.", 
Optional.empty());
@@ -742,7 +763,10 @@ public class StreamsGroupCommand {
                     if (internalTopicsToDelete != null && 
!internalTopicsToDelete.isEmpty()) {
                         DeleteTopicsResult deleteTopicsResult = null;
                         try {
-                            deleteTopicsResult = 
adminClient.deleteTopics(internalTopicsToDelete);
+                            deleteTopicsResult = adminClient.deleteTopics(
+                                internalTopicsToDelete,
+                                withTimeoutMs(new DeleteTopicsOptions())
+                            );
                             deleteTopicsResult.all().get();
                         } catch (InterruptedException | ExecutionException e) {
                             if (deleteTopicsResult != null) {
@@ -820,7 +844,10 @@ public class StreamsGroupCommand {
         Map<String, List<String>> retrieveInternalTopics(List<String> 
groupIds) {
             Map<String, List<String>> groupToInternalTopics = new HashMap<>();
             try {
-                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(groupIds).all().get();
+                Map<String, StreamsGroupDescription> descriptionMap = 
adminClient.describeStreamsGroups(
+                    groupIds,
+                    withTimeoutMs(new DescribeStreamsGroupsOptions())
+                ).all().get();
                 for (StreamsGroupDescription description : 
descriptionMap.values()) {
 
                     List<String> sourceTopics = 
description.subtopologies().stream()
@@ -848,7 +875,7 @@ public class StreamsGroupCommand {
                 if (e.getCause() instanceof UnsupportedVersionException) {
                     try {
                         // Retrieve internal topic list if possible, and add 
the list of topic names to error message
-                        Set<String> allTopics = 
adminClient.listTopics().names().get();
+                        Set<String> allTopics = 
adminClient.listTopics(withTimeoutMs(new ListTopicsOptions())).names().get();
                         List<String> internalTopics = allTopics.stream()
                             .filter(topic -> 
groupIds.stream().anyMatch(groupId -> isInferredInternalTopic(topic, groupId)))
                             .collect(Collectors.toList());
@@ -873,7 +900,8 @@ public class StreamsGroupCommand {
                 if (!dryRun) {
                     adminClient.alterStreamsGroupOffsets(
                         groupId,
-                        preparedOffsets
+                        preparedOffsets,
+                        withTimeoutMs(new AlterStreamsGroupOffsetsOptions())
                     ).all().get();
                 }
 
@@ -947,22 +975,6 @@ public class StreamsGroupCommand {
                 || 
topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
         }
 
-        List<String> collectAllTopics(String groupId) {
-            try {
-                return adminClient.describeStreamsGroups(List.of(groupId))
-                    .all().get().get(groupId)
-                    .subtopologies().stream()
-                    .flatMap(subtopology -> Stream.of(
-                        subtopology.sourceTopics().stream(),
-                        subtopology.repartitionSinkTopics().stream(),
-                        
subtopology.repartitionSourceTopics().keySet().stream(),
-                        subtopology.stateChangelogTopics().keySet().stream()
-                    ).flatMap(s -> s)).distinct().collect(Collectors.toList());
-            } catch (InterruptedException | ExecutionException e) {
-                throw new RuntimeException(e);
-            }
-        }
-
         Collection<StreamsGroupMemberDescription> collectGroupMembers(String 
groupId) throws Exception {
             return getDescribeGroup(groupId).members();
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index d6715f833d2..a69e0bbe3a4 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -19,9 +19,6 @@ package org.apache.kafka.tools.streams;
 import org.apache.kafka.server.util.CommandDefaultOptions;
 import org.apache.kafka.server.util.CommandLineUtils;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.util.List;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -32,7 +29,6 @@ import static org.apache.kafka.tools.ToolsUtils.minus;
 
 public class StreamsGroupCommandOptions extends CommandDefaultOptions {
     private static final String NL = System.lineSeparator();
-    static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsGroupCommandOptions.class);
 
     private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The 
server(s) to connect to.";
     private static final String GROUP_DOC = "The streams group we wish to act 
on.";
@@ -144,11 +140,10 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
         deleteOpt = parser.accepts("delete", DELETE_DOC);
         deleteOffsetsOpt = parser.accepts("delete-offsets", 
DELETE_OFFSETS_DOC);
         timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
-            .availableIf(describeOpt)
             .withRequiredArg()
             .describedAs("timeout (ms)")
             .ofType(Long.class)
-            .defaultsTo(5000L);
+            .defaultsTo(30000L);
         commandConfigOpt = parser.accepts("command-config", COMMAND_CONFIG_DOC)
             .withRequiredArg()
             .describedAs("command config property file")
@@ -214,9 +209,6 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
 
         if (options.has(describeOpt)) {
             checkDescribeArgs();
-        } else {
-            if (options.has(timeoutMsOpt))
-                LOGGER.debug("Option " + timeoutMsOpt + " is applicable only 
when " + describeOpt + " is used.");
         }
 
         if (options.has(deleteOpt)) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index 236d1ce51cc..956bebaad82 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -90,7 +90,7 @@ public class ConsumerGroupServiceTest {
                 .thenReturn(listGroupOffsetsResult(GROUP));
         when(admin.listOffsets(offsetsArgMatcher(), any()))
                 .thenReturn(listOffsetsResult());
-        when(admin.describeTopics(ArgumentMatchers.anySet()))
+        when(admin.describeTopics(ArgumentMatchers.anySet(), any()))
                 .thenReturn(describeTopicsResult());
 
         Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = 
groupService.collectGroupOffsets(GROUP);
@@ -175,7 +175,7 @@ public class ConsumerGroupServiceTest {
                 any()
         )).thenReturn(new 
ListOffsetsResult(endOffsets.entrySet().stream().filter(e -> 
unassignedTopicPartitions.contains(e.getKey()))
                 .collect(Collectors.toMap(Entry::getKey, Entry::getValue))));
-        
when(admin.describeTopics(ArgumentMatchers.anySet())).thenReturn(describeTopicsResult());
+        when(admin.describeTopics(ArgumentMatchers.anySet(), 
any())).thenReturn(describeTopicsResult());
 
         Entry<Optional<GroupState>, 
Optional<Collection<PartitionAssignmentState>>> statesAndAssignments = 
groupService.collectGroupOffsets(GROUP);
         Optional<GroupState> state = statesAndAssignments.getKey();
@@ -233,7 +233,7 @@ public class ConsumerGroupServiceTest {
                 .thenReturn(describeGroupsResult(GroupState.DEAD));
         
when(admin.describeTopics(ArgumentMatchers.eq(topicsWithoutPartitionsSpecified),
 any()))
                 
.thenReturn(describeTopicsResult(topicsWithoutPartitionsSpecified));
-        when(admin.describeTopics(anySet()))
+        when(admin.describeTopics(anySet(), any()))
                 .thenReturn(describeTopicsResult(TOPICS));
         when(admin.listOffsets(offsetsArgMatcher(), any()))
                 .thenReturn(listOffsetsResult());
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 29fe151ba2b..7c7d38822b9 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.tools.consumer.group;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.AlterShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.AlterShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
@@ -31,6 +32,7 @@ import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
 import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListShareGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.ListTopicsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
@@ -211,7 +213,7 @@ public class ShareGroupCommandTest {
 
             
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, 
KafkaFuture.completedFuture(exp)));
             
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-            
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(listShareGroupOffsetsResult);
+            when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
             try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
                 TestUtils.waitForCondition(() -> {
                     Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
@@ -260,7 +262,7 @@ public class ShareGroupCommandTest {
 
             
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, 
KafkaFuture.completedFuture(exp)));
             
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-            
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(listShareGroupOffsetsResult);
+            when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
             try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
                 TestUtils.waitForCondition(() -> {
                     Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
@@ -327,7 +329,7 @@ public class ShareGroupCommandTest {
             
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
             
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, 
KafkaFuture.completedFuture(exp1), secondGroup, 
KafkaFuture.completedFuture(exp2)));
             
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-            
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenAnswer(
+            when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap(), 
any(ListShareGroupOffsetsOptions.class))).thenAnswer(
                 invocation -> {
                     Map<String, Object> argument = invocation.getArgument(0);
                     if (argument.containsKey(firstGroup)) {
@@ -1082,7 +1084,7 @@ public class ShareGroupCommandTest {
                     new TopicPartition(topic2, 0), new OffsetAndMetadata(0L)))
             )
         );
-        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
         
         AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mockAlterShareGroupOffsets(adminClient, group);
         TopicPartition tp0 = new TopicPartition(topic1, 0);
@@ -1101,25 +1103,22 @@ public class ShareGroupCommandTest {
         DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
         
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, 
KafkaFuture.completedFuture(exp)));
         when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
-        Map<String, TopicDescription> d1 = Map.of(
+        Map<String, TopicDescription> descriptions = Map.of(
             topic1, new TopicDescription(topic1, false, List.of(
                 new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()),
                 new TopicPartitionInfo(1, Node.noNode(), List.of(), List.of()))
-        ));
-        Map<String, TopicDescription> d2 = Map.of(
+            ),
             topic2, new TopicDescription(topic2, false, List.of(
                 new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
         )));
-        DescribeTopicsResult topicsResult1 = mock(DescribeTopicsResult.class);
-        DescribeTopicsResult topicsResult2 = mock(DescribeTopicsResult.class);
-        when(topicsResult1.allTopicNames()).thenReturn(completedFuture(d1));
-        when(topicsResult2.allTopicNames()).thenReturn(completedFuture(d2));
-        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(topicsResult1, topicsResult2);
-        
when(adminClient.describeTopics(anyCollection())).thenReturn(topicsResult1, 
topicsResult2);
+        DescribeTopicsResult topicsResult = mock(DescribeTopicsResult.class);
+        
when(topicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(topicsResult);
+        
when(adminClient.describeTopics(anyCollection())).thenReturn(topicsResult);
         try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
             service.resetOffsets();
-            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
-            verify(adminClient).describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class));
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(), 
any(AlterShareGroupOffsetsOptions.class));
+            verify(adminClient, times(3)).describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class));
             verify(alterShareGroupOffsetsResult, times(1)).all();
             
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
         }
@@ -1149,7 +1148,7 @@ public class ShareGroupCommandTest {
         
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
         
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
         when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
-        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
 
         AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mockAlterShareGroupOffsets(adminClient, group);
         Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(t1, 
new OffsetAndMetadata(40L), t2, new OffsetAndMetadata(40L));
@@ -1169,7 +1168,7 @@ public class ShareGroupCommandTest {
                 topicPartitionOffsets.values().stream().allMatch(offsetSpec -> 
offsetSpec instanceof OffsetSpec.LatestSpec);
         try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
             service.resetOffsets();
-            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(), 
any(AlterShareGroupOffsetsOptions.class));
             verify(adminClient, 
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher.apply(Set.of(t1,
 t2))), any());
             verify(alterShareGroupOffsetsResult, times(1)).all();
             
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
@@ -1193,7 +1192,7 @@ public class ShareGroupCommandTest {
                     new TopicPartition(topic2, 0), new OffsetAndMetadata(10L), 
new TopicPartition(topic3, 0), new OffsetAndMetadata(10L)))
             )
         );
-        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
         ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
         Set<String> topics = Set.of(topic1, topic2, topic3);
         when(listTopicsResult.names()).thenReturn(completedFuture(topics));
@@ -1238,7 +1237,7 @@ public class ShareGroupCommandTest {
                 topicPartitionOffsets.values().stream().allMatch(offsetSpec -> 
offsetSpec instanceof OffsetSpec.TimestampSpec);
         try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
             service.resetOffsets();
-            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(), 
any(AlterShareGroupOffsetsOptions.class));
             verify(adminClient, 
times(1)).listOffsets(ArgumentMatchers.argThat(offsetsArgMatcher.apply(Set.of(tp1,
 tp2, tp3, tp4))), any());
             verify(alterShareGroupOffsetsResult, times(1)).all();
             
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
@@ -1259,7 +1258,7 @@ public class ShareGroupCommandTest {
                 KafkaFuture.completedFuture(Map.of(new TopicPartition(topic, 
0), new OffsetAndMetadata(10L)))
             )
         );
-        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
 
         Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(new 
TopicPartition(topic, 0), new OffsetAndMetadata(0L));
         ListOffsetsResult listOffsetsResult = 
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
@@ -1288,7 +1287,7 @@ public class ShareGroupCommandTest {
 
         try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
             service.resetOffsets();
-            verify(adminClient, times(0)).alterShareGroupOffsets(any(), any());
+            verify(adminClient, times(0)).alterShareGroupOffsets(any(), any(), 
any(AlterShareGroupOffsetsOptions.class));
         }
     }
 
@@ -1326,7 +1325,7 @@ public class ShareGroupCommandTest {
                 KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
             )
         );
-        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
         ListTopicsResult listTopicsResult = mock(ListTopicsResult.class);
         Set<String> topics = Set.of("topic");
         when(listTopicsResult.names()).thenReturn(completedFuture(topics));
@@ -1385,7 +1384,7 @@ public class ShareGroupCommandTest {
                 KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
             )
         );
-        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
 
         AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mockAlterShareGroupOffsets(adminClient, group);
         TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -1411,8 +1410,8 @@ public class ShareGroupCommandTest {
         when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
         try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
             service.resetOffsets();
-            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
-            verify(adminClient).describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class));
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(), 
any(AlterShareGroupOffsetsOptions.class));
+            verify(adminClient, times(3)).describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class));
             verify(alterShareGroupOffsetsResult, times(1)).all();
             
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
         }
@@ -1432,7 +1431,7 @@ public class ShareGroupCommandTest {
                 KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
             )
         );
-        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+        when(adminClient.listShareGroupOffsets(any(), 
any(ListShareGroupOffsetsOptions.class))).thenReturn(listShareGroupOffsetsResult);
 
         AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mockAlterShareGroupOffsets(adminClient, group);
         TopicPartition tp0 = new TopicPartition(topic, 0);
@@ -1455,8 +1454,8 @@ public class ShareGroupCommandTest {
         when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
         try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
             service.resetOffsets();
-            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
-            verify(adminClient).describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class));
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap(), 
any(AlterShareGroupOffsetsOptions.class));
+            verify(adminClient, times(3)).describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class));
             verify(alterShareGroupOffsetsResult, times(1)).all();
             
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
         }
@@ -1467,7 +1466,7 @@ public class ShareGroupCommandTest {
         KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
         resultFuture.complete(null);
         when(alterShareGroupOffsetsResult.all()).thenReturn(resultFuture);
-        when(client.alterShareGroupOffsets(eq(groupId), 
any())).thenReturn(alterShareGroupOffsetsResult);
+        when(client.alterShareGroupOffsets(eq(groupId), any(), 
any(AlterShareGroupOffsetsOptions.class))).thenReturn(alterShareGroupOffsetsResult);
         return alterShareGroupOffsetsResult;
     }
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
index ecfddf725f3..47f8605218d 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools.streams;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupsOptions;
 import org.apache.kafka.clients.admin.DeleteStreamsGroupsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsOptions;
 import org.apache.kafka.clients.admin.DeleteTopicsResult;
 import org.apache.kafka.clients.admin.DescribeStreamsGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
@@ -28,7 +29,9 @@ import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsOptions;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsOptions;
 import org.apache.kafka.clients.admin.ListStreamsGroupOffsetsResult;
 import org.apache.kafka.clients.admin.StreamsGroupDescription;
 import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
@@ -47,7 +50,6 @@ import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentMatchers;
 import org.mockito.MockedStatic;
 
 import java.util.ArrayList;
@@ -74,6 +76,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyMap;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockStatic;
@@ -175,7 +179,7 @@ public class StreamsGroupCommandTest {
             null);
         resultMap.put(firstGroup, exp);
         when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
-        
when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection(),  
any(DescribeStreamsGroupsOptions.class))).thenReturn(result);
+        when(ADMIN_CLIENT.describeStreamsGroups(anyCollection(),  
any(DescribeStreamsGroupsOptions.class))).thenReturn(result);
 
         StreamsGroupCommandOptions streamsGroupCommandOptions = new 
StreamsGroupCommandOptions(
             new String[]{"--bootstrap-server", BOOTSTRAP_SERVERS, "--group", 
firstGroup, "--describe"});
@@ -201,19 +205,19 @@ public class StreamsGroupCommandTest {
         
when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap));
         
when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap));
 
-        
when(ADMIN_CLIENT.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
 endOffset);
+        when(ADMIN_CLIENT.listOffsets(anyMap(), 
any(ListOffsetsOptions.class))).thenReturn(startOffset, endOffset);
 
         ListStreamsGroupOffsetsResult result = 
mock(ListStreamsGroupOffsetsResult.class);
         Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new 
HashMap<>();
         committedOffsetsMap.put(new TopicPartition("topic1", 0), new 
OffsetAndMetadata(12, Optional.of(0), ""));
 
-        
when(ADMIN_CLIENT.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
-        
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+        when(ADMIN_CLIENT.listStreamsGroupOffsets(anyMap(), 
any(ListStreamsGroupOffsetsOptions.class))).thenReturn(result);
+        
when(result.partitionsToOffsetAndMetadata(anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
 
         DescribeStreamsGroupsResult describeResult = 
mock(DescribeStreamsGroupsResult.class);
         StreamsGroupDescription groupDescription = 
mock(StreamsGroupDescription.class);
         StreamsGroupSubtopologyDescription subtopology = 
mock(StreamsGroupSubtopologyDescription.class);
-        
when(ADMIN_CLIENT.describeStreamsGroups(List.of(groupId))).thenReturn(describeResult);
+        when(ADMIN_CLIENT.describeStreamsGroups(eq(List.of(groupId)), 
any(DescribeStreamsGroupsOptions.class))).thenReturn(describeResult);
         
when(describeResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
 groupDescription)));
         
when(groupDescription.subtopologies()).thenReturn(List.of(subtopology));
         when(subtopology.sourceTopics()).thenReturn(List.of("topic1"));
@@ -302,26 +306,24 @@ public class StreamsGroupCommandTest {
         List<String> topics = List.of(topic);
 
         DescribeTopicsResult describeTopicsResult = 
mock(DescribeTopicsResult.class);
-        when(adminClient.describeStreamsGroups(List.of(groupId)))
+        when(adminClient.describeStreamsGroups(eq(List.of(groupId)), 
any(DescribeStreamsGroupsOptions.class)))
             .thenReturn(describeStreamsResult(groupId, GroupState.DEAD));
         Map<String, TopicDescription> descriptions = Map.of(
             topic, new TopicDescription(topic, false, List.of(
                 new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()))
         ));
-        when(adminClient.describeTopics(anyCollection()))
-            .thenReturn(describeTopicsResult);
-        when(adminClient.describeTopics(eq(topics), 
any(DescribeTopicsOptions.class)))
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class)))
             .thenReturn(describeTopicsResult);
         
when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
-        when(adminClient.listOffsets(any(), any()))
+        when(adminClient.listOffsets(anyMap(), any(ListOffsetsOptions.class)))
             .thenReturn(listOffsetsResult());
         ListGroupsResult listGroupsResult = listGroupResult(groupId);
         
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(listGroupsResult);
         ListStreamsGroupOffsetsResult result = 
mock(ListStreamsGroupOffsetsResult.class);
         Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = new 
HashMap<>();
         committedOffsetsMap.put(new TopicPartition(topic, 0), 
mock(OffsetAndMetadata.class));
-        
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
-        
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+        when(adminClient.listStreamsGroupOffsets(anyMap(), 
any(ListStreamsGroupOffsetsOptions.class))).thenReturn(result);
+        
when(result.partitionsToOffsetAndMetadata(anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
 
         StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args.toArray(new String[0]), adminClient);
         Map<String, Map<TopicPartition, OffsetAndMetadata>> resetResult = 
service.resetOffsets();
@@ -330,10 +332,10 @@ public class StreamsGroupCommandTest {
         assertEquals(Set.of(new TopicPartition(topics.get(0), 0)),
             resetResult.get(groupId).keySet());
 
-        verify(adminClient, times(1)).describeStreamsGroups(List.of(groupId));
+        verify(adminClient, 
times(1)).describeStreamsGroups(eq(List.of(groupId)), 
any(DescribeStreamsGroupsOptions.class));
         verify(adminClient, times(1)).describeTopics(eq(topics), 
any(DescribeTopicsOptions.class));
-        verify(adminClient, times(1)).listOffsets(any(), any());
-        verify(adminClient, times(1)).listStreamsGroupOffsets(any());
+        verify(adminClient, times(1)).listOffsets(anyMap(), 
any(ListOffsetsOptions.class));
+        verify(adminClient, times(1)).listStreamsGroupOffsets(anyMap(), 
any(ListStreamsGroupOffsetsOptions.class));
 
         service.close();
     }
@@ -367,7 +369,7 @@ public class StreamsGroupCommandTest {
             null));
         DescribeStreamsGroupsResult result = 
mock(DescribeStreamsGroupsResult.class);
         when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
-        
when(ADMIN_CLIENT.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
+        when(ADMIN_CLIENT.describeStreamsGroups(anyCollection(), 
any(DescribeStreamsGroupsOptions.class))).thenReturn(result);
 
         StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args.toArray(new String[0]));
         Map<String, List<String>> internalTopics = 
service.retrieveInternalTopics(List.of(groupId));
@@ -394,10 +396,10 @@ public class StreamsGroupCommandTest {
         
when(deleteStreamsGroupsResult.deletedGroups()).thenReturn(Map.of(groupId, 
KafkaFuture.completedFuture(null)));
         DeleteTopicsResult deleteTopicsResult = mock(DeleteTopicsResult.class);
         
when(deleteTopicsResult.all()).thenReturn(KafkaFuture.completedFuture(null));
-        
when(adminClient.deleteTopics(ArgumentMatchers.anyCollection())).thenReturn(deleteTopicsResult);
+        when(adminClient.deleteTopics(anyCollection(), 
any(DeleteTopicsOptions.class))).thenReturn(deleteTopicsResult);
         DescribeStreamsGroupsResult describeStreamsGroupsResult = 
mock(DescribeStreamsGroupsResult.class);
         
when(describeStreamsGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(Map.of(groupId,
 mock(StreamsGroupDescription.class))));
-        
when(adminClient.describeStreamsGroups(any())).thenReturn(describeStreamsGroupsResult);
+        when(adminClient.describeStreamsGroups(any(), 
any(DescribeStreamsGroupsOptions.class))).thenReturn(describeStreamsGroupsResult);
         ListGroupsResult listGroupsResult = mock(ListGroupsResult.class);
         when(adminClient.listGroups(any())).thenReturn(listGroupsResult);
         
when(listGroupsResult.all()).thenReturn(KafkaFuture.completedFuture(List.of(new 
GroupListing(groupId, Optional.of(GroupType.STREAMS), "streams", 
Optional.of(GroupState.EMPTY)))));
@@ -407,9 +409,9 @@ public class StreamsGroupCommandTest {
 
         verify(adminClient, times(1)).listGroups(any(ListGroupsOptions.class));
         verify(adminClient, 
times(1)).deleteStreamsGroups(eq(List.of(groupId)), 
any(DeleteStreamsGroupsOptions.class));
-        verify(adminClient, times(1)).describeStreamsGroups(any());
+        verify(adminClient, times(1)).describeStreamsGroups(any(), 
any(DescribeStreamsGroupsOptions.class));
         // because of having 0 internal topics, we do not expect deleteTopics 
to be called
-        verify(adminClient, 
times(0)).deleteTopics(ArgumentMatchers.anyCollection());
+        verify(adminClient, times(0)).deleteTopics(anyCollection(), 
any(DeleteTopicsOptions.class));
 
         service.close();
     }
@@ -434,8 +436,8 @@ public class StreamsGroupCommandTest {
         verify(adminClient, times(1)).listGroups(any(ListGroupsOptions.class));
         // we do not expect any further API to be called
         verify(adminClient, 
times(0)).deleteStreamsGroups(eq(List.of(groupId)), 
any(DeleteStreamsGroupsOptions.class));
-        verify(adminClient, times(0)).describeStreamsGroups(any());
-        verify(adminClient, 
times(0)).deleteTopics(ArgumentMatchers.anyCollection());
+        verify(adminClient, times(0)).describeStreamsGroups(any(), 
any(DescribeStreamsGroupsOptions.class));
+        verify(adminClient, times(0)).deleteTopics(anyCollection(), 
any(DeleteTopicsOptions.class));
 
         service.close();
     }
@@ -447,7 +449,7 @@ public class StreamsGroupCommandTest {
         String topic = "topic";
         List<String> args = new 
ArrayList<>(Arrays.asList("--bootstrap-server", "localhost:9092", "--group", 
groupId, "--reset-offsets", "--input-topic", "topic:3", "--to-latest"));
         
-        when(adminClient.describeStreamsGroups(List.of(groupId)))
+        when(adminClient.describeStreamsGroups(eq(List.of(groupId)), 
any(DescribeStreamsGroupsOptions.class)))
             .thenReturn(describeStreamsResult(groupId, GroupState.DEAD));
         DescribeTopicsResult describeTopicsResult = 
mock(DescribeTopicsResult.class);
 
@@ -455,12 +457,10 @@ public class StreamsGroupCommandTest {
             topic, new TopicDescription(topic, false, List.of(
                 new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of()))
         ));
-        when(adminClient.describeTopics(anyCollection()))
-            .thenReturn(describeTopicsResult);
-        when(adminClient.describeTopics(eq(List.of(topic)), 
any(DescribeTopicsOptions.class)))
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class)))
             .thenReturn(describeTopicsResult);
         
when(describeTopicsResult.allTopicNames()).thenReturn(completedFuture(descriptions));
-        when(adminClient.listOffsets(any(), any()))
+        when(adminClient.listOffsets(anyMap(), any(ListOffsetsOptions.class)))
             .thenReturn(listOffsetsResult());
         ListStreamsGroupOffsetsResult result = 
mock(ListStreamsGroupOffsetsResult.class);
         Map<TopicPartition, OffsetAndMetadata> committedOffsetsMap = Map.of(
@@ -470,8 +470,8 @@ public class StreamsGroupCommandTest {
             new OffsetAndMetadata(12, Optional.of(0), "")  
         );
         
-        
when(adminClient.listStreamsGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(result);
-        
when(result.partitionsToOffsetAndMetadata(ArgumentMatchers.anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
+        when(adminClient.listStreamsGroupOffsets(anyMap(), 
any(ListStreamsGroupOffsetsOptions.class))).thenReturn(result);
+        
when(result.partitionsToOffsetAndMetadata(anyString())).thenReturn(KafkaFuture.completedFuture(committedOffsetsMap));
         StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(args.toArray(new String[0]), adminClient);
         assertThrows(UnknownTopicOrPartitionException.class, () -> 
service.resetOffsets());
         service.close();

Reply via email to