This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 79e853d68e1 KAFKA-18761: Complete listing of share group offsets [1/N] (#18894) 79e853d68e1 is described below commit 79e853d68e1a9a8d81d731cbf3a4591aeb2c85dc Author: Andrew Schofield <aschofi...@confluent.io> AuthorDate: Fri Feb 14 18:55:20 2025 +0000 KAFKA-18761: Complete listing of share group offsets [1/N] (#18894) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../tools/consumer/group/ShareGroupCommand.java | 51 ++++++++---- .../consumer/group/ShareGroupCommandOptions.java | 11 ++- .../consumer/group/ShareGroupCommandTest.java | 91 +++++++++++++++++----- 3 files changed, 111 insertions(+), 42 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index 967095040ee..e5b01a12d9e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -55,6 +55,8 @@ import joptsimple.OptionException; public class ShareGroupCommand { + static final String MISSING_COLUMN_VALUE = "-"; + public static void main(String[] args) { ShareGroupCommandOptions opts = new ShareGroupCommandOptions(args); try { @@ -202,7 +204,7 @@ public class ShareGroupCommand { } else { TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> offsets = collectGroupsOffsets(groupIds); - printOffsets(offsets); + printOffsets(offsets, opts.options.has(opts.verboseOpt)); } } @@ -250,7 +252,7 @@ public class ShareGroupCommand { groupId, tp.getKey().topic(), tp.getKey().partition(), - earliestResult.get(tp.getKey()) + Optional.ofNullable(earliestResult.get(tp.getKey())) ); partitionOffsets.add(partitionOffsetInfo); } @@ -263,34 +265,53 @@ public class ShareGroupCommand { return groupOffsets; } - private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> offsets) { + private void printOffsets(TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> offsets, boolean verbose) { offsets.forEach((groupId, tuple) -> { ShareGroupDescription description = tuple.getKey(); Collection<SharePartitionOffsetInformation> offsetsInfo = tuple.getValue(); if (maybePrintEmptyGroupState(groupId, description.groupState(), offsetsInfo.size())) { - String fmt = printOffsetFormat(groupId, offsetsInfo); - System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET"); + String fmt = printOffsetFormat(groupId, offsetsInfo, verbose); + + if (verbose) { + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET"); + } else { + System.out.printf(fmt, "GROUP", "TOPIC", "PARTITION", "START-OFFSET"); + } for (SharePartitionOffsetInformation info : offsetsInfo) { - System.out.printf(fmt, - groupId, - info.topic, - info.partition, - info.offset - ); + if (verbose) { + System.out.printf(fmt, + groupId, + info.topic, + info.partition, + MISSING_COLUMN_VALUE, // Temporary + info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE) + ); + } else { + System.out.printf(fmt, + groupId, + info.topic, + info.partition, + info.offset.map(Object::toString).orElse(MISSING_COLUMN_VALUE) + ); + } } System.out.println(); } }); } - private static String printOffsetFormat(String groupId, Collection<SharePartitionOffsetInformation> offsetsInfo) { + private static String printOffsetFormat(String groupId, Collection<SharePartitionOffsetInformation> offsetsInfo, boolean verbose) { int groupLen = Math.max(15, groupId.length()); int maxTopicLen = 15; for (SharePartitionOffsetInformation info : offsetsInfo) { maxTopicLen = Math.max(maxTopicLen, info.topic.length()); } - return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s"; + if (verbose) { + return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %-13s %s"; + } else { + return "\n%" + (-groupLen) + "s %" + (-maxTopicLen) + "s %-10s %s"; + } } private void printStates(Map<String, ShareGroupDescription> descriptions, boolean verbose) { @@ -380,13 +401,13 @@ public class ShareGroupCommand { final String group; final String topic; final int partition; - final long offset; + final Optional<Long> offset; SharePartitionOffsetInformation( String group, String topic, int partition, - long offset + Optional<Long> offset ) { this.group = group; this.topic = topic; diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java index 2852725a5d1..f4500654dd7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -37,7 +37,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { private static final String BOOTSTRAP_SERVER_DOC = "REQUIRED: The server(s) to connect to."; private static final String GROUP_DOC = "The share group we wish to act on."; - private static final String TOPIC_DOC = "The topic whose share group information should be deleted or topic whose should be included in the reset offset process. " + + private static final String TOPIC_DOC = "The topic whose offset information should be deleted or included in the reset offset process. " + "When resetting offsets, partitions can be specified using this format: 'topic1:0,1,2', where 0,1,2 are the partitions to be included."; private static final String ALL_TOPICS_DOC = "Consider all topics assigned to a share group in the 'reset-offsets' process."; private static final String LIST_DOC = "List all share groups."; @@ -64,7 +64,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { "When specified with '--list', it displays the state of all groups. It can also be used to list groups with specific states. " + "Valid values are Empty, Stable and Dead."; private static final String VERBOSE_DOC = "Provide additional information, if any, when describing the group. This option may be used " + - "with the '--describe --state' and '--describe --members' options only."; + "with the '--describe' option only."; private static final String DELETE_OFFSETS_DOC = "Delete offsets of share group. Supports one share group at the time, and multiple topics."; final OptionSpec<String> bootstrapServerOpt; @@ -142,8 +142,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { .withOptionalArg() .ofType(String.class); verboseOpt = parser.accepts("verbose", VERBOSE_DOC) - .availableIf(membersOpt, stateOpt) - .availableUnless(listOpt); + .availableIf(describeOpt); allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt)); allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt)); @@ -178,8 +177,8 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { CommandLineUtils.printUsageAndExit(parser, "Option " + deleteOpt + " takes the option: " + groupOpt); if (options.has(topicOpt)) - CommandLineUtils.printUsageAndExit(parser, "The consumer does not support topic-specific offset " + - "deletion from a share group."); + CommandLineUtils.printUsageAndExit(parser, + "Option " + deleteOpt + " does not take the option: " + topicOpt); } if (options.has(deleteOffsetsOpt)) { diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index bfb8c30f109..db58847d34a 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -47,6 +47,7 @@ import org.mockito.ArgumentMatchers; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -70,7 +71,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class ShareGroupCommandTest { - private static final List<List<String>> DESCRIBE_TYPE_OFFSETS = List.of(List.of(""), List.of("--offsets")); + private static final List<List<String>> DESCRIBE_TYPE_OFFSETS = List.of(List.of(""), List.of("--offsets"), List.of("--verbose"), List.of("--offsets", "--verbose")); private static final List<List<String>> DESCRIBE_TYPE_MEMBERS = List.of(List.of("--members"), List.of("--members", "--verbose")); private static final List<List<String>> DESCRIBE_TYPE_STATE = List.of(List.of("--state"), List.of("--state", "--verbose")); private static final List<List<String>> DESCRIBE_TYPES = Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList(); @@ -178,7 +179,61 @@ public class ShareGroupCommandTest { return false; } - List<String> expectedValues = List.of(firstGroup, "topic1", "0", "0"); + List<String> expectedValues; + if (describeType.contains("--verbose")) { + expectedValues = List.of(firstGroup, "topic1", "0", "-", "0"); + } else { + expectedValues = List.of(firstGroup, "topic1", "0", "0"); + } + return checkArgsHeaderOutput(cgcArgs, lines[0]) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues); + }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); + } + } + } + + @Test + public void testDescribeOffsetsOfExistingGroupWithNulls() throws Exception { + String firstGroup = "group1"; + String bootstrapServer = "localhost:9092"; + + for (List<String> describeType : DESCRIBE_TYPE_OFFSETS) { + List<String> cgcArgs = new ArrayList<>(List.of("--bootstrap-server", bootstrapServer, "--describe", "--group", firstGroup)); + cgcArgs.addAll(describeType); + Admin adminClient = mock(KafkaAdminClient.class); + DescribeShareGroupsResult describeShareGroupsResult = mock(DescribeShareGroupsResult.class); + ShareGroupDescription exp = new ShareGroupDescription( + firstGroup, + List.of(new ShareMemberDescription("memid1", "clId1", "host1", new ShareMemberAssignment( + Set.of(new TopicPartition("topic1", 0)) + ), 0)), + GroupState.STABLE, + new Node(0, "host1", 9090), 0, 0); + // The null here indicates a topic-partition for which offset information could not be retrieved, typically due to an error + ListShareGroupOffsetsResult listShareGroupOffsetsResult = AdminClientTestUtils.createListShareGroupOffsetsResult( + Map.of( + firstGroup, + KafkaFuture.completedFuture(Collections.singletonMap(new TopicPartition("topic1", 0), null)) + ) + ); + + when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(firstGroup, KafkaFuture.completedFuture(exp))); + when(adminClient.describeShareGroups(ArgumentMatchers.anyCollection(), any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult); + when(adminClient.listShareGroupOffsets(ArgumentMatchers.anyMap())).thenReturn(listShareGroupOffsetsResult); + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(describeGroups(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 2 && !res.getValue().isEmpty()) { + return false; + } + + List<String> expectedValues; + if (describeType.contains("--verbose")) { + expectedValues = List.of(firstGroup, "topic1", "0", "-", "-"); + } else { + expectedValues = List.of(firstGroup, "topic1", "0", "-"); + } return checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues); }, "Expected a data row and no error in describe results with describe type " + String.join(" ", describeType) + "."); @@ -249,8 +304,14 @@ public class ShareGroupCommandTest { return false; } - List<String> expectedValues1 = List.of(firstGroup, "topic1", "0", "0"); - List<String> expectedValues2 = List.of(secondGroup, "topic1", "0", "0"); + List<String> expectedValues1, expectedValues2; + if (describeType.contains("--verbose")) { + expectedValues1 = List.of(firstGroup, "topic1", "0", "-", "0"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "-", "0"); + } else { + expectedValues1 = List.of(firstGroup, "topic1", "0", "0"); + expectedValues2 = List.of(secondGroup, "topic1", "0", "0"); + } return checkArgsHeaderOutput(cgcArgs, lines[0]) && checkArgsHeaderOutput(cgcArgs, lines[3]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) && Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedValues2); @@ -488,20 +549,6 @@ public class ShareGroupCommandTest { } } - @Test - public void testDescribeShareGroupsInvalidVerboseOption() { - String bootstrapServer = "localhost:9092"; - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--describe", "--group", "group1", "--verbose"}; - assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient())); - } - - @Test - public void testDescribeShareGroupsOffsetsInvalidVerboseOption() { - String bootstrapServer = "localhost:9092"; - String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--describe", "--group", "group1", "--offsets", "--verbose"}; - assertThrows(OptionException.class, () -> getShareGroupService(cgcArgs, new MockAdminClient())); - } - @Test public void testPrintEmptyGroupState() { assertFalse(ShareGroupService.maybePrintEmptyGroupState("group", GroupState.EMPTY, 0)); @@ -564,11 +611,13 @@ public class ShareGroupCommandTest { } // --offsets or no arguments - return checkOffsetsArgsHeaderOutput(output); + return checkOffsetsArgsHeaderOutput(output, args.contains("--verbose")); } - private boolean checkOffsetsArgsHeaderOutput(String output) { - List<String> expectedKeys = List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET"); + private boolean checkOffsetsArgsHeaderOutput(String output, boolean verbose) { + List<String> expectedKeys = verbose ? + List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "START-OFFSET") : + List.of("GROUP", "TOPIC", "PARTITION", "START-OFFSET"); return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys); }