This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 443c01ca80c MINOR: add repartitionSourceTopics to Streams group 
description (#19561)
443c01ca80c is described below

commit 443c01ca80cbeecd8c1b4272742cd5b05f43568e
Author: Alieh Saeedi <107070585+aliehsaee...@users.noreply.github.com>
AuthorDate: Mon Apr 28 11:54:53 2025 +0200

    MINOR: add repartitionSourceTopics to Streams group description (#19561)
    
    This is a follow-up of this #19433  This PR aims at adding the
    `repartition source topics` to the output of `--describe` for streams
    groups.
    
    Reviewers: Lucas Brutschy <lbruts...@confluent.io>
---
 .../java/org/apache/kafka/tools/streams/StreamsGroupCommand.java  | 4 +++-
 .../org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java  | 8 ++++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
index bbf09c3f36a..1a8be4104aa 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/streams/StreamsGroupCommand.java
@@ -368,7 +368,9 @@ public class StreamsGroupCommand {
         private static Set<TopicPartition> 
getTopicPartitions(List<StreamsGroupMemberAssignment.TaskIds> taskIds, 
StreamsGroupDescription description) {
             Map<String, List<String>> allSourceTopics = new HashMap<>();
             for (StreamsGroupSubtopologyDescription subtopologyDescription : 
description.subtopologies()) {
-                allSourceTopics.put(subtopologyDescription.subtopologyId(), 
subtopologyDescription.sourceTopics());
+                List<String> topics = new 
ArrayList<>(subtopologyDescription.sourceTopics());
+                
topics.addAll(subtopologyDescription.repartitionSourceTopics().keySet());
+                allSourceTopics.put(subtopologyDescription.subtopologyId(), 
topics);
             }
             Set<TopicPartition> topicPartitions = new HashSet<>();
 
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
index 09efaebd0b1..6af3232dd4c 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java
@@ -95,7 +95,9 @@ public class DescribeStreamsGroupTest {
         final List<String> expectedHeader = List.of("GROUP", "TOPIC", 
"PARTITION", "OFFSET-LAG");
         final Set<List<String>> expectedRows = Set.of(
             List.of(APP_ID, INPUT_TOPIC, "0", "0"),
-            List.of(APP_ID, INPUT_TOPIC, "1", "0"));
+            List.of(APP_ID, INPUT_TOPIC, "1", "0"),
+            List.of(APP_ID, 
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
 "0", "0"),
+            List.of(APP_ID, 
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
 "1", "0"));
 
         validateDescribeOutput(
             Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe"), expectedHeader, expectedRows, List.of());
@@ -109,7 +111,9 @@ public class DescribeStreamsGroupTest {
         final List<String> expectedHeader = List.of("GROUP", "TOPIC", 
"PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
         final Set<List<String>> expectedRows = Set.of(
             List.of(APP_ID, INPUT_TOPIC, "0", "-", "-", "0", "0"),
-            List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"));
+            List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"),
+            List.of(APP_ID, 
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
 "0", "-", "-", "0", "0"),
+            List.of(APP_ID, 
"streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition",
 "1", "-", "-", "0", "0"));
 
         validateDescribeOutput(
             Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), 
"--describe", "--verbose"), expectedHeader, expectedRows, List.of());

Reply via email to