dongnuo123 commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1375454303


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3686,8 +3686,51 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleConsumerGroupDescribe(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    requestHelper.sendMaybeThrottle(request, 
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+    val consumerGroupDescribeRequest = 
request.body[ConsumerGroupDescribeRequest]
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val response = new ConsumerGroupDescribeResponseData()
+
+      val authorizedGroups = new ArrayBuffer[String]()
+      consumerGroupDescribeRequest.data.groupIds.forEach { groupId =>
+        if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
+          response.groups.add(new 
ConsumerGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+            .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message)
+          )
+        } else {
+          authorizedGroups += groupId
+        }
+      }
+
+      val future = groupCoordinator.consumerGroupDescribe(
+        request.context,
+        authorizedGroups.asJava
+      )
+
+      future.handle[Unit] { (results, exception) =>

Review Comment:
   nit: Have we used `future` anywhere else? Could we write it like
   ```
   groupCoordinator.consumerGroupDescribe(
           request.context,
           authorizedGroups.asJava
   ).handle[Unit] {...}
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -445,6 +446,44 @@ public List<ListGroupsResponseData.ListedGroup> 
listGroups(List<String> statesFi
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+
+    public List<ConsumerGroupDescribeResponseData.DescribedGroup> 
consumerGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new 
ArrayList<>();
+
+        for (String groupId: groupIds) {
+            Group group = groups.get(groupId, committedOffset);
+
+            ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId);
+
+            if (group == null || !CONSUMER.equals(group.type())) {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we set an error if a group exists with the wrong type.
+                
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());
+                describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code());
+            } else {
+                ConsumerGroup consumerGroup = (ConsumerGroup) group;
+                describedGroup.setGroupState(consumerGroup.stateAsString())
+                    .setGroupEpoch(consumerGroup.groupEpoch())
+                    .setAssignmentEpoch(consumerGroup.assignmentEpoch())
+                    .setAssignorName(
+                        consumerGroup.preferredServerAssignor().isPresent() ?
+                            consumerGroup.preferredServerAssignor().get() : 
null
+                    );
+                consumerGroup.members().forEach(
+                    (id, member) -> 
describedGroup.members().add(member.asConsumerGroupDescribeMember())
+                );

Review Comment:
   Attributes like group state, epoch, members are timeline object in 
ConsumerGroup. You can get them according to the committedOffset. An example is 
`public String stateAsString(long committedOffset)` in ConsumerGroup.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8660,6 +8665,68 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testConsumerGroupDescribeNoErrors() {
+        String consumerGroupId = "consumerGroupId";
+        int epoch = 10;
+        String memberId = Uuid.randomUuid().toString();
+        String topicName = "topicName";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 
epoch))
+            .build();
+
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+            .setSubscribedTopicNames(Collections.singletonList(topicName));
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(
+            consumerGroupId,
+            memberBuilder.build()
+        ));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 
epoch + 1));
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId));
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup();
+        describedGroup.setGroupEpoch(epoch + 1);
+        describedGroup.setGroupId(consumerGroupId);
+        
describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember()));
+        describedGroup.setAssignorName(null);
+        describedGroup.setGroupState("assigning");
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            describedGroup
+        );
+
+        assertEquals(expected, actual);
+    }

Review Comment:
   Could we test more on describe the groups by committedOffset?
   For instance, we've added a group but not updated the last committed offset, 
then we shouldn't be able to describe this group. After updating the last 
committed offset, we're able to describe the group.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -444,6 +445,43 @@ public List<ListGroupsResponseData.ListedGroup> 
listGroups(List<String> statesFi
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+
+    public List<ConsumerGroupDescribeResponseData.DescribedGroup> 
consumerGroupDescribe(
+        List<String> groupIds
+    ) {
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new 
ArrayList<>();
+
+        for (String groupId: groupIds) {
+            Group group = groups.get(groupId);
+
+            ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId);
+
+            if (group == null || !CONSUMER.equals(group.type())) {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we set an error if a group exists with the wrong type.
+                
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());
+                describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code());
+            } else {
+                ConsumerGroup consumerGroup = (ConsumerGroup) group;
+                describedGroup.setGroupState(consumerGroup.stateAsString())
+                    .setGroupEpoch(consumerGroup.groupEpoch())
+                    .setAssignmentEpoch(consumerGroup.assignmentEpoch())
+                    .setAssignorName(
+                        consumerGroup.preferredServerAssignor().isPresent() ?
+                            consumerGroup.preferredServerAssignor().get() : 
null

Review Comment:
   I think yes though we need to confirm with @dajac 



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupId = "group0"
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.add(groupId)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+
+    createKafkaApis(
+      overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+    ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+    val describedGroups = List(new DescribedGroup()).asJava
+    val consumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()
+      .setGroups(describedGroups)
+    future.complete(describedGroups)
+
+    assertEquals(consumerGroupDescribeResponseData, response.data)
+  }
+

Review Comment:
   Could we add some extra tests to test request with unauthorized groups and 
future that completes with exception?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -1046,6 +1047,112 @@ public void 
testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
         assertEquals(expectedResponse, future.get());
     }
 
+    @Test
+    public void testConsumerGroupDescribe() throws InterruptedException, 
ExecutionException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 2;
+        service.startup(() -> partitionCount);
+
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("group-id-1");
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("group-id-2");
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            describedGroup1,
+            describedGroup2
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("consumer-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+        CompletableFuture<Object> describedGroupFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("consumer-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        )).thenReturn(describedGroupFuture);
+
+        
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
future =
+            
service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), 
Arrays.asList("group-id-1", "group-id-2"));
+
+        assertFalse(future.isDone());
+        
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+        assertEquals(expectedDescribedGroups, future.get());
+    }
+
+    @Test
+    public void testConsumerGroupDescribeInvalidGroupId() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 1;
+        service.startup(() -> partitionCount);
+
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
+            .setGroupId("");
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            new ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(null)
+                .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                .setErrorMessage(Errors.INVALID_GROUP_ID.message()),
+            describedGroup
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("consumer-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
+
+        
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
future =
+            
service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), 
Arrays.asList("", null));
+
+        assertEquals(expectedDescribedGroups, future.get());
+    }
+
+    @Test
+    public void testConsumerGroupDescribeCoordinatorLoadInProgress() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 1;
+        service.startup(() -> partitionCount);
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("consumer-group-describe"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        )).thenReturn(FutureUtils.failedFuture(
+            new CoordinatorLoadInProgressException(null)
+        ));
+
+        
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
future =
+            
service.consumerGroupDescribe(requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), 
Collections.singletonList("group-id"));
+
+        assertEquals(
+            Collections.singletonList(new 
ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId("group-id")
+                .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+                .setErrorMessage(Errors.COORDINATOR_LOAD_IN_PROGRESS.message())
+            ),
+            future.get()
+        );
+    }
+

Review Comment:
   Could we add another test when the coordinator is not active?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -497,6 +499,66 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
         return future;
     }
 
+    /**
+     * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
consumerGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception());
+        }

Review Comment:
   We should return a completed future with a response whose error code set to 
Errors.COORDINATOR_NOT_AVAILABLE.code(). We'll fix this in the other apis in  
https://github.com/apache/kafka/pull/14589



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupId = "group0"
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.add(groupId)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+
+    createKafkaApis(
+      overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+    ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+    val describedGroups = List(new DescribedGroup()).asJava
+    val consumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()
+      .setGroups(describedGroups)
+    future.complete(describedGroups)

Review Comment:
   I guess the mock not invoked error is because the future is not completed 
when sending the response (calling `verifyNoThrottling`). Some mock sending the 
response is not invoked and cause the error. Something like the following 
should help.
   ```
   val describedGroups = List(new DescribedGroup()).asJava
   val consumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()
     .setGroups(describedGroups)
   future.complete(describedGroups)
   
   val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
   
   assert...
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,26 @@ public String currentAssignmentSummary() {
             ')';
     }
 
+    public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember() {
+        return new ConsumerGroupDescribeResponseData.Member()
+            .setMemberEpoch(memberEpoch)
+            .setMemberId(Uuid.fromString(memberId))
+            .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+                .setTopicPartitions(
+                    assignedPartitions.entrySet().stream().map(
+                        item -> new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+                            .setTopicId(item.getKey())
+                            .setPartitions(new ArrayList<>(item.getValue()))

Review Comment:
   I guess we don't need to set the topic name here. Need to confirm with 
@dajac 



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupId = "group0"
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.add(groupId)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)

Review Comment:
   You can change it back to the way it was. This is actually invoked. See 
comment on line 6226.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,26 @@ public String currentAssignmentSummary() {
             ')';
     }
 
+    public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember() {
+        return new ConsumerGroupDescribeResponseData.Member()
+            .setMemberEpoch(memberEpoch)
+            .setMemberId(Uuid.fromString(memberId))
+            .setAssignment(new ConsumerGroupDescribeResponseData.Assignment()
+                .setTopicPartitions(
+                    assignedPartitions.entrySet().stream().map(
+                        item -> new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+                            .setTopicId(item.getKey())
+                            .setPartitions(new ArrayList<>(item.getValue()))
+                    ).collect(Collectors.toList())
+                ))
+            .setClientHost(clientHost)
+            .setClientId(clientId)
+            .setInstanceId(instanceId)
+            .setRackId(rackId)
+            .setSubscribedTopicNames(subscribedTopicNames)
+            .setSubscribedTopicRegex(subscribedTopicRegex);

Review Comment:
   We have target assignments stored in ConsumerGroup. Can we pass it in as a 
parameter or use anyway that helps it access the `targetAssignment` in 
ConsumerGroup?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to