jeffkbkim commented on code in PR #13870:
URL: https://github.com/apache/kafka/pull/13870#discussion_r1265554449


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -3022,143 +3242,2087 @@ public void testOnLoaded() {
         
assertNotNull(context.timer.timeout(consumerGroupRevocationTimeoutKey("foo", 
"foo-1")));
     }
 
-    private <T> void assertUnorderedListEquals(
-        List<T> expected,
-        List<T> actual
-    ) {
-        assertEquals(new HashSet<>(expected), new HashSet<>(actual));
-    }
-
-    private void assertResponseEquals(
-        ConsumerGroupHeartbeatResponseData expected,
-        ConsumerGroupHeartbeatResponseData actual
-    ) {
-        if (!responseEquals(expected, actual)) {
-            assertionFailure()
-                .expected(expected)
-                .actual(actual)
-                .buildAndThrow();
-        }
-    }
+    @Test
+    public void testGenerateRecordsOnNewGroup() throws Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .build();
 
-    private boolean responseEquals(
-        ConsumerGroupHeartbeatResponseData expected,
-        ConsumerGroupHeartbeatResponseData actual
-    ) {
-        if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false;
-        if (expected.errorCode() != actual.errorCode()) return false;
-        if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) 
return false;
-        if (!Objects.equals(expected.memberId(), actual.memberId())) return 
false;
-        if (expected.memberEpoch() != actual.memberEpoch()) return false;
-        if (expected.shouldComputeAssignment() != 
actual.shouldComputeAssignment()) return false;
-        if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) 
return false;
-        // Unordered comparison of the assignments.
-        return responseAssignmentEquals(expected.assignment(), 
actual.assignment());
-    }
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
 
-    private boolean responseAssignmentEquals(
-        ConsumerGroupHeartbeatResponseData.Assignment expected,
-        ConsumerGroupHeartbeatResponseData.Assignment actual
-    ) {
-        if (expected == actual) return true;
-        if (expected == null) return false;
-        if (actual == null) return false;
+        CompletableFuture<JoinGroupResponseData> responseFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(request, responseFuture, true);
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
responseFuture.get().errorCode());
 
-        if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), 
fromAssignment(actual.pendingTopicPartitions())))
-            return false;
+        GenericGroup group = context.createGenericGroup("group-id");
 
-        return 
Objects.equals(fromAssignment(expected.assignedTopicPartitions()), 
fromAssignment(actual.assignedTopicPartitions()));
+        assertEquals(
+            
Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, 
MetadataVersion.latest())),
+            result.records()
+        );
     }
 
-    private Map<Uuid, Set<Integer>> fromAssignment(
-        List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment
-    ) {
-        if (assignment == null) return null;
+    @Test
+    public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws 
Exception {
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(10)
+            .build();
+        context.createGenericGroup("group-id");
 
-        Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>();
-        assignment.forEach(topicPartitions -> {
-            assignmentMap.put(topicPartitions.topicId(), new 
HashSet<>(topicPartitions.partitions()));
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .withReason("exceed max group size")
+            .build();
+
+        IntStream.range(0, 10).forEach(i -> {
+            CompletableFuture<JoinGroupResponseData> responseFuture = new 
CompletableFuture<>();
+            CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(request, responseFuture);
+            assertFalse(responseFuture.isDone());
+            assertTrue(result.records().isEmpty());
         });
-        return assignmentMap;
+
+        CompletableFuture<JoinGroupResponseData> responseFuture = new 
CompletableFuture<>();
+        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(request, responseFuture);
+        assertTrue(result.records().isEmpty());
+        assertTrue(responseFuture.isDone());
+        assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), 
responseFuture.get(5, TimeUnit.SECONDS).errorCode());
     }
 
-    private void assertRecordsEquals(
-        List<Record> expectedRecords,
-        List<Record> actualRecords
-    ) {
-        try {
-            assertEquals(expectedRecords.size(), actualRecords.size());
+    @Test
+    public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() 
{
+        boolean requiredKnownMemberId = true;
+        int groupMaxSize = 10;
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withGenericGroupMaxSize(groupMaxSize)
+            .withGenericGroupInitialRebalanceDelayMs(50)
+            .build();
+
+        GenericGroup group = context.createGenericGroup("group-id");
+
+        JoinGroupRequestData request = new JoinGroupRequestBuilder()
+            .withGroupId("group-id")
+            .withMemberId(UNKNOWN_MEMBER_ID)
+            .withDefaultProtocolTypeAndProtocols()
+            .build();
+
+        // First round of join requests. Generate member ids. All requests 
will be accepted
+        // as the group is still Empty.
+        List<CompletableFuture<JoinGroupResponseData>> firstRoundFutures = new 
ArrayList<>();
+        IntStream.range(0, groupMaxSize + 1).forEach(i -> {
+            CompletableFuture<JoinGroupResponseData> responseFuture = new 
CompletableFuture<>();
+            firstRoundFutures.add(responseFuture);
+            CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(request, responseFuture, requiredKnownMemberId);
+            assertTrue(responseFuture.isDone());
+            try {
+                assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
responseFuture.get().errorCode());
+            } catch (Exception ignored) {

Review Comment:
   this is required if we want to use the streams api. let me know if we should 
just use the for each loop



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to