This is an automated email from the ASF dual-hosted git repository.
manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 96816a3ed74 KAFKA-16742: Add share group describe in coordinator
(KIP-932) (#16797)
96816a3ed74 is described below
commit 96816a3ed74857cc7631174b2c189ed8b1d9807c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Aug 7 10:23:01 2024 +0100
KAFKA-16742: Add share group describe in coordinator (KIP-932) (#16797)
Share group describe functionality for KIP-932
Reviewers: Andrew Schofield <[email protected]>, Manikumar Reddy
<[email protected]>
---
.../coordinator/group/GroupCoordinatorService.java | 45 ++++++-
.../coordinator/group/GroupCoordinatorShard.java | 16 +++
.../coordinator/group/GroupMetadataManager.java | 32 +++++
.../group/GroupCoordinatorServiceTest.java | 140 ++++++++++++++++++++-
.../group/GroupMetadataManagerTest.java | 70 +++++++++++
.../group/GroupMetadataManagerTestContext.java | 5 +
6 files changed, 305 insertions(+), 3 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index e132e85ec21..b7f44544b30 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -40,6 +40,7 @@ import
org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import
org.apache.kafka.common.message.ShareGroupDescribeResponseData.DescribedGroup;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
@@ -54,6 +55,7 @@ import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.ShareGroupDescribeRequest;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.utils.BufferSupplier;
@@ -628,8 +630,47 @@ public class GroupCoordinatorService implements
GroupCoordinator {
public CompletableFuture<List<DescribedGroup>> shareGroupDescribe(
RequestContext context,
List<String> groupIds) {
- // TODO: Implement this method as part of KIP-932.
- throw new UnsupportedOperationException();
+ if (!isActive.get()) {
+ return
CompletableFuture.completedFuture(ShareGroupDescribeRequest.getErrorDescribedGroupList(
+ groupIds,
+ Errors.COORDINATOR_NOT_AVAILABLE
+ ));
+ }
+
+ final
List<CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>>
futures =
+ new ArrayList<>(groupIds.size());
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ if (isGroupIdNotEmpty(groupId)) {
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartitionFor(groupId), __ -> new
ArrayList<>())
+ .add(groupId);
+ } else {
+
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+ new ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ )));
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+
CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>> future =
+ runtime.scheduleReadOperation(
+ "share-group-describe",
+ topicPartition,
+ (coordinator, lastCommittedOffset) ->
coordinator.shareGroupDescribe(groupIds, lastCommittedOffset)
+ ).exceptionally(exception -> handleOperationException(
+ "share-group-describe",
+ groupList,
+ exception,
+ (error, __) ->
ShareGroupDescribeRequest.getErrorDescribedGroupList(groupList, error)
+ ));
+
+ futures.add(future);
+ });
+
+ return FutureUtils.combineFutures(futures, ArrayList::new,
List::addAll);
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index c633a4bddd6..8685480ed1e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -36,6 +36,7 @@ import
org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -541,6 +542,21 @@ public class GroupCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
return groupMetadataManager.consumerGroupDescribe(groupIds,
committedOffset);
}
+ /**
+ * Handles a ShareGroupDescribe request.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ *
+ * @return A list containing the
ShareGroupDescribeResponseData.DescribedGroup.
+ *
+ */
+ public List<ShareGroupDescribeResponseData.DescribedGroup>
shareGroupDescribe(
+ List<String> groupIds,
+ long committedOffset
+ ) {
+ return groupMetadataManager.shareGroupDescribe(groupIds,
committedOffset);
+ }
+
/**
* Handles a DescribeGroups request.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index ff96a98f28c..47e9d4c0642 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -49,6 +49,7 @@ import
org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -645,6 +646,37 @@ public class GroupMetadataManager {
return describedGroups;
}
+ /**
+ * Handles a ShareGroupDescribe request.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A list containing the
ShareGroupDescribeResponseData.DescribedGroup.
+ */
+ public List<ShareGroupDescribeResponseData.DescribedGroup>
shareGroupDescribe(
+ List<String> groupIds,
+ long committedOffset
+ ) {
+ final List<ShareGroupDescribeResponseData.DescribedGroup>
describedGroups = new ArrayList<>();
+ groupIds.forEach(groupId -> {
+ try {
+ describedGroups.add(shareGroup(groupId,
committedOffset).asDescribedGroup(
+ committedOffset,
+ shareGroupAssignor.name(),
+ metadataImage.topics()
+ ));
+ } catch (GroupIdNotFoundException exception) {
+ describedGroups.add(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(Errors.GROUP_ID_NOT_FOUND.code())
+ );
+ }
+ });
+
+ return describedGroups;
+ }
+
/**
* Handles a DescribeGroup request.
*
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index c1fdc18e349..f3f9aa6c969 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -49,6 +49,7 @@ import
org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -2198,4 +2199,141 @@ public class GroupCoordinatorServiceTest {
future.get(5, TimeUnit.SECONDS)
);
}
-}
\ No newline at end of file
+
+ @Test
+ public void testShareGroupDescribe() throws InterruptedException,
ExecutionException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ int partitionCount = 2;
+ service.startup(() -> partitionCount);
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup1 = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id-1");
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup2 = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id-2");
+ List<ShareGroupDescribeResponseData.DescribedGroup>
expectedDescribedGroups = Arrays.asList(
+ describedGroup1,
+ describedGroup2
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+ CompletableFuture<Object> describedGroupFuture = new
CompletableFuture<>();
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ ArgumentMatchers.any()
+ )).thenReturn(describedGroupFuture);
+
+ CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>
future =
+
service.shareGroupDescribe(requestContext(ApiKeys.SHARE_GROUP_DESCRIBE),
Arrays.asList("share-group-id-1", "share-group-id-2"));
+
+ assertFalse(future.isDone());
+
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+ assertEquals(expectedDescribedGroups, future.get());
+ }
+
+ @Test
+ public void testShareGroupDescribeInvalidGroupId() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ int partitionCount = 1;
+ service.startup(() -> partitionCount);
+
+ ShareGroupDescribeResponseData.DescribedGroup describedGroup = new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code());
+ List<ShareGroupDescribeResponseData.DescribedGroup>
expectedDescribedGroups = Arrays.asList(
+ new ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ describedGroup
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
+
+ CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>
future =
+
service.shareGroupDescribe(requestContext(ApiKeys.SHARE_GROUP_DESCRIBE),
Arrays.asList("", null));
+
+ assertEquals(expectedDescribedGroups, future.get());
+ }
+
+ @Test
+ public void testShareGroupDescribeCoordinatorLoadInProgress() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ int partitionCount = 1;
+ service.startup(() -> partitionCount);
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
+ ));
+
+ CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>
future =
+
service.shareGroupDescribe(requestContext(ApiKeys.SHARE_GROUP_DESCRIBE),
Collections.singletonList("share-group-id"));
+
+ assertEquals(
+ Collections.singletonList(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ ),
+ future.get()
+ );
+ }
+
+ @Test
+ public void testShareGroupDescribeCoordinatorNotActive() throws
ExecutionException, InterruptedException {
+ CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("share-group-describe"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ Errors.COORDINATOR_NOT_AVAILABLE.exception()
+ ));
+
+ CompletableFuture<List<ShareGroupDescribeResponseData.DescribedGroup>>
future =
+
service.shareGroupDescribe(requestContext(ApiKeys.SHARE_GROUP_DESCRIBE),
Collections.singletonList("share-group-id"));
+
+ assertEquals(
+ Collections.singletonList(new
ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupId("share-group-id")
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ ),
+ future.get()
+ );
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 321b6d64324..29340f781a6 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -50,6 +50,7 @@ import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -13506,6 +13507,75 @@ public class GroupMetadataManagerTest {
assertEquals("RackId can't be empty.", ex.getMessage());
}
+ @Test
+ public void testShareGroupDescribeRequest() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder().build();
+
+ // GroupId is not required
+ List<ShareGroupDescribeResponseData.DescribedGroup> groups =
context.sendShareGroupDescribe(Collections.emptyList());
+ assertEquals(0, groups.size());
+
+ // Group id not found
+ groups =
context.sendShareGroupDescribe(Collections.singletonList("unknown-group"));
+ assertEquals(1, groups.size());
+ assertEquals(Errors.GROUP_ID_NOT_FOUND.code(),
groups.get(0).errorCode());
+ }
+
+ @Test
+ public void testShareGroupDescribeNoErrors() {
+ MockPartitionAssignor assignor = new
MockPartitionAssignor("share-range");
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .withShareGroupAssignor(assignor)
+ .build();
+
+ assignor.prepareGroupAssignment(new GroupAssignment(
+ Collections.emptyMap()
+ ));
+
+ List<String> groupIds = Arrays.asList("group-id-1", "group-id-2");
+
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupIds.get(0),
100, GroupType.SHARE));
+
context.replay(CoordinatorRecordHelpers.newGroupEpochRecord(groupIds.get(1),
15, GroupType.SHARE));
+
+ CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord>
result = context.shareGroupHeartbeat(
+ new ShareGroupHeartbeatRequestData()
+ .setGroupId(groupIds.get(1))
+ .setMemberEpoch(0)
+ .setSubscribedTopicNames(Collections.singletonList("foo")));
+
+ // Verify that a member id was generated for the new member.
+ String memberId = result.response().memberId();
+ assertNotNull(memberId);
+ context.commit();
+
+ List<ShareGroupDescribeResponseData.DescribedGroup> expected =
Arrays.asList(
+ new ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupEpoch(100)
+ .setGroupId(groupIds.get(0))
+ .setGroupState(ShareGroup.ShareGroupState.EMPTY.toString())
+ .setAssignorName("share-range"),
+ new ShareGroupDescribeResponseData.DescribedGroup()
+ .setGroupEpoch(16)
+ .setAssignmentEpoch(16)
+ .setGroupId(groupIds.get(1))
+ .setMembers(Collections.singletonList(
+ new ShareGroupMember.Builder(memberId)
+ .setMemberEpoch(16)
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+
.setSubscribedTopicNames(Collections.singletonList("foo"))
+ .build()
+ .asShareGroupDescribeMember(
+ new MetadataImageBuilder().build().topics()
+ )
+ ))
+ .setGroupState(ShareGroup.ShareGroupState.STABLE.toString())
+ .setAssignorName("share-range")
+ );
+ List<ShareGroupDescribeResponseData.DescribedGroup> actual =
context.sendShareGroupDescribe(groupIds);
+
+ assertEquals(expected, actual);
+ }
+
@Test
public void testShareGroupMemberIdGeneration() {
MockPartitionAssignor assignor = new MockPartitionAssignor("share");
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
index 192bca9951e..cef907da2c1 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTestContext.java
@@ -33,6 +33,7 @@ import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
@@ -1234,6 +1235,10 @@ public class GroupMetadataManagerTestContext {
return groupMetadataManager.describeGroups(groupIds,
lastCommittedOffset);
}
+ public List<ShareGroupDescribeResponseData.DescribedGroup>
sendShareGroupDescribe(List<String> groupIds) {
+ return groupMetadataManager.shareGroupDescribe(groupIds,
lastCommittedOffset);
+ }
+
public void verifyHeartbeat(
String groupId,
JoinGroupResponseData joinResponse,