This is an automated email from the ASF dual-hosted git repository. schofielaj pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fa17bd44b30 KAFKA-16718-4/n: ShareGroupCommand changes for DeleteShareGroupOffsets admin call (#19587) fa17bd44b30 is described below commit fa17bd44b30702fdfa393ae8ce7ab6e3a1f884d8 Author: Chirag Wadhwa <cwad...@confluent.io> AuthorDate: Wed Apr 30 20:49:11 2025 +0530 KAFKA-16718-4/n: ShareGroupCommand changes for DeleteShareGroupOffsets admin call (#19587) This PR is the last in series to implement the DeleteShareGroupOffsets request. This PR includes the changes in ShareGroupCommand which internally calls the admin api to delete the offsets. Now, any enduser will be able to delete share group offsets for topics subscribed by a share group using kafka-share-groups.sh --delete-offsets command. Reviewers: Andrew Schofield <aschofi...@confluent.io> --- .../tools/consumer/group/ShareGroupCommand.java | 84 ++++++++- .../consumer/group/ShareGroupCommandOptions.java | 4 +- .../consumer/group/ShareGroupCommandTest.java | 205 +++++++++++++++++++++ 3 files changed, 290 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java index dcebff0d3ea..cfe3fee5812 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java @@ -19,6 +19,8 @@ package org.apache.kafka.tools.consumer.group; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.AbstractOptions; import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions; +import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult; import org.apache.kafka.clients.admin.DeleteShareGroupsOptions; import org.apache.kafka.clients.admin.DescribeShareGroupsOptions; import org.apache.kafka.clients.admin.GroupListing; @@ -33,6 +35,7 @@ import org.apache.kafka.common.GroupType; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupNotEmptyException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.server.util.CommandLineUtils; @@ -89,7 +92,7 @@ public class ShareGroupCommand { } else if (opts.options.has(opts.resetOffsetsOpt)) { throw new UnsupportedOperationException("--reset-offsets option is not yet implemented"); } else if (opts.options.has(opts.deleteOffsetsOpt)) { - throw new UnsupportedOperationException("--delete-offsets option is not yet implemented"); + shareGroupService.deleteOffsets(); } } catch (IllegalArgumentException e) { CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage()); @@ -285,6 +288,85 @@ public class ShareGroupCommand { return failed; } + void deleteOffsets() { + String groupId = opts.options.valueOf(opts.groupOpt); + List<String> topics = opts.options.valuesOf(opts.topicOpt); + + Entry<Throwable, Map<String, Throwable>> res = sendDeleteShareGroupOffsetsRequest(groupId, new HashSet<>(topics)); + + Throwable topLevelResult = res.getKey(); + Map<String, Throwable> topicLevelResult = res.getValue(); + + if (topLevelResult != null) { + Errors topLevelError = Errors.forException(topLevelResult); + switch (topLevelError) { + case INVALID_GROUP_ID: + case GROUP_ID_NOT_FOUND: + case GROUP_AUTHORIZATION_FAILED: + case NON_EMPTY_GROUP: + printError(topLevelResult.getMessage(), Optional.empty()); + break; + case TOPIC_AUTHORIZATION_FAILED: + case UNKNOWN_TOPIC_OR_PARTITION: + // These are expected topic-level errors which will be reported in the topic-level results + break; + default: + printError("Encounter some unknown error: " + topLevelResult, Optional.empty()); + } + } + + if (topicLevelResult != null && !topicLevelResult.isEmpty()) { + int maxTopicLen = 15; + for (String topic : topicLevelResult.keySet()) { + maxTopicLen = Math.max(maxTopicLen, topic.length()); + } + + String format = "%n%" + (-maxTopicLen) + "s %s"; + + System.out.printf(format, "TOPIC", "STATUS"); + topicLevelResult.entrySet().stream() + .sorted(Entry.comparingByKey()) + .forEach(e -> { + String topic = e.getKey(); + Throwable error = e.getValue(); + System.out.printf(format, + topic, + error != null ? "Error: " + error.getMessage() : "Successful" + ); + }); + } + + System.out.println(); + } + + Entry<Throwable, Map<String, Throwable>> sendDeleteShareGroupOffsetsRequest(String groupId, Set<String> topics) { + Map<String, Throwable> topicLevelResult = new HashMap<>(); + + DeleteShareGroupOffsetsResult deleteResult = adminClient.deleteShareGroupOffsets( + groupId, + new HashSet<>(topics), + withTimeoutMs(new DeleteShareGroupOffsetsOptions())); + + Throwable topLevelException = null; + + try { + deleteResult.all().get(); + } catch (ExecutionException | InterruptedException e) { + topLevelException = e.getCause(); + } + + topics.forEach(topic -> { + try { + deleteResult.topicResult(topic).get(); + topicLevelResult.put(topic, null); + } catch (ExecutionException | InterruptedException e) { + topicLevelResult.put(topic, e.getCause()); + } + }); + + return new SimpleImmutableEntry<>(topLevelException, topicLevelResult); + } + private <T extends AbstractOptions<T>> T withTimeoutMs(T options) { int t = opts.options.valueOf(opts.timeoutMsOpt).intValue(); return options.timeoutMs(t); diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java index 3ba0a707ee5..be99d2946a7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java @@ -145,7 +145,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { .availableIf(describeOpt); allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, allGroupsOpt)); - allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt)); + allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, describeOpt, deleteOpt, resetOffsetsOpt)); allResetOffsetScenarioOpts = new HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, resetToLatestOpt)); allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, topicOpt)); @@ -208,7 +208,7 @@ public class ShareGroupCommandOptions extends CommandDefaultOptions { } CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allGroupSelectionScopeOpts, groupOpt)); - CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allShareGroupLevelOpts, describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt)); + CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, minus(allShareGroupLevelOpts, describeOpt, deleteOpt, resetOffsetsOpt)); CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, minus(allShareGroupLevelOpts, deleteOpt, resetOffsetsOpt)); } } diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java index c3690d953ab..f1f91217511 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.tools.consumer.group; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.AdminClientTestUtils; +import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult; import org.apache.kafka.clients.admin.DeleteShareGroupsResult; import org.apache.kafka.clients.admin.DescribeShareGroupsOptions; import org.apache.kafka.clients.admin.DescribeShareGroupsResult; @@ -38,6 +39,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.GroupIdNotFoundException; import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.test.TestUtils; import org.apache.kafka.tools.ToolsTestUtils; @@ -77,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -612,6 +615,204 @@ public class ShareGroupCommandTest { assertThrows(IllegalArgumentException.class, () -> ShareGroupCommand.groupStatesFromString(" , ,")); } + @Test + public void testDeleteShareGroupOffsetsArgsWithoutTopic() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", "groupId"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupOffsetsArgsWithoutGroup() { + String bootstrapServer = "localhost:9092"; + Admin adminClient = mock(KafkaAdminClient.class); + + // no group spec args + String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, "--delete-offsets", "--topic", "t1"}; + AtomicBoolean exited = new AtomicBoolean(false); + Exit.setExitProcedure(((statusCode, message) -> { + assertNotEquals(0, statusCode); + assertTrue(message.contains("Option [delete-offsets] takes the following options: [topic], [group]")); + exited.set(true); + })); + try { + getShareGroupService(cgcArgs, adminClient); + } finally { + assertTrue(exited.get()); + } + } + + @Test + public void testDeleteShareGroupOffsets() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + when(result.all()).thenReturn(KafkaFuture.completedFuture(null)); + + when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null)); + when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null)); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 3 && !res.getValue().isEmpty()) { + return false; + } + + List<String> expectedResultHeader = List.of("TOPIC", "STATUS"); + List<String> expectedResultValues1 = List.of(firstTopic, "Successful"); + List<String> expectedResultValues2 = List.of(secondTopic, "Successful"); + + return Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1) && + Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2); + }, "Expected a data row and no error in delete offsets result with group: " + firstGroup + " and topic: " + firstTopic); + } + } + + @Test + public void testDeleteShareGroupOffsetsMultipleGroups() { + String firstGroup = "first-group"; + String secondGroup = "second-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--group", secondGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + service.deleteOffsets(); + fail("Expected error was not detected while trying delete offsets multiple groups"); + } catch (Exception e) { + String expectedErrorMessage = "Found multiple arguments for option group, but you asked for only one"; + assertEquals(expectedErrorMessage, e.getMessage()); + } + } + + @Test + public void testDeleteShareGroupOffsetsTopLevelError() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>(); + String errorMessage = "Group g3 not found."; + GroupIdNotFoundException exception = new GroupIdNotFoundException(errorMessage); + + resultFuture.completeExceptionally(exception); + when(result.all()).thenReturn(resultFuture); + + when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture); + when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 5 && !res.getValue().isEmpty()) { + return false; + } + + List<String> error = Stream.concat( + Stream.of("Error:"), + Arrays.stream(errorMessage.trim().split("\\s+")) + ).toList(); + + List<String> errorLine = new ArrayList<>(error); + List<String> expectedResultHeader = List.of("TOPIC", "STATUS"); + List<String> expectedResultValue1 = new ArrayList<>(); + expectedResultValue1.add(firstTopic); + expectedResultValue1.addAll(error); + List<String> expectedResultValue2 = new ArrayList<>(); + expectedResultValue2.add(secondTopic); + expectedResultValue2.addAll(error); + + return Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(errorLine) && + Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultHeader) && + Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(expectedResultValue1) && + Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedResultValue2); + }, "Expected a data row and no error in delete offsets result with group: " + firstGroup + " and topic: " + firstTopic); + } + } + + @Test + public void testDeleteShareGroupOffsetsTopicLevelError() throws Exception { + String firstGroup = "first-group"; + String firstTopic = "t1"; + String secondTopic = "t2"; + String bootstrapServer = "localhost:9092"; + + List<String> cgcArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, "--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", secondTopic)); + Admin adminClient = mock(KafkaAdminClient.class); + DeleteShareGroupOffsetsResult result = mock(DeleteShareGroupOffsetsResult.class); + + KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>(); + String errorMessage = Errors.UNKNOWN_TOPIC_OR_PARTITION.message(); + + resultFuture.completeExceptionally(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception()); + when(result.all()).thenReturn(resultFuture); + + when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null)); + when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture); + + when(adminClient.deleteShareGroupOffsets(any(), any(), any())).thenReturn(result); + + try (ShareGroupService service = getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) { + TestUtils.waitForCondition(() -> { + Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service)); + String[] lines = res.getKey().trim().split("\n"); + if (lines.length != 5 && !res.getValue().isEmpty()) { + return false; + } + + List<String> error = Stream.concat( + Stream.of("Error:"), + Arrays.stream(errorMessage.trim().split("\\s+")) + ).toList(); + + List<String> expectedResultHeader = List.of("TOPIC", "STATUS"); + List<String> expectedResultValue1 = List.of(firstTopic, "Successful"); + List<String> expectedResultValue2 = new ArrayList<>(); + expectedResultValue2.add(secondTopic); + expectedResultValue2.addAll(error); + + return Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader) && + Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValue1) && + Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValue2); + }, "Expected a data row and no error in delete offsets result with group: " + firstGroup + " and topic: " + firstTopic); + } + } + @Test public void testDeleteShareGroupsArgs() { String bootstrapServer = "localhost:9092"; @@ -873,6 +1074,10 @@ public class ShareGroupCommandTest { return () -> Assertions.assertDoesNotThrow(service::describeGroups); } + private Runnable deleteOffsets(ShareGroupCommand.ShareGroupService service) { + return () -> Assertions.assertDoesNotThrow(service::deleteOffsets); + } + private boolean checkArgsHeaderOutput(List<String> args, String output) { if (!output.contains("GROUP")) { return false;