dajac commented on code in PR #14462:
URL: https://github.com/apache/kafka/pull/14462#discussion_r1347070423
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1210,6 +1229,11 @@ private void replay(
lastWrittenOffset++;
snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
}
+
+ private void createSnapshot() {
+ lastWrittenOffset++;
+ snapshotRegistry.getOrCreateSnapshot(lastWrittenOffset);
+ }
Review Comment:
This is weird because we should only increment lastWrittenOffset when
records are written. I think that we should remove it. See my other comments to
avoid it.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8641,6 +8665,130 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testDescribeGroupStableForDynamicMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ JoinGroupResponseData responseData =
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
Review Comment:
The weirdness comes from the fact that
joinGenericGroupAsDynamicMemberAndCompleteRebalance does not replay the
generated records so the lastWrittenOffset is not updated and a new snapshot is
not created. I tried to add the logic to replay them but that breaks a few
tests because they assume that the GenericGroup object is updated but it is
replaced when records are replayed. We should fix this but this is for another
PR. Instead of doing this, we could just replay the relevant records to build
the state that we want. Example:
```
# Create first group.
context.replay(newGroupMetadataRecord(....));
# Commit it.
context.commit()
# Create second group.
context.replay(newGroupMetadataRecord(....));
# Verify that first group is returned and second one is returned as dead
# Commit
# Verify that both groups are returned
```
What do you think?
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -1134,6 +1140,19 @@ public CoordinatorResult<LeaveGroupResponseData, Record>
sendGenericGroupLeave(
return groupMetadataManager.genericGroupLeave(context, request);
}
+ private void testDescribeGroupsUncommittedGroup(String groupId) {
Review Comment:
nit: `verifyDescribeGroupsReturnsDeadGroup(...)`? We use verify for similar
methods in the context and this method is not aware of the "uncommitted" so it
should not be in the name in my opinion.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8641,6 +8665,130 @@ public void testListGroups() {
assertEquals(expectAllGroupMap, actualAllGroupMap);
}
+ @Test
+ public void testDescribeGroupStableForDynamicMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ JoinGroupResponseData responseData =
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
+ context.testDescribeGroupsUncommittedGroup("group-id");
+
+ context.createSnapshot();
+ context.commit();
+
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroups = Collections.singletonList(
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id")
+ .setGroupState(STABLE.toString())
+ .setProtocolType(responseData.protocolType())
+ .setProtocolData(responseData.protocolName())
+ .setMembers(responseData.members().stream().map(member ->
+ new DescribeGroupsResponseData.DescribedGroupMember()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(member.groupInstanceId())
+ .setClientId("client")
+ .setClientHost("localhost/127.0.0.1")
+ .setMemberMetadata(member.metadata())
+ ).collect(Collectors.toList()))
+ );
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(expectedDescribedGroups, describedGroups);
+ }
+
+ @Test
+ public void testDescribeGroupStableForStaticMember() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ context.staticMembersJoinAndRebalance(
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+ context.testDescribeGroupsUncommittedGroup("group-id");
+
+ context.createSnapshot();
+ context.commit();
+
+ GenericGroup group =
context.groupMetadataManager.getOrMaybeCreateGenericGroup("group-id", false);
+ List<DescribeGroupsResponseData.DescribedGroup>
expectedDescribedGroups = Collections.singletonList(
+ new DescribeGroupsResponseData.DescribedGroup()
+ .setGroupId("group-id")
+ .setGroupState(STABLE.toString())
+ .setProtocolType(group.protocolType().get())
+ .setProtocolData(group.protocolName().get())
+ .setMembers(group.allMembers().stream().map(member ->
+ new DescribeGroupsResponseData.DescribedGroupMember()
+ .setMemberId(member.memberId())
+ .setGroupInstanceId(member.groupInstanceId().get())
+ .setClientId(member.clientId())
+ .setClientHost(member.clientHost())
+
.setMemberMetadata(member.metadata(group.protocolName().get()))
+ .setMemberAssignment(member.assignment())
+ ).collect(Collectors.toList()))
+ );
+
+ List<DescribeGroupsResponseData.DescribedGroup> describedGroups =
+ context.describeGroups(Collections.singletonList("group-id"));
+
+ assertEquals(expectedDescribedGroups, describedGroups);
+ }
+
+ @Test
+ public void testDescribeGroupRebalancing() throws Exception {
Review Comment:
In this case, you can probably replay and then trigger a rebalance to get
the state that you want.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]