This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6d16f687aa1 KAFKA-19662: Reset share group offsets for unsubscribed 
topics (#20708)
6d16f687aa1 is described below

commit 6d16f687aa1a0df26f2f665436b7efaf0aec0c56
Author: Andrew Schofield <[email protected]>
AuthorDate: Thu Oct 23 18:37:25 2025 +0100

    KAFKA-19662: Reset share group offsets for unsubscribed topics (#20708)
    
    This PR allows the kafka-share-groups.sh --reset-offsets tool to be used
    to set offsets for topics which are not currently subscribed in a share
    group. It also works if the share group does not yet exist. This brings
    the capability in line with the equivalent function in
    Kafka-consumer-groups.sh. The primary purpose is to allow offsets to be
    set before the share group is first used as a way of initialising in a
    known state.
    
    Reviewers: Jimmy Wang <[email protected]>,
     Kuan-Po Tseng <[email protected]>, Apoorv Mittal
     <[email protected]>
---
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 12 ++---
 .../coordinator/group/GroupCoordinatorShard.java   | 19 +------
 .../coordinator/group/GroupMetadataManager.java    | 59 ++++++++++++++++++----
 .../coordinator/group/modern/share/ShareGroup.java |  4 --
 .../tools/consumer/group/ShareGroupCommand.java    | 34 ++++++++++---
 .../consumer/group/ShareGroupCommandTest.java      | 46 ++++++++++++++++-
 6 files changed, 128 insertions(+), 46 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index d48ddaf58ad..a480a3b5ffb 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -2934,7 +2934,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     val testTopicName = "test_topic"
     val testGroupId = "test_group_id"
     val testClientId = "test_client_id"
-    val fakeGroupId = "fake_group_id"
+    val nonexistentGroupId = "nonexistent_group_id"
     val fakeTopicName = "foo"
 
     val tp1 = new TopicPartition(testTopicName, 0)
@@ -2968,12 +2968,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
         assertFutureThrows(classOf[GroupNotEmptyException], 
offsetAlterResult.partitionResult(tp1))
         assertFutureThrows(classOf[GroupNotEmptyException], 
offsetAlterResult.partitionResult(tp2))
 
-        // Test the fake group ID
-        val fakeAlterResult = client.alterShareGroupOffsets(fakeGroupId, 
util.Map.of(tp1, 0, tp2, 0))
+        // Test the non-existent group ID
+        val nonexistentAlterResult = 
client.alterShareGroupOffsets(nonexistentGroupId, util.Map.of(tp1, 0, tp2, 0))
 
-        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeAlterResult.all())
-        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeAlterResult.partitionResult(tp1))
-        assertFutureThrows(classOf[GroupIdNotFoundException], 
fakeAlterResult.partitionResult(tp2))
+        assertFutureThrows(classOf[UnknownTopicOrPartitionException], 
nonexistentAlterResult.all())
+        assertNull(nonexistentAlterResult.partitionResult(tp1).get())
+        assertFutureThrows(classOf[UnknownTopicOrPartitionException], 
nonexistentAlterResult.partitionResult(tp2))
       }
 
       // Test offset alter when group is empty
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c591f8d3767..3165d10d8f4 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -779,10 +779,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
     }
 
     /**
-     * Make the following checks to make sure the 
AlterShareGroupOffsetsRequest request is valid:
-     * 1. Checks whether the provided group is empty
-     * 2. Checks the requested topics are presented in the metadataImage
-     * 3. Checks the corresponding share partitions in 
AlterShareGroupOffsetsRequest are existing
+     * Alters the offsets for a share group.
      *
      * @param groupId - The group ID
      * @param alterShareGroupOffsetsRequestData - The request data for 
AlterShareGroupOffsetsRequestData
@@ -793,19 +790,7 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         String groupId,
         AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequestData
     ) {
-        List<CoordinatorRecord> records = new ArrayList<>();
-        ShareGroup group = groupMetadataManager.shareGroup(groupId);
-        group.validateOffsetsAlterable();
-
-        Map.Entry<AlterShareGroupOffsetsResponseData, 
InitializeShareGroupStateParameters> response = 
groupMetadataManager.completeAlterShareGroupOffsets(
-            groupId,
-            alterShareGroupOffsetsRequestData,
-            records
-        );
-        return new CoordinatorResult<>(
-            records,
-            response
-        );
+        return groupMetadataManager.alterShareGroupOffsets(groupId, 
alterShareGroupOffsetsRequestData.topics());
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 66f755148e9..09b459a27f7 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -25,6 +25,7 @@ import 
org.apache.kafka.common.errors.FencedInstanceIdException;
 import org.apache.kafka.common.errors.FencedMemberEpochException;
 import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupMaxSizeReachedException;
+import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.errors.IllegalGenerationException;
 import org.apache.kafka.common.errors.InconsistentGroupProtocolException;
 import org.apache.kafka.common.errors.InvalidRequestException;
@@ -1513,6 +1514,21 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Checks whether the share group is empty.
+     *
+     * @param group     The share group.
+     *
+     * @throws GroupNotEmptyException if the group is not empty.
+     */
+    private void throwIfShareGroupIsNotEmpty(
+        ShareGroup group
+    ) throws GroupNotEmptyException {
+        if (group.numMembers() > 0) {
+            throw new GroupNotEmptyException(Errors.NON_EMPTY_GROUP.message());
+        }
+    }
+
     /**
      * Validates the member epoch provided in the heartbeat request.
      *
@@ -8300,19 +8316,37 @@ public class GroupMetadataManager {
         return deleteShareGroupStateRequestTopicsData;
     }
 
-    public Map.Entry<AlterShareGroupOffsetsResponseData, 
InitializeShareGroupStateParameters> completeAlterShareGroupOffsets(
+    /**
+     * Handles an AlterShareGroupOffsets request.
+     *
+     * Make the following checks to make sure the 
AlterShareGroupOffsetsRequest request is valid:
+     * 1. Checks whether the provided group is empty
+     * 2. Checks the requested topics are presented in the metadataImage
+     * 3. Checks the corresponding share partitions in 
AlterShareGroupOffsetsRequest are existing
+     *
+     * @param groupId   The group id from the request.
+     * @param topics    The topic information for altering the share group's 
offsets from the request.
+     *
+     * @return A Result containing a pair of ShareGroupHeartbeat response and 
maybe InitializeShareGroupStateParameters
+     *         and a list of records to update the state machine.
+     */
+    public CoordinatorResult<Map.Entry<AlterShareGroupOffsetsResponseData, 
InitializeShareGroupStateParameters>, CoordinatorRecord> alterShareGroupOffsets(
         String groupId,
-        AlterShareGroupOffsetsRequestData alterShareGroupOffsetsRequest,
-        List<CoordinatorRecord> records
-    ) {
+        
AlterShareGroupOffsetsRequestData.AlterShareGroupOffsetsRequestTopicCollection 
topics
+    ) throws ApiException {
         final long currentTimeMs = time.milliseconds();
-        Group group = groups.get(groupId);
+        final List<CoordinatorRecord> records = new ArrayList<>();
+
+        // Get or create the share group. If the group exists, check that it's 
empty. If it is created, it is empty.
+        final ShareGroup group = getOrMaybeCreateShareGroup(groupId, true);
+        throwIfShareGroupIsNotEmpty(group);
+
         
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection
 alterShareGroupOffsetsResponseTopics = new 
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponseTopicCollection();
 
         Map<Uuid, InitMapValue> initializingTopics = new HashMap<>();
         Map<Uuid, Map<Integer, Long>> offsetByTopicPartitions = new 
HashMap<>();
 
-        alterShareGroupOffsetsRequest.topics().forEach(topic -> {
+        topics.forEach(topic -> {
             Optional<CoordinatorMetadataImage.TopicMetadata> topicMetadataOpt 
= metadataImage.topicMetadata(topic.topicName());
             if (topicMetadataOpt.isPresent()) {
                 var topicMetadata = topicMetadataOpt.get();
@@ -8366,10 +8400,13 @@ public class GroupMetadataManager {
         });
 
         addInitializingTopicsRecords(groupId, records, initializingTopics);
-        return Map.entry(
-            new AlterShareGroupOffsetsResponseData()
-                .setResponses(alterShareGroupOffsetsResponseTopics),
-            buildInitializeShareGroupState(groupId, ((ShareGroup) 
group).groupEpoch(), offsetByTopicPartitions)
+        return new CoordinatorResult<>(
+            records,
+            Map.entry(
+                new AlterShareGroupOffsetsResponseData()
+                    .setResponses(alterShareGroupOffsetsResponseTopics),
+                buildInitializeShareGroupState(groupId, group.groupEpoch(), 
offsetByTopicPartitions)
+            )
         );
     }
 
@@ -8432,7 +8469,7 @@ public class GroupMetadataManager {
         return new CoordinatorResult<>(records);
     }
 
-    /*
+    /**
      * Returns a list of {@link 
DeleteShareGroupOffsetsResponseData.DeleteShareGroupOffsetsResponseTopic} 
corresponding to the
      * topics for which persister delete share group state request was 
successful
      * @param groupId                    group ID of the share group
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
index efe2474abef..57c9cf5cb65 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/share/ShareGroup.java
@@ -245,10 +245,6 @@ public class ShareGroup extends 
ModernGroup<ShareGroupMember> {
         validateEmptyGroup();
     }
 
-    public void validateOffsetsAlterable() throws ApiException {
-        validateEmptyGroup();
-    }
-
     public void validateEmptyGroup() {
         if (state() != ShareGroupState.EMPTY) {
             throw Errors.NON_EMPTY_GROUP.exception();
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 50565dcc0ba..87cf0f1e837 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.errors.GroupNotEmptyException;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.utils.Utils;
@@ -385,10 +386,25 @@ public class ShareGroupCommand {
                 if 
(!(GroupState.EMPTY.equals(shareGroupDescription.groupState()) || 
GroupState.DEAD.equals(shareGroupDescription.groupState()))) {
                     CommandLineUtils.printErrorAndExit(String.format("Share 
group '%s' is not empty.", groupId));
                 }
-                Map<TopicPartition, OffsetAndMetadata> offsetsToReset = 
prepareOffsetsToReset(groupId);
-                if (offsetsToReset == null) {
-                    return;
+                resetOffsetsForInactiveGroup(groupId);
+            } catch (InterruptedException ie) {
+                throw new RuntimeException(ie);
+            } catch (ExecutionException ee) {
+                Throwable cause = ee.getCause();
+                if (cause instanceof GroupIdNotFoundException) {
+                    resetOffsetsForInactiveGroup(groupId);
+                } else if (cause instanceof KafkaException) {
+                    CommandLineUtils.printErrorAndExit(cause.getMessage());
+                } else {
+                    throw new RuntimeException(cause);
                 }
+            }
+        }
+
+        private void resetOffsetsForInactiveGroup(String groupId) {
+            try {
+                Collection<TopicPartition> partitionsToReset = 
getPartitionsToReset(groupId);
+                Map<TopicPartition, OffsetAndMetadata> offsetsToReset = 
prepareOffsetsToReset(partitionsToReset);
                 boolean dryRun = opts.options.has(opts.dryRunOpt) || 
!opts.options.has(opts.executeOpt);
                 if (!dryRun) {
                     adminClient.alterShareGroupOffsets(groupId,
@@ -404,24 +420,28 @@ public class ShareGroupCommand {
             } catch (ExecutionException ee) {
                 Throwable cause = ee.getCause();
                 if (cause instanceof KafkaException) {
-                    CommandLineUtils.printErrorAndExit(cause.getMessage());
+                    throw (KafkaException) cause;
                 } else {
                     throw new RuntimeException(cause);
                 }
             }
         }
 
-        protected Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(String groupId) throws ExecutionException, 
InterruptedException {
-            Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
-            Map<TopicPartition, OffsetAndMetadata> offsetsByTopicPartitions = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
+        private Collection<TopicPartition> getPartitionsToReset(String 
groupId) throws ExecutionException, InterruptedException {
             Collection<TopicPartition> partitionsToReset;
 
             if (opts.options.has(opts.topicOpt)) {
                 partitionsToReset = 
offsetsUtils.parseTopicPartitionsToReset(opts.options.valuesOf(opts.topicOpt));
             } else {
+                Map<String, ListShareGroupOffsetsSpec> groupSpecs = 
Map.of(groupId, new ListShareGroupOffsetsSpec());
+                Map<TopicPartition, OffsetAndMetadata> 
offsetsByTopicPartitions = 
adminClient.listShareGroupOffsets(groupSpecs).all().get().get(groupId);
                 partitionsToReset = offsetsByTopicPartitions.keySet();
             }
 
+            return partitionsToReset;
+        }
+
+        private Map<TopicPartition, OffsetAndMetadata> 
prepareOffsetsToReset(Collection<TopicPartition> partitionsToReset) {
             offsetsUtils.checkAllTopicPartitionsValid(partitionsToReset);
             if (opts.options.has(opts.resetToEarliestOpt)) {
                 return offsetsUtils.resetToEarliest(partitionsToReset);
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 3b1843b95cd..29fe151ba2b 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -1313,7 +1313,7 @@ public class ShareGroupCommandTest {
     }
 
     @Test
-    public void testAlterShareGroupOffsetsFailureWithNoneEmptyGroup() {
+    public void testAlterShareGroupOffsetsFailureWithNonEmptyGroup() {
         String group = "share-group";
         String topic = "topic";
         String bootstrapServer = "localhost:9092";
@@ -1418,6 +1418,50 @@ public class ShareGroupCommandTest {
         }
     }
 
+    @Test
+    public void testAlterShareGroupNonExistentGroupSuccess() {
+        String group = "share-group";
+        String topic = "none";
+        String bootstrapServer = "localhost:9092";
+        String[] cgcArgs = new String[]{"--bootstrap-server", bootstrapServer, 
"--reset-offsets", "--to-earliest", "--execute", "--topic", topic, "--group", 
group};
+        Admin adminClient = mock(KafkaAdminClient.class);
+
+        ListShareGroupOffsetsResult listShareGroupOffsetsResult = 
AdminClientTestUtils.createListShareGroupOffsetsResult(
+            Map.of(
+                group,
+                KafkaFuture.completedFuture(Map.of(new TopicPartition("topic", 
0), new OffsetAndMetadata(10L)))
+            )
+        );
+        
when(adminClient.listShareGroupOffsets(any())).thenReturn(listShareGroupOffsetsResult);
+
+        AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mockAlterShareGroupOffsets(adminClient, group);
+        TopicPartition tp0 = new TopicPartition(topic, 0);
+        Map<TopicPartition, OffsetAndMetadata> partitionOffsets = Map.of(tp0, 
new OffsetAndMetadata(0L));
+        ListOffsetsResult listOffsetsResult = 
AdminClientTestUtils.createListOffsetsResult(partitionOffsets);
+        when(adminClient.listOffsets(any(), 
any(ListOffsetsOptions.class))).thenReturn(listOffsetsResult);
+
+        KafkaFutureImpl<ShareGroupDescription> missingGroupFuture = new 
KafkaFutureImpl<>();
+        missingGroupFuture.completeExceptionally(new 
GroupIdNotFoundException("Group " + group + " not found."));
+        DescribeShareGroupsResult describeShareGroupsResult = 
mock(DescribeShareGroupsResult.class);
+        
when(describeShareGroupsResult.describedGroups()).thenReturn(Map.of(group, 
missingGroupFuture));
+        when(adminClient.describeShareGroups(any(), 
any(DescribeShareGroupsOptions.class))).thenReturn(describeShareGroupsResult);
+        Map<String, TopicDescription> descriptions = Map.of(
+            topic, new TopicDescription(topic, false, List.of(
+                new TopicPartitionInfo(0, Node.noNode(), List.of(), List.of())
+            )));
+        DescribeTopicsResult describeTopicResult = 
mock(DescribeTopicsResult.class);
+        
when(describeTopicResult.allTopicNames()).thenReturn(completedFuture(descriptions));
+        
when(adminClient.describeTopics(anyCollection())).thenReturn(describeTopicResult);
+        when(adminClient.describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class))).thenReturn(describeTopicResult);
+        try (ShareGroupService service = getShareGroupService(cgcArgs, 
adminClient)) {
+            service.resetOffsets();
+            verify(adminClient).alterShareGroupOffsets(eq(group), anyMap());
+            verify(adminClient).describeTopics(anyCollection(), 
any(DescribeTopicsOptions.class));
+            verify(alterShareGroupOffsetsResult, times(1)).all();
+            
verify(adminClient).describeShareGroups(ArgumentMatchers.anyCollection(), 
any(DescribeShareGroupsOptions.class));
+        }
+    }
+
     private AlterShareGroupOffsetsResult mockAlterShareGroupOffsets(Admin 
client, String groupId) {
         AlterShareGroupOffsetsResult alterShareGroupOffsetsResult = 
mock(AlterShareGroupOffsetsResult.class);
         KafkaFutureImpl<Void> resultFuture = new KafkaFutureImpl<>();

Reply via email to