This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/kip1071 by this push:
     new f7b303eff3a Implement kafka-streams-groups.sh --describe (#18231)
f7b303eff3a is described below

commit f7b303eff3aba1891eb41bf8d5091a5b8b354f44
Author: Alieh Saeedi <[email protected]>
AuthorDate: Thu Jan 23 10:35:13 2025 +0100

    Implement kafka-streams-groups.sh --describe (#18231)
    
    Implement --describe and its options: (--state, --offset, --members and the 
combination of them with --verbose)
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../kafka/tools/streams/StreamsGroupCommand.java   | 188 ++++++++++++++++++++-
 .../tools/streams/StreamsGroupCommandOptions.java  |  59 ++++++-
 .../tools/streams/StreamsGroupCommandUnitTest.java | 105 +++++++++++-
 3 files changed, 338 insertions(+), 14 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 6c3d1a5631b..1c0f36dd5a5 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -18,11 +18,19 @@ package org.apache.kafka.tools.streams;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.StreamsGroupDescription;
+import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
+import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
+import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
 
@@ -30,6 +38,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -48,12 +58,12 @@ public class StreamsGroupCommand {
         try {
             opts.checkArgs();
 
-            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
list all streams groups.");
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to 
list all streams groups, or describe a streams group.");
 
             // should have exactly one action
-            long actions = 
Stream.of(opts.listOpt).filter(opts.options::has).count();
+            long actions = Stream.of(opts.listOpt, 
opts.describeOpt).filter(opts.options::has).count();
             if (actions != 1)
-                CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one action: --list.");
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must 
include exactly one action: --list, or --describe.");
 
             run(opts);
         } catch (OptionException e) {
@@ -65,6 +75,8 @@ public class StreamsGroupCommand {
         try (StreamsGroupService streamsGroupService = new 
StreamsGroupService(opts, Map.of())) {
             if (opts.options.has(opts.listOpt)) {
                 streamsGroupService.listGroups();
+            } else if (opts.options.has(opts.describeOpt)) {
+                streamsGroupService.describeGroups();
             } else {
                 throw new IllegalArgumentException("Unknown action!");
             }
@@ -81,7 +93,7 @@ public class StreamsGroupCommand {
         Set<GroupState> validStates = 
GroupState.groupStatesForType(GroupType.STREAMS);
         if (!validStates.containsAll(parsedStates)) {
             throw new IllegalArgumentException("Invalid state list '" + input 
+ "'. Valid states are: " +
-                    
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", 
")));
+                
validStates.stream().map(GroupState::toString).collect(Collectors.joining(", 
")));
         }
         return parsedStates;
     }
@@ -156,6 +168,174 @@ public class StreamsGroupCommand {
             }
         }
 
+        public void describeGroups() throws ExecutionException, 
InterruptedException {
+            String group = opts.options.valueOf(opts.groupOpt);
+            StreamsGroupDescription description = getDescribeGroup(group);
+            if (description == null)
+                return;
+            boolean verbose =  opts.options.has(opts.verboseOpt);
+            if (opts.options.has(opts.membersOpt)) {
+                printMembers(description, verbose);
+            } else if (opts.options.has(opts.stateOpt)) {
+                printStates(description, verbose);
+            } else {
+                printOffsets(description, verbose);
+            }
+        }
+
+        StreamsGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
+            DescribeStreamsGroupsResult result = 
adminClient.describeStreamsGroups(List.of(group));
+            Map<String, StreamsGroupDescription> descriptionMap = 
result.all().get();
+            return descriptionMap.get(group);
+        }
+
+        private void printMembers(StreamsGroupDescription description, boolean 
verbose) {
+            int groupLen = Math.max(15, description.groupId().length());
+            int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+            Collection<StreamsGroupMemberDescription> members = 
description.members();
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), description.members().size())) {
+                for (StreamsGroupMemberDescription member : members) {
+                    maxMemberIdLen = Math.max(maxMemberIdLen, 
member.memberId().length());
+                    maxHostLen = Math.max(maxHostLen, 
member.processId().length());
+                    maxClientIdLen = Math.max(maxClientIdLen, 
member.clientId().length());
+                }
+
+                if (!verbose) {
+                    String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen + 
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS", 
"CLIENT-ID");
+                        System.out.printf(fmt, description.groupId(), 
member.memberId(), member.processId(), member.clientId());
+                        printTasks(member.assignment(), false);
+                        System.out.println();
+                    }
+                } else {
+                    String fmt = "%" + -groupLen + "s %s %-15s%" + 
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+                    for (StreamsGroupMemberDescription member : members) {
+                        System.out.printf(fmt, "GROUP", 
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", 
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID");
+                        System.out.printf(fmt, description.groupId(), 
description.targetAssignmentEpoch(), description.topologyEpoch(), 
member.memberId(),
+                            member.isClassic() ? "classic" : "streams", 
member.memberEpoch(), member.processId(), member.clientId());
+                        printTasks(member.assignment(), false);
+                        printTasks(member.targetAssignment(), true);
+                        System.out.println();
+                    }
+                }
+            }
+        }
+
+        private void printTaskType(List<StreamsGroupMemberAssignment.TaskIds> 
tasks, String taskType) {
+            System.out.printf("%s\n", taskType + ": " + 
tasks.stream().map(taskId -> taskId.subtopologyId() + ": [" + 
taskId.partitions()).collect(Collectors.joining(",")) + "] ");
+        }
+
+        private void printTasks(StreamsGroupMemberAssignment assignment, 
boolean isTarget) {
+            String typePrefix = isTarget ? "TARGET-" : "";
+            printTaskType(assignment.activeTasks(), typePrefix + 
"ACTIVE-TASKS:");
+            printTaskType(assignment.standbyTasks(), typePrefix + 
"STANDBY-TASKS:");
+            printTaskType(assignment.warmupTasks(), typePrefix + 
"WARMUP-TASKS:");
+        }
+
+        private void printStates(StreamsGroupDescription description, boolean 
verbose) {
+            maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), 1);
+
+            int groupLen = Math.max(15, description.groupId().length());
+            String coordinator = description.coordinator().host() + ":" + 
description.coordinator().port() + "  (" + description.coordinator().idString() 
+ ")";
+            int coordinatorLen = Math.max(25, coordinator.length());
+
+            if (!verbose) {
+                String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s 
%-15s %s\n";
+                System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", 
"#MEMBERS");
+                System.out.printf(fmt, description.groupId(), coordinator, 
description.groupState().toString(), description.members().size());
+            } else {
+                String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s 
%-15s %-15s %-15s %s\n";
+                System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE", 
"GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
+                System.out.printf(fmt, description.groupId(), coordinator, 
description.groupState().toString(), description.groupEpoch(), 
description.targetAssignmentEpoch(), description.members().size());
+            }
+        }
+
+        private void printOffsets(StreamsGroupDescription description, boolean 
verbose) throws ExecutionException, InterruptedException {
+            Map<TopicPartition, Long> offsets = 
getOffsets(description.members(), description);
+            if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), offsets.size())) {
+                int groupLen = Math.max(15, description.groupId().length());
+                int maxTopicLen = 15;
+                for (TopicPartition topicPartition : offsets.keySet()) {
+                    maxTopicLen = Math.max(maxTopicLen, 
topicPartition.topic().length());
+                }
+
+                if (!verbose) {
+                    String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %s\n";
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"OFFSET-LAG");
+                    for (Map.Entry<TopicPartition, Long> offset : 
offsets.entrySet()) {
+                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
+                    }
+                } else {
+                    String fmt =  "%" + (-groupLen) + "s %" + (-maxTopicLen) + 
"s %-10s %-15s %s\n";
+                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"LEADER-EPOCH", "OFFSET-LAG");
+                    for (Map.Entry<TopicPartition, Long> offset : 
offsets.entrySet()) {
+                        System.out.printf(fmt, description.groupId(), 
offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue());
+                    }
+                }
+            }
+        }
+
+        Map<TopicPartition, Long> 
getOffsets(Collection<StreamsGroupMemberDescription> members, 
StreamsGroupDescription description) throws ExecutionException, 
InterruptedException {
+            Set<TopicPartition> allTp = new HashSet<>();
+            for (StreamsGroupMemberDescription memberDescription : members) {
+                
allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks(), 
description));
+            }
+            // fetch latest and earliest offsets
+            Map<TopicPartition, OffsetSpec> earliest = new HashMap<>();
+            Map<TopicPartition, OffsetSpec> latest = new HashMap<>();
+
+            for (TopicPartition tp : allTp) {
+                earliest.put(tp, OffsetSpec.earliest());
+                latest.put(tp, OffsetSpec.latest());
+            }
+            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
earliestResult = adminClient.listOffsets(earliest).all().get();
+            Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
latestResult = adminClient.listOffsets(latest).all().get();
+
+            Map<TopicPartition, Long> lag = new HashMap<>();
+            for (Map.Entry<TopicPartition, 
ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
+                lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() - 
earliestResult.get(tp.getKey()).offset());
+            }
+            return lag;
+        }
+
+
+        /**
+         * Prints a summary of the state for situations where the group is 
empty or dead.
+         *
+         * @return Whether the group detail should be printed
+         */
+        public static boolean maybePrintEmptyGroupState(String group, 
GroupState state, int numRows) {
+            if (state == GroupState.DEAD) {
+                printError("Streams group '" + group + "' does not exist.", 
Optional.empty());
+            } else if (state == GroupState.EMPTY) {
+                System.err.println("\nStreams group '" + group + "' has no 
active members.");
+            }
+
+            return !state.equals(GroupState.DEAD) && numRows > 0;
+        }
+
+        private static Set<TopicPartition> 
getTopicPartitions(List<StreamsGroupMemberAssignment.TaskIds> taskIds, 
StreamsGroupDescription description) {
+            Map<String, List<String>> allSourceTopics = new HashMap<>();
+            for (StreamsGroupSubtopologyDescription subtopologyDescription : 
description.subtopologies()) {
+                allSourceTopics.put(subtopologyDescription.subtopologyId(), 
subtopologyDescription.sourceTopics());
+            }
+            Set<TopicPartition> topicPartitions = new HashSet<>();
+
+            for (StreamsGroupMemberAssignment.TaskIds task : taskIds) {
+                List<String> sourceTopics = 
allSourceTopics.get(task.subtopologyId());
+                if (sourceTopics == null) {
+                    throw new IllegalArgumentException("Subtopology " + 
task.subtopologyId() + " not found in group description!");
+                }
+                for (String topic : sourceTopics) {
+                    for (Integer partition : task.partitions()) {
+                        topicPartitions.add(new TopicPartition(topic, 
partition));
+                    }
+                }
+            }
+            return topicPartitions;
+        }
+
         public void close() {
             adminClient.close();
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index ced73b44bf1..c97e99e65a8 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -19,22 +19,46 @@ package org.apache.kafka.tools.streams;
 import org.apache.kafka.server.util.CommandDefaultOptions;
 import org.apache.kafka.server.util.CommandLineUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
 import joptsimple.OptionSpec;
 
 public class StreamsGroupCommandOptions extends CommandDefaultOptions {
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(StreamsGroupCommandOptions.class);
+
     public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) 
to connect to.";
+    public static final String GROUP_DOC = "The streams group we wish to act 
on.";
     public static final String LIST_DOC = "List all streams groups.";
+    public static final String DESCRIBE_DOC = "Describe streams group and list 
offset lag related to given group.";
     public static final String TIMEOUT_MS_DOC = "The timeout that can be set 
for some use cases. For example, it can be used when describing the group " +
         "to specify the maximum amount of time in milliseconds to wait before 
the group stabilizes.";
     public static final String COMMAND_CONFIG_DOC = "Property file containing 
configs to be passed to Admin Client.";
     public static final String STATE_DOC = "When specified with '--list', it 
displays the state of all groups. It can also be used to list groups with 
specific states. " +
         "Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and 
Dead.";
+    public static final String MEMBERS_DOC = "Describe members of the group. 
This option may be used with the '--describe' option only.";
+    public static final String OFFSETS_DOC = "Describe the group and list all 
topic partitions in the group along with their offset information." +
+        "This is the default sub-action and may be used with the '--describe' 
option only.";
+    public static final String VERBOSE_DOC = """
+        Use with --describe --state  to show group epoch and target assignment 
epoch.
+        Use with --describe --members to show for each member the member 
epoch, target assignment epoch, current assignment, target assignment, and 
whether member is still using the classic rebalance protocol.
+        Use with --describe --offsets  and --describe  to show leader epochs 
for each partition.""";
 
     public final OptionSpec<String> bootstrapServerOpt;
+    public final OptionSpec<String> groupOpt;
     public final OptionSpec<Void> listOpt;
+    public final OptionSpec<Void> describeOpt;
     public final OptionSpec<Long> timeoutMsOpt;
     public final OptionSpec<String> commandConfigOpt;
     public final OptionSpec<String> stateOpt;
+    public final OptionSpec<Void> membersOpt;
+    public final OptionSpec<Void> offsetsOpt;
+    public final OptionSpec<Void> verboseOpt;
+
 
 
     public StreamsGroupCommandOptions(String[] args) {
@@ -44,8 +68,14 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
             .withRequiredArg()
             .describedAs("server to connect to")
             .ofType(String.class);
+        groupOpt = parser.accepts("group", GROUP_DOC)
+            .withRequiredArg()
+            .describedAs("streams group")
+            .ofType(String.class);
         listOpt = parser.accepts("list", LIST_DOC);
+        describeOpt = parser.accepts("describe", DESCRIBE_DOC);
         timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
+            .availableIf(describeOpt)
             .withRequiredArg()
             .describedAs("timeout (ms)")
             .ofType(Long.class)
@@ -55,16 +85,41 @@ public class StreamsGroupCommandOptions extends 
CommandDefaultOptions {
             .describedAs("command config property file")
             .ofType(String.class);
         stateOpt = parser.accepts("state", STATE_DOC)
-            .availableIf(listOpt)
+            .availableIf(listOpt, describeOpt)
             .withOptionalArg()
             .ofType(String.class);
+        membersOpt = parser.accepts("members", MEMBERS_DOC)
+            .availableIf(describeOpt);
+        offsetsOpt = parser.accepts("offsets", OFFSETS_DOC)
+            .availableIf(describeOpt);
+        verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
+            .availableIf(describeOpt);
 
         options = parser.parse(args);
     }
 
     public void checkArgs() {
-        CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
list streams groups.");
+        CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to 
list, or describe streams groups.");
 
         CommandLineUtils.checkRequiredArgs(parser, options, 
bootstrapServerOpt);
+
+        if (options.has(describeOpt)) {
+            if (!options.has(groupOpt))
+                CommandLineUtils.printUsageAndExit(parser,
+                    "Option " + describeOpt + " takes the option: " + 
groupOpt);
+            List<OptionSpec<?>> mutuallyExclusiveOpts = 
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
+            if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ? 
1 : 0).sum() > 1) {
+                CommandLineUtils.printUsageAndExit(parser,
+                    "Option " + describeOpt + " takes at most one of these 
options: " + 
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
 ")));
+            }
+            if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
+                CommandLineUtils.printUsageAndExit(parser,
+                    "Option " + describeOpt + " does not take a value for " + 
stateOpt);
+        } else {
+            if (options.has(timeoutMsOpt))
+                LOGGER.debug("Option " + timeoutMsOpt + " is applicable only 
when " + describeOpt + " is used.");
+        }
+
+        CommandLineUtils.checkInvalidArgs(parser, options, listOpt, 
membersOpt, offsetsOpt);
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
index ff84df127aa..2a1c3047d85 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
@@ -17,21 +17,33 @@
 package org.apache.kafka.tools.streams;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
 import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
 import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.StreamsGroupDescription;
+import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
+import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
+import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.test.TestUtils;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
@@ -39,7 +51,9 @@ import java.util.Set;
 import joptsimple.OptionException;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -56,8 +70,8 @@ public class StreamsGroupCommandUnitTest {
         Admin adminClient = mock(KafkaAdminClient.class);
         ListGroupsResult result = mock(ListGroupsResult.class);
         
when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
-                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
-                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))
+            new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+            new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))
         )));
         
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
         StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(cgcArgs, adminClient);
@@ -88,14 +102,14 @@ public class StreamsGroupCommandUnitTest {
         Admin adminClient = mock(KafkaAdminClient.class);
         ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
         
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
-                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
-                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))
+            new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+            new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))
         )));
         
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
         StreamsGroupCommand.StreamsGroupService service = 
getStreamsGroupService(cgcArgs, adminClient);
         Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
-                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
-                new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))));
+            new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)),
+            new GroupListing(secondGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.EMPTY))));
 
         final Set[] foundListing = new Set[]{Collections.emptySet()};
         TestUtils.waitForCondition(() -> {
@@ -105,11 +119,11 @@ public class StreamsGroupCommandUnitTest {
 
         ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
         
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList(
-                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
+            new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE))
         )));
         
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
         Set<GroupListing> expectedListingStable = Collections.singleton(
-                new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)));
+            new GroupListing(firstGroup, Optional.of(GroupType.STREAMS), 
"streams", Optional.of(GroupState.STABLE)));
 
         foundListing[0] = Collections.emptySet();
 
@@ -120,6 +134,80 @@ public class StreamsGroupCommandUnitTest {
         service.close();
     }
 
+
+    @Test
+    public void testDescribeStreamsGroups() throws Exception {
+        String firstGroup = "group1";
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DescribeStreamsGroupsResult result = 
mock(DescribeStreamsGroupsResult.class);
+        Map<String, StreamsGroupDescription> resultMap = new HashMap<>();
+        StreamsGroupDescription exp = new StreamsGroupDescription(
+            firstGroup,
+            0,
+            0,
+            0,
+            List.of(new StreamsGroupSubtopologyDescription("foo", List.of(), 
List.of(), Map.of(), Map.of())),
+            List.of(),
+            GroupState.STABLE,
+            new Node(0, "bar", 0),
+            null);
+        resultMap.put(firstGroup, exp);
+
+        when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
+        
when(adminClient.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
+        StreamsGroupCommand.StreamsGroupService service = new 
StreamsGroupCommand.StreamsGroupService(null, adminClient);
+        assertEquals(exp, service.getDescribeGroup(firstGroup));
+        service.close();
+    }
+
+    @Test
+    public void testDescribeStreamsGroupsGetOffsets() throws Exception {
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        ListOffsetsResult startOffset = mock(ListOffsetsResult.class);
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
startOffsetResultMap = new HashMap<>();
+        startOffsetResultMap.put(new TopicPartition("topic1", 0), new 
ListOffsetsResult.ListOffsetsResultInfo(10, -1, Optional.empty()));
+
+        ListOffsetsResult endOffset = mock(ListOffsetsResult.class);
+        Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> 
endOffsetResultMap = new HashMap<>();
+        endOffsetResultMap.put(new TopicPartition("topic1", 0), new 
ListOffsetsResult.ListOffsetsResultInfo(30, -1, Optional.empty()));
+
+        
when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap));
+        
when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap));
+
+        
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
 endOffset);
+
+        StreamsGroupMemberDescription description = new 
StreamsGroupMemberDescription("foo", 0, Optional.empty(),
+            Optional.empty(), "bar", "baz", 0, "qux",
+            Optional.empty(), Map.of(), List.of(), List.of(),
+            new StreamsGroupMemberAssignment(List.of(), List.of(), List.of()), 
new StreamsGroupMemberAssignment(List.of(), List.of(), List.of()),
+            false);
+        StreamsGroupDescription x = new StreamsGroupDescription(
+            "group1",
+            0,
+            0,
+            0,
+            List.of(new StreamsGroupSubtopologyDescription("id", 
List.of("topic1"), List.of(), Map.of(), Map.of())),
+            List.of(description),
+            GroupState.STABLE,
+            new Node(0, "host", 0),
+            null);
+        StreamsGroupCommand.StreamsGroupService service = new 
StreamsGroupCommand.StreamsGroupService(null, adminClient);
+        Map<TopicPartition, Long> lags = 
service.getOffsets(List.of(description), x);
+        assertEquals(1, lags.size());
+        assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
+        service.close();
+    }
+
+    @Test
+    public void testPrintEmptyGroupState() {
+        
assertFalse(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
 GroupState.EMPTY, 0));
+        
assertFalse(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
 GroupState.DEAD, 0));
+        
assertFalse(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
 GroupState.STABLE, 0));
+        
assertTrue(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
 GroupState.STABLE, 1));
+        
assertTrue(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
 GroupState.UNKNOWN, 1));
+    }
+
     @Test
     public void testGroupStatesFromString() {
         Set<GroupState> result = 
StreamsGroupCommand.groupStatesFromString("empty");
@@ -163,6 +251,7 @@ public class StreamsGroupCommandUnitTest {
         assertThrows(IllegalArgumentException.class, () -> 
StreamsGroupCommand.groupStatesFromString("   ,   ,"));
     }
 
+
     StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] 
args, Admin adminClient) {
         StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
         return new StreamsGroupCommand.StreamsGroupService(opts, adminClient);

Reply via email to