This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a6e6c7d8c2 KAFKA-14504: Implement DescribeGroups API (#14462)
4a6e6c7d8c2 is described below

commit 4a6e6c7d8c2d2226046ca88027f2cc44bbea3bf6
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Fri Oct 6 05:25:17 2023 -0400

    KAFKA-14504: Implement DescribeGroups API (#14462)
    
    This patch implements DescribeGroups API in the new group coordinator.
    
    Reviewers: David Jacot <[email protected]>
---
 .../common/requests/DescribeGroupsRequest.java     |  14 +++
 .../common/requests/DescribeGroupsRequestTest.java |  52 +++++++++
 .../coordinator/group/GroupCoordinatorService.java |  52 ++++++++-
 .../coordinator/group/GroupCoordinatorShard.java   |  18 +++
 .../coordinator/group/GroupMetadataManager.java    |  79 +++++++++++++
 .../group/generic/GenericGroupMember.java          |  20 ++++
 .../group/GroupCoordinatorServiceTest.java         | 105 +++++++++++++++++
 .../group/GroupMetadataManagerTest.java            | 126 +++++++++++++++++++++
 .../group/generic/GenericGroupMemberTest.java      |  62 ++++++++++
 9 files changed, 524 insertions(+), 4 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index 25afb432cb7..3a4b9e38b02 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -23,6 +23,8 @@ import org.apache.kafka.common.protocol.ByteBufferAccessor;
 import org.apache.kafka.common.protocol.Errors;
 
 import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
 
 public class DescribeGroupsRequest extends AbstractRequest {
     public static class Builder extends 
AbstractRequest.Builder<DescribeGroupsRequest> {
@@ -75,4 +77,16 @@ public class DescribeGroupsRequest extends AbstractRequest {
     public static DescribeGroupsRequest parse(ByteBuffer buffer, short 
version) {
         return new DescribeGroupsRequest(new DescribeGroupsRequestData(new 
ByteBufferAccessor(buffer), version), version);
     }
+
+    public static List<DescribeGroupsResponseData.DescribedGroup> 
getErrorDescribedGroupList(
+        List<String> groupIds,
+        Errors error
+    ) {
+        return groupIds.stream()
+            .map(groupId -> new DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId(groupId)
+                .setErrorCode(error.code())
+            )
+            .collect(Collectors.toList());
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/DescribeGroupsRequestTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/DescribeGroupsRequestTest.java
new file mode 100644
index 00000000000..afce9d99417
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/DescribeGroupsRequestTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static 
org.apache.kafka.common.requests.DescribeGroupsRequest.getErrorDescribedGroupList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class DescribeGroupsRequestTest {
+
+    @Test
+    public void testGetErrorDescribedGroupList() {
+        List<DescribeGroupsResponseData.DescribedGroup> 
expectedDescribedGroupList = Arrays.asList(
+            new DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId("group-id-1")
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()),
+            new DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId("group-id-2")
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()),
+            new DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId("group-id-3")
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+        );
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroupList = 
getErrorDescribedGroupList(
+            Arrays.asList("group-id-1", "group-id-2", "group-id-3"),
+            Errors.COORDINATOR_LOAD_IN_PROGRESS
+        );
+
+        assertEquals(expectedDescribedGroupList, describedGroupList);
+    }
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 1c71f6dee37..e30bb930269 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.message.SyncGroupResponseData;
 import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
 import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
 import org.apache.kafka.common.requests.DeleteGroupsRequest;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.RequestContext;
@@ -77,9 +78,9 @@ import org.slf4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.OptionalInt;
 import java.util.Properties;
 import java.util.Set;
@@ -508,9 +509,52 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
             return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
         }
 
-        return FutureUtils.failedFuture(Errors.UNSUPPORTED_VERSION.exception(
-            "This API is not implemented yet."
-        ));
+        final 
List<CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DescribeGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+                    new DescribeGroupsResponseData.DescribedGroup()
+                        .setGroupId(null)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                )));
+            } else {
+                final TopicPartition topicPartition = 
topicPartitionFor(groupId);
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartition, __ -> new ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> 
future =
+                runtime.scheduleReadOperation(
+                    "describe-groups",
+                    topicPartition,
+                    (coordinator, lastCommittedOffset) -> 
coordinator.describeGroups(context, groupList, lastCommittedOffset)
+                ).exceptionally(exception -> {
+                    if (!(exception instanceof KafkaException)) {
+                        log.error("DescribeGroups request {} hit an unexpected 
exception: {}",
+                            groupList, exception.getMessage());
+                    }
+
+                    return DescribeGroupsRequest.getErrorDescribedGroupList(
+                        groupList,
+                        Errors.forException(exception)
+                    );
+                });
+
+            futures.add(future);
+        });
+
+        final CompletableFuture<Void> allFutures = 
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+        return allFutures.thenApply(v -> {
+            final List<DescribeGroupsResponseData.DescribedGroup> res = new 
ArrayList<>();
+            futures.forEach(future -> res.addAll(future.join()));
+            return res;
+        });
     }
 
     /**
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index a094d7c6e41..c30d348336b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -18,6 +18,7 @@ package org.apache.kafka.coordinator.group;
 
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -385,6 +386,23 @@ public class GroupCoordinatorShard implements 
CoordinatorShard<Record> {
         return groupMetadataManager.listGroups(statesFilter, committedOffset);
     }
 
+    /**
+     * Handles a DescribeGroups request.
+     *
+     * @param context           The request context.
+     * @param groupIds          The IDs of the groups to describe.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A list containing the DescribeGroupsResponseData.DescribedGroup.
+     */
+    public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
+        RequestContext context,
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        return groupMetadataManager.describeGroups(groupIds, committedOffset);
+    }
+
     /**
      * Handles a LeaveGroup request.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
index 9ab0457aa11..7bfe243d026 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.errors.UnsupportedAssignorException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
@@ -444,6 +445,59 @@ public class GroupMetadataManager {
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+    /**
+     * Handles a DescribeGroup request.
+     *
+     * @param groupIds          The IDs of the groups to describe.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A list containing the DescribeGroupsResponseData.DescribedGroup.
+     */
+    public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        final List<DescribeGroupsResponseData.DescribedGroup> describedGroups 
= new ArrayList<>();
+        groupIds.forEach(groupId -> {
+            try {
+                GenericGroup group = genericGroup(groupId, committedOffset);
+
+                if (group.isInState(STABLE)) {
+                    if (!group.protocolName().isPresent()) {
+                        throw new IllegalStateException("Invalid null group 
protocol for stable group");
+                    }
+
+                    describedGroups.add(new 
DescribeGroupsResponseData.DescribedGroup()
+                        .setGroupId(groupId)
+                        .setGroupState(group.stateAsString())
+                        .setProtocolType(group.protocolType().orElse(""))
+                        .setProtocolData(group.protocolName().get())
+                        .setMembers(group.allMembers().stream()
+                            .map(member -> 
member.describe(group.protocolName().get()))
+                            .collect(Collectors.toList())
+                        )
+                    );
+                } else {
+                    describedGroups.add(new 
DescribeGroupsResponseData.DescribedGroup()
+                        .setGroupId(groupId)
+                        .setGroupState(group.stateAsString())
+                        .setProtocolType(group.protocolType().orElse(""))
+                        .setMembers(group.allMembers().stream()
+                            .map(member -> member.describeNoMetadata())
+                            .collect(Collectors.toList())
+                        )
+                    );
+                }
+            } catch (GroupIdNotFoundException exception) {
+                describedGroups.add(new 
DescribeGroupsResponseData.DescribedGroup()
+                    .setGroupId(groupId)
+                    .setGroupState(DEAD.toString())
+                );
+            }
+        });
+        return describedGroups;
+    }
+
     /**
      * Gets or maybe creates a consumer group.
      *
@@ -521,6 +575,31 @@ public class GroupMetadataManager {
         }
     }
 
+    /**
+     * Gets a generic group by committed offset.
+     *
+     * @param groupId           The group id.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A GenericGroup.
+     * @throws GroupIdNotFoundException if the group does not exist or is not 
a generic group.
+     */
+    public GenericGroup genericGroup(
+        String groupId,
+        long committedOffset
+    ) throws GroupIdNotFoundException {
+        Group group = group(groupId, committedOffset);
+
+        if (group.type() == GENERIC) {
+            return (GenericGroup) group;
+        } else {
+            // We don't support upgrading/downgrading between protocols at the 
moment so
+            // we throw an exception if a group exists with the wrong type.
+            throw new GroupIdNotFoundException(String.format("Group %s is not 
a generic group.",
+                groupId));
+        }
+    }
+
     /**
      * Removes the group.
      *
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java
index 64c3fbcad5b..702bea653db 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/generic/GenericGroupMember.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.coordinator.group.generic;
 
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
 import org.apache.kafka.common.message.JoinGroupResponseData;
@@ -349,6 +350,25 @@ public class GenericGroupMember {
         return isNew;
     }
 
+    /**
+     * @return The described group member without metadata.
+     */
+    public DescribeGroupsResponseData.DescribedGroupMember 
describeNoMetadata() {
+        return new DescribeGroupsResponseData.DescribedGroupMember()
+            .setMemberId(memberId())
+            .setGroupInstanceId(groupInstanceId().orElse(null))
+            .setClientId(clientId())
+            .setClientHost(clientHost())
+            .setMemberAssignment(assignment());
+    }
+
+    /**
+     * @return The described group member with metadata corresponding to the 
provided protocol.
+     */
+    public DescribeGroupsResponseData.DescribedGroupMember describe(String 
protocolName) {
+        return describeNoMetadata().setMemberMetadata(metadata(protocolName));
+    }
+
     /**
      * @param value the new rebalance timeout in milliseconds.
      */
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 755bd2aa56a..d54d574f9e5 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -34,6 +34,7 @@ import 
org.apache.kafka.common.errors.UnknownMemberIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.DeleteGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
@@ -753,6 +754,110 @@ public class GroupCoordinatorServiceTest {
         assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
     }
 
+    @Test
+    public void testDescribeGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 2;
+        service.startup(() -> partitionCount);
+
+        DescribeGroupsResponseData.DescribedGroup describedGroup1 = new 
DescribeGroupsResponseData.DescribedGroup()
+            .setGroupId("group-id-1");
+        DescribeGroupsResponseData.DescribedGroup describedGroup2 = new 
DescribeGroupsResponseData.DescribedGroup()
+            .setGroupId("group-id-2");
+        List<DescribeGroupsResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            describedGroup1,
+            describedGroup2
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("describe-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+        CompletableFuture<Object> describedGroupFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("describe-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        )).thenReturn(describedGroupFuture);
+
+        CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> 
future =
+            service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS), 
Arrays.asList("group-id-1", "group-id-2"));
+
+        assertFalse(future.isDone());
+        
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+        assertEquals(expectedDescribedGroups, future.get());
+    }
+
+    @Test
+    public void testDescribeGroupsInvalidGroupId() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 1;
+        service.startup(() -> partitionCount);
+
+        DescribeGroupsResponseData.DescribedGroup describedGroup = new 
DescribeGroupsResponseData.DescribedGroup()
+            .setGroupId("");
+        List<DescribeGroupsResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            new DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId(null)
+                .setErrorCode(Errors.INVALID_GROUP_ID.code()),
+            describedGroup
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("describe-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
+
+        CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> 
future =
+            service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS), 
Arrays.asList("", null));
+
+        assertEquals(expectedDescribedGroups, future.get());
+    }
+
+    @Test
+    public void testDescribeGroupCoordinatorLoadInProgress() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 1;
+        service.startup(() -> partitionCount);
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("describe-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> 
future =
+            service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS), 
Collections.singletonList("group-id"));
+
+        assertEquals(
+            Collections.singletonList(new 
DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId("group-id")
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+            ),
+            future.get()
+        );
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {true, false})
     public void testFetchOffsets(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
index 3a3db59855e..f299705016d 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java
@@ -33,6 +33,7 @@ import 
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedAssignorException;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
 import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import org.apache.kafka.common.message.HeartbeatRequestData;
 import org.apache.kafka.common.message.HeartbeatResponseData;
 import org.apache.kafka.common.message.JoinGroupRequestData;
@@ -425,6 +426,7 @@ public class GroupMetadataManagerTest {
             this.groupMetadataManager = groupMetadataManager;
             this.genericGroupInitialRebalanceDelayMs = 
genericGroupInitialRebalanceDelayMs;
             this.genericGroupNewMemberJoinTimeoutMs = 
genericGroupNewMemberJoinTimeoutMs;
+            snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
         }
 
         public void commit() {
@@ -1029,6 +1031,10 @@ public class GroupMetadataManagerTest {
             return groupMetadataManager.listGroups(statesFilter, 
lastCommittedOffset);
         }
 
+        public List<DescribeGroupsResponseData.DescribedGroup> 
describeGroups(List<String> groupIds) {
+            return groupMetadataManager.describeGroups(groupIds, 
lastCommittedOffset);
+        }
+
         public void verifyHeartbeat(
             String groupId,
             JoinGroupResponseData joinResponse,
@@ -1134,6 +1140,19 @@ public class GroupMetadataManagerTest {
             return groupMetadataManager.genericGroupLeave(context, request);
         }
 
+        private void verifyDescribeGroupsReturnsDeadGroup(String groupId) {
+            List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+                describeGroups(Collections.singletonList(groupId));
+
+            assertEquals(
+                Collections.singletonList(new 
DescribeGroupsResponseData.DescribedGroup()
+                    .setGroupId("group-id")
+                    .setGroupState(DEAD.toString())
+                ),
+                describedGroups
+            );
+        }
+
         private ApiMessage messageOrNull(ApiMessageAndVersion 
apiMessageAndVersion) {
             if (apiMessageAndVersion == null) {
                 return null;
@@ -8641,6 +8660,113 @@ public class GroupMetadataManagerTest {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testDescribeGroupStable() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        GroupMetadataValue.MemberMetadata memberMetadata = new 
GroupMetadataValue.MemberMetadata()
+            .setMemberId("member-id")
+            .setGroupInstanceId("group-instance-id")
+            .setClientHost("client-host")
+            .setClientId("client-id")
+            .setAssignment(new byte[]{0})
+            .setSubscription(new byte[]{0, 1, 2});
+        GroupMetadataValue groupMetadataValue = new GroupMetadataValue()
+            .setMembers(Collections.singletonList(memberMetadata))
+            .setProtocolType("consumer")
+            .setProtocol("range")
+            .setCurrentStateTimestamp(context.time.milliseconds());
+
+        context.replay(newGroupMetadataRecord(
+            "group-id",
+            groupMetadataValue,
+            MetadataVersion.latest()
+        ));
+        context.verifyDescribeGroupsReturnsDeadGroup("group-id");
+        context.commit();
+
+        List<DescribeGroupsResponseData.DescribedGroup> 
expectedDescribedGroups = Collections.singletonList(
+            new DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId("group-id")
+                .setGroupState(STABLE.toString())
+                .setProtocolType(groupMetadataValue.protocolType())
+                .setProtocolData(groupMetadataValue.protocol())
+                .setMembers(Collections.singletonList(
+                    new DescribeGroupsResponseData.DescribedGroupMember()
+                        .setMemberId(memberMetadata.memberId())
+                        .setGroupInstanceId(memberMetadata.groupInstanceId())
+                        .setClientId(memberMetadata.clientId())
+                        .setClientHost(memberMetadata.clientHost())
+                        .setMemberMetadata(memberMetadata.subscription())
+                        .setMemberAssignment(memberMetadata.assignment())
+                ))
+        );
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals(expectedDescribedGroups, describedGroups);
+    }
+
+    @Test
+    public void testDescribeGroupRebalancing() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        GroupMetadataValue.MemberMetadata memberMetadata = new 
GroupMetadataValue.MemberMetadata()
+            .setMemberId("member-id")
+            .setGroupInstanceId("group-instance-id")
+            .setClientHost("client-host")
+            .setClientId("client-id")
+            .setAssignment(new byte[]{0})
+            .setSubscription(new byte[]{0, 1, 2});
+        GroupMetadataValue groupMetadataValue = new GroupMetadataValue()
+            .setMembers(Collections.singletonList(memberMetadata))
+            .setProtocolType("consumer")
+            .setProtocol("range")
+            .setCurrentStateTimestamp(context.time.milliseconds());
+
+        context.replay(newGroupMetadataRecord(
+            "group-id",
+            groupMetadataValue,
+            MetadataVersion.latest()
+        ));
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+        context.groupMetadataManager.prepareRebalance(group, "trigger 
rebalance");
+
+        context.verifyDescribeGroupsReturnsDeadGroup("group-id");
+        context.commit();
+
+        List<DescribeGroupsResponseData.DescribedGroup> 
expectedDescribedGroups = Collections.singletonList(
+            new DescribeGroupsResponseData.DescribedGroup()
+                .setGroupId("group-id")
+                .setGroupState(PREPARING_REBALANCE.toString())
+                .setProtocolType(groupMetadataValue.protocolType())
+                .setProtocolData("")
+                .setMembers(Collections.singletonList(
+                    new DescribeGroupsResponseData.DescribedGroupMember()
+                        .setMemberId(memberMetadata.memberId())
+                        .setGroupInstanceId(memberMetadata.groupInstanceId())
+                        .setClientId(memberMetadata.clientId())
+                        .setClientHost(memberMetadata.clientHost())
+                        .setMemberAssignment(memberMetadata.assignment())
+                ))
+        );
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals(expectedDescribedGroups, describedGroups);
+    }
+
+    @Test
+    public void testDescribeGroupsGroupIdNotFoundException() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.verifyDescribeGroupsReturnsDeadGroup("group-id");
+    }
+
     public static <T> void assertUnorderedListEquals(
         List<T> expected,
         List<T> actual
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java
index 7263f5a07d9..075f418d4ac 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/generic/GenericGroupMemberTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.coordinator.group.generic;
 
+import org.apache.kafka.common.message.DescribeGroupsResponseData;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
 import 
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
 import org.junit.jupiter.api.Test;
@@ -258,4 +259,65 @@ public class GenericGroupMemberTest {
         member.setAwaitingSyncFuture(new CompletableFuture<>());
         assertTrue(member.hasSatisfiedHeartbeat());
     }
+
+    @Test
+    public void testDescribeNoMetadata() {
+        GenericGroupMember member = new GenericGroupMember(
+            "member",
+            Optional.of("group-instance-id"),
+            "client-id",
+            "client-host",
+            10,
+            4500,
+            "generic",
+            new JoinGroupRequestProtocolCollection(),
+            new byte[0]
+        );
+
+        DescribeGroupsResponseData.DescribedGroupMember 
expectedDescribedGroupMember =
+            new DescribeGroupsResponseData.DescribedGroupMember()
+                .setMemberId("member")
+                .setGroupInstanceId("group-instance-id")
+                .setClientId("client-id")
+                .setClientHost("client-host")
+                .setMemberAssignment(new byte[0]);
+
+        DescribeGroupsResponseData.DescribedGroupMember describedGroupMember = 
member.describeNoMetadata();
+
+        assertEquals(expectedDescribedGroupMember, describedGroupMember);
+    }
+
+    @Test
+    public void testDescribe() {
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection(Collections.singletonList(
+            new JoinGroupRequestProtocol()
+                .setName("range")
+                .setMetadata(new byte[]{0})
+        ).iterator());
+
+        GenericGroupMember member = new GenericGroupMember(
+            "member",
+            Optional.of("group-instance-id"),
+            "client-id",
+            "client-host",
+            10,
+            4500,
+            "generic",
+            protocols,
+            new byte[0]
+        );
+
+        DescribeGroupsResponseData.DescribedGroupMember 
expectedDescribedGroupMember =
+            new DescribeGroupsResponseData.DescribedGroupMember()
+                .setMemberId("member")
+                .setGroupInstanceId("group-instance-id")
+                .setClientId("client-id")
+                .setClientHost("client-host")
+                .setMemberAssignment(new byte[0])
+                .setMemberMetadata(member.metadata("range"));
+
+        DescribeGroupsResponseData.DescribedGroupMember describedGroupMember = 
member.describe("range");
+
+        assertEquals(expectedDescribedGroupMember, describedGroupMember);
+    }
 }


Reply via email to