jeffkbkim commented on code in PR #14056:
URL: https://github.com/apache/kafka/pull/14056#discussion_r1276944101


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8540,5 +8568,1182 @@ private static class RebalanceResult {
             this.followerAssignment = followerAssignment;
         }
     }
+
+    @Test
+    public void testStaticMemberHeartbeatLeaderWithInvalidMemberId() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        context.createGenericGroup("group-id");
+
+        RebalanceResult rebalanceResult = staticMembersJoinAndRebalance(
+            context,
+            "group-id",
+            "leader-instance-id",
+            "follower-instance-id"
+        );
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("leader-instance-id")
+            .withMemberId(rebalanceResult.leaderId)
+            .withGenerationId(rebalanceResult.generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertTrue(result.records().isEmpty());
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(rebalanceResult.leaderId)
+            .setGenerationId(rebalanceResult.generationId);
+
+        HeartbeatResponseData validHeartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.NONE.code(), validHeartbeatResponse.errorCode());
+
+        HeartbeatResponseData inValidHeartbeatResponse = 
context.sendGenericGroupHeartbeat(
+            heartbeatRequest.setGroupInstanceId("leader-instance-id")
+                .setMemberId("invalid-member-id"));
+
+        assertEquals(Errors.FENCED_INSTANCE_ID.code(), 
inValidHeartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatUnknownGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId("member-id")
+            .setGenerationId(-1);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatDeadGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        group.transitionTo(DEAD);
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId("member-id")
+            .setGenerationId(-1);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.COORDINATOR_NOT_AVAILABLE.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatEmptyGroup() {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+        protocols.add(new JoinGroupRequestProtocol()
+            .setName("range")
+            .setMetadata(new byte[]{0}));
+
+        group.add(new GenericGroupMember(
+            "member-id",
+            Optional.empty(),
+            "client-id",
+            "client-host",
+            10000,
+            5000,
+            "consumer",
+            protocols
+        ));
+
+        group.transitionTo(PREPARING_REBALANCE);
+        group.transitionTo(EMPTY);
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId("member-id")
+            .setGenerationId(0);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatUnknownMemberExistingGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId("unknown-member-id")
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatDuringPreparingRebalance() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> joinFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(joinRequest, joinFuture, true);
+
+        assertTrue(result.records().isEmpty());
+        assertTrue(joinFuture.isDone());
+        assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
joinFuture.get().errorCode());
+
+        String memberId = joinFuture.get().memberId();
+
+        joinFuture = new CompletableFuture<>();
+        context.sendGenericGroupJoin(joinRequest.setMemberId(memberId), 
joinFuture);
+
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(memberId)
+            .setGenerationId(0);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testHeartbeatDuringCompletingRebalance() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(new HeartbeatResponseData(), heartbeatResponse);
+    }
+
+    @Test
+    public void testHeartbeatIllegalGeneration() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId + 1);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.ILLEGAL_GENERATION.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testValidHeartbeat() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testGenericGroupMemberSessionTimeout() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withRebalanceTimeoutMs(10000)
+            .withSessionTimeoutMs(5000)
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        // Advance clock by session timeout to kick member out.
+        verifySessionExpiration(context, group, 5000);
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testGenericGroupMemberMaintainsSession() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withRebalanceTimeoutMs(10000)
+            .withSessionTimeoutMs(5000)
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        // Advance clock by 1/2 of session timeout.
+        assertNoOrEmptyResult(context.sleep(2500));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+
+
+        assertNoOrEmptyResult(context.sleep(2500));
+
+        heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testGenericGroupMemberSessionTimeoutDuringRebalance() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withRebalanceTimeoutMs(10000)
+            .withSessionTimeoutMs(5000)
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        // Add a new member. This should trigger a rebalance. The new member 
has the
+        // 'genericGroupNewMemberJoinTimeoutMs` session timeout, so it has a 
longer expiration than the existing member.
+        CompletableFuture<JoinGroupResponseData> joinFuture = new 
CompletableFuture<>();
+        result = 
context.sendGenericGroupJoin(joinRequest.setMemberId(UNKNOWN_MEMBER_ID), 
joinFuture);
+
+        assertTrue(result.records().isEmpty());
+        assertFalse(joinFuture.isDone());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Advance clock by 1/2 of session timeout.
+        assertNoOrEmptyResult(context.sleep(2500));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
+
+        // Advance clock by first member's session timeout.
+        assertNoOrEmptyResult(context.sleep(5000));
+
+        heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.UNKNOWN_MEMBER_ID.code(), 
heartbeatResponse.errorCode());
+
+        // Advance clock by remaining rebalance timeout to complete join phase.
+        assertNoOrEmptyResult(context.sleep(2500));
+
+        assertTrue(joinFuture.isDone());
+        assertEquals(Errors.NONE.code(), joinFuture.get().errorCode());
+        assertEquals(1, group.size());
+        assertEquals(2, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+    }
+
+    @Test
+    public void testRebalanceCompletesBeforeMemberJoins() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        // Create a group with a single member
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("leader-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withRebalanceTimeoutMs(10000)
+            .withSessionTimeoutMs(5000)
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            context.joinGenericGroupAndCompleteJoin(joinRequest, true, true);
+
+        String firstMemberId = leaderJoinResponse.memberId();
+        int firstGenerationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, firstGenerationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(firstMemberId)
+            .withGenerationId(firstGenerationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        // Add a new dynamic member. This should trigger a rebalance. The new 
member has the
+        // 'genericGroupNewMemberJoinTimeoutMs` session timeout, so it has a 
longer expiration than the existing member.
+        CompletableFuture<JoinGroupResponseData> joinFuture = new 
CompletableFuture<>();
+        result = context.sendGenericGroupJoin(
+            joinRequest.setMemberId(UNKNOWN_MEMBER_ID)
+                .setGroupInstanceId(null)
+                .setSessionTimeoutMs(2500),
+            joinFuture);
+
+        assertTrue(result.records().isEmpty());
+        assertFalse(joinFuture.isDone());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Send a couple heartbeats to keep the first member alive while the 
rebalance finishes.
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(firstMemberId)
+            .setGenerationId(firstGenerationId);
+
+        for (int i = 0; i < 2; i++) {
+            assertNoOrEmptyResult(context.sleep(2500));
+            HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+            assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
+        }
+
+        // Advance clock by remaining rebalance timeout to complete join phase.
+        // The second member will become the leader. However, as the first 
member is a static member
+        // it will not be kicked out.
+        assertNoOrEmptyResult(context.sleep(8000));
+
+        assertTrue(joinFuture.isDone());
+        assertEquals(Errors.NONE.code(), joinFuture.get().errorCode());
+        assertEquals(2, group.size());
+        assertEquals(2, group.generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        String otherMemberId = joinFuture.get().memberId();
+
+        syncFuture = new CompletableFuture<>();
+        result = context.sendGenericGroupSync(
+            syncRequest.setGroupInstanceId(null)
+                .setMemberId(otherMemberId)
+                .setGenerationId(2),
+            syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        // The unjoined static member should be remained in the group before 
session timeout.
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.ILLEGAL_GENERATION.code(), 
heartbeatResponse.errorCode());
+
+        // Now session timeout the unjoined member. Still keeping the new 
member.
+        List<Errors> expectedErrors = Arrays.asList(Errors.NONE, Errors.NONE, 
Errors.REBALANCE_IN_PROGRESS);
+        for (Errors expectedError : expectedErrors) {
+            assertNoOrEmptyResult(context.sleep(2000));
+            heartbeatResponse = context.sendGenericGroupHeartbeat(
+                heartbeatRequest.setMemberId(otherMemberId)
+                    .setGenerationId(2));
+
+            assertEquals(expectedError.code(), heartbeatResponse.errorCode());
+        }
+        assertEquals(1, group.size());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        CompletableFuture<JoinGroupResponseData> otherMemberRejoinFuture = new 
CompletableFuture<>();
+        result = context.sendGenericGroupJoin(
+            joinRequest.setMemberId(otherMemberId)
+                .setGroupInstanceId(null)
+                .setSessionTimeoutMs(2500),
+            otherMemberRejoinFuture);
+
+        assertTrue(result.records().isEmpty());
+        assertTrue(otherMemberRejoinFuture.isDone());
+        assertEquals(Errors.NONE.code(), 
otherMemberRejoinFuture.get().errorCode());
+        assertEquals(3, otherMemberRejoinFuture.get().generationId());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        CompletableFuture<SyncGroupResponseData> otherMemberResyncFuture = new 
CompletableFuture<>();
+        result = context.sendGenericGroupSync(
+            syncRequest.setGroupInstanceId(null)
+                .setMemberId(otherMemberId)
+                .setGenerationId(3),
+            otherMemberResyncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(otherMemberResyncFuture.isDone());
+        assertEquals(Errors.NONE.code(), 
otherMemberResyncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        // The joined member should get heart beat response with no error. Let 
the new member keep
+        // heartbeating for a while to verify that no new rebalance is 
triggered unexpectedly.
+        for (int i = 0; i < 20; i++) {
+            assertNoOrEmptyResult(context.sleep(2000));
+            heartbeatResponse = context.sendGenericGroupHeartbeat(
+                heartbeatRequest.setMemberId(otherMemberId)
+                    .setGenerationId(3));
+
+            assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+        }
+    }
+
+    @Test
+    public void testSyncGroupEmptyAssignment() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withRebalanceTimeoutMs(10000)
+            .withSessionTimeoutMs(5000)
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            
context.joinGenericGroupAndCompleteJoinAsDynamicMemberAndCompleteJoin(joinRequest);
+
+        String leaderId = leaderJoinResponse.memberId();
+        int generationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, generationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(leaderId)
+            .withGenerationId(generationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertEquals(0, syncFuture.get().assignment().length);
+        assertTrue(group.isInState(STABLE));
+
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(leaderId)
+            .setGenerationId(generationId);
+
+        HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+        assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+    }
+
+    @Test
+    public void testSecondMemberPartiallyJoinAndTimeout() throws Exception {
+        // Test if the following scenario completes a rebalance correctly: A 
new member starts a JoinGroup request with
+        // an UNKNOWN_MEMBER_ID, attempting to join a stable group. But never 
initiates the second JoinGroup request with
+        // the provided member ID and times out. The test checks if original 
member remains the sole member in this group,
+        // which should remain stable throughout this test.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        // Create a group with a single member
+        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withGroupInstanceId("leader-instance-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withRebalanceTimeoutMs(10000)
+            .withSessionTimeoutMs(5000)
+            .build();
+
+        JoinGroupResponseData leaderJoinResponse =
+            context.joinGenericGroupAndCompleteJoin(joinRequest, true, true);
+
+        String firstMemberId = leaderJoinResponse.memberId();
+        int firstGenerationId = leaderJoinResponse.generationId();
+
+        assertEquals(1, firstGenerationId);
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(firstMemberId)
+            .withGenerationId(firstGenerationId)
+            .build();
+
+        CompletableFuture<SyncGroupResponseData> syncFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupSync(syncRequest, syncFuture);
+
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
+        // Simulate a successful write to the log.
+        result.appendFuture().complete(null);
+
+        assertTrue(syncFuture.isDone());
+        assertEquals(Errors.NONE.code(), syncFuture.get().errorCode());
+        assertTrue(group.isInState(STABLE));
+
+        // Add a new dynamic pending member.
+        CompletableFuture<JoinGroupResponseData> joinFuture = new 
CompletableFuture<>();
+        result = context.sendGenericGroupJoin(
+            joinRequest.setMemberId(UNKNOWN_MEMBER_ID)
+                .setGroupInstanceId(null)
+                .setSessionTimeoutMs(5000),
+            joinFuture,
+            true,
+            true
+        );
+
+        assertTrue(result.records().isEmpty());
+        assertTrue(joinFuture.isDone());
+        assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
joinFuture.get().errorCode());
+        assertEquals(1, group.numPendingJoinMembers());
+        assertTrue(group.isInState(STABLE));
+
+        // Heartbeat from the leader to maintain session while timing out 
pending member.
+        HeartbeatRequestData heartbeatRequest = new HeartbeatRequestData()
+            .setGroupId("group-id")
+            .setMemberId(firstMemberId)
+            .setGenerationId(firstGenerationId);
+
+        for (int i = 0; i < 2; i++) {
+            assertNoOrEmptyResult(context.sleep(2500));
+            HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
+            assertEquals(Errors.NONE.code(), heartbeatResponse.errorCode());
+        }
+
+        // At this point the second member should have been removed from 
pending list (session timeout),
+        // and the group should be in Stable state with only the first member 
in it.
+        assertEquals(1, group.size());
+        assertTrue(group.hasMemberId(firstMemberId));
+        assertEquals(1, group.generationId());
+        assertTrue(group.isInState(STABLE));
+    }
+
+    @Test
+    public void testRebalanceTimesOutWhenSyncRequestIsNotReceived() throws 
Exception {
+        // This test case ensure that the pending sync expiration does kick 
out all members
+        // if they don't send sync requests before the rebalance timeout. The
+        // group is in the Empty state in this case.
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        int rebalanceTimeoutMs = 5000;
+        int sessionTimeoutMs = 5000;
+        List<JoinGroupResponseData> joinResponses = joinWithNMembers(context, 
group, 3, rebalanceTimeoutMs, sessionTimeoutMs);

Review Comment:
   The join responses are verified inside `joinWithNMembers`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to