This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fa17bd44b30 KAFKA-16718-4/n: ShareGroupCommand changes for 
DeleteShareGroupOffsets admin call (#19587)
fa17bd44b30 is described below

commit fa17bd44b30702fdfa393ae8ce7ab6e3a1f884d8
Author: Chirag Wadhwa <cwad...@confluent.io>
AuthorDate: Wed Apr 30 20:49:11 2025 +0530

    KAFKA-16718-4/n: ShareGroupCommand changes for DeleteShareGroupOffsets 
admin call (#19587)
    
    This PR is the last in series to implement the DeleteShareGroupOffsets
    request. This PR includes the changes in ShareGroupCommand which
    internally calls the admin api to delete the offsets. Now, any enduser
    will be able to delete share group offsets for topics subscribed by a
    share group using kafka-share-groups.sh --delete-offsets command.
    
    Reviewers: Andrew Schofield <aschofi...@confluent.io>
---
 .../tools/consumer/group/ShareGroupCommand.java    |  84 ++++++++-
 .../consumer/group/ShareGroupCommandOptions.java   |   4 +-
 .../consumer/group/ShareGroupCommandTest.java      | 205 +++++++++++++++++++++
 3 files changed, 290 insertions(+), 3 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index dcebff0d3ea..cfe3fee5812 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -19,6 +19,8 @@ package org.apache.kafka.tools.consumer.group;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.AbstractOptions;
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteShareGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.GroupListing;
@@ -33,6 +35,7 @@ import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.util.CommandLineUtils;
 
@@ -89,7 +92,7 @@ public class ShareGroupCommand {
             } else if (opts.options.has(opts.resetOffsetsOpt)) {
                 throw new UnsupportedOperationException("--reset-offsets 
option is not yet implemented");
             } else if (opts.options.has(opts.deleteOffsetsOpt)) {
-                throw new UnsupportedOperationException("--delete-offsets 
option is not yet implemented");
+                shareGroupService.deleteOffsets();
             }
         } catch (IllegalArgumentException e) {
             CommandLineUtils.printUsageAndExit(opts.parser, e.getMessage());
@@ -285,6 +288,85 @@ public class ShareGroupCommand {
             return failed;
         }
 
+        void deleteOffsets() {
+            String groupId = opts.options.valueOf(opts.groupOpt);
+            List<String> topics = opts.options.valuesOf(opts.topicOpt);
+
+            Entry<Throwable, Map<String, Throwable>> res = 
sendDeleteShareGroupOffsetsRequest(groupId, new HashSet<>(topics));
+
+            Throwable topLevelResult = res.getKey();
+            Map<String, Throwable> topicLevelResult = res.getValue();
+
+            if (topLevelResult != null) {
+                Errors topLevelError = Errors.forException(topLevelResult);
+                switch (topLevelError) {
+                    case INVALID_GROUP_ID:
+                    case GROUP_ID_NOT_FOUND:
+                    case GROUP_AUTHORIZATION_FAILED:
+                    case NON_EMPTY_GROUP:
+                        printError(topLevelResult.getMessage(), 
Optional.empty());
+                        break;
+                    case TOPIC_AUTHORIZATION_FAILED:
+                    case UNKNOWN_TOPIC_OR_PARTITION:
+                        // These are expected topic-level errors which will be 
reported in the topic-level results
+                        break;
+                    default:
+                        printError("Encounter some unknown error: " + 
topLevelResult, Optional.empty());
+                }
+            }
+
+            if (topicLevelResult != null && !topicLevelResult.isEmpty()) {
+                int maxTopicLen = 15;
+                for (String topic : topicLevelResult.keySet()) {
+                    maxTopicLen = Math.max(maxTopicLen, topic.length());
+                }
+
+                String format = "%n%" + (-maxTopicLen) + "s %s";
+
+                System.out.printf(format, "TOPIC", "STATUS");
+                topicLevelResult.entrySet().stream()
+                    .sorted(Entry.comparingByKey())
+                    .forEach(e -> {
+                        String topic = e.getKey();
+                        Throwable error = e.getValue();
+                        System.out.printf(format,
+                            topic,
+                            error != null ? "Error: " + error.getMessage() : 
"Successful"
+                        );
+                    });
+            }
+
+            System.out.println();
+        }
+
+        Entry<Throwable, Map<String, Throwable>> 
sendDeleteShareGroupOffsetsRequest(String groupId, Set<String> topics) {
+            Map<String, Throwable> topicLevelResult = new HashMap<>();
+
+            DeleteShareGroupOffsetsResult deleteResult = 
adminClient.deleteShareGroupOffsets(
+                groupId,
+                new HashSet<>(topics),
+                withTimeoutMs(new DeleteShareGroupOffsetsOptions()));
+
+            Throwable topLevelException = null;
+
+            try {
+                deleteResult.all().get();
+            } catch (ExecutionException | InterruptedException e) {
+                topLevelException = e.getCause();
+            }
+
+            topics.forEach(topic -> {
+                try {
+                    deleteResult.topicResult(topic).get();
+                    topicLevelResult.put(topic, null);
+                } catch (ExecutionException | InterruptedException e) {
+                    topicLevelResult.put(topic, e.getCause());
+                }
+            });
+
+            return new SimpleImmutableEntry<>(topLevelException, 
topicLevelResult);
+        }
+
         private <T extends AbstractOptions<T>> T withTimeoutMs(T options) {
             int t = opts.options.valueOf(opts.timeoutMsOpt).intValue();
             return options.timeoutMs(t);
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
index 3ba0a707ee5..be99d2946a7 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandOptions.java
@@ -145,7 +145,7 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
             .availableIf(describeOpt);
 
         allGroupSelectionScopeOpts = new HashSet<>(Arrays.asList(groupOpt, 
allGroupsOpt));
-        allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, 
describeOpt, deleteOpt, deleteOffsetsOpt, resetOffsetsOpt));
+        allShareGroupLevelOpts = new HashSet<>(Arrays.asList(listOpt, 
describeOpt, deleteOpt, resetOffsetsOpt));
         allResetOffsetScenarioOpts = new 
HashSet<>(Arrays.asList(resetToDatetimeOpt, resetToEarliestOpt, 
resetToLatestOpt));
         allDeleteOffsetsOpts = new HashSet<>(Arrays.asList(groupOpt, 
topicOpt));
 
@@ -208,7 +208,7 @@ public class ShareGroupCommandOptions extends 
CommandDefaultOptions {
         }
 
         CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
minus(allGroupSelectionScopeOpts, groupOpt));
-        CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
minus(allShareGroupLevelOpts, describeOpt, deleteOpt, deleteOffsetsOpt, 
resetOffsetsOpt));
+        CommandLineUtils.checkInvalidArgs(parser, options, groupOpt, 
minus(allShareGroupLevelOpts, describeOpt, deleteOpt, resetOffsetsOpt));
         CommandLineUtils.checkInvalidArgs(parser, options, topicOpt, 
minus(allShareGroupLevelOpts, deleteOpt, resetOffsetsOpt));
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index c3690d953ab..f1f91217511 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.tools.consumer.group;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsResult;
 import org.apache.kafka.clients.admin.DeleteShareGroupsResult;
 import org.apache.kafka.clients.admin.DescribeShareGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeShareGroupsResult;
@@ -38,6 +39,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ToolsTestUtils;
@@ -77,6 +79,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -612,6 +615,204 @@ public class ShareGroupCommandTest {
         assertThrows(IllegalArgumentException.class, () -> 
ShareGroupCommand.groupStatesFromString("   ,   ,"));
     }
 
+    @Test
+    public void testDeleteShareGroupOffsetsArgsWithoutTopic() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", "groupId"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] takes the 
following options: [topic], [group]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsArgsWithoutGroup() {
+        String bootstrapServer = "localhost:9092";
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        // no group spec args
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--topic", "t1"};
+        AtomicBoolean exited = new AtomicBoolean(false);
+        Exit.setExitProcedure(((statusCode, message) -> {
+            assertNotEquals(0, statusCode);
+            assertTrue(message.contains("Option [delete-offsets] takes the 
following options: [topic], [group]"));
+            exited.set(true);
+        }));
+        try {
+            getShareGroupService(cgcArgs, adminClient);
+        } finally {
+            assertTrue(exited.get());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsets() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        when(result.all()).thenReturn(KafkaFuture.completedFuture(null));
+
+        
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+        
when(result.topicResult(eq(secondTopic))).thenReturn(KafkaFuture.completedFuture(null));
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 3 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+                List<String> expectedResultValues1 = List.of(firstTopic, 
"Successful");
+                List<String> expectedResultValues2 = List.of(secondTopic, 
"Successful");
+
+                return 
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader)
 &&
+                    
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValues1)
 &&
+                    
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValues2);
+            }, "Expected a data row and no error in delete offsets result with 
group: " + firstGroup + " and topic: " + firstTopic);
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsMultipleGroups() {
+        String firstGroup = "first-group";
+        String secondGroup = "second-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--group", secondGroup, "--topic", 
firstTopic, "--topic", secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            service.deleteOffsets();
+            fail("Expected error was not detected while trying delete offsets 
multiple groups");
+        } catch (Exception e) {
+            String expectedErrorMessage = "Found multiple arguments for option 
group, but you asked for only one";
+            assertEquals(expectedErrorMessage, e.getMessage());
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsTopLevelError() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+        String errorMessage = "Group g3 not found.";
+        GroupIdNotFoundException exception = new 
GroupIdNotFoundException(errorMessage);
+
+        resultFuture.completeExceptionally(exception);
+        when(result.all()).thenReturn(resultFuture);
+
+        when(result.topicResult(eq(firstTopic))).thenReturn(resultFuture);
+        when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 5 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> error = Stream.concat(
+                    Stream.of("Error:"),
+                    Arrays.stream(errorMessage.trim().split("\\s+"))
+                    ).toList();
+
+                List<String> errorLine = new ArrayList<>(error);
+                List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+                List<String> expectedResultValue1 =  new ArrayList<>();
+                expectedResultValue1.add(firstTopic);
+                expectedResultValue1.addAll(error);
+                List<String> expectedResultValue2 =  new ArrayList<>();
+                expectedResultValue2.add(secondTopic);
+                expectedResultValue2.addAll(error);
+
+                return 
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(errorLine) &&
+                    
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultHeader)
 &&
+                    
Arrays.stream(lines[3].trim().split("\\s+")).toList().equals(expectedResultValue1)
 &&
+                    
Arrays.stream(lines[4].trim().split("\\s+")).toList().equals(expectedResultValue2);
+            }, "Expected a data row and no error in delete offsets result with 
group: " + firstGroup + " and topic: " + firstTopic);
+        }
+    }
+
+    @Test
+    public void testDeleteShareGroupOffsetsTopicLevelError() throws Exception {
+        String firstGroup = "first-group";
+        String firstTopic = "t1";
+        String secondTopic = "t2";
+        String bootstrapServer = "localhost:9092";
+
+        List<String> cgcArgs = new 
ArrayList<>(Arrays.asList("--bootstrap-server", bootstrapServer, 
"--delete-offsets", "--group", firstGroup, "--topic", firstTopic, "--topic", 
secondTopic));
+        Admin adminClient = mock(KafkaAdminClient.class);
+        DeleteShareGroupOffsetsResult result = 
mock(DeleteShareGroupOffsetsResult.class);
+
+        KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();
+        String errorMessage = Errors.UNKNOWN_TOPIC_OR_PARTITION.message();
+
+        
resultFuture.completeExceptionally(Errors.UNKNOWN_TOPIC_OR_PARTITION.exception());
+        when(result.all()).thenReturn(resultFuture);
+
+        
when(result.topicResult(eq(firstTopic))).thenReturn(KafkaFuture.completedFuture(null));
+        when(result.topicResult(eq(secondTopic))).thenReturn(resultFuture);
+
+        when(adminClient.deleteShareGroupOffsets(any(), any(), 
any())).thenReturn(result);
+
+        try (ShareGroupService service = 
getShareGroupService(cgcArgs.toArray(new String[0]), adminClient)) {
+            TestUtils.waitForCondition(() -> {
+                Entry<String, String> res = 
ToolsTestUtils.grabConsoleOutputAndError(deleteOffsets(service));
+                String[] lines = res.getKey().trim().split("\n");
+                if (lines.length != 5 && !res.getValue().isEmpty()) {
+                    return false;
+                }
+
+                List<String> error = Stream.concat(
+                    Stream.of("Error:"),
+                    Arrays.stream(errorMessage.trim().split("\\s+"))
+                ).toList();
+
+                List<String> expectedResultHeader = List.of("TOPIC", "STATUS");
+                List<String> expectedResultValue1 =  List.of(firstTopic, 
"Successful");
+                List<String> expectedResultValue2 =  new ArrayList<>();
+                expectedResultValue2.add(secondTopic);
+                expectedResultValue2.addAll(error);
+
+                return 
Arrays.stream(lines[0].trim().split("\\s+")).toList().equals(expectedResultHeader)
 &&
+                    
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedResultValue1)
 &&
+                    
Arrays.stream(lines[2].trim().split("\\s+")).toList().equals(expectedResultValue2);
+            }, "Expected a data row and no error in delete offsets result with 
group: " + firstGroup + " and topic: " + firstTopic);
+        }
+    }
+
     @Test
     public void testDeleteShareGroupsArgs() {
         String bootstrapServer = "localhost:9092";
@@ -873,6 +1074,10 @@ public class ShareGroupCommandTest {
         return () -> Assertions.assertDoesNotThrow(service::describeGroups);
     }
 
+    private Runnable deleteOffsets(ShareGroupCommand.ShareGroupService 
service) {
+        return () -> Assertions.assertDoesNotThrow(service::deleteOffsets);
+    }
+
     private boolean checkArgsHeaderOutput(List<String> args, String output) {
         if (!output.contains("GROUP")) {
             return false;

Reply via email to