dajac commented on code in PR #14462:
URL: https://github.com/apache/kafka/pull/14462#discussion_r1345506634
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testDescribeGroupStableForDynamicMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ JoinGroupResponseData responseData =
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(responseData.protocolType(),
describedGroups.get(0).protocolType());
+ assertEquals(responseData.protocolName(),
describedGroups.get(0).protocolData());
+ assertEquals(responseData.members().stream().map(member ->
member.memberId()).collect(Collectors.toList()),
+ describedGroups.get(0).members().stream().map(member ->
member.memberId()).collect(Collectors.toList())
+ );
+ }
+
+ @Test
+ public void testDescribeGroupStableForStaticMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals("consumer", describedGroups.get(0).protocolType());
+ assertEquals("range", describedGroups.get(0).protocolData());
+ assertEquals(group.allMembers().stream().map(member ->
member.memberId()).collect(Collectors.toList()),
+ describedGroups.get(0).members().stream().map(member ->
member.memberId()).collect(Collectors.toList())
+ );
+ assertEquals(group.allMembers().stream().map(member ->
member.groupInstanceId().get()).collect(Collectors.toList()),
+ describedGroups.get(0).members().stream().map(member ->
member.groupInstanceId()).collect(Collectors.toList())
+ );
+ }
+
+ @Test
+ public void testDescribeGroupRebalancing() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+ JoinGroupResponseData responseData =
context.joinGenericGroupAsDynamicMemberAndCompleteJoin(
+ new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build()
+ );
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+ assertEquals(responseData.protocolType(),
describedGroups.get(0).protocolType());
+ assertEquals("", describedGroups.get(0).protocolData());
+ assertTrue(describedGroups.get(0).members().stream().map(member ->
member.memberId()).collect(Collectors.toList())
+ .contains(responseData.memberId()));
+ describedGroups.get(0).members().forEach(member ->
assertTrue(member.memberMetadata().length == 0));
+ describedGroups.get(0).members().forEach(member ->
assertTrue(member.memberAssignment().length == 0));
+ }
+
+ @Test
+ public void testDescribeGroupsGroupIdNotFoundException() {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.createSnapshot();
Review Comment:
nit: Instead of doing this, would it make sense to create the first snapshot
in the constructor of the test context?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testDescribeGroupStableForDynamicMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ JoinGroupResponseData responseData =
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(responseData.protocolType(),
describedGroups.get(0).protocolType());
+ assertEquals(responseData.protocolName(),
describedGroups.get(0).protocolData());
+ assertEquals(responseData.members().stream().map(member ->
member.memberId()).collect(Collectors.toList()),
+ describedGroups.get(0).members().stream().map(member ->
member.memberId()).collect(Collectors.toList())
+ );
Review Comment:
nit:
```
assertEquals(
responseData.members().stream().map(member ->
member.memberId()).collect(Collectors.toList()),
describedGroups.get(0).members().stream().map(member ->
member.memberId()).collect(Collectors.toList())
);
```
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
Review Comment:
It would be great if we could add a test similar to `testListGroups` which
verifies that the committed offset works as expected.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testDescribeGroupStableForDynamicMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ JoinGroupResponseData responseData =
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(responseData.protocolType(),
describedGroups.get(0).protocolType());
+ assertEquals(responseData.protocolName(),
describedGroups.get(0).protocolData());
+ assertEquals(responseData.members().stream().map(member ->
member.memberId()).collect(Collectors.toList()),
+ describedGroups.get(0).members().stream().map(member ->
member.memberId()).collect(Collectors.toList())
+ );
+ }
+
+ @Test
+ public void testDescribeGroupStableForStaticMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals("consumer", describedGroups.get(0).protocolType());
+ assertEquals("range", describedGroups.get(0).protocolData());
+ assertEquals(group.allMembers().stream().map(member ->
member.memberId()).collect(Collectors.toList()),
+ describedGroups.get(0).members().stream().map(member ->
member.memberId()).collect(Collectors.toList())
+ );
+ assertEquals(group.allMembers().stream().map(member ->
member.groupInstanceId().get()).collect(Collectors.toList()),
+ describedGroups.get(0).members().stream().map(member ->
member.groupInstanceId()).collect(Collectors.toList())
+ );
Review Comment:
nit: format as before.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testDescribeGroupStableForDynamicMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ JoinGroupResponseData responseData =
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(responseData.protocolType(),
describedGroups.get(0).protocolType());
+ assertEquals(responseData.protocolName(),
describedGroups.get(0).protocolData());
+ assertEquals(responseData.members().stream().map(member ->
member.memberId()).collect(Collectors.toList()),
+ describedGroups.get(0).members().stream().map(member ->
member.memberId()).collect(Collectors.toList())
+ );
+ }
+
+ @Test
+ public void testDescribeGroupStableForStaticMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
Review Comment:
nit: Let's move this one to right before the assertions as it is only used
there.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java:
##########
@@ -751,6 +752,111 @@ public void testListGroupsFailedImmediately()
assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
}
+ @Test
+ public void testDescribeGroups() throws Exception {
+ CoordinatorRuntime<GroupCoordinatorShard, Record> runtime =
mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime
+ );
+ int partitionCount = 2;
+ service.startup(() -> partitionCount);
+
+ DescribeGroupsResponseData.DescribedGroup describedGroup1 = new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-1");
+ DescribeGroupsResponseData.DescribedGroup describedGroup2 = new
DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id-2");
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroups = Arrays.asList(
+ describedGroup1,
+ describedGroup2
+ );
+
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("describe-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.any()
+
)).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
+
+ CompletableFuture<Object> describedGroupFuture = new
CompletableFuture<>();
+ when(runtime.scheduleReadOperation(
+ ArgumentMatchers.eq("describe-groups"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)),
+ ArgumentMatchers.any()
+ )).thenReturn(describedGroupFuture);
+
+ CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>
future =
+ service.describeGroups(requestContext(ApiKeys.DESCRIBE_GROUPS),
Arrays.asList("group-id-1", "group-id-2"));
+
+ assertFalse(future.isDone());
+
describedGroupFuture.complete(Collections.singletonList(describedGroup2));
+
Review Comment:
nit: Should we remove this empty line?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -521,6 +575,31 @@ GenericGroup getOrMaybeCreateGenericGroup(
}
}
+ /**
+ * Gets a generic group by committed offset.
+ *
+ * @param groupId The group id.
+ * @param committedOffset A specified committed offset corresponding to
this shard.
+ *
+ * @return A GenericGroup.
+ * @throws GroupIdNotFoundException if the group does not exist or is not
a generic group.
+ */
+ public GenericGroup getGenericGroupByCommittedOffset(
Review Comment:
nit: We could call it `genericGroup` to be aligned with `group`.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8686,6 +8695,88 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testDescribeGroupStableForDynamicMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ JoinGroupResponseData responseData =
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(responseData.protocolType(),
describedGroups.get(0).protocolType());
Review Comment:
Would it be possible to validate all the fields? One way would be to
construct the expected response and to use assertEquals? This comment applies
to all the tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]