This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 96816a3ed74 KAFKA-16742: Add share group describe in coordinator 
(KIP-932) (#16797)
96816a3ed74 is described below

commit 96816a3ed74857cc7631174b2c189ed8b1d9807c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Aug 7 10:23:01 2024 +0100

    KAFKA-16742: Add share group describe in coordinator (KIP-932) (#16797)
    
    Share group describe functionality for KIP-932
    
    Reviewers:  Andrew Schofield <[email protected]>,  Manikumar Reddy 
<[email protected]>
---
 .../coordinator/group/GroupCoordinatorService.java |  45 ++++++-
 .../coordinator/group/GroupCoordinatorShard.java   |  16 +++
 .../coordinator/group/GroupMetadataManager.java    |  32 +++++
 .../group/GroupCoordinatorServiceTest.java         | 140 ++++++++++++++++++++-
 .../group/GroupMetadataManagerTest.java            |  70 +++++++++++
 .../group/GroupMetadataManagerTestContext.java     |   5 +
 6 files changed, 305 insertions(+), 3 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index e132e85ec21..b7f44544b30 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -40,6 +40,7 @@ import 
org.apache.kafka.common.message.OffsetDeleteRequestData;
 import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import 
org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
@@ -54,6 +55,7 @@ import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
 import org.apache.kafka.common.requests.TransactionResult;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.utils.BufferSupplier;
@@ -628,8 +630,47 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
     public CompletableFuture<List<DescribedGroup>> shareGroupDescribe(
         RequestContext context,
         List<String> groupIds) {
-        // TODO: Implement this method as part of KIP-932.
-        throw new UnsupportedOperationException();
+        if (!isActive.get()) {
+            return 
CompletableFuture.completedFuture(ShareGroupDescribeRequest.getErrorDescribedGroupList(
+                groupIds,
+                Errors.COORDINATOR_NOT_AVAILABLE
+            ));
+        }
+
+        final 
List<CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            if (isGroupIdNotEmpty(groupId)) {
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartitionFor(groupId), __ -> new 
ArrayList<>())
+                    .add(groupId);
+            } else {
+                
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+                    new ShareGroupDescribeResponseData.DescribedGroup()
+                        .setGroupId(null)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                )));
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> future =
+                runtime.scheduleReadOperation(
+                    "share-group-describe",
+                    topicPartition,
+                    (coordinator, lastCommittedOffset) -> 
coordinator.shareGroupDescribe(groupIds, lastCommittedOffset)
+                ).exceptionally(exception -> handleOperationException(
+                    "share-group-describe",
+                    groupList,
+                    exception,
+                    (error, __) -> 
ShareGroupDescribeRequest.getErrorDescribedGroupList(groupList, error)
+                ));
+
+            futures.add(future);
+        });
+
+        return FutureUtils.combineFutures(futures, ArrayList::new, 
List::addAll);
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c633a4bddd6..8685480ed1e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -36,6 +36,7 @@ import 
org.apache.kafka.common.message.OffsetDeleteRequestData;
 import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -541,6 +542,21 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         return groupMetadataManager.consumerGroupDescribe(groupIds, 
committedOffset);
     }
 
+    /**
+     * Handles a ShareGroupDescribe request.
+     *
+     * @param groupIds      The IDs of the groups to describe.
+     *
+     * @return A list containing the 
ShareGroupDescribeResponseData.DescribedGroup.
+     *
+     */
+    public List<ShareGroupDescribeResponseData.DescribedGroup> 
shareGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        return groupMetadataManager.shareGroupDescribe(groupIds, 
committedOffset);
+    }
+
     /**
      * Handles a DescribeGroups request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ff96a98f28c..47e9d4c0642 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -645,6 +646,37 @@ public class GroupMetadataManager {
         return describedGroups;
     }
 
+    /**
+     * Handles a ShareGroupDescribe request.
+     *
+     * @param groupIds          The IDs of the groups to describe.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A list containing the 
ShareGroupDescribeResponseData.DescribedGroup.
+     */
+    public List<ShareGroupDescribeResponseData.DescribedGroup> 
shareGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        final List<ShareGroupDescribeResponseData.DescribedGroup> 
describedGroups = new ArrayList<>();
+        groupIds.forEach(groupId -> {
+            try {
+                describedGroups.add(shareGroup(groupId, 
committedOffset).asDescribedGroup(
+                    committedOffset,
+                    shareGroupAssignor.name(),
+                    metadataImage.topics()
+                ));
+            } catch (GroupIdNotFoundException exception) {
+                describedGroups.add(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                    .setGroupId(groupId)
+                    .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+                );
+            }
+        });
+
+        return describedGroups;
+    }
+
     /**
      * Handles a DescribeGroup request.
      *
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index c1fdc18e349..f3f9aa6c969 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.message.OffsetDeleteRequestData;
 import org.apache.kafka.common.message.OffsetDeleteResponseData;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
 import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -2198,4 +2199,141 @@ public class GroupCoordinatorServiceTest {
             future.get(5, TimeUnit.SECONDS)
         );
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testShareGroupDescribe() throws InterruptedException, 
ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        int partitionCount = 2;
+        service.startup(() -> partitionCount);
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup1 = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("share-group-id-1");
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup2 = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("share-group-id-2");
+        List<ShareGroupDescribeResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            describedGroup1,
+            describedGroup2
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+        CompletableFuture<Object> describedGroupFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        )).thenReturn(describedGroupFuture);
+
+        CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> 
future =
+            
service.shareGroupDescribe(requestContext(ApiKeys.SHARE_GROUP_DESCRIBE), 
Arrays.asList("share-group-id-1", "share-group-id-2"));
+
+        assertFalse(future.isDone());
+        
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+        assertEquals(expectedDescribedGroups, future.get());
+    }
+
+    @Test
+    public void testShareGroupDescribeInvalidGroupId() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        int partitionCount = 1;
+        service.startup(() -> partitionCount);
+
+        ShareGroupDescribeResponseData.DescribedGroup describedGroup = new 
ShareGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(null)
+            .setErrorCode(Errors.INVALID_GROUP_ID.code());
+        List<ShareGroupDescribeResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            new ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(null)
+                .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            describedGroup
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
+
+        CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> 
future =
+            
service.shareGroupDescribe(requestContext(ApiKeys.SHARE_GROUP_DESCRIBE), 
Arrays.asList("", null));
+
+        assertEquals(expectedDescribedGroups, future.get());
+    }
+
+    @Test
+    public void testShareGroupDescribeCoordinatorLoadInProgress() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        int partitionCount = 1;
+        service.startup(() -> partitionCount);
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> 
future =
+            
service.shareGroupDescribe(requestContext(ApiKeys.SHARE_GROUP_DESCRIBE), 
Collections.singletonList("share-group-id"));
+
+        assertEquals(
+            Collections.singletonList(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("share-group-id")
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+            ),
+            future.get()
+        );
+    }
+
+    @Test
+    public void testShareGroupDescribeCoordinatorNotActive() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime,
+            new GroupCoordinatorMetrics()
+        );
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("share-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            Errors.COORDINATOR_NOT_AVAILABLE.exception()
+        ));
+
+        CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> 
future =
+            
service.shareGroupDescribe(requestContext(ApiKeys.SHARE_GROUP_DESCRIBE), 
Collections.singletonList("share-group-id"));
+
+        assertEquals(
+            Collections.singletonList(new 
ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("share-group-id")
+                .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+            ),
+            future.get()
+        );
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 321b6d64324..29340f781a6 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -13506,6 +13507,75 @@ public class GroupMetadataManagerTest {
         assertEquals("RackId can't be empty.", ex.getMessage());
     }
 
+    @Test
+    public void testShareGroupDescribeRequest() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder().build();
+
+        // GroupId is not required
+        List<ShareGroupDescribeResponseData.DescribedGroup> groups = 
context.sendShareGroupDescribe(Collections.emptyList());
+        assertEquals(0, groups.size());
+
+        // Group id not found
+        groups = 
context.sendShareGroupDescribe(Collections.singletonList("unknown-group"));
+        assertEquals(1, groups.size());
+        assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), 
groups.get(0).errorCode());
+    }
+
+    @Test
+    public void testShareGroupDescribeNoErrors() {
+        MockPartitionAssignor assignor = new 
MockPartitionAssignor("share-range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withShareGroupAssignor(assignor)
+            .build();
+
+        assignor.prepareGroupAssignment(new GroupAssignment(
+            Collections.emptyMap()
+        ));
+
+        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+        
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupIds.get(0), 
100, GroupType.SHARE));
+        
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupIds.get(1), 
15, GroupType.SHARE));
+
+        CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> 
result = context.shareGroupHeartbeat(
+            new ShareGroupHeartbeatRequestData()
+                .setGroupId(groupIds.get(1))
+                .setMemberEpoch(0)
+                .setSubscribedTopicNames(Collections.singletonList("foo")));
+
+        // Verify that a member id was generated for the new member.
+        String memberId = result.response().memberId();
+        assertNotNull(memberId);
+        context.commit();
+
+        List<ShareGroupDescribeResponseData.DescribedGroup> expected = 
Arrays.asList(
+            new ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupEpoch(100)
+                .setGroupId(groupIds.get(0))
+                .setGroupState(ShareGroup.ShareGroupState.EMPTY.toString())
+                .setAssignorName("share-range"),
+            new ShareGroupDescribeResponseData.DescribedGroup()
+                .setGroupEpoch(16)
+                .setAssignmentEpoch(16)
+                .setGroupId(groupIds.get(1))
+                .setMembers(Collections.singletonList(
+                    new ShareGroupMember.Builder(memberId)
+                        .setMemberEpoch(16)
+                        .setClientId("client")
+                        .setClientHost("localhost/127.0.0.1")
+                        
.setSubscribedTopicNames(Collections.singletonList("foo"))
+                        .build()
+                        .asShareGroupDescribeMember(
+                            new MetadataImageBuilder().build().topics()
+                        )
+                ))
+                .setGroupState(ShareGroup.ShareGroupState.STABLE.toString())
+                .setAssignorName("share-range")
+        );
+        List<ShareGroupDescribeResponseData.DescribedGroup> actual = 
context.sendShareGroupDescribe(groupIds);
+
+        assertEquals(expected, actual);
+    }
+
     @Test
     public void testShareGroupMemberIdGeneration() {
         MockPartitionAssignor assignor = new MockPartitionAssignor("share");
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 192bca9951e..cef907da2c1 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.message.LeaveGroupRequestData;
 import org.apache.kafka.common.message.LeaveGroupResponseData;
 import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
 import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -1234,6 +1235,10 @@ public class GroupMetadataManagerTestContext {
         return groupMetadataManager.describeGroups(groupIds, 
lastCommittedOffset);
     }
 
+    public List<ShareGroupDescribeResponseData.DescribedGroup> 
sendShareGroupDescribe(List<String> groupIds) {
+        return groupMetadataManager.shareGroupDescribe(groupIds, 
lastCommittedOffset);
+    }
+
     public void verifyHeartbeat(
         String groupId,
         JoinGroupResponseData joinResponse,

Reply via email to