dajac commented on code in PR #13870: URL: https://github.com/apache/kafka/pull/13870#discussion_r1264961072
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -3022,143 +3242,2087 @@ public void testOnLoaded() { assertNotNull(context.timer.timeout(consumerGroupRevocationTimeoutKey("foo", "foo-1"))); } - private <T> void assertUnorderedListEquals( - List<T> expected, - List<T> actual - ) { - assertEquals(new HashSet<>(expected), new HashSet<>(actual)); - } - - private void assertResponseEquals( - ConsumerGroupHeartbeatResponseData expected, - ConsumerGroupHeartbeatResponseData actual - ) { - if (!responseEquals(expected, actual)) { - assertionFailure() - .expected(expected) - .actual(actual) - .buildAndThrow(); - } - } + @Test + public void testGenerateRecordsOnNewGroup() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .build(); - private boolean responseEquals( - ConsumerGroupHeartbeatResponseData expected, - ConsumerGroupHeartbeatResponseData actual - ) { - if (expected.throttleTimeMs() != actual.throttleTimeMs()) return false; - if (expected.errorCode() != actual.errorCode()) return false; - if (!Objects.equals(expected.errorMessage(), actual.errorMessage())) return false; - if (!Objects.equals(expected.memberId(), actual.memberId())) return false; - if (expected.memberEpoch() != actual.memberEpoch()) return false; - if (expected.shouldComputeAssignment() != actual.shouldComputeAssignment()) return false; - if (expected.heartbeatIntervalMs() != actual.heartbeatIntervalMs()) return false; - // Unordered comparison of the assignments. - return responseAssignmentEquals(expected.assignment(), actual.assignment()); - } + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); - private boolean responseAssignmentEquals( - ConsumerGroupHeartbeatResponseData.Assignment expected, - ConsumerGroupHeartbeatResponseData.Assignment actual - ) { - if (expected == actual) return true; - if (expected == null) return false; - if (actual == null) return false; + CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, responseFuture, true); + assertTrue(responseFuture.isDone()); + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), responseFuture.get().errorCode()); - if (!Objects.equals(fromAssignment(expected.pendingTopicPartitions()), fromAssignment(actual.pendingTopicPartitions()))) - return false; + GenericGroup group = context.createGenericGroup("group-id"); - return Objects.equals(fromAssignment(expected.assignedTopicPartitions()), fromAssignment(actual.assignedTopicPartitions())); + assertEquals( + Collections.singletonList(RecordHelpers.newEmptyGroupMetadataRecord(group, MetadataVersion.latest())), + result.records() + ); } - private Map<Uuid, Set<Integer>> fromAssignment( - List<ConsumerGroupHeartbeatResponseData.TopicPartitions> assignment - ) { - if (assignment == null) return null; + @Test + public void testJoinGroupShouldReceiveErrorIfGroupOverMaxSize() throws Exception { + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(10) + .build(); + context.createGenericGroup("group-id"); - Map<Uuid, Set<Integer>> assignmentMap = new HashMap<>(); - assignment.forEach(topicPartitions -> { - assignmentMap.put(topicPartitions.topicId(), new HashSet<>(topicPartitions.partitions())); + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .withReason("exceed max group size") + .build(); + + IntStream.range(0, 10).forEach(i -> { + CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, responseFuture); + assertFalse(responseFuture.isDone()); + assertTrue(result.records().isEmpty()); }); - return assignmentMap; + + CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, responseFuture); + assertTrue(result.records().isEmpty()); + assertTrue(responseFuture.isDone()); + assertEquals(Errors.GROUP_MAX_SIZE_REACHED.code(), responseFuture.get(5, TimeUnit.SECONDS).errorCode()); } - private void assertRecordsEquals( - List<Record> expectedRecords, - List<Record> actualRecords - ) { - try { - assertEquals(expectedRecords.size(), actualRecords.size()); + @Test + public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { + boolean requiredKnownMemberId = true; + int groupMaxSize = 10; + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withGenericGroupMaxSize(groupMaxSize) + .withGenericGroupInitialRebalanceDelayMs(50) + .build(); + + GenericGroup group = context.createGenericGroup("group-id"); + + JoinGroupRequestData request = new JoinGroupRequestBuilder() + .withGroupId("group-id") + .withMemberId(UNKNOWN_MEMBER_ID) + .withDefaultProtocolTypeAndProtocols() + .build(); + + // First round of join requests. Generate member ids. All requests will be accepted + // as the group is still Empty. + List<CompletableFuture<JoinGroupResponseData>> firstRoundFutures = new ArrayList<>(); + IntStream.range(0, groupMaxSize + 1).forEach(i -> { + CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); + firstRoundFutures.add(responseFuture); + CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, responseFuture, requiredKnownMemberId); + assertTrue(responseFuture.isDone()); + try { + assertEquals(Errors.MEMBER_ID_REQUIRED.code(), responseFuture.get().errorCode()); + } catch (Exception ignored) { Review Comment: Do we really need to keep the try..catch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org