jeffkbkim commented on code in PR #14017:
URL: https://github.com/apache/kafka/pull/14017#discussion_r1270926383
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -5524,5 +5670,2897 @@ private static Record newGroupMetadataRecord(
)
);
}
+
+ @Test
+ public void testNewMemberTimeoutCompletion() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .withSessionTimeoutMs(context.genericGroupNewMemberJoinTimeoutMs +
5000)
+ .build();
+
+ CompletableFuture<JoinGroupResponseData> joinFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result =
context.sendGenericGroupJoin(joinRequest, joinFuture);
+ assertTrue(result.records().isEmpty());
+ assertFalse(joinFuture.isDone());
+
+
context.sleepAndAssertEmptyResult(context.genericGroupInitialRebalanceDelayMs);
+
+ assertTrue(joinFuture.isDone());
+ assertEquals(Errors.NONE.code(), joinFuture.get().errorCode());
+
+ assertEquals(0,
group.allMembers().stream().filter(GenericGroupMember::isNew).count());
+
+ SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(joinFuture.get().memberId())
+ .withGenerationId(joinFuture.get().generationId())
+ .build();
+
+ CompletableFuture<SyncGroupResponseData> syncFuture = new
CompletableFuture<>();
+ result = context.sendGenericGroupSync(syncRequest, syncFuture);
+
+ assertEquals(
+ Collections.singletonList(newGroupMetadataRecord(group,
MetadataVersion.latest())),
+ result.records()
+ );
+ // Simulate a successful write to the log.
+ result.appendFuture().complete(null);
+
+ assertTrue(syncFuture.isDone());
+ assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+ assertEquals(1, group.size());
+
+ // Make sure the NewMemberTimeout is not still in effect, and the
member is not kicked
+
context.sleepAndAssertEmptyResult(context.genericGroupNewMemberJoinTimeoutMs);
+ assertEquals(1, group.size());
+
+ // Member should be removed as heartbeat expires.
+ List<ExpiredTimeout<Void, Record>> timeouts = context.sleep(5000);
+ List<Record> expectedRecords =
Collections.singletonList(newGroupMetadataRecord("group-id",
+ new GroupMetadataValue()
+ .setMembers(Collections.emptyList())
+ .setGeneration(2)
+ .setLeader(null)
+ .setProtocolType("consumer")
+ .setProtocol(null)
+ .setCurrentStateTimestamp(context.time.milliseconds()),
+ MetadataVersion.latest()));
+
+ assertEquals(1, timeouts.size());
+ String memberId = joinFuture.get().memberId();
+ timeouts.forEach(timeout -> {
+ assertEquals(genericGroupHeartbeatKey("group-id", memberId),
timeout.key);
+ assertEquals(expectedRecords, timeout.result.records());
+ });
+
+ assertEquals(0, group.size());
+ }
+
+ @Test
+ public void testNewMemberFailureAfterJoinGroupCompletion() throws
Exception {
+ // For old versions of the JoinGroup protocol, new members were subject
+ // to expiration if the rebalance took long enough. This test case
ensures
+ // that following completion of the JoinGroup phase, new members follow
+ // normal heartbeat expiration logic.
+
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .withSessionTimeoutMs(5000)
+ .withRebalanceTimeoutMs(10000)
+ .build();
+
+ JoinGroupResponseData joinResponse =
context.joinGenericGroupAndCompleteJoin(joinRequest, false, false);
+ assertEquals(Errors.NONE.code(), joinResponse.errorCode());
+
+ String memberId = joinResponse.memberId();
+ assertEquals(memberId, joinResponse.leader());
+ assertEquals(1, joinResponse.generationId());
+
+ SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(memberId)
+ .withGenerationId(1)
+ .build();
+
+ CompletableFuture<SyncGroupResponseData> syncResponseFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result =
context.sendGenericGroupSync(syncRequest, syncResponseFuture);
+
+ assertEquals(
+ Collections.singletonList(newGroupMetadataRecord(group,
MetadataVersion.latest())),
+ result.records()
+ );
+ // Simulate a successful write to the log.
+ result.appendFuture().complete(null);
+
+ assertTrue(syncResponseFuture.isDone());
+ assertEquals(Errors.NONE.code(), syncResponseFuture.get().errorCode());
+
+ assertTrue(group.isInState(STABLE));
+ assertEquals(1, group.generationId());
+
+ CompletableFuture<JoinGroupResponseData> otherJoinFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> otherJoinResult =
context.sendGenericGroupJoin(
+ joinRequest.setMemberId(UNKNOWN_MEMBER_ID),
+ otherJoinFuture);
+
+ CompletableFuture<JoinGroupResponseData> joinFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> joinResult =
context.sendGenericGroupJoin(
+ joinRequest.setMemberId(memberId),
+ joinFuture);
+
+ assertTrue(otherJoinResult.records().isEmpty());
+ assertTrue(joinResult.records().isEmpty());
+ assertTrue(joinFuture.isDone());
+ assertTrue(otherJoinFuture.isDone());
+
+ verifySessionExpiration(context, group, 5000);
+ }
+
+ @Test
+ public void testStaticMemberFenceDuplicateRejoinedFollower() throws
Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+ context,
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ // A third member joins. Trigger a rebalance.
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ context.sendGenericGroupJoin(request, new CompletableFuture<>());
+
+ assertTrue(group.isInState(PREPARING_REBALANCE));
+
+ // Old follower rejoins group will be matching current member.id.
+ CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+ request
+ .setMemberId(rebalanceResult.followerId)
+ .setGroupInstanceId("follower-instance-id"),
+ oldFollowerJoinFuture);
+
+ assertTrue(result.records().isEmpty());
+ assertFalse(oldFollowerJoinFuture.isDone());
+
+ // Duplicate follower joins group with unknown member id will trigger
member id replacement.
+ result = context.sendGenericGroupJoin(
+
request.setMemberId(UNKNOWN_MEMBER_ID).setGroupInstanceId("follower-instance-id"),
+ new CompletableFuture<>());
+
+ // Old member shall be fenced immediately upon duplicate follower
joins.
+ assertTrue(result.records().isEmpty());
+ assertTrue(oldFollowerJoinFuture.isDone());
+
+ JoinGroupResponseData expectedResponse = new JoinGroupResponseData()
+ .setErrorCode(Errors.FENCED_INSTANCE_ID.code())
+ .setProtocolName(null)
+ .setProtocolType(null)
+ .setLeader(UNKNOWN_MEMBER_ID)
+ .setMemberId(rebalanceResult.followerId)
+ .setGenerationId(-1);
+
+ checkJoinGroupResponse(
+ expectedResponse,
+ oldFollowerJoinFuture.get(),
+ group,
+ PREPARING_REBALANCE,
+ Collections.emptySet()
+ );
+ }
+
+ @Test
+ public void
testStaticMemberFenceDuplicateSyncingFollowerAfterMemberIdChanged() throws
Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+ context,
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ // Known leader rejoins will trigger rebalance.
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("leader-instance-id")
+ .withMemberId(rebalanceResult.leaderId)
+ .withDefaultProtocolTypeAndProtocols()
+ .withRebalanceTimeoutMs(10000)
+ .build();
+
+ CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result =
context.sendGenericGroupJoin(request, leaderJoinFuture);
+
+ assertTrue(result.records().isEmpty());
+ assertFalse(leaderJoinFuture.isDone());
+ assertTrue(group.isInState(PREPARING_REBALANCE));
+
+ // Old follower rejoins group will match current member.id.
+ CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new
CompletableFuture<>();
+ result = context.sendGenericGroupJoin(
+
request.setMemberId(rebalanceResult.followerId).setGroupInstanceId("follower-instance-id"),
+ oldFollowerJoinFuture);
+
+ assertTrue(result.records().isEmpty());
+ assertTrue(oldFollowerJoinFuture.isDone());
+ assertTrue(leaderJoinFuture.isDone());
+
+ JoinGroupResponseData expectedLeaderResponse = new
JoinGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGenerationId(rebalanceResult.generationId + 1)
+ .setMemberId(rebalanceResult.leaderId)
+ .setLeader(rebalanceResult.leaderId)
+ .setProtocolName("range")
+ .setProtocolType("consumer")
+ .setMembers(toJoinResponseMembers(group));
+
+ Set<String> expectedGroupInstanceIds = new HashSet<>();
+ expectedGroupInstanceIds.add("leader-instance-id");
+ expectedGroupInstanceIds.add("follower-instance-id");
+
+ checkJoinGroupResponse(
+ expectedLeaderResponse,
+ leaderJoinFuture.get(),
+ group,
+ COMPLETING_REBALANCE,
+ expectedGroupInstanceIds
+ );
+
+ assertEquals(rebalanceResult.leaderId,
leaderJoinFuture.get().memberId());
+ assertEquals(rebalanceResult.leaderId,
leaderJoinFuture.get().leader());
+
+ // Old follower should get a successful join group response.
+ assertTrue(oldFollowerJoinFuture.isDone());
+
+ JoinGroupResponseData expectedFollowerResponse = new
JoinGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGenerationId(rebalanceResult.generationId + 1)
+ .setMemberId(oldFollowerJoinFuture.get().memberId())
+ .setLeader(rebalanceResult.leaderId)
+ .setProtocolName("range")
+ .setProtocolType("consumer");
+
+ checkJoinGroupResponse(
+ expectedFollowerResponse,
+ oldFollowerJoinFuture.get(),
+ group,
+ COMPLETING_REBALANCE,
+ Collections.emptySet()
+ );
+
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+ assertEquals(rebalanceResult.followerId,
oldFollowerJoinFuture.get().memberId());
+ assertEquals(rebalanceResult.leaderId,
oldFollowerJoinFuture.get().leader());
+
+ // Duplicate follower joins group with unknown member id will trigger
member.id replacement,
+ // and will also trigger a rebalance under CompletingRebalance state;
the old follower sync callback
+ // will return fenced exception while broker replaces the member
identity with the duplicate follower joins.
+ SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("follower-instance-id")
+ .withGenerationId(oldFollowerJoinFuture.get().generationId())
+ .withMemberId(oldFollowerJoinFuture.get().memberId())
+ .build();
+
+ CompletableFuture<SyncGroupResponseData> oldFollowerSyncFuture = new
CompletableFuture<>();
+ result = context.sendGenericGroupSync(syncRequest,
oldFollowerSyncFuture);
+
+ assertTrue(result.records().isEmpty());
+ assertFalse(oldFollowerSyncFuture.isDone());
+
+ CompletableFuture<JoinGroupResponseData> duplicateFollowerJoinFuture =
new CompletableFuture<>();
+ result = context.sendGenericGroupJoin(
+
request.setMemberId(UNKNOWN_MEMBER_ID).setGroupInstanceId("follower-instance-id"),
+ duplicateFollowerJoinFuture);
+
+ assertTrue(result.records().isEmpty());
+ assertTrue(group.isInState(PREPARING_REBALANCE));
+ assertFalse(duplicateFollowerJoinFuture.isDone());
+ assertTrue(oldFollowerSyncFuture.isDone());
+ assertEquals(Errors.FENCED_INSTANCE_ID.code(),
oldFollowerSyncFuture.get().errorCode());
+
+ // Advance clock by rebalance timeout so that the join phase completes
with duplicate follower.
+ // Leader is kicked out.
+ context.sleepAndAssertEmptyResult(10000);
+
+ assertTrue(duplicateFollowerJoinFuture.isDone());
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+ assertEquals(3, group.generationId());
+ assertEquals(1, group.size());
+
assertTrue(group.hasMemberId(duplicateFollowerJoinFuture.get().memberId()));
+ assertEquals(duplicateFollowerJoinFuture.get().memberId(),
duplicateFollowerJoinFuture.get().leader());
+ }
+
+ @Test
+ public void
testStaticMemberFenceDuplicateRejoiningFollowerAfterMemberIdChanged() throws
Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+ context,
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ // Known leader rejoins will trigger rebalance.
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("leader-instance-id")
+ .withMemberId(rebalanceResult.leaderId)
+ .withDefaultProtocolTypeAndProtocols()
+ .withRebalanceTimeoutMs(10000)
+ .build();
+
+ CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result =
context.sendGenericGroupJoin(request, leaderJoinFuture);
+
+ assertTrue(result.records().isEmpty());
+ assertFalse(leaderJoinFuture.isDone());
+ assertTrue(group.isInState(PREPARING_REBALANCE));
+
+ // Duplicate follower joins group will trigger member id replacement.
+ CompletableFuture<JoinGroupResponseData> duplicateFollowerJoinFuture =
new CompletableFuture<>();
+ result = context.sendGenericGroupJoin(
+
request.setMemberId(UNKNOWN_MEMBER_ID).setGroupInstanceId("follower-instance-id"),
+ duplicateFollowerJoinFuture);
+
+ assertTrue(result.records().isEmpty());
+ assertTrue(duplicateFollowerJoinFuture.isDone());
+
+ // Old follower rejoins group will fail because member id is already
updated.
+ CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new
CompletableFuture<>();
+ result = context.sendGenericGroupJoin(
+ request.setMemberId(rebalanceResult.followerId),
+ oldFollowerJoinFuture);
+
+ assertTrue(result.records().isEmpty());
+ assertTrue(oldFollowerJoinFuture.isDone());
+ assertTrue(leaderJoinFuture.isDone());
+
+ JoinGroupResponseData expectedLeaderResponse = new
JoinGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGenerationId(rebalanceResult.generationId + 1)
+ .setMemberId(rebalanceResult.leaderId)
+ .setLeader(rebalanceResult.leaderId)
+ .setProtocolName("range")
+ .setProtocolType("consumer")
+ .setMembers(toJoinResponseMembers(group));
+
+ Set<String> expectedGroupInstanceIds = new HashSet<>();
+ expectedGroupInstanceIds.add("leader-instance-id");
+ expectedGroupInstanceIds.add("follower-instance-id");
+
+ checkJoinGroupResponse(
+ expectedLeaderResponse,
+ leaderJoinFuture.get(),
+ group,
+ COMPLETING_REBALANCE,
+ expectedGroupInstanceIds
+ );
+
+ assertTrue(duplicateFollowerJoinFuture.isDone());
+
+ JoinGroupResponseData expectedDuplicateFollowerResponse = new
JoinGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGenerationId(rebalanceResult.generationId + 1)
+ .setMemberId(duplicateFollowerJoinFuture.get().memberId())
+ .setLeader(rebalanceResult.leaderId)
+ .setProtocolName("range")
+ .setProtocolType("consumer")
+ .setMembers(Collections.emptyList());
+
+ checkJoinGroupResponse(
+ expectedDuplicateFollowerResponse,
+ duplicateFollowerJoinFuture.get(),
+ group,
+ COMPLETING_REBALANCE,
+ Collections.emptySet()
+ );
+
+ assertTrue(duplicateFollowerJoinFuture.isDone());
+
+ JoinGroupResponseData expectedOldFollowerResponse = new
JoinGroupResponseData()
+ .setErrorCode(Errors.FENCED_INSTANCE_ID.code())
+ .setGenerationId(-1)
+ .setMemberId(rebalanceResult.followerId)
+ .setLeader(UNKNOWN_MEMBER_ID)
+ .setProtocolName(null)
+ .setProtocolType(null)
+ .setMembers(Collections.emptyList());
+
+ checkJoinGroupResponse(
+ expectedOldFollowerResponse,
+ oldFollowerJoinFuture.get(),
+ group,
+ COMPLETING_REBALANCE,
+ Collections.emptySet()
+ );
+ }
+
+ @Test
+ public void testStaticMemberRejoinWithKnownMemberId() throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withGroupInstanceId("group-instance-id")
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ JoinGroupResponseData joinResponse =
context.joinGenericGroupAndCompleteJoin(request, false, false);
+ assertEquals(Errors.NONE.code(), joinResponse.errorCode());
+
+ String memberId = joinResponse.memberId();
+
+ CompletableFuture<JoinGroupResponseData> rejoinResponseFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+ request.setMemberId(memberId),
+ rejoinResponseFuture);
+
+ // The second join group should return immediately since we are using
the same metadata during CompletingRebalance.
+ assertTrue(result.records().isEmpty());
+ assertTrue(group.isInState(COMPLETING_REBALANCE));
+ assertTrue(rejoinResponseFuture.isDone());
+ assertEquals(Errors.NONE.code(),
rejoinResponseFuture.get().errorCode());
+
+ SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withMemberId(memberId)
+ .withGenerationId(joinResponse.generationId())
+ .withGroupInstanceId("group-instance-id")
+ .build();
+
+ CompletableFuture<SyncGroupResponseData> syncFuture = new
CompletableFuture<>();
+ result = context.sendGenericGroupSync(syncRequest, syncFuture);
+
+ assertEquals(
+ Collections.singletonList(newGroupMetadataRecord(group,
MetadataVersion.latest())),
+ result.records()
+ );
+ // Successful write to the log.
+ result.appendFuture().complete(null);
+
+ assertTrue(syncFuture.isDone());
+ assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+ assertTrue(group.isInState(STABLE));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testStaticMemberRejoinWithLeaderIdAndUnknownMemberId(
+ boolean supportSkippingAssignment
+ ) throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+ context,
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ // A static leader rejoin with unknown id will not trigger rebalance,
and no assignment will be returned.
+ // As the group was in Stable state and the member id was updated,
this will generate records.
+ JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("leader-instance-id")
+ .withMemberId(UNKNOWN_MEMBER_ID)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ CompletableFuture<JoinGroupResponseData> joinResponseFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+ joinRequest,
+ joinResponseFuture,
+ true,
+ supportSkippingAssignment);
+
+ assertEquals(
+ Collections.singletonList(newGroupMetadataRecord(group,
MetadataVersion.latest())),
+ result.records()
+ );
+ // Simulate a successful write to the log.
+ result.appendFuture().complete(null);
+ assertTrue(joinResponseFuture.isDone());
+
+ Set<String> expectedGroupInstanceIds = new HashSet<>();
+ expectedGroupInstanceIds.add("leader-instance-id");
+ expectedGroupInstanceIds.add("follower-instance-id");
+
+ String leader = supportSkippingAssignment ?
+ joinResponseFuture.get().memberId() : rebalanceResult.leaderId;
+
+ List<JoinGroupResponseMember> members = supportSkippingAssignment ?
+ toJoinResponseMembers(group) : Collections.emptyList();
+
+ JoinGroupResponseData expectedJoinResponse = new
JoinGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGenerationId(rebalanceResult.generationId)
+ .setMemberId(joinResponseFuture.get().memberId())
+ .setLeader(leader)
+ .setProtocolName("range")
+ .setProtocolType("consumer")
+ .setSkipAssignment(supportSkippingAssignment)
+ .setMembers(members);
+
+ checkJoinGroupResponse(
+ expectedJoinResponse,
+ joinResponseFuture.get(),
+ group,
+ STABLE,
+ supportSkippingAssignment ? expectedGroupInstanceIds :
Collections.emptySet()
+ );
+ }
+
+ @Test
+ public void testStaticMemberRejoinWithLeaderIdAndKnownMemberId() throws
Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+ context,
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ // Known static leader rejoin will trigger rebalance.
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("leader-instance-id")
+ .withMemberId(rebalanceResult.leaderId)
+ .withDefaultProtocolTypeAndProtocols()
+ .withRebalanceTimeoutMs(10000)
+ .build();
+
+ JoinGroupResponseData joinResponse =
context.joinGenericGroupAndCompleteJoin(request, true, true, 10000);
+
+ JoinGroupResponseData expectedResponse = new JoinGroupResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGenerationId(rebalanceResult.generationId + 1)
+ .setMemberId(rebalanceResult.leaderId)
+ .setLeader(rebalanceResult.leaderId)
+ .setProtocolName("range")
+ .setProtocolType("consumer")
+ .setMembers(toJoinResponseMembers(group));
+
+ checkJoinGroupResponse(
+ expectedResponse,
+ joinResponse,
+ group,
+ COMPLETING_REBALANCE,
+ Collections.singleton("leader-instance-id")
+ );
+ }
+
+ @Test
+ public void testStaticMemberRejoinWithLeaderIdAndUnexpectedDeadGroup()
throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+ context,
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ group.transitionTo(DEAD);
+
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("leader-instance-id")
+ .withMemberId(rebalanceResult.leaderId)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ CompletableFuture<JoinGroupResponseData> joinFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result =
context.sendGenericGroupJoin(request, joinFuture, true, true);
+
+ assertTrue(result.records().isEmpty());
+ assertTrue(joinFuture.isDone());
+ assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(),
joinFuture.get().errorCode());
+ }
+
+ @Test
+ public void testStaticMemberRejoinWithLeaderIdAndUnexpectedEmptyGroup()
throws Exception {
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+ context,
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id"
+ );
+
+ group.transitionTo(PREPARING_REBALANCE);
+ group.transitionTo(EMPTY);
+
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("leader-instance-id")
+ .withMemberId(rebalanceResult.leaderId)
+ .withDefaultProtocolTypeAndProtocols()
+ .build();
+
+ CompletableFuture<JoinGroupResponseData> joinFuture = new
CompletableFuture<>();
+ CoordinatorResult<Void, Record> result =
context.sendGenericGroupJoin(request, joinFuture, true, true);
+
+ assertTrue(result.records().isEmpty());
+ assertTrue(joinFuture.isDone());
+ assertEquals(Errors.UNKNOWN_MEMBER_ID.code(),
joinFuture.get().errorCode());
+ }
+
+ @Test
+ public void testStaticMemberRejoinWithFollowerIdAndChangeOfProtocol()
throws Exception {
+ int rebalanceTimeoutMs = 10000;
+ int sessionTimeoutMs = 15000;
+ GroupMetadataManagerTestContext context = new
GroupMetadataManagerTestContext.Builder()
+ .build();
+ GenericGroup group = context.createGenericGroup("group-id");
+
+ RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+ context,
+ "group-id",
+ "leader-instance-id",
+ "follower-instance-id",
+ rebalanceTimeoutMs,
+ sessionTimeoutMs
+ );
+
+ // A static follower rejoin with changed protocol will trigger
rebalance.
+ JoinGroupRequestProtocolCollection protocols = new
JoinGroupRequestProtocolCollection(0);
+ protocols.add(new JoinGroupRequestProtocol()
+ .setName("roundrobin")
+ .setMetadata(ConsumerProtocol.serializeSubscription(new
ConsumerPartitionAssignor.Subscription(
+ Collections.singletonList("foo"))).array())
+ );
+ JoinGroupRequestData request = new JoinGroupRequestBuilder()
+ .withGroupId("group-id")
+ .withGroupInstanceId("follower-instance-id")
+ .withMemberId(rebalanceResult.followerId)
+ .withProtocols(protocols)
+ .withRebalanceTimeoutMs(rebalanceTimeoutMs)
+ .withSessionTimeoutMs(sessionTimeoutMs)
+ .build();
+
+ CompletableFuture<JoinGroupResponseData> responseFuture = new
CompletableFuture<>();
Review Comment:
this corresponds to
```
// A static follower rejoin with changed protocol will trigger rebalance.
```
above. do you think that's insufficient?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]