dongnuo123 commented on code in PR #14544: URL: https://github.com/apache/kafka/pull/14544#discussion_r1361149813
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java: ########## @@ -151,6 +152,19 @@ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describeGroup List<String> groupIds ); + /** + * Describe consumer groups. + * + * @param context The coordinator request context. + * @param groupIds The group ids. + * + * @return A list of the described groups. Review Comment: nit: @return A future yielding the results or an exception. ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java: ########## @@ -410,6 +411,18 @@ public List<ListGroupsResponseData.ListedGroup> listGroups( return groupMetadataManager.listGroups(statesFilter, committedOffset); } + /** + * Handles a ConsumerGroupDescribe request. + * + * @param Review Comment: nit: the javadoc hasn't been finished ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ########## @@ -444,6 +445,43 @@ public List<ListGroupsResponseData.ListedGroup> listGroups(List<String> statesFi return groupStream.map(group -> group.asListedGroup(committedOffset)).collect(Collectors.toList()); } + + public List<ConsumerGroupDescribeResponseData.DescribedGroup> consumerGroupDescribe( + List<String> groupIds + ) { + List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new ArrayList<>(); + + for (String groupId: groupIds) { + Group group = groups.get(groupId); + + ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId); + + if (group == null || !CONSUMER.equals(group.type())) { + // We don't support upgrading/downgrading between protocols at the moment so + // we set an error if a group exists with the wrong type. + describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message()); + describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code()); + } else { + ConsumerGroup consumerGroup = (ConsumerGroup) group; + describedGroup.setGroupState(consumerGroup.stateAsString()) + .setGroupEpoch(consumerGroup.groupEpoch()) + .setAssignmentEpoch(consumerGroup.assignmentEpoch()) + .setAssignorName( + consumerGroup.preferredServerAssignor().isPresent() ? + consumerGroup.preferredServerAssignor().get() : null Review Comment: No, assignorName is non-nullable (see ConsumerGroupDescribeResponseData#read for reference) ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java: ########## @@ -545,6 +547,26 @@ public String currentAssignmentSummary() { ')'; } + public ConsumerGroupDescribeResponseData.Member asConsumerGroupDescribeMember() { + return new ConsumerGroupDescribeResponseData.Member() + .setMemberEpoch(memberEpoch) + .setMemberId(Uuid.fromString(memberId)) + .setAssignment(new ConsumerGroupDescribeResponseData.Assignment() + .setTopicPartitions( + assignedPartitions.entrySet().stream().map( + item -> new ConsumerGroupDescribeResponseData.TopicPartitions() + .setTopicId(item.getKey()) + .setPartitions(new ArrayList<>(item.getValue())) + ).collect(Collectors.toList()) + )) Review Comment: nit: parenthesis alignment ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java: ########## @@ -304,4 +306,53 @@ public void testUpdateWithConsumerGroupCurrentMemberAssignmentValue() { assertEquals(mkAssignment(mkTopicAssignment(topicId2, 3, 4, 5)), member.partitionsPendingRevocation()); assertEquals(mkAssignment(mkTopicAssignment(topicId3, 6, 7, 8)), member.partitionsPendingAssignment()); } + + @Test + void testAsConsumerGroupDescribeMember() { Review Comment: Could we make it public? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java: ########## @@ -497,6 +499,66 @@ public CompletableFuture<ListGroupsResponseData> listGroups( return future; } + /** + * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, List)}. + */ + @Override + public CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> consumerGroupDescribe( + RequestContext context, + List<String> groupIds + ) { + if (!isActive.get()) { + return FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()); + } + + final List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> futures = + new ArrayList<>(groupIds.size()); + final Map<TopicPartition, List<String>> groupsByTopicPartition = new HashMap<>(); + groupIds.forEach(groupId -> { + // For backwards compatibility, we support DescribeGroups for the empty group id. + if (groupId == null) { + futures.add(CompletableFuture.completedFuture(Collections.singletonList( + new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(null) + .setErrorCode(Errors.INVALID_GROUP_ID.code()) + .setErrorMessage(Errors.INVALID_GROUP_ID.message()) + ))); + } else { + groupsByTopicPartition + .computeIfAbsent(topicPartitionFor(groupId), __ -> new ArrayList<>()) + .add(groupId); + } + }); + + groupsByTopicPartition.forEach((topicPartition, groupList) -> { + CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> future = + runtime.scheduleReadOperation( + "consumer-group-describe", + topicPartition, + (coordinator, __) -> coordinator.consumerGroupDescribe(groupIds) Review Comment: Not 100% sure here, I guess we can't ignore lastCommittedOffset, because we only want to describe groups with info that's been fully committed. You can check ListGroups for getting group info by committedOffsets. ########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -3686,8 +3686,39 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleConsumerGroupDescribe(request: RequestChannel.Request): CompletableFuture[Unit] = { - requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) - CompletableFuture.completedFuture[Unit](()) + val consumerGroupDescribeRequest = request.body[ConsumerGroupDescribeRequest] + + if (!config.isNewGroupCoordinatorEnabled) { + // The API is not supported by the "old" group coordinator (the default). If the + // new one is not enabled, we fail directly here. + requestHelper.sendMaybeThrottle(request, request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val response = new ConsumerGroupDescribeResponseData() + + val authorizedGroups = new ArrayBuffer[String]() + consumerGroupDescribeRequest.data.groupIds.forEach { groupId => + if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) { + response.groups.add(new ConsumerGroupDescribeResponseData.DescribedGroup() + .setGroupId(groupId) + .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code) + .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message) + ) + } else { + authorizedGroups += groupId + } + } + + groupCoordinator.consumerGroupDescribe( + request.context, + authorizedGroups.asJava + ).handle[Unit] { (response, exception) => + if (exception != null) { + requestHelper.sendMaybeThrottle(request, consumerGroupDescribeRequest.getErrorResponse(exception)) + } Review Comment: Do we need an else statement to send the response when exception == null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org