dajac commented on code in PR #14122: URL: https://github.com/apache/kafka/pull/14122#discussion_r1311531193
########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -4007,41 +3968,21 @@ public void testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() { // First round of join requests. Generate member ids. All requests will be accepted // as the group is still Empty. - List<CompletableFuture<JoinGroupResponseData>> firstRoundFutures = new ArrayList<>(); - IntStream.range(0, groupMaxSize + 1).forEach(i -> { - CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); - firstRoundFutures.add(responseFuture); - CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, responseFuture, requiredKnownMemberId); - assertTrue(responseFuture.isDone()); - JoinGroupResponseData response = null; - try { - response = responseFuture.get(); - } catch (Exception ignored) { - } - assertNotNull(response); - assertEquals(Errors.MEMBER_ID_REQUIRED.code(), response.errorCode()); - assertTrue(result.records().isEmpty()); - }); + List<JoinResult> firstRoundJoinResults = IntStream.range(0, groupMaxSize + 1) + .mapToObj(i -> context.sendGenericGroupJoin(request, requiredKnownMemberId)).collect(Collectors.toList()); Review Comment: nit: Should we format it as follow to keep the style consistent with the following Stream API usaged? ``` List<JoinResult> firstRoundJoinResults = IntStream.range(0, groupMaxSize + 1).mapToObj(i -> context.sendGenericGroupJoin( request, requiredKnownMemberId )).collect(Collectors.toList()); ``` alternatively, we could just put `.collect(...)` on a new line. I am fine either ways. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -9119,4 +8602,397 @@ public void testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep HeartbeatResponseData heartbeatResponse = context.sendGenericGroupHeartbeat(heartbeatRequest); assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), heartbeatResponse.errorCode()); } + + public static <T> void assertUnorderedListEquals( + List<T> expected, + List<T> actual + ) { + assertEquals(new HashSet<>(expected), new HashSet<>(actual)); + } + + private void assertResponseEquals( Review Comment: nit: Should we make all those helpers static? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -6173,22 +5815,22 @@ public void testStaticMemberFenceDuplicateSyncingFollowerAfterMemberIdChanged() .withRebalanceTimeoutMs(10000) .build(); - CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new CompletableFuture<>(); - CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, leaderJoinFuture); + JoinResult leaderJoinResult = context.sendGenericGroupJoin(request); - assertTrue(result.records().isEmpty()); - assertFalse(leaderJoinFuture.isDone()); + assertTrue(leaderJoinResult.records.isEmpty()); + assertFalse(leaderJoinResult.joinFuture.isDone()); assertTrue(group.isInState(PREPARING_REBALANCE)); // Old follower rejoins group will match current member.id. - CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new CompletableFuture<>(); - result = context.sendGenericGroupJoin( - request.setMemberId(rebalanceResult.followerId).setGroupInstanceId("follower-instance-id"), - oldFollowerJoinFuture); + JoinResult oldFollowerJoinResult = context.sendGenericGroupJoin( + request Review Comment: nit: Indentation seems to be off here. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -4090,13 +4024,8 @@ public void testDynamicMembersJoinGroupWithMaxSizeAndNotRequiredKnownMember() { .build(); // First round of join requests. This will trigger a rebalance. - List<CompletableFuture<JoinGroupResponseData>> firstRoundFutures = new ArrayList<>(); - IntStream.range(0, groupMaxSize + 1).forEach(i -> { - CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<>(); - firstRoundFutures.add(responseFuture); - CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(request, responseFuture, requiredKnownMemberId); - assertTrue(result.records().isEmpty()); - }); + List<JoinResult> firstRoundJoinResults = IntStream.range(0, groupMaxSize + 1) Review Comment: ditto. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java: ########## @@ -8179,29 +7770,30 @@ public void testGenerationIdIncrementsOnRebalance() throws Exception { .build(); JoinGroupResponseData leaderJoinResponse = context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id"); - CompletableFuture<JoinGroupResponseData> joinFuture = new CompletableFuture<>(); - CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin( + JoinResult joinResult = context.sendGenericGroupJoin( new JoinGroupRequestBuilder() .withGroupId("group-id") .withMemberId(leaderJoinResponse.memberId()) .withDefaultProtocolTypeAndProtocols() - .build(), - joinFuture); + .build() + ); - assertTrue(result.records().isEmpty()); - assertTrue(joinFuture.isDone()); - assertEquals(2, joinFuture.get().generationId()); - assertEquals(Errors.NONE.code(), joinFuture.get().errorCode()); + assertTrue(joinResult.records.isEmpty()); + assertTrue(joinResult.joinFuture.isDone()); + assertEquals(2, joinResult.joinFuture.get().generationId()); + assertEquals(Errors.NONE.code(), joinResult.joinFuture.get().errorCode()); } private List<JoinGroupResponseMember> toJoinResponseMembers(GenericGroup group) { Review Comment: nit: This method could be static? checkJoinGroupResponse as well. Are those two helpers as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org