dajac commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1264968675


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -2377,156 +2606,2165 @@ public void testOnNewMetadataImage() {
 
         // Verify the groups.
         Arrays.asList("group1", "group2", "group3", "group4").forEach(groupId 
-> {
-            ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+            ConsumerGroup group = context.groupMetadataManager
+                .getOrMaybeCreateConsumerGroup(groupId, false);
             assertTrue(group.hasMetadataExpired(context.time.milliseconds()));
         });
 
         Arrays.asList("group5").forEach(groupId -> {
-            ConsumerGroup group = 
context.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false);
+            ConsumerGroup group = context.groupMetadataManager
+                .getOrMaybeCreateConsumerGroup(groupId, false);
             assertFalse(group.hasMetadataExpired(context.time.milliseconds()));
         });
 
         // Verify image.
         assertEquals(image, context.groupMetadataManager.image());
     }
 
-    private <T> void assertUnorderedListEquals(
-        List<T> expected,
-        List<T> actual
-    ) {
-        assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-    }
+    @Test
+    public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(10)
+            .build();
 
-    private void assertResponseEquals(
-        ConsumerGroupHeartbeatResponseData expected,
-        ConsumerGroupHeartbeatResponseData actual
-    ) {
-        if (!responseEquals(expected, actual)) {
-            assertionFailure()
-                .expected(expected)
-                .actual(actual)
-                .buildAndThrow();
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withReason("exceed max group size")
+            .build();
+
+        for (int i = 0; i < 10; i++) {
+            CompletableFuture<JoinGroupResponseData> responseFuture;
+            if (i == 0) {
+                responseFuture = context.sendGenericGroupJoin(
+                    request,
+                    false,
+                    false,
+                    new ExpectedGenericGroupResult(Errors.NONE, true)
+                );
+            } else {
+                responseFuture = context.sendGenericGroupJoin(request);
+            }
+            assertFalse(responseFuture.isDone());
         }
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
     }
 
-    private boolean responseEquals(
-        ConsumerGroupHeartbeatResponseData expected,
-        ConsumerGroupHeartbeatResponseData actual
-    ) {
-        if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-        if (expected.errorCode() != actual.errorCode()) return false;
-        if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-        if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-        if (expected.memberEpoch() != actual.memberEpoch()) return false;
-        if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-        if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-        // Unordered comparison of the assignments.
-        return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-    }
+    @Test
+    public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+        boolean requiredKnownMemberId = true;
+        int groupMaxSize = 10;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
 
-    private boolean responseAssignmentEquals(
-        ConsumerGroupHeartbeatResponseData.Assignment expected,
-        ConsumerGroupHeartbeatResponseData.Assignment actual
-    ) {
-        if (expected == actual) return true;
-        if (expected == null) return false;
-        if (actual == null) return false;
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
 
-        if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions())))
-            return false;
+        // First round of join requests. Generate member ids. All requests 
will be accepted
+        // as the group is still Empty.
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        for (int i = 0; i < groupMaxSize + 1; i++) {
+            if (i == 0) {
+                responseFutures.add(context.sendGenericGroupJoin(
+                    request,
+                    requiredKnownMemberId,
+                    false,
+                    new ExpectedGenericGroupResult(Errors.NONE, true)
+                ));
+            } else {
+                responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+            }
+        }
 
-        return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 
fromAssignment(actual.assignedTopicPartitions()));
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group-id",
+            false
+        );
+
+        List<String> memberIds = 
verifyGenericGroupJoinResponses(responseFutures, 0, Errors.MEMBER_ID_REQUIRED);
+        assertEquals(groupMaxSize + 1, memberIds.size());
+        assertEquals(0, group.size());
+        assertTrue(group.isInState(EMPTY));
+        assertEquals(groupMaxSize + 1, group.numPendingJoinMembers());
+
+        // Second round of join requests with the generated member ids.
+        // One of them will fail, reaching group max size.
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        // Advance clock by group initial rebalance delay to complete first 
inital delayed join.
+        // This will extend the initial rebalance as new members have joined.
+        context.timer.advanceClock(50);
+        // Advance clock by group initial rebalance delay to complete second 
inital delayed join.
+        // Since there are no new members that joined since the previous 
delayed join,
+        // the join group phase will complete.
+        context.timer.advanceClock(50);
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(0, group.numPendingJoinMembers());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+
+        // Members that were accepted can rejoin while others are rejected in 
CompletingRebalance state.
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
     }
 
-    private Map<Uuid, Set<Integer>> fromAssignment(
-        List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment
-    ) {
-        if (assignment == null) return null;
+    @Test
+    public void 
testDynamicMembersJoinGroupWithMaxSizeAndNotRequiredKnownMember() {
+        boolean requiredKnownMemberId = false;
+        int groupMaxSize = 10;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
 
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions -> {
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
-        });
-        return assignmentMap;
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        // First round of join requests. This will trigger a rebalance.
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        responseFutures.add(context.sendGenericGroupJoin(
+            request,
+            requiredKnownMemberId,
+            true,
+            new ExpectedGenericGroupResult(Errors.NONE, true)
+        ));
+        for (int i = 0; i < groupMaxSize; i++) {
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group-id",
+            
+            false
+        );
+
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Advance clock by group initial rebalance delay to complete first 
inital delayed join.
+        // This will extend the initial rebalance as new members have joined.
+        context.timer.advanceClock(50);
+        // Advance clock by group initial rebalance delay to complete second 
inital delayed join.
+        // Since there are no new members that joined since the previous 
delayed join,
+        // we will complete the rebalance.
+        context.timer.advanceClock(50);
+
+        List<String> memberIds = 
verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+
+        // Members that were accepted can rejoin while others are rejected in 
CompletingRebalance state.
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        verifyGenericGroupJoinResponses(responseFutures, 10, 
Errors.GROUP_MAX_SIZE_REACHED);
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(0, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
     }
 
-    private void assertRecordsEquals(
-        List<Record> expectedRecords,
-        List<Record> actualRecords
-    ) {
-        try {
-            assertEquals(expectedRecords.size(), actualRecords.size());
+    @Test
+    public void testStaticMembersJoinGroupWithMaxSize() {
+        int groupMaxSize = 10;
 
-            for (int i = 0; i < expectedRecords.size(); i++) {
-                Record expectedRecord = expectedRecords.get(i);
-                Record actualRecord = actualRecords.get(i);
-                assertRecordEquals(expectedRecord, actualRecord);
+        List<String> groupInstanceIds = IntStream.range(0, groupMaxSize + 1)
+            .mapToObj(i -> "instance-id-" + i)
+            .collect(Collectors.toList());
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        // First round of join requests. The group metadata manager will 
prepare a rebalance.
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        for (int i = 0; i < groupMaxSize + 1; i++) {
+            String instanceId = groupInstanceIds.get(i);
+            request = request.setGroupInstanceId(instanceId);
+            if (i == 0) {
+                responseFutures.add(context.sendGenericGroupJoin(
+                    request,
+                    false,
+                    true,
+                    new ExpectedGenericGroupResult(Errors.NONE, true)
+                ));
+            } else {
+                responseFutures.add(context.sendGenericGroupJoin(request));
             }
-        } catch (AssertionFailedError e) {
-            assertionFailure()
-                .expected(expectedRecords)
-                .actual(actualRecords)
-                .buildAndThrow();
         }
-    }
 
-    private void assertRecordEquals(
-        Record expected,
-        Record actual
-    ) {
-        try {
-            assertApiMessageAndVersionEquals(expected.key(), actual.key());
-            assertApiMessageAndVersionEquals(expected.value(), actual.value());
-        } catch (AssertionFailedError e) {
-            assertionFailure()
-                .expected(expected)
-                .actual(actual)
-                .buildAndThrow();
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group-id",
+            false
+        );
+
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Advance clock by group initial rebalance delay to complete first 
inital delayed join.
+        // This will extend the initial rebalance as new members have joined.
+        context.timer.advanceClock(50);
+        // Advance clock by group initial rebalance delay to complete second 
inital delayed join.
+        // Since there are no new members that joined since the previous 
delayed join,
+        // we will complete the rebalance.
+        context.timer.advanceClock(50);
+
+        List<String> memberIds = 
verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+
+        // Members which were accepted can rejoin, others are rejected, while
+        // completing rebalance
+        responseFutures = new ArrayList<>();
+        for (int i = 0; i < groupMaxSize; i++) {
+            String memberId = memberIds.get(i);
+            String instanceId = groupInstanceIds.get(i);
+
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withGroupInstanceId(instanceId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request));
         }
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(0, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
     }
 
-    private void assertApiMessageAndVersionEquals(
-        ApiMessageAndVersion expected,
-        ApiMessageAndVersion actual
-    ) {
-        if (expected == actual) return;
+    @Test
+    public void testDynamicMembersCanRejoinGroupWithMaxSizeWhileRebalancing() {
+        boolean requiredKnownMemberId = true;
+        int groupMaxSize = 10;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
 
-        assertEquals(expected.version(), actual.version());
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
 
-        if (actual.message() instanceof 
ConsumerGroupCurrentMemberAssignmentValue) {
-            // The order of the topics stored in 
ConsumerGroupCurrentMemberAssignmentValue is not
-            // always guaranteed. Therefore, we need a special comparator.
-            ConsumerGroupCurrentMemberAssignmentValue expectedValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) expected.message();
-            ConsumerGroupCurrentMemberAssignmentValue actualValue =
-                (ConsumerGroupCurrentMemberAssignmentValue) actual.message();
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        // First round of join requests. Generate member ids.
+        for (int i = 0; i < groupMaxSize + 1; i++) {
+            if (i == 0) {
+                responseFutures.add(context.sendGenericGroupJoin(
+                    request,
+                    requiredKnownMemberId,
+                    false,
+                    new ExpectedGenericGroupResult(Errors.NONE, true)
+                ));
+            } else {
+                responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+            }
+        }
 
-            assertEquals(expectedValue.memberEpoch(), 
actualValue.memberEpoch());
-            assertEquals(expectedValue.previousMemberEpoch(), 
actualValue.previousMemberEpoch());
-            assertEquals(expectedValue.targetMemberEpoch(), 
actualValue.targetMemberEpoch());
-            assertEquals(expectedValue.error(), actualValue.error());
-            assertEquals(expectedValue.metadataVersion(), 
actualValue.metadataVersion());
-            assertEquals(expectedValue.metadataBytes(), 
actualValue.metadataBytes());
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group-id",
+            false
+        );
 
-            // We transform those to Maps before comparing them.
-            
assertEquals(fromTopicPartitions(expectedValue.assignedPartitions()),
-                fromTopicPartitions(actualValue.assignedPartitions()));
-            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingRevocation()),
-                
fromTopicPartitions(actualValue.partitionsPendingRevocation()));
-            
assertEquals(fromTopicPartitions(expectedValue.partitionsPendingAssignment()),
-                
fromTopicPartitions(actualValue.partitionsPendingAssignment()));
-        } else {
-            assertEquals(expected.message(), actual.message());
+        assertEquals(0, group.size());
+        assertEquals(groupMaxSize + 1, group.numPendingJoinMembers());
+        assertTrue(group.isInState(EMPTY));
+
+        List<String> memberIds = 
verifyGenericGroupJoinResponses(responseFutures, 0, Errors.MEMBER_ID_REQUIRED);
+        assertEquals(groupMaxSize + 1, memberIds.size());
+
+        // Second round of join requests with the generated member ids.
+        // One of them will fail, reaching group max size.
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            
responseFutures.add(context.sendGenericGroupJoin(request.setMemberId(memberId), 
requiredKnownMemberId));
         }
-    }
 
-    private Map<Uuid, Set<Integer>> fromTopicPartitions(
-        List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions> 
assignment
-    ) {
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions -> {
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
-        });
-        return assignmentMap;
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Members can rejoin while rebalancing
+        responseFutures = new ArrayList<>();
+        for (String memberId : memberIds) {
+            request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request, 
requiredKnownMemberId));
+        }
+
+        // Advance clock by group initial rebalance delay to complete first 
inital delayed join.
+        // This will extend the initial rebalance as new members have joined.
+        context.timer.advanceClock(50);
+        // Advance clock by group initial rebalance delay to complete second 
inital delayed join.
+        // Since there are no new members that joined since the previous 
delayed join,
+        // we will complete the rebalance.
+        context.timer.advanceClock(50);
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(0, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
     }
+
+    @Test
+    public void 
testLastJoiningMembersAreKickedOutWhenRejoiningGroupWithMaxSize() {
+        int groupMaxSize = 10;
+
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
+
+        // Create a group and add members that exceed the group max size.
+        GenericGroup group = 
context.groupMetadataManager.getOrMaybeCreateGenericGroup(
+            "group-id",
+            true
+        );
+
+        List<String> memberIds = IntStream.range(0, groupMaxSize + 2)
+            .mapToObj(i -> group.generateMemberId("client-id", 
Optional.empty()))
+            .collect(Collectors.toList());
+
+        memberIds.forEach(memberId -> {
+            JoinGroupRequestProtocolCollection protocols = new 
JoinGroupRequestProtocolCollection();
+            protocols.add(new JoinGroupRequestProtocol()
+                .setName("range")
+                .setMetadata(new byte[0]));
+
+            group.add(
+                new GenericGroupMember(
+                    memberId,
+                    Optional.empty(),
+                    "client-id",
+                    "client-host",
+                    10000,
+                    5000,
+                    "consumer",
+                    protocols
+                )
+            );
+        });
+
+        context.groupMetadataManager.prepareRebalance(group, "test");
+
+        List<CompletableFuture<JoinGroupResponseData>> responseFutures = new 
ArrayList<>();
+        for (String memberId : memberIds) {
+            JoinGroupRequestData request = new JoinGroupRequestBuilder()
+                .withGroupId("group-id")
+                .withMemberId(memberId)
+                .withDefaultProtocolTypeAndProtocols()
+                .withRebalanceTimeoutMs(10000)
+                .build();
+
+            responseFutures.add(context.sendGenericGroupJoin(request));
+        }
+
+        assertEquals(groupMaxSize, group.size());
+        assertEquals(groupMaxSize, group.numAwaitingJoinResponse());
+        assertTrue(group.isInState(PREPARING_REBALANCE));
+
+        // Advance clock by rebalance timeout to complete join phase.
+        context.timer.advanceClock(10000);
+
+        verifyGenericGroupJoinResponses(responseFutures, groupMaxSize, 
Errors.GROUP_MAX_SIZE_REACHED);
+
+        assertEquals(groupMaxSize, group.size());
+        assertTrue(group.isInState(COMPLETING_REBALANCE));
+
+        memberIds.subList(groupMaxSize, groupMaxSize + 2)
+            .forEach(memberId -> assertFalse(group.hasMemberId(memberId)));
+
+        memberIds.subList(0, groupMaxSize)
+            .forEach(memberId -> assertTrue(group.hasMemberId(memberId)));
+    }
+
+    @Test
+    public void testJoinGroupSessionTimeoutTooSmall() throws Exception {
+        int minSessionTimeout = 50;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMinSessionTimeoutMs(minSessionTimeout)
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withSessionTimeoutMs(minSessionTimeout - 1)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.INVALID_SESSION_TIMEOUT.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupSessionTimeoutTooLarge() throws Exception {
+        int maxSessionTimeout = 50;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSessionTimeoutMs(maxSessionTimeout)
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withSessionTimeoutMs(maxSessionTimeout + 1)
+            .build();
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = 
context.sendGenericGroupJoin(request);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.INVALID_SESSION_TIMEOUT.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
+    }
+
+    @Test
+    public void testJoinGroupUnknownConsumerNewGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId("member-id")
+            .build();
+
+        JoinGroupResponseData response = 
context.joinGenericGroupAsDynamicMember(request);
+
+        assertEquals(Errors.GROUP_ID_NOT_FOUND.code(), response.errorCode());

Review Comment:
   Yeah, we have to stick to the old one here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to