This is an automated email from the ASF dual-hosted git repository. lucasbru pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 443c01ca80c MINOR: add repartitionSourceTopics to Streams group description (#19561) 443c01ca80c is described below commit 443c01ca80cbeecd8c1b4272742cd5b05f43568e Author: Alieh Saeedi <107070585+aliehsaee...@users.noreply.github.com> AuthorDate: Mon Apr 28 11:54:53 2025 +0200 MINOR: add repartitionSourceTopics to Streams group description (#19561) This is a follow-up of this #19433 This PR aims at adding the `repartition source topics` to the output of `--describe` for streams groups. Reviewers: Lucas Brutschy <lbruts...@confluent.io> --- .../java/org/apache/kafka/tools/streams/StreamsGroupCommand.java | 4 +++- .../org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java index bbf09c3f36a..1a8be4104aa 100644 --- a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java @@ -368,7 +368,9 @@ public class StreamsGroupCommand { private static Set<TopicPartition> getTopicPartitions(List<StreamsGroupMemberAssignment.TaskIds> taskIds, StreamsGroupDescription description) { Map<String, List<String>> allSourceTopics = new HashMap<>(); for (StreamsGroupSubtopologyDescription subtopologyDescription : description.subtopologies()) { - allSourceTopics.put(subtopologyDescription.subtopologyId(), subtopologyDescription.sourceTopics()); + List<String> topics = new ArrayList<>(subtopologyDescription.sourceTopics()); + topics.addAll(subtopologyDescription.repartitionSourceTopics().keySet()); + allSourceTopics.put(subtopologyDescription.subtopologyId(), topics); } Set<TopicPartition> topicPartitions = new HashSet<>(); diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java index 09efaebd0b1..6af3232dd4c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java @@ -95,7 +95,9 @@ public class DescribeStreamsGroupTest { final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "OFFSET-LAG"); final Set<List<String>> expectedRows = Set.of( List.of(APP_ID, INPUT_TOPIC, "0", "0"), - List.of(APP_ID, INPUT_TOPIC, "1", "0")); + List.of(APP_ID, INPUT_TOPIC, "1", "0"), + List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "0"), + List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "0")); validateDescribeOutput( Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe"), expectedHeader, expectedRows, List.of()); @@ -109,7 +111,9 @@ public class DescribeStreamsGroupTest { final List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG"); final Set<List<String>> expectedRows = Set.of( List.of(APP_ID, INPUT_TOPIC, "0", "-", "-", "0", "0"), - List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0")); + List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"), + List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "-", "-", "0", "0"), + List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "-", "-", "0", "0")); validateDescribeOutput( Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose"), expectedHeader, expectedRows, List.of());