This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a6e6c7d8c2 KAFKA-14504: Implement DescribeGroups API (#14462)
4a6e6c7d8c2 is described below
commit 4a6e6c7d8c2d2226046ca88027f2cc44bbea3bf6
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Fri Oct 6 05:25:17 2023 -0400
KAFKA-14504: Implement DescribeGroups API (#14462)
This patch implements DescribeGroups API in the new group coordinator.
Reviewers: David Jacot <[email protected]>
---
.../common/requests/DescribeGroupsRequest.java | 14 +++
.../common/requests/DescribeGroupsRequestTest.java | 52 +++++++++
.../coordinator/group/GroupCoordinatorService.java | 52 ++++++++-
.../coordinator/group/GroupCoordinatorShard.java | 18 +++
.../coordinator/group/GroupMetadataManager.java | 79 +++++++++++++
.../group/generic/GenericGroupMember.java | 20 ++++
.../group/GroupCoordinatorServiceTest.java | 105 +++++++++++++++++
.../group/GroupMetadataManagerTest.java | 126 +++++++++++++++++++++
.../group/generic/GenericGroupMemberTest.java | 62 ++++++++++
9 files changed, 524 insertions(+), 4 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index 25afb432cb7..3a4b9e38b02 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
public class DescribeGroupsRequest extends AbstractRequest {
public static class Builder extends
AbstractRequest.Builder<DescribeGroupsRequest> {
@@ -75,4 +77,16 @@ public class DescribeGroupsRequest extends AbstractRequest {
public static DescribeGroupsRequest parse(ByteBuffer buffer, short
version) {
return new DescribeGroupsRequest(new DescribeGroupsRequestData(new
ByteBufferAccessor(buffer), version), version);
}
+
+ public static List<DescribeGroupsResponseData.DescribedGroup>
getErrorDescribedGroupList(
+ List<String> groupIds,
+ Errors error
+ ) {
+ return groupIds.stream()
+ .map(groupId -> new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setErrorCode(error.code())
+ )
+ .collect(Collectors.toList());
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/DescribeGroupsRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/DescribeGroupsRequestTest.java
new file mode 100644
index 00000000000..afce9d99417
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/requests/DescribeGroupsRequestTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static
org.apache.kafka.common.requests.DescribeGroupsRequest.getErrorDescribedGroupList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DescribeGroupsRequestTest {
+
+ @Test
+ public void testGetErrorDescribedGroupList() {
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroupList = Arrays.asList(
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-1")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()),
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-2")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()),
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-3")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ );
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroupList =
getErrorDescribedGroupList(
+ Arrays.asList("group-id-1", "group-id-2", "group-id-3"),
+ Errors.COORDINATOR_LOAD_IN_PROGRESS
+ );
+
+ assertEquals(expectedDescribedGroupList, describedGroupList);
+ }
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 1c71f6dee37..e30bb930269 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.RequestContext;
@@ -77,9 +78,9 @@ import org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.HashMap;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.Set;
@@ -508,9 +509,52 @@ public class GroupCoordinatorService implements
GroupCoordinator {
return
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
}
- return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
- "This API is not implemented yet."
- ));
+ final
List<CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>>
futures =
+ new ArrayList<>(groupIds.size());
+ final Map<TopicPartition, List<String>> groupsByTopicPartition = new
HashMap<>();
+ groupIds.forEach(groupId -> {
+ // For backwards compatibility, we support DescribeGroups for the
empty group id.
+ if (groupId == null) {
+
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code())
+ )));
+ } else {
+ final TopicPartition topicPartition =
topicPartitionFor(groupId);
+ groupsByTopicPartition
+ .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+ .add(groupId);
+ }
+ });
+
+ groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>
future =
+ runtime.scheduleReadOperation(
+ "describe-groups",
+ topicPartition,
+ (coordinator, lastCommittedOffset) ->
coordinator.describeGroups(context, groupList, lastCommittedOffset)
+ ).exceptionally(exception -> {
+ if (!(exception instanceof KafkaException)) {
+ log.error("DescribeGroups request {} hit an unexpected
exception: {}",
+ groupList, exception.getMessage());
+ }
+
+ return DescribeGroupsRequest.getErrorDescribedGroupList(
+ groupList,
+ Errors.forException(exception)
+ );
+ });
+
+ futures.add(future);
+ });
+
+ final CompletableFuture<Void> allFutures =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ return allFutures.thenApply(v -> {
+ final List<DescribeGroupsResponseData.DescribedGroup> res = new
ArrayList<>();
+ futures.forEach(future -> res.addAll(future.join()));
+ return res;
+ });
}
/**
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index a094d7c6e41..c30d348336b 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -385,6 +386,23 @@ public class GroupCoordinatorShard implements
CoordinatorShard<Record> {
return groupMetadataManager.listGroups(statesFilter, committedOffset);
}
+ /**
+ * Handles a DescribeGroups request.
+ *
+ * @param context The request context.
+ * @param groupIds The IDs of the groups to describe.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A list containing the DescribeGroupsResponseData.DescribedGroup.
+ */
+ public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
+ RequestContext context,
+ List<String> groupIds,
+ long committedOffset
+ ) {
+ return groupMetadataManager.describeGroups(groupIds, committedOffset);
+ }
+
/**
* Handles a LeaveGroup request.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9ab0457aa11..7bfe243d026 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
@@ -444,6 +445,59 @@ public class GroupMetadataManager {
return groupStream.map(group ->
group.asListedGroup(committedOffset)).collect(Collectors.toList());
}
+ /**
+ * Handles a DescribeGroup request.
+ *
+ * @param groupIds The IDs of the groups to describe.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A list containing the DescribeGroupsResponseData.DescribedGroup.
+ */
+ public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
+ List<String> groupIds,
+ long committedOffset
+ ) {
+ final List<DescribeGroupsResponseData.DescribedGroup> describedGroups
= new ArrayList<>();
+ groupIds.forEach(groupId -> {
+ try {
+ GenericGroup group = genericGroup(groupId, committedOffset);
+
+ if (group.isInState(STABLE)) {
+ if (!group.protocolName().isPresent()) {
+ throw new IllegalStateException("Invalid null group
protocol for stable group");
+ }
+
+ describedGroups.add(new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setGroupState(group.stateAsString())
+ .setProtocolType(group.protocolType().orElse(""))
+ .setProtocolData(group.protocolName().get())
+ .setMembers(group.allMembers().stream()
+ .map(member ->
member.describe(group.protocolName().get()))
+ .collect(Collectors.toList())
+ )
+ );
+ } else {
+ describedGroups.add(new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setGroupState(group.stateAsString())
+ .setProtocolType(group.protocolType().orElse(""))
+ .setMembers(group.allMembers().stream()
+ .map(member -> member.describeNoMetadata())
+ .collect(Collectors.toList())
+ )
+ );
+ }
+ } catch (GroupIdNotFoundException exception) {
+ describedGroups.add(new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId(groupId)
+ .setGroupState(DEAD.toString())
+ );
+ }
+ });
+ return describedGroups;
+ }
+
/**
* Gets or maybe creates a consumer group.
*
@@ -521,6 +575,31 @@ public class GroupMetadataManager {
}
}
+ /**
+ * Gets a generic group by committed offset.
+ *
+ * @param groupId The group id.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A GenericGroup.
+ * @throws GroupIdNotFoundException if the group does not exist or is not
a generic group.
+ */
+ public GenericGroup genericGroup(
+ String groupId,
+ long committedOffset
+ ) throws GroupIdNotFoundException {
+ Group group = group(groupId, committedOffset);
+
+ if (group.type() == GENERIC) {
+ return (GenericGroup) group;
+ } else {
+ // We don't support upgrading/downgrading between protocols at the
moment so
+ // we throw an exception if a group exists with the wrong type.
+ throw new GroupIdNotFoundException(String.format("Group %s is not
a generic group.",
+ groupId));
+ }
+ }
+
/**
* Removes the group.
*
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java
index 64c3fbcad5b..702bea653db 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.generic;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.apache.kafka.common.message.JoinGroupResponseData;
@@ -349,6 +350,25 @@ public class GenericGroupMember {
return isNew;
}
+ /**
+ * @return The described group member without metadata.
+ */
+ public DescribeGroupsResponseData.DescribedGroupMember
describeNoMetadata() {
+ return new DescribeGroupsResponseData.DescribedGroupMember()
+ .setMemberId(memberId())
+ .setGroupInstanceId(groupInstanceId().orElse(null))
+ .setClientId(clientId())
+ .setClientHost(clientHost())
+ .setMemberAssignment(assignment());
+ }
+
+ /**
+ * @return The described group member with metadata corresponding to the
provided protocol.
+ */
+ public DescribeGroupsResponseData.DescribedGroupMember describe(String
protocolName) {
+ return describeNoMetadata().setMemberMetadata(metadata(protocolName));
+ }
+
/**
* @param value the new rebalance timeout in milliseconds.
*/
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 755bd2aa56a..d54d574f9e5 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -34,6 +34,7 @@ import
org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -753,6 +754,110 @@ public class GroupCoordinatorServiceTest {
assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
}
+ @Test
+ public void testDescribeGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 2;
+ service.startup(() -> partitionCount);
+
+ DescribeGroupsResponseData.DescribedGroup describedGroup1 = new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-1");
+ DescribeGroupsResponseData.DescribedGroup describedGroup2 = new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-2");
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroups = Arrays.asList(
+ describedGroup1,
+ describedGroup2
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("describe-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+ CompletableFuture<Object> describedGroupFuture = new
CompletableFuture<>();
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("describe-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ ArgumentMatchers.any()
+ )).thenReturn(describedGroupFuture);
+
+ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>
future =
+ service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS),
Arrays.asList("group-id-1", "group-id-2"));
+
+ assertFalse(future.isDone());
+
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+ assertEquals(expectedDescribedGroups, future.get());
+ }
+
+ @Test
+ public void testDescribeGroupsInvalidGroupId() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 1;
+ service.startup(() -> partitionCount);
+
+ DescribeGroupsResponseData.DescribedGroup describedGroup = new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("");
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroups = Arrays.asList(
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId(null)
+ .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+ describedGroup
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("describe-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
+
+ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>
future =
+ service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS),
Arrays.asList("", null));
+
+ assertEquals(expectedDescribedGroups, future.get());
+ }
+
+ @Test
+ public void testDescribeGroupCoordinatorLoadInProgress() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 1;
+ service.startup(() -> partitionCount);
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("describe-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(
+ new CoordinatorLoadInProgressException(null)
+ ));
+
+ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>
future =
+ service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS),
Collections.singletonList("group-id"));
+
+ assertEquals(
+ Collections.singletonList(new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id")
+ .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ ),
+ future.get()
+ );
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFetchOffsets(
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3a3db59855e..f299705016d 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -33,6 +33,7 @@ import
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnsupportedAssignorException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -425,6 +426,7 @@ public class GroupMetadataManagerTest {
this.groupMetadataManager = groupMetadataManager;
this.genericGroupInitialRebalanceDelayMs =
genericGroupInitialRebalanceDelayMs;
this.genericGroupNewMemberJoinTimeoutMs =
genericGroupNewMemberJoinTimeoutMs;
+ snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
}
public void commit() {
@@ -1029,6 +1031,10 @@ public class GroupMetadataManagerTest {
return groupMetadataManager.listGroups(statesFilter,
lastCommittedOffset);
}
+ public List<DescribeGroupsResponseData.DescribedGroup>
describeGroups(List<String> groupIds) {
+ return groupMetadataManager.describeGroups(groupIds,
lastCommittedOffset);
+ }
+
public void verifyHeartbeat(
String groupId,
JoinGroupResponseData joinResponse,
@@ -1134,6 +1140,19 @@ public class GroupMetadataManagerTest {
return groupMetadataManager.genericGroupLeave(context, request);
}
+ private void verifyDescribeGroupsReturnsDeadGroup(String groupId) {
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ describeGroups(Collections.singletonList(groupId));
+
+ assertEquals(
+ Collections.singletonList(new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id")
+ .setGroupState(DEAD.toString())
+ ),
+ describedGroups
+ );
+ }
+
private ApiMessage messageOrNull(ApiMessageAndVersion
apiMessageAndVersion) {
if (apiMessageAndVersion == null) {
return null;
@@ -8641,6 +8660,113 @@ public class GroupMetadataManagerTest {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testDescribeGroupStable() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ GroupMetadataValue.MemberMetadata memberMetadata = new
GroupMetadataValue.MemberMetadata()
+ .setMemberId("member-id")
+ .setGroupInstanceId("group-instance-id")
+ .setClientHost("client-host")
+ .setClientId("client-id")
+ .setAssignment(new byte[]{0})
+ .setSubscription(new byte[]{0, 1, 2});
+ GroupMetadataValue groupMetadataValue = new GroupMetadataValue()
+ .setMembers(Collections.singletonList(memberMetadata))
+ .setProtocolType("consumer")
+ .setProtocol("range")
+ .setCurrentStateTimestamp(context.time.milliseconds());
+
+ context.replay(newGroupMetadataRecord(
+ "group-id",
+ groupMetadataValue,
+ MetadataVersion.latest()
+ ));
+ context.verifyDescribeGroupsReturnsDeadGroup("group-id");
+ context.commit();
+
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroups = Collections.singletonList(
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id")
+ .setGroupState(STABLE.toString())
+ .setProtocolType(groupMetadataValue.protocolType())
+ .setProtocolData(groupMetadataValue.protocol())
+ .setMembers(Collections.singletonList(
+ new DescribeGroupsResponseData.DescribedGroupMember()
+ .setMemberId(memberMetadata.memberId())
+ .setGroupInstanceId(memberMetadata.groupInstanceId())
+ .setClientId(memberMetadata.clientId())
+ .setClientHost(memberMetadata.clientHost())
+ .setMemberMetadata(memberMetadata.subscription())
+ .setMemberAssignment(memberMetadata.assignment())
+ ))
+ );
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(expectedDescribedGroups, describedGroups);
+ }
+
+ @Test
+ public void testDescribeGroupRebalancing() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+
+ GroupMetadataValue.MemberMetadata memberMetadata = new
GroupMetadataValue.MemberMetadata()
+ .setMemberId("member-id")
+ .setGroupInstanceId("group-instance-id")
+ .setClientHost("client-host")
+ .setClientId("client-id")
+ .setAssignment(new byte[]{0})
+ .setSubscription(new byte[]{0, 1, 2});
+ GroupMetadataValue groupMetadataValue = new GroupMetadataValue()
+ .setMembers(Collections.singletonList(memberMetadata))
+ .setProtocolType("consumer")
+ .setProtocol("range")
+ .setCurrentStateTimestamp(context.time.milliseconds());
+
+ context.replay(newGroupMetadataRecord(
+ "group-id",
+ groupMetadataValue,
+ MetadataVersion.latest()
+ ));
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+ context.groupMetadataManager.prepareRebalance(group, "trigger
rebalance");
+
+ context.verifyDescribeGroupsReturnsDeadGroup("group-id");
+ context.commit();
+
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroups = Collections.singletonList(
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id")
+ .setGroupState(PREPARING_REBALANCE.toString())
+ .setProtocolType(groupMetadataValue.protocolType())
+ .setProtocolData("")
+ .setMembers(Collections.singletonList(
+ new DescribeGroupsResponseData.DescribedGroupMember()
+ .setMemberId(memberMetadata.memberId())
+ .setGroupInstanceId(memberMetadata.groupInstanceId())
+ .setClientId(memberMetadata.clientId())
+ .setClientHost(memberMetadata.clientHost())
+ .setMemberAssignment(memberMetadata.assignment())
+ ))
+ );
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(expectedDescribedGroups, describedGroups);
+ }
+
+ @Test
+ public void testDescribeGroupsGroupIdNotFoundException() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.verifyDescribeGroupsReturnsDeadGroup("group-id");
+ }
+
public static <T> void assertUnorderedListEquals(
List<T> expected,
List<T> actual
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java
index 7263f5a07d9..075f418d4ac 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.coordinator.group.generic;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
import org.junit.jupiter.api.Test;
@@ -258,4 +259,65 @@ public class GenericGroupMemberTest {
member.setAwaitingSyncFuture(new CompletableFuture<>());
assertTrue(member.hasSatisfiedHeartbeat());
}
+
+ @Test
+ public void testDescribeNoMetadata() {
+ GenericGroupMember member = new GenericGroupMember(
+ "member",
+ Optional.of("group-instance-id"),
+ "client-id",
+ "client-host",
+ 10,
+ 4500,
+ "generic",
+ new JoinGroupRequestProtocolCollection(),
+ new byte[0]
+ );
+
+ DescribeGroupsResponseData.DescribedGroupMember
expectedDescribedGroupMember =
+ new DescribeGroupsResponseData.DescribedGroupMember()
+ .setMemberId("member")
+ .setGroupInstanceId("group-instance-id")
+ .setClientId("client-id")
+ .setClientHost("client-host")
+ .setMemberAssignment(new byte[0]);
+
+ DescribeGroupsResponseData.DescribedGroupMember describedGroupMember =
member.describeNoMetadata();
+
+ assertEquals(expectedDescribedGroupMember, describedGroupMember);
+ }
+
+ @Test
+ public void testDescribe() {
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection(Collections.singletonList(
+ new JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(new byte[]{0})
+ ).iterator());
+
+ GenericGroupMember member = new GenericGroupMember(
+ "member",
+ Optional.of("group-instance-id"),
+ "client-id",
+ "client-host",
+ 10,
+ 4500,
+ "generic",
+ protocols,
+ new byte[0]
+ );
+
+ DescribeGroupsResponseData.DescribedGroupMember
expectedDescribedGroupMember =
+ new DescribeGroupsResponseData.DescribedGroupMember()
+ .setMemberId("member")
+ .setGroupInstanceId("group-instance-id")
+ .setClientId("client-id")
+ .setClientHost("client-host")
+ .setMemberAssignment(new byte[0])
+ .setMemberMetadata(member.metadata("range"));
+
+ DescribeGroupsResponseData.DescribedGroupMember describedGroupMember =
member.describe("range");
+
+ assertEquals(expectedDescribedGroupMember, describedGroupMember);
+ }
}