This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch kip1071
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/kip1071 by this push:
new f7b303eff3a Implement kafka-streams-groups.sh --describe (#18231)
f7b303eff3a is described below
commit f7b303eff3aba1891eb41bf8d5091a5b8b354f44
Author: Alieh Saeedi <[email protected]>
AuthorDate: Thu Jan 23 10:35:13 2025 +0100
Implement kafka-streams-groups.sh --describe (#18231)
Implement --describe and its options: (--state, --offset, --members and the
combination of them with --verbose)
Reviewers: Lucas Brutschy <[email protected]>
---
.../kafka/tools/streams/StreamsGroupCommand.java | 188 ++++++++++++++++++++-
.../tools/streams/StreamsGroupCommandOptions.java | 59 ++++++-
.../tools/streams/StreamsGroupCommandUnitTest.java | 105 +++++++++++-
3 files changed, 338 insertions(+), 14 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index 6c3d1a5631b..1c0f36dd5a5 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -18,11 +18,19 @@ package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.StreamsGroupDescription;
+import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
+import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
+import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.util.CommandLineUtils;
@@ -30,6 +38,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -48,12 +58,12 @@ public class StreamsGroupCommand {
try {
opts.checkArgs();
- CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to
list all streams groups.");
+ CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to
list all streams groups, or describe a streams group.");
// should have exactly one action
- long actions =
Stream.of(opts.listOpt).filter(opts.options::has).count();
+ long actions = Stream.of(opts.listOpt,
opts.describeOpt).filter(opts.options::has).count();
if (actions != 1)
- CommandLineUtils.printUsageAndExit(opts.parser, "Command must
include exactly one action: --list.");
+ CommandLineUtils.printUsageAndExit(opts.parser, "Command must
include exactly one action: --list, or --describe.");
run(opts);
} catch (OptionException e) {
@@ -65,6 +75,8 @@ public class StreamsGroupCommand {
try (StreamsGroupService streamsGroupService = new
StreamsGroupService(opts, Map.of())) {
if (opts.options.has(opts.listOpt)) {
streamsGroupService.listGroups();
+ } else if (opts.options.has(opts.describeOpt)) {
+ streamsGroupService.describeGroups();
} else {
throw new IllegalArgumentException("Unknown action!");
}
@@ -81,7 +93,7 @@ public class StreamsGroupCommand {
Set<GroupState> validStates =
GroupState.groupStatesForType(GroupType.STREAMS);
if (!validStates.containsAll(parsedStates)) {
throw new IllegalArgumentException("Invalid state list '" + input
+ "'. Valid states are: " +
-
validStates.stream().map(GroupState::toString).collect(Collectors.joining(",
")));
+
validStates.stream().map(GroupState::toString).collect(Collectors.joining(",
")));
}
return parsedStates;
}
@@ -156,6 +168,174 @@ public class StreamsGroupCommand {
}
}
+ public void describeGroups() throws ExecutionException,
InterruptedException {
+ String group = opts.options.valueOf(opts.groupOpt);
+ StreamsGroupDescription description = getDescribeGroup(group);
+ if (description == null)
+ return;
+ boolean verbose = opts.options.has(opts.verboseOpt);
+ if (opts.options.has(opts.membersOpt)) {
+ printMembers(description, verbose);
+ } else if (opts.options.has(opts.stateOpt)) {
+ printStates(description, verbose);
+ } else {
+ printOffsets(description, verbose);
+ }
+ }
+
+ StreamsGroupDescription getDescribeGroup(String group) throws
ExecutionException, InterruptedException {
+ DescribeStreamsGroupsResult result =
adminClient.describeStreamsGroups(List.of(group));
+ Map<String, StreamsGroupDescription> descriptionMap =
result.all().get();
+ return descriptionMap.get(group);
+ }
+
+ private void printMembers(StreamsGroupDescription description, boolean
verbose) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxMemberIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
+ Collection<StreamsGroupMemberDescription> members =
description.members();
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), description.members().size())) {
+ for (StreamsGroupMemberDescription member : members) {
+ maxMemberIdLen = Math.max(maxMemberIdLen,
member.memberId().length());
+ maxHostLen = Math.max(maxHostLen,
member.processId().length());
+ maxClientIdLen = Math.max(maxClientIdLen,
member.clientId().length());
+ }
+
+ if (!verbose) {
+ String fmt = "%" + -groupLen + "s %" + -maxMemberIdLen +
"s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, "GROUP", "MEMBER", "PROCESS",
"CLIENT-ID");
+ System.out.printf(fmt, description.groupId(),
member.memberId(), member.processId(), member.clientId());
+ printTasks(member.assignment(), false);
+ System.out.println();
+ }
+ } else {
+ String fmt = "%" + -groupLen + "s %s %-15s%" +
-maxMemberIdLen + "s %s %15s %" + -maxHostLen + "s %" + -maxClientIdLen + "s\n";
+ for (StreamsGroupMemberDescription member : members) {
+ System.out.printf(fmt, "GROUP",
"TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL",
"MEMBER-EPOCH", "PROCESS", "CLIENT-ID");
+ System.out.printf(fmt, description.groupId(),
description.targetAssignmentEpoch(), description.topologyEpoch(),
member.memberId(),
+ member.isClassic() ? "classic" : "streams",
member.memberEpoch(), member.processId(), member.clientId());
+ printTasks(member.assignment(), false);
+ printTasks(member.targetAssignment(), true);
+ System.out.println();
+ }
+ }
+ }
+ }
+
+ private void printTaskType(List<StreamsGroupMemberAssignment.TaskIds>
tasks, String taskType) {
+ System.out.printf("%s\n", taskType + ": " +
tasks.stream().map(taskId -> taskId.subtopologyId() + ": [" +
taskId.partitions()).collect(Collectors.joining(",")) + "] ");
+ }
+
+ private void printTasks(StreamsGroupMemberAssignment assignment,
boolean isTarget) {
+ String typePrefix = isTarget ? "TARGET-" : "";
+ printTaskType(assignment.activeTasks(), typePrefix +
"ACTIVE-TASKS:");
+ printTaskType(assignment.standbyTasks(), typePrefix +
"STANDBY-TASKS:");
+ printTaskType(assignment.warmupTasks(), typePrefix +
"WARMUP-TASKS:");
+ }
+
+ private void printStates(StreamsGroupDescription description, boolean
verbose) {
+ maybePrintEmptyGroupState(description.groupId(),
description.groupState(), 1);
+
+ int groupLen = Math.max(15, description.groupId().length());
+ String coordinator = description.coordinator().host() + ":" +
description.coordinator().port() + " (" + description.coordinator().idString()
+ ")";
+ int coordinatorLen = Math.max(25, coordinator.length());
+
+ if (!verbose) {
+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s
%-15s %s\n";
+ System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE",
"#MEMBERS");
+ System.out.printf(fmt, description.groupId(), coordinator,
description.groupState().toString(), description.members().size());
+ } else {
+ String fmt = "%" + -groupLen + "s %" + -coordinatorLen + "s
%-15s %-15s %-15s %s\n";
+ System.out.printf(fmt, "GROUP", "COORDINATOR (ID)", "STATE",
"GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
+ System.out.printf(fmt, description.groupId(), coordinator,
description.groupState().toString(), description.groupEpoch(),
description.targetAssignmentEpoch(), description.members().size());
+ }
+ }
+
+ private void printOffsets(StreamsGroupDescription description, boolean
verbose) throws ExecutionException, InterruptedException {
+ Map<TopicPartition, Long> offsets =
getOffsets(description.members(), description);
+ if (maybePrintEmptyGroupState(description.groupId(),
description.groupState(), offsets.size())) {
+ int groupLen = Math.max(15, description.groupId().length());
+ int maxTopicLen = 15;
+ for (TopicPartition topicPartition : offsets.keySet()) {
+ maxTopicLen = Math.max(maxTopicLen,
topicPartition.topic().length());
+ }
+
+ if (!verbose) {
+ String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) +
"s %-10s %s\n";
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"OFFSET-LAG");
+ for (Map.Entry<TopicPartition, Long> offset :
offsets.entrySet()) {
+ System.out.printf(fmt, description.groupId(),
offset.getKey().topic(), offset.getKey().partition(), offset.getValue());
+ }
+ } else {
+ String fmt = "%" + (-groupLen) + "s %" + (-maxTopicLen) +
"s %-10s %-15s %s\n";
+ System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION",
"LEADER-EPOCH", "OFFSET-LAG");
+ for (Map.Entry<TopicPartition, Long> offset :
offsets.entrySet()) {
+ System.out.printf(fmt, description.groupId(),
offset.getKey().topic(), offset.getKey().partition(), "", offset.getValue());
+ }
+ }
+ }
+ }
+
+ Map<TopicPartition, Long>
getOffsets(Collection<StreamsGroupMemberDescription> members,
StreamsGroupDescription description) throws ExecutionException,
InterruptedException {
+ Set<TopicPartition> allTp = new HashSet<>();
+ for (StreamsGroupMemberDescription memberDescription : members) {
+
allTp.addAll(getTopicPartitions(memberDescription.assignment().activeTasks(),
description));
+ }
+ // fetch latest and earliest offsets
+ Map<TopicPartition, OffsetSpec> earliest = new HashMap<>();
+ Map<TopicPartition, OffsetSpec> latest = new HashMap<>();
+
+ for (TopicPartition tp : allTp) {
+ earliest.put(tp, OffsetSpec.earliest());
+ latest.put(tp, OffsetSpec.latest());
+ }
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
earliestResult = adminClient.listOffsets(earliest).all().get();
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
latestResult = adminClient.listOffsets(latest).all().get();
+
+ Map<TopicPartition, Long> lag = new HashMap<>();
+ for (Map.Entry<TopicPartition,
ListOffsetsResult.ListOffsetsResultInfo> tp : earliestResult.entrySet()) {
+ lag.put(tp.getKey(), latestResult.get(tp.getKey()).offset() -
earliestResult.get(tp.getKey()).offset());
+ }
+ return lag;
+ }
+
+
+ /**
+ * Prints a summary of the state for situations where the group is
empty or dead.
+ *
+ * @return Whether the group detail should be printed
+ */
+ public static boolean maybePrintEmptyGroupState(String group,
GroupState state, int numRows) {
+ if (state == GroupState.DEAD) {
+ printError("Streams group '" + group + "' does not exist.",
Optional.empty());
+ } else if (state == GroupState.EMPTY) {
+ System.err.println("\nStreams group '" + group + "' has no
active members.");
+ }
+
+ return !state.equals(GroupState.DEAD) && numRows > 0;
+ }
+
+ private static Set<TopicPartition>
getTopicPartitions(List<StreamsGroupMemberAssignment.TaskIds> taskIds,
StreamsGroupDescription description) {
+ Map<String, List<String>> allSourceTopics = new HashMap<>();
+ for (StreamsGroupSubtopologyDescription subtopologyDescription :
description.subtopologies()) {
+ allSourceTopics.put(subtopologyDescription.subtopologyId(),
subtopologyDescription.sourceTopics());
+ }
+ Set<TopicPartition> topicPartitions = new HashSet<>();
+
+ for (StreamsGroupMemberAssignment.TaskIds task : taskIds) {
+ List<String> sourceTopics =
allSourceTopics.get(task.subtopologyId());
+ if (sourceTopics == null) {
+ throw new IllegalArgumentException("Subtopology " +
task.subtopologyId() + " not found in group description!");
+ }
+ for (String topic : sourceTopics) {
+ for (Integer partition : task.partitions()) {
+ topicPartitions.add(new TopicPartition(topic,
partition));
+ }
+ }
+ }
+ return topicPartitions;
+ }
+
public void close() {
adminClient.close();
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
index ced73b44bf1..c97e99e65a8 100644
---
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommandOptions.java
@@ -19,22 +19,46 @@ package org.apache.kafka.tools.streams;
import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
import joptsimple.OptionSpec;
public class StreamsGroupCommandOptions extends CommandDefaultOptions {
+ public static final Logger LOGGER =
LoggerFactory.getLogger(StreamsGroupCommandOptions.class);
+
public static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s)
to connect to.";
+ public static final String GROUP_DOC = "The streams group we wish to act
on.";
public static final String LIST_DOC = "List all streams groups.";
+ public static final String DESCRIBE_DOC = "Describe streams group and list
offset lag related to given group.";
public static final String TIMEOUT_MS_DOC = "The timeout that can be set
for some use cases. For example, it can be used when describing the group " +
"to specify the maximum amount of time in milliseconds to wait before
the group stabilizes.";
public static final String COMMAND_CONFIG_DOC = "Property file containing
configs to be passed to Admin Client.";
public static final String STATE_DOC = "When specified with '--list', it
displays the state of all groups. It can also be used to list groups with
specific states. " +
"Valid values are Empty, NotReady, Stable, Assigning, Reconciling, and
Dead.";
+ public static final String MEMBERS_DOC = "Describe members of the group.
This option may be used with the '--describe' option only.";
+ public static final String OFFSETS_DOC = "Describe the group and list all
topic partitions in the group along with their offset information." +
+ "This is the default sub-action and may be used with the '--describe'
option only.";
+ public static final String VERBOSE_DOC = """
+ Use with --describe --state to show group epoch and target assignment
epoch.
+ Use with --describe --members to show for each member the member
epoch, target assignment epoch, current assignment, target assignment, and
whether member is still using the classic rebalance protocol.
+ Use with --describe --offsets and --describe to show leader epochs
for each partition.""";
public final OptionSpec<String> bootstrapServerOpt;
+ public final OptionSpec<String> groupOpt;
public final OptionSpec<Void> listOpt;
+ public final OptionSpec<Void> describeOpt;
public final OptionSpec<Long> timeoutMsOpt;
public final OptionSpec<String> commandConfigOpt;
public final OptionSpec<String> stateOpt;
+ public final OptionSpec<Void> membersOpt;
+ public final OptionSpec<Void> offsetsOpt;
+ public final OptionSpec<Void> verboseOpt;
+
public StreamsGroupCommandOptions(String[] args) {
@@ -44,8 +68,14 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
+ groupOpt = parser.accepts("group", GROUP_DOC)
+ .withRequiredArg()
+ .describedAs("streams group")
+ .ofType(String.class);
listOpt = parser.accepts("list", LIST_DOC);
+ describeOpt = parser.accepts("describe", DESCRIBE_DOC);
timeoutMsOpt = parser.accepts("timeout", TIMEOUT_MS_DOC)
+ .availableIf(describeOpt)
.withRequiredArg()
.describedAs("timeout (ms)")
.ofType(Long.class)
@@ -55,16 +85,41 @@ public class StreamsGroupCommandOptions extends
CommandDefaultOptions {
.describedAs("command config property file")
.ofType(String.class);
stateOpt = parser.accepts("state", STATE_DOC)
- .availableIf(listOpt)
+ .availableIf(listOpt, describeOpt)
.withOptionalArg()
.ofType(String.class);
+ membersOpt = parser.accepts("members", MEMBERS_DOC)
+ .availableIf(describeOpt);
+ offsetsOpt = parser.accepts("offsets", OFFSETS_DOC)
+ .availableIf(describeOpt);
+ verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
+ .availableIf(describeOpt);
options = parser.parse(args);
}
public void checkArgs() {
- CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
list streams groups.");
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool helps to
list, or describe streams groups.");
CommandLineUtils.checkRequiredArgs(parser, options,
bootstrapServerOpt);
+
+ if (options.has(describeOpt)) {
+ if (!options.has(groupOpt))
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + describeOpt + " takes the option: " +
groupOpt);
+ List<OptionSpec<?>> mutuallyExclusiveOpts =
Arrays.asList(membersOpt, offsetsOpt, stateOpt);
+ if (mutuallyExclusiveOpts.stream().mapToInt(o -> options.has(o) ?
1 : 0).sum() > 1) {
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + describeOpt + " takes at most one of these
options: " +
mutuallyExclusiveOpts.stream().map(Object::toString).collect(Collectors.joining(",
")));
+ }
+ if (options.has(stateOpt) && options.valueOf(stateOpt) != null)
+ CommandLineUtils.printUsageAndExit(parser,
+ "Option " + describeOpt + " does not take a value for " +
stateOpt);
+ } else {
+ if (options.has(timeoutMsOpt))
+ LOGGER.debug("Option " + timeoutMsOpt + " is applicable only
when " + describeOpt + " is used.");
+ }
+
+ CommandLineUtils.checkInvalidArgs(parser, options, listOpt,
membersOpt, offsetsOpt);
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
index ff84df127aa..2a1c3047d85 100644
---
a/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/streams/StreamsGroupCommandUnitTest.java
@@ -17,21 +17,33 @@
package org.apache.kafka.tools.streams;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DescribeStreamsGroupsResult;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.clients.admin.ListGroupsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.StreamsGroupDescription;
+import org.apache.kafka.clients.admin.StreamsGroupMemberAssignment;
+import org.apache.kafka.clients.admin.StreamsGroupMemberDescription;
+import org.apache.kafka.clients.admin.StreamsGroupSubtopologyDescription;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -39,7 +51,9 @@ import java.util.Set;
import joptsimple.OptionException;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -56,8 +70,8 @@ public class StreamsGroupCommandUnitTest {
Admin adminClient = mock(KafkaAdminClient.class);
ListGroupsResult result = mock(ListGroupsResult.class);
when(result.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
- new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
- new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
+ new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))
)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(result);
StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(cgcArgs, adminClient);
@@ -88,14 +102,14 @@ public class StreamsGroupCommandUnitTest {
Admin adminClient = mock(KafkaAdminClient.class);
ListGroupsResult resultWithAllStates = mock(ListGroupsResult.class);
when(resultWithAllStates.all()).thenReturn(KafkaFuture.completedFuture(Arrays.asList(
- new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
- new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
+ new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))
)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithAllStates);
StreamsGroupCommand.StreamsGroupService service =
getStreamsGroupService(cgcArgs, adminClient);
Set<GroupListing> expectedListing = new HashSet<>(Arrays.asList(
- new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
- new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))));
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)),
+ new GroupListing(secondGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.EMPTY))));
final Set[] foundListing = new Set[]{Collections.emptySet()};
TestUtils.waitForCondition(() -> {
@@ -105,11 +119,11 @@ public class StreamsGroupCommandUnitTest {
ListGroupsResult resultWithStableState = mock(ListGroupsResult.class);
when(resultWithStableState.all()).thenReturn(KafkaFuture.completedFuture(Collections.singletonList(
- new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE))
)));
when(adminClient.listGroups(any(ListGroupsOptions.class))).thenReturn(resultWithStableState);
Set<GroupListing> expectedListingStable = Collections.singleton(
- new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)));
+ new GroupListing(firstGroup, Optional.of(GroupType.STREAMS),
"streams", Optional.of(GroupState.STABLE)));
foundListing[0] = Collections.emptySet();
@@ -120,6 +134,80 @@ public class StreamsGroupCommandUnitTest {
service.close();
}
+
+ @Test
+ public void testDescribeStreamsGroups() throws Exception {
+ String firstGroup = "group1";
+ Admin adminClient = mock(KafkaAdminClient.class);
+ DescribeStreamsGroupsResult result =
mock(DescribeStreamsGroupsResult.class);
+ Map<String, StreamsGroupDescription> resultMap = new HashMap<>();
+ StreamsGroupDescription exp = new StreamsGroupDescription(
+ firstGroup,
+ 0,
+ 0,
+ 0,
+ List.of(new StreamsGroupSubtopologyDescription("foo", List.of(),
List.of(), Map.of(), Map.of())),
+ List.of(),
+ GroupState.STABLE,
+ new Node(0, "bar", 0),
+ null);
+ resultMap.put(firstGroup, exp);
+
+ when(result.all()).thenReturn(KafkaFuture.completedFuture(resultMap));
+
when(adminClient.describeStreamsGroups(ArgumentMatchers.anyCollection())).thenReturn(result);
+ StreamsGroupCommand.StreamsGroupService service = new
StreamsGroupCommand.StreamsGroupService(null, adminClient);
+ assertEquals(exp, service.getDescribeGroup(firstGroup));
+ service.close();
+ }
+
+ @Test
+ public void testDescribeStreamsGroupsGetOffsets() throws Exception {
+ Admin adminClient = mock(KafkaAdminClient.class);
+
+ ListOffsetsResult startOffset = mock(ListOffsetsResult.class);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
startOffsetResultMap = new HashMap<>();
+ startOffsetResultMap.put(new TopicPartition("topic1", 0), new
ListOffsetsResult.ListOffsetsResultInfo(10, -1, Optional.empty()));
+
+ ListOffsetsResult endOffset = mock(ListOffsetsResult.class);
+ Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo>
endOffsetResultMap = new HashMap<>();
+ endOffsetResultMap.put(new TopicPartition("topic1", 0), new
ListOffsetsResult.ListOffsetsResultInfo(30, -1, Optional.empty()));
+
+
when(startOffset.all()).thenReturn(KafkaFuture.completedFuture(startOffsetResultMap));
+
when(endOffset.all()).thenReturn(KafkaFuture.completedFuture(endOffsetResultMap));
+
+
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
endOffset);
+
+ StreamsGroupMemberDescription description = new
StreamsGroupMemberDescription("foo", 0, Optional.empty(),
+ Optional.empty(), "bar", "baz", 0, "qux",
+ Optional.empty(), Map.of(), List.of(), List.of(),
+ new StreamsGroupMemberAssignment(List.of(), List.of(), List.of()),
new StreamsGroupMemberAssignment(List.of(), List.of(), List.of()),
+ false);
+ StreamsGroupDescription x = new StreamsGroupDescription(
+ "group1",
+ 0,
+ 0,
+ 0,
+ List.of(new StreamsGroupSubtopologyDescription("id",
List.of("topic1"), List.of(), Map.of(), Map.of())),
+ List.of(description),
+ GroupState.STABLE,
+ new Node(0, "host", 0),
+ null);
+ StreamsGroupCommand.StreamsGroupService service = new
StreamsGroupCommand.StreamsGroupService(null, adminClient);
+ Map<TopicPartition, Long> lags =
service.getOffsets(List.of(description), x);
+ assertEquals(1, lags.size());
+ assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
+ service.close();
+ }
+
+ @Test
+ public void testPrintEmptyGroupState() {
+
assertFalse(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
GroupState.EMPTY, 0));
+
assertFalse(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
GroupState.DEAD, 0));
+
assertFalse(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
GroupState.STABLE, 0));
+
assertTrue(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
GroupState.STABLE, 1));
+
assertTrue(StreamsGroupCommand.StreamsGroupService.maybePrintEmptyGroupState("group",
GroupState.UNKNOWN, 1));
+ }
+
@Test
public void testGroupStatesFromString() {
Set<GroupState> result =
StreamsGroupCommand.groupStatesFromString("empty");
@@ -163,6 +251,7 @@ public class StreamsGroupCommandUnitTest {
assertThrows(IllegalArgumentException.class, () ->
StreamsGroupCommand.groupStatesFromString(" , ,"));
}
+
StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[]
args, Admin adminClient) {
StreamsGroupCommandOptions opts = new StreamsGroupCommandOptions(args);
return new StreamsGroupCommand.StreamsGroupService(opts, adminClient);