dajac commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1273950275
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -5205,11 +5245,102 @@ public void
testReplaceStaticMemberInStableStateErrors() throws Exception {
GenericGroupMember revertedMember =
group.member(group.staticMemberId("group-instance-id"));
- assertEquals(oldMember.memberId(), revertedMember.memberId());
- assertEquals(oldMember.groupInstanceId(),
revertedMember.groupInstanceId());
- assertEquals(oldMember.rebalanceTimeoutMs(),
revertedMember.rebalanceTimeoutMs());
- assertEquals(oldMember.sessionTimeoutMs(),
revertedMember.sessionTimeoutMs());
- assertEquals(oldMember.supportedProtocols(),
revertedMember.supportedProtocols());
+ assertEquals(oldMemberId, revertedMember.memberId());
+ assertEquals(Optional.of("group-instance-id"),
revertedMember.groupInstanceId());
+ assertEquals(4000, revertedMember.rebalanceTimeoutMs());
+ assertEquals(3000, revertedMember.sessionTimeoutMs());
+ assertEquals(protocols, revertedMember.supportedProtocols());
+ assertEquals(1, group.size());
+ assertEquals(1, group.generationId());
+ assertTrue(group.isInState(STABLE));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testReplaceStaticMemberInStableStateSucceeds(
+ boolean supportSkippingAssignment
+ ) throws Exception {
+ // If the append future succeeds, the soft state is updated with the
new member.
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection(0);
+
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("range")
+ .setMetadata(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("foo"))).array())
+ );
+
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("group-instance-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withProtocolType("consumer")
+ .withProtocols(protocols)
+ .build();
+
+ JoinGroupResponseData response =
context.joinGenericGroupAndCompleteJoin(
+ request,
+ true,
+ supportSkippingAssignment
+ );
+
+ assertEquals(Errors.NONE.code(), response.errorCode());
+ assertEquals(1, group.size());
+ assertEquals(1, group.generationId());
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+ String oldMemberId = response.memberId();
+ // Simulate successful sync group phase
+ group.transitionTo(STABLE);
+
+ // Static member rejoins with UNKNOWN_MEMBER_ID and the append
succeeds.
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("roundrobin")
+ .setMetadata(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("bar"))).array()));
+
+ CompletableFuture<JoinGroupResponseData> responseFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+ request
+ .setProtocols(protocols)
+ .setRebalanceTimeoutMs(7000)
+ .setSessionTimeoutMs(6000),
+ responseFuture,
+ true,
+ supportSkippingAssignment);
+
+ assertEquals(
+ Collections.singletonList(newGroupMetadataRecord(group,
MetadataVersion.latest())),
+ result.records()
+ );
+ assertFalse(responseFuture.isDone());
+
+ // Simulate a successful write to the log.
+ result.appendFuture().complete(null);
Review Comment:
That's fair.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]