dajac commented on code in PR #14462:
URL: https://github.com/apache/kafka/pull/14462#discussion_r1345506634


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testDescribeGroupStableForDynamicMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        JoinGroupResponseData responseData = 
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals(responseData.protocolType(), 
describedGroups.get(0).protocolType());
+        assertEquals(responseData.protocolName(), 
describedGroups.get(0).protocolData());
+        assertEquals(responseData.members().stream().map(member -> 
member.memberId()).collect(Collectors.toList()),
+            describedGroups.get(0).members().stream().map(member -> 
member.memberId()).collect(Collectors.toList())
+        );
+    }
+
+    @Test
+    public void testDescribeGroupStableForStaticMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals("consumer", describedGroups.get(0).protocolType());
+        assertEquals("range", describedGroups.get(0).protocolData());
+        assertEquals(group.allMembers().stream().map(member -> 
member.memberId()).collect(Collectors.toList()),
+            describedGroups.get(0).members().stream().map(member -> 
member.memberId()).collect(Collectors.toList())
+        );
+        assertEquals(group.allMembers().stream().map(member -> 
member.groupInstanceId().get()).collect(Collectors.toList()),
+            describedGroups.get(0).members().stream().map(member -> 
member.groupInstanceId()).collect(Collectors.toList())
+        );
+    }
+
+    @Test
+    public void testDescribeGroupRebalancing() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+        JoinGroupResponseData responseData = 
context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
+            new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(UNKNOWN_MEMBER_ID)
+                .withDefaultProtocolTypeAndProtocols()
+                .build()
+        );
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+        assertEquals(responseData.protocolType(), 
describedGroups.get(0).protocolType());
+        assertEquals("", describedGroups.get(0).protocolData());
+        assertTrue(describedGroups.get(0).members().stream().map(member -> 
member.memberId()).collect(Collectors.toList())
+            .contains(responseData.memberId()));
+        describedGroups.get(0).members().forEach(member -> 
assertTrue(member.memberMetadata().length == 0));
+        describedGroups.get(0).members().forEach(member -> 
assertTrue(member.memberAssignment().length == 0));
+    }
+
+    @Test
+    public void testDescribeGroupsGroupIdNotFoundException() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createSnapshot();

Review Comment:
   nit: Instead of doing this, would it make sense to create the first snapshot 
in the constructor of the test context?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testDescribeGroupStableForDynamicMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        JoinGroupResponseData responseData = 
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals(responseData.protocolType(), 
describedGroups.get(0).protocolType());
+        assertEquals(responseData.protocolName(), 
describedGroups.get(0).protocolData());
+        assertEquals(responseData.members().stream().map(member -> 
member.memberId()).collect(Collectors.toList()),
+            describedGroups.get(0).members().stream().map(member -> 
member.memberId()).collect(Collectors.toList())
+        );

Review Comment:
   nit:
   
   ```
   assertEquals(
        responseData.members().stream().map(member -> 
member.memberId()).collect(Collectors.toList()),
        describedGroups.get(0).members().stream().map(member -> 
member.memberId()).collect(Collectors.toList())
   );
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test

Review Comment:
   It would be great if we could add a test similar to `testListGroups` which 
verifies that the committed offset works as expected.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testDescribeGroupStableForDynamicMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        JoinGroupResponseData responseData = 
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals(responseData.protocolType(), 
describedGroups.get(0).protocolType());
+        assertEquals(responseData.protocolName(), 
describedGroups.get(0).protocolData());
+        assertEquals(responseData.members().stream().map(member -> 
member.memberId()).collect(Collectors.toList()),
+            describedGroups.get(0).members().stream().map(member -> 
member.memberId()).collect(Collectors.toList())
+        );
+    }
+
+    @Test
+    public void testDescribeGroupStableForStaticMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals("consumer", describedGroups.get(0).protocolType());
+        assertEquals("range", describedGroups.get(0).protocolData());
+        assertEquals(group.allMembers().stream().map(member -> 
member.memberId()).collect(Collectors.toList()),
+            describedGroups.get(0).members().stream().map(member -> 
member.memberId()).collect(Collectors.toList())
+        );
+        assertEquals(group.allMembers().stream().map(member -> 
member.groupInstanceId().get()).collect(Collectors.toList()),
+            describedGroups.get(0).members().stream().map(member -> 
member.groupInstanceId()).collect(Collectors.toList())
+        );

Review Comment:
   nit: format as before.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testDescribeGroupStableForDynamicMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        JoinGroupResponseData responseData = 
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals(responseData.protocolType(), 
describedGroups.get(0).protocolType());
+        assertEquals(responseData.protocolName(), 
describedGroups.get(0).protocolData());
+        assertEquals(responseData.members().stream().map(member -> 
member.memberId()).collect(Collectors.toList()),
+            describedGroups.get(0).members().stream().map(member -> 
member.memberId()).collect(Collectors.toList())
+        );
+    }
+
+    @Test
+    public void testDescribeGroupStableForStaticMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.staticMembersJoinAndRebalance(
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);

Review Comment:
   nit: Let's move this one to right before the assertions as it is only used 
there.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -751,6 +752,111 @@ public void testListGroupsFailedImmediately()
         assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
     }
 
+    @Test
+    public void testDescribeGroups() throws Exception {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 2;
+        service.startup(() -> partitionCount);
+
+        DescribeGroupsResponseData.DescribedGroup describedGroup1 = new 
DescribeGroupsResponseData.DescribedGroup()
+            .setGroupId("group-id-1");
+        DescribeGroupsResponseData.DescribedGroup describedGroup2 = new 
DescribeGroupsResponseData.DescribedGroup()
+            .setGroupId("group-id-2");
+        List<DescribeGroupsResponseData.DescribedGroup> 
expectedDescribedGroups = Arrays.asList(
+            describedGroup1,
+            describedGroup2
+        );
+
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("describe-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+            ArgumentMatchers.any()
+        
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+        CompletableFuture<Object> describedGroupFuture = new 
CompletableFuture<>();
+        when(runtime.scheduleReadOperation(
+            ArgumentMatchers.eq("describe-groups"),
+            ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+            ArgumentMatchers.any()
+        )).thenReturn(describedGroupFuture);
+
+        CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> 
future =
+            service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS), 
Arrays.asList("group-id-1", "group-id-2"));
+
+        assertFalse(future.isDone());
+        
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+

Review Comment:
   nit: Should we remove this empty line?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -521,6 +575,31 @@ GenericGroup getOrMaybeCreateGenericGroup(
         }
     }
 
+    /**
+     * Gets a generic group by committed offset.
+     *
+     * @param groupId           The group id.
+     * @param committedOffset   A specified committed offset corresponding to 
this shard.
+     *
+     * @return A GenericGroup.
+     * @throws GroupIdNotFoundException if the group does not exist or is not 
a generic group.
+     */
+    public GenericGroup getGenericGroupByCommittedOffset(

Review Comment:
   nit: We could call it `genericGroup` to be aligned with `group`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testDescribeGroupStableForDynamicMember() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        JoinGroupResponseData responseData = 
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+        List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+            context.describeGroups(Collections.singletonList("group-id"));
+
+        assertEquals(responseData.protocolType(), 
describedGroups.get(0).protocolType());

Review Comment:
   Would it be possible to validate all the fields? One way would be to 
construct the expected response and to use assertEquals? This comment applies 
to all the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to