jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1272854832
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -5205,11 +5245,102 @@ public void
testReplaceStaticMemberInStableStateErrors() throws Exception {
GenericGroupMember revertedMember =
group.member(group.staticMemberId("group-instance-id"));
- assertEquals(oldMember.memberId(), revertedMember.memberId());
- assertEquals(oldMember.groupInstanceId(),
revertedMember.groupInstanceId());
- assertEquals(oldMember.rebalanceTimeoutMs(),
revertedMember.rebalanceTimeoutMs());
- assertEquals(oldMember.sessionTimeoutMs(),
revertedMember.sessionTimeoutMs());
- assertEquals(oldMember.supportedProtocols(),
revertedMember.supportedProtocols());
+ assertEquals(oldMemberId, revertedMember.memberId());
+ assertEquals(Optional.of("group-instance-id"),
revertedMember.groupInstanceId());
+ assertEquals(4000, revertedMember.rebalanceTimeoutMs());
+ assertEquals(3000, revertedMember.sessionTimeoutMs());
+ assertEquals(protocols, revertedMember.supportedProtocols());
+ assertEquals(1, group.size());
+ assertEquals(1, group.generationId());
+ assertTrue(group.isInState(STABLE));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testReplaceStaticMemberInStableStateSucceeds(
+ boolean supportSkippingAssignment
+ ) throws Exception {
+ // If the append future succeeds, the soft state is updated with the
new member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection(0);
+
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("foo"))).array())
+ );
+
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("group-instance-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocolType("consumer")
+ .withProtocols(protocols)
+ .build();
+
+ JoinGroupResponseData response =
context.joinGenericGroupAndCompleteJoin(
+ request,
+ true,
+ supportSkippingAssignment
+ );
+
+ assertEquals(Errors.NONE.code(), response.errorCode());
+ assertEquals(1, group.size());
+ assertEquals(1, group.generationId());
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+ String oldMemberId = response.memberId();
+ // Simulate successful sync group phase
+ group.transitionTo(STABLE);
+
+ // Static member rejoins with UNKNOWN_MEMBER_ID and the append
succeeds.
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("roundrobin")
+ .setMetadata(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("bar"))).array()));
+
+ CompletableFuture<JoinGroupResponseData> responseFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+ request
+ .setProtocols(protocols)
+ .setRebalanceTimeoutMs(7000)
+ .setSessionTimeoutMs(6000),
+ responseFuture,
+ true,
+ supportSkippingAssignment);
+
+ assertEquals(
+ Collections.singletonList(newGroupMetadataRecord(group,
MetadataVersion.latest())),
+ result.records()
+ );
+ assertFalse(responseFuture.isDone());
+
+ // Simulate a successful write to the log.
+ result.appendFuture().complete(null);
Review Comment:
i like it explicitly stated since it shows when we expect the append future
to be completed.
we only replay in `sleep()`. the append future can be completed from both
join group and sync group requests along with sleep(), and follower sync
requests do not generate records so it doesn't help simplify. also i think it
confuses the reader more on what is actually going under the hood
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]