This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 79e853d68e1 KAFKA-18761: Complete listing of share group offsets [1/N] 
(#18894)
79e853d68e1 is described below

commit 79e853d68e1a9a8d81d731cbf3a4591aeb2c85dc
Author: Andrew Schofield <aschofi...@confluent.io>
AuthorDate: Fri Feb 14 18:55:20 2025 +0000

    KAFKA-18761: Complete listing of share group offsets [1/N] (#18894)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../tools/consumer/group/ShareGroupCommand.java    | 51 ++++++++----
 .../consumer/group/ShareGroupCommandOptions.java   | 11 ++-
 .../consumer/group/ShareGroupCommandTest.java      | 91 +++++++++++++++++-----
 3 files changed, 111 insertions(+), 42 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 967095040ee..e5b01a12d9e 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -55,6 +55,8 @@ import joptsimple.OptionException;
 
 public class ShareGroupCommand {
 
+    static final String MISSING_COLUMN_VALUE = "-";
+
     public static void main(String[] args) {
         ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args);
         try {
@@ -202,7 +204,7 @@ public class ShareGroupCommand {
             } else {
                 TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> offsets
                     = collectGroupsOffsets(groupIds);
-                printOffsets(offsets);
+                printOffsets(offsets, opts.options.has(opts.verboseOpt));
             }
         }
 
@@ -250,7 +252,7 @@ public class ShareGroupCommand {
                             groupId,
                             tp.getKey().topic(),
                             tp.getKey().partition(),
-                            earliestResult.get(tp.getKey())
+                            
Optional.ofNullable(earliestResult.get(tp.getKey()))
                         );
                         partitionOffsets.add(partitionOffsetInfo);
                     }
@@ -263,34 +265,53 @@ public class ShareGroupCommand {
             return groupOffsets;
         }
 
-        private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> offsets) {
+        private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, 
Collection<SharePartitionOffsetInformation>>> offsets, boolean verbose) {
             offsets.forEach((groupId, tuple) -> {
                 ShareGroupDescription description = tuple.getKey();
                 Collection<SharePartitionOffsetInformation> offsetsInfo = 
tuple.getValue();
                 if (maybePrintEmptyGroupState(groupId, 
description.groupState(), offsetsInfo.size())) {
-                    String fmt = printOffsetFormat(groupId, offsetsInfo);
-                    System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"START-OFFSET");
+                    String fmt = printOffsetFormat(groupId, offsetsInfo, 
verbose);
+
+                    if (verbose) {
+                        System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"LEADER-EPOCH", "START-OFFSET");
+                    } else {
+                        System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", 
"START-OFFSET");
+                    }
 
                     for (SharePartitionOffsetInformation info : offsetsInfo) {
-                        System.out.printf(fmt,
-                            groupId,
-                            info.topic,
-                            info.partition,
-                            info.offset
-                        );
+                        if (verbose) {
+                            System.out.printf(fmt,
+                                groupId,
+                                info.topic,
+                                info.partition,
+                                MISSING_COLUMN_VALUE, // Temporary
+                                
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+                            );
+                        } else {
+                            System.out.printf(fmt,
+                                groupId,
+                                info.topic,
+                                info.partition,
+                                
info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE)
+                            );
+                        }
                     }
                     System.out.println();
                 }
             });
         }
 
-        private static String printOffsetFormat(String groupId, 
Collection<SharePartitionOffsetInformation> offsetsInfo) {
+        private static String printOffsetFormat(String groupId, 
Collection<SharePartitionOffsetInformation> offsetsInfo, boolean verbose) {
             int groupLen = Math.max(15, groupId.length());
             int maxTopicLen = 15;
             for (SharePartitionOffsetInformation info : offsetsInfo) {
                 maxTopicLen = Math.max(maxTopicLen, info.topic.length());
             }
-            return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s";
+            if (verbose) {
+                return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%-13s %s";
+            } else {
+                return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s 
%s";
+            }
         }
 
         private void printStates(Map<String, ShareGroupDescription> 
descriptions, boolean verbose) {
@@ -380,13 +401,13 @@ public class ShareGroupCommand {
         final String group;
         final String topic;
         final int partition;
-        final long offset;
+        final Optional<Long> offset;
 
         SharePartitionOffsetInformation(
             String group,
             String topic,
             int partition,
-            long offset
+            Optional<Long> offset
         ) {
             this.group = group;
             this.topic = topic;
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index 2852725a5d1..f4500654dd7 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -37,7 +37,7 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
 
     private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The 
server(s) to connect to.";
     private static final String GROUP_DOC = "The share group we wish to act 
on.";
-    private static final String TOPIC_DOC = "The topic whose share group 
information should be deleted or topic whose should be included in the reset 
offset process. " +
+    private static final String TOPIC_DOC = "The topic whose offset 
information should be deleted or included in the reset offset process. " +
         "When resetting offsets, partitions can be specified using this 
format: 'topic1:0,1,2', where 0,1,2 are the partitions to be included.";
     private static final String ALL_TOPICS_DOC = "Consider all topics assigned 
to a share group in the 'reset-offsets' process.";
     private static final String LIST_DOC = "List all share groups.";
@@ -64,7 +64,7 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
         "When specified with '--list', it displays the state of all groups. It 
can also be used to list groups with specific states. " +
         "Valid values are Empty, Stable and Dead.";
     private static final String VERBOSE_DOC = "Provide additional information, 
if any, when describing the group. This option may be used " +
-        "with the '--describe --state' and '--describe --members' options 
only.";
+        "with the '--describe' option only.";
     private static final String DELETE_OFFSETS_DOC = "Delete offsets of share 
group. Supports one share group at the time, and multiple topics.";
 
     final OptionSpec<String> bootstrapServerOpt;
@@ -142,8 +142,7 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
             .withOptionalArg()
             .ofType(String.class);
         verboseOpt = parser.accepts("verbose", VERBOSE_DOC)
-            .availableIf(membersOpt, stateOpt)
-            .availableUnless(listOpt);
+            .availableIf(describeOpt);
 
         allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, 
allGroupsOpt));
         allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, 
describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt));
@@ -178,8 +177,8 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
                 CommandLineUtils.printUsageAndExit(parser,
                     "Option " + deleteOpt + " takes the option: " + groupOpt);
             if (options.has(topicOpt))
-                CommandLineUtils.printUsageAndExit(parser, "The consumer does 
not support topic-specific offset " +
-                    "deletion from a share group.");
+                CommandLineUtils.printUsageAndExit(parser,
+                    "Option " + deleteOpt + " does not take the option: " + 
topicOpt);
         }
 
         if (options.has(deleteOffsetsOpt)) {
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index bfb8c30f109..db58847d34a 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -47,6 +47,7 @@ import org.mockito.ArgumentMatchers;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -70,7 +71,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class ShareGroupCommandTest {
-    private static final List<List<String>> DESCRIBE_TYPE_OFFSETS = 
List.of(List.of(""), List.of("--offsets"));
+    private static final List<List<String>> DESCRIBE_TYPE_OFFSETS = 
List.of(List.of(""), List.of("--offsets"), List.of("--verbose"), 
List.of("--offsets", "--verbose"));
     private static final List<List<String>> DESCRIBE_TYPE_MEMBERS = 
List.of(List.of("--members"), List.of("--members", "--verbose"));
     private static final List<List<String>> DESCRIBE_TYPE_STATE = 
List.of(List.of("--state"), List.of("--state", "--verbose"));
     private static final List<List<String>> DESCRIBE_TYPES = 
Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, 
DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList();
@@ -178,7 +179,61 @@ public class ShareGroupCommandTest {
                         return false;
                     }
 
-                    List<String> expectedValues = List.of(firstGroup, 
"topic1", "0", "0");
+                    List<String> expectedValues;
+                    if (describeType.contains("--verbose")) {
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"-", "0");
+                    } else {
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"0");
+                    }
+                    return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
+                        
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
+                }, "Expected a data row and no error in describe results with 
describe type " + String.join(" ", describeType) + ".");
+            }
+        }
+    }
+
+    @Test
+    public void testDescribeOffsetsOfExistingGroupWithNulls() throws Exception 
{
+        String firstGroup = "group1";
+        String bootstrapServer = "localhost:9092";
+
+        for (List<String> describeType : DESCRIBE_TYPE_OFFSETS) {
+            List<String> cgcArgs = new 
ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", 
"--group", firstGroup));
+            cgcArgs.addAll(describeType);
+            Admin adminClient = mock(KafkaAdminClient.class);
+            DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
+            ShareGroupDescription exp = new ShareGroupDescription(
+                firstGroup,
+                List.of(new ShareMemberDescription("memid1", "clId1", "host1", 
new ShareMemberAssignment(
+                    Set.of(new TopicPartition("topic1", 0))
+                ), 0)),
+                GroupState.STABLE,
+                new Node(0, "host1", 9090), 0, 0);
+            // The null here indicates a topic-partition for which offset 
information could not be retrieved, typically due to an error
+            ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+                Map.of(
+                    firstGroup,
+                    KafkaFuture.completedFuture(Collections.singletonMap(new 
TopicPartition("topic1", 0), null))
+                )
+            );
+
+            
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, 
KafkaFuture.completedFuture(exp)));
+            
when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+            
when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(listShareGroupOffsetsResult);
+            try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+                TestUtils.waitForCondition(() -> {
+                    Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service));
+                    String[] lines = res.getKey().trim().split("\n");
+                    if (lines.length != 2 && !res.getValue().isEmpty()) {
+                        return false;
+                    }
+
+                    List<String> expectedValues;
+                    if (describeType.contains("--verbose")) {
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"-", "-");
+                    } else {
+                        expectedValues = List.of(firstGroup, "topic1", "0", 
"-");
+                    }
                     return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
                         
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
                 }, "Expected a data row and no error in describe results with 
describe type " + String.join(" ", describeType) + ".");
@@ -249,8 +304,14 @@ public class ShareGroupCommandTest {
                         return false;
                     }
 
-                    List<String> expectedValues1 = List.of(firstGroup, 
"topic1", "0", "0");
-                    List<String> expectedValues2 = List.of(secondGroup, 
"topic1", "0", "0");
+                    List<String> expectedValues1, expectedValues2;
+                    if (describeType.contains("--verbose")) {
+                        expectedValues1 = List.of(firstGroup, "topic1", "0", 
"-", "0");
+                        expectedValues2 = List.of(secondGroup, "topic1", "0", 
"-", "0");
+                    } else {
+                        expectedValues1 = List.of(firstGroup, "topic1", "0", 
"0");
+                        expectedValues2 = List.of(secondGroup, "topic1", "0", 
"0");
+                    }
                     return checkArgsHeaderOutput(cgcArgs, lines[0]) && 
checkArgsHeaderOutput(cgcArgs, lines[3]) &&
                         
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) &&
                         
Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedValues2);
@@ -488,20 +549,6 @@ public class ShareGroupCommandTest {
         }
     }
 
-    @Test
-    public void testDescribeShareGroupsInvalidVerboseOption() {
-        String bootstrapServer = "localhost:9092";
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--describe", "--group", "group1", "--verbose"};
-        assertThrows(OptionException.class, () -> 
getShareGroupService(cgcArgs, new MockAdminClient()));
-    }
-
-    @Test
-    public void testDescribeShareGroupsOffsetsInvalidVerboseOption() {
-        String bootstrapServer = "localhost:9092";
-        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--describe", "--group", "group1", "--offsets", "--verbose"};
-        assertThrows(OptionException.class, () -> 
getShareGroupService(cgcArgs, new MockAdminClient()));
-    }
-
     @Test
     public void testPrintEmptyGroupState() {
         assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", 
GroupState.EMPTY, 0));
@@ -564,11 +611,13 @@ public class ShareGroupCommandTest {
         }
 
         // --offsets or no arguments
-        return checkOffsetsArgsHeaderOutput(output);
+        return checkOffsetsArgsHeaderOutput(output, 
args.contains("--verbose"));
     }
 
-    private boolean checkOffsetsArgsHeaderOutput(String output) {
-        List<String> expectedKeys = List.of("GROUP", "TOPIC", "PARTITION", 
"START-OFFSET");
+    private boolean checkOffsetsArgsHeaderOutput(String output, boolean 
verbose) {
+        List<String> expectedKeys = verbose ?
+            List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", 
"START-OFFSET") :
+            List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET");
         return 
Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
     }
 

Reply via email to