dongnuo123 commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1361149813


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java:
##########
@@ -151,6 +152,19 @@ 
CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroup
         List<String> groupIds
     );
 
+    /**
+     * Describe consumer groups.
+     *
+     * @param context           The coordinator request context.
+     * @param groupIds          The group ids.
+     *
+     * @return A list of the described groups.

Review Comment:
   nit: @return A future yielding the results or an exception.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java:
##########
@@ -410,6 +411,18 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(
         return groupMetadataManager.listGroups(statesFilter, committedOffset);
     }
 
+    /**
+     * Handles a ConsumerGroupDescribe request.
+     *
+     * @param

Review Comment:
   nit: the javadoc hasn't been finished



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -444,6 +445,43 @@ public List<ListGroupsResponseData.ListedGroup> 
listGroups(List<String> statesFi
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+
+    public List<ConsumerGroupDescribeResponseData.DescribedGroup> 
consumerGroupDescribe(
+        List<String> groupIds
+    ) {
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new 
ArrayList<>();
+
+        for (String groupId: groupIds) {
+            Group group = groups.get(groupId);
+
+            ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId);
+
+            if (group == null || !CONSUMER.equals(group.type())) {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we set an error if a group exists with the wrong type.
+                
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());
+                describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code());
+            } else {
+                ConsumerGroup consumerGroup = (ConsumerGroup) group;
+                describedGroup.setGroupState(consumerGroup.stateAsString())
+                    .setGroupEpoch(consumerGroup.groupEpoch())
+                    .setAssignmentEpoch(consumerGroup.assignmentEpoch())
+                    .setAssignorName(
+                        consumerGroup.preferredServerAssignor().isPresent() ?
+                            consumerGroup.preferredServerAssignor().get() : 
null

Review Comment:
   No, assignorName is non-nullable (see ConsumerGroupDescribeResponseData#read 
for reference)



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,26 @@ public String currentAssignmentSummary() {
             ')';
     }
 
+    public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember() {
+        return new ConsumerGroupDescribeResponseData.Member()
+            .setMemberEpoch(memberEpoch)
+            .setMemberId(Uuid.fromString(memberId))
+            .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+                .setTopicPartitions(
+                    assignedPartitions.entrySet().stream().map(
+                        item -> new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+                            .setTopicId(item.getKey())
+                            .setPartitions(new ArrayList<>(item.getValue()))
+                    ).collect(Collectors.toList())
+                ))

Review Comment:
   nit: parenthesis alignment



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##########
@@ -304,4 +306,53 @@ public void 
testUpdateWithConsumerGroupCurrentMemberAssignmentValue() {
         assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), 
member.partitionsPendingRevocation());
         assertEquals(mkAssignment(mkTopicAssignment(topicId3, 6, 7, 8)), 
member.partitionsPendingAssignment());
     }
+
+    @Test
+    void testAsConsumerGroupDescribeMember() {

Review Comment:
   Could we make it public?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -497,6 +499,66 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
         return future;
     }
 
+    /**
+     * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
consumerGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }
+
+        final 
List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DescribeGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+                    new ConsumerGroupDescribeResponseData.DescribedGroup()
+                        .setGroupId(null)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                        .setErrorMessage(Errors.INVALID_GROUP_ID.message())
+                )));
+            } else {
+                groupsByTopicPartition
+                    .computeIfAbsent(topicPartitionFor(groupId), __ -> new 
ArrayList<>())
+                    .add(groupId);
+            }
+        });
+
+        groupsByTopicPartition.forEach((topicPartition, groupList) -> {
+            
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
future =
+                runtime.scheduleReadOperation(
+                    "consumer-group-describe",
+                    topicPartition,
+                    (coordinator, __) -> 
coordinator.consumerGroupDescribe(groupIds)

Review Comment:
   Not 100% sure here, I guess we can't ignore lastCommittedOffset, because we 
only want to describe groups with info that's been fully committed. You can 
check ListGroups for getting group info by committedOffsets.



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3686,8 +3686,39 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleConsumerGroupDescribe(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    requestHelper.sendMaybeThrottle(request, 
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+    val consumerGroupDescribeRequest = 
request.body[ConsumerGroupDescribeRequest]
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val response = new ConsumerGroupDescribeResponseData()
+
+      val authorizedGroups = new ArrayBuffer[String]()
+      consumerGroupDescribeRequest.data.groupIds.forEach { groupId =>
+        if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
+          response.groups.add(new 
ConsumerGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+            .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message)
+          )
+        } else {
+          authorizedGroups += groupId
+        }
+      }
+
+      groupCoordinator.consumerGroupDescribe(
+        request.context,
+        authorizedGroups.asJava
+      ).handle[Unit] { (response, exception) =>
+        if (exception != null) {
+          requestHelper.sendMaybeThrottle(request, 
consumerGroupDescribeRequest.getErrorResponse(exception))
+        }

Review Comment:
   Do we need an else statement to send the response when exception == null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to