dajac commented on code in PR #14122:
URL: https://github.com/apache/kafka/pull/14122#discussion_r1311531193


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -4007,41 +3968,21 @@ public void 
testDynamicMembersJoinGroupWithMaxSizeAndRequiredKnownMember() {
 
         // First round of join requests. Generate member ids. All requests 
will be accepted
         // as the group is still Empty.
-        List<CompletableFuture<JoinGroupResponseData>> firstRoundFutures = new 
ArrayList<>();
-        IntStream.range(0, groupMaxSize + 1).forEach(i -> {
-            CompletableFuture<JoinGroupResponseData> responseFuture = new 
CompletableFuture<>();
-            firstRoundFutures.add(responseFuture);
-            CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(request, responseFuture, requiredKnownMemberId);
-            assertTrue(responseFuture.isDone());
-            JoinGroupResponseData response = null;
-            try {
-                response = responseFuture.get();
-            } catch (Exception ignored) {
-            }
-            assertNotNull(response);
-            assertEquals(Errors.MEMBER_ID_REQUIRED.code(), 
response.errorCode());
-            assertTrue(result.records().isEmpty());
-        });
+        List<JoinResult> firstRoundJoinResults = IntStream.range(0, 
groupMaxSize + 1)
+            .mapToObj(i -> context.sendGenericGroupJoin(request, 
requiredKnownMemberId)).collect(Collectors.toList());

Review Comment:
   nit: Should we format it as follow to keep the style consistent with the 
following Stream API usaged?
   
   ```
   List<JoinResult> firstRoundJoinResults = IntStream.range(0, groupMaxSize + 
1).mapToObj(i -> context.sendGenericGroupJoin(
       request,
       requiredKnownMemberId
   )).collect(Collectors.toList());
   ```
   
   alternatively, we could just put `.collect(...)` on a new line. I am fine 
either ways.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9119,4 +8602,397 @@ public void 
testHeartbeatDuringRebalanceCausesRebalanceInProgress() throws Excep
         HeartbeatResponseData heartbeatResponse = 
context.sendGenericGroupHeartbeat(heartbeatRequest);
         assertEquals(Errors.REBALANCE_IN_PROGRESS.code(), 
heartbeatResponse.errorCode());
     }
+
+    public static <T> void assertUnorderedListEquals(
+        List<T> expected,
+        List<T> actual
+    ) {
+        assertEquals(new HashSet<>(expected), new HashSet<>(actual));
+    }
+
+    private void assertResponseEquals(

Review Comment:
   nit: Should we make all those helpers static?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -6173,22 +5815,22 @@ public void 
testStaticMemberFenceDuplicateSyncingFollowerAfterMemberIdChanged()
             .withRebalanceTimeoutMs(10000)
             .build();
 
-        CompletableFuture<JoinGroupResponseData> leaderJoinFuture = new 
CompletableFuture<>();
-        CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(request, leaderJoinFuture);
+        JoinResult leaderJoinResult = context.sendGenericGroupJoin(request);
 
-        assertTrue(result.records().isEmpty());
-        assertFalse(leaderJoinFuture.isDone());
+        assertTrue(leaderJoinResult.records.isEmpty());
+        assertFalse(leaderJoinResult.joinFuture.isDone());
         assertTrue(group.isInState(PREPARING_REBALANCE));
 
         // Old follower rejoins group will match current member.id.
-        CompletableFuture<JoinGroupResponseData> oldFollowerJoinFuture = new 
CompletableFuture<>();
-        result = context.sendGenericGroupJoin(
-                
request.setMemberId(rebalanceResult.followerId).setGroupInstanceId("follower-instance-id"),
-                oldFollowerJoinFuture);
+        JoinResult oldFollowerJoinResult = context.sendGenericGroupJoin(
+                request

Review Comment:
   nit: Indentation seems to be off here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -4090,13 +4024,8 @@ public void 
testDynamicMembersJoinGroupWithMaxSizeAndNotRequiredKnownMember() {
             .build();
 
         // First round of join requests. This will trigger a rebalance.
-        List<CompletableFuture<JoinGroupResponseData>> firstRoundFutures = new 
ArrayList<>();
-        IntStream.range(0, groupMaxSize + 1).forEach(i -> {
-            CompletableFuture<JoinGroupResponseData> responseFuture = new 
CompletableFuture<>();
-            firstRoundFutures.add(responseFuture);
-            CoordinatorResult<Void, Record> result = 
context.sendGenericGroupJoin(request, responseFuture, requiredKnownMemberId);
-            assertTrue(result.records().isEmpty());
-        });
+        List<JoinResult> firstRoundJoinResults = IntStream.range(0, 
groupMaxSize + 1)

Review Comment:
   ditto.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8179,29 +7770,30 @@ public void testGenerationIdIncrementsOnRebalance() 
throws Exception {
             .build();
         JoinGroupResponseData leaderJoinResponse = 
context.joinGenericGroupAsDynamicMemberAndCompleteRebalance("group-id");
 
-        CompletableFuture<JoinGroupResponseData> joinFuture = new 
CompletableFuture<>();
-        CoordinatorResult<Void, Record> result = context.sendGenericGroupJoin(
+        JoinResult joinResult = context.sendGenericGroupJoin(
             new JoinGroupRequestBuilder()
                 .withGroupId("group-id")
                 .withMemberId(leaderJoinResponse.memberId())
                 .withDefaultProtocolTypeAndProtocols()
-                .build(),
-            joinFuture);
+                .build()
+        );
 
-        assertTrue(result.records().isEmpty());
-        assertTrue(joinFuture.isDone());
-        assertEquals(2, joinFuture.get().generationId());
-        assertEquals(Errors.NONE.code(), joinFuture.get().errorCode());
+        assertTrue(joinResult.records.isEmpty());
+        assertTrue(joinResult.joinFuture.isDone());
+        assertEquals(2, joinResult.joinFuture.get().generationId());
+        assertEquals(Errors.NONE.code(), 
joinResult.joinFuture.get().errorCode());
     }
 
     private List<JoinGroupResponseMember> toJoinResponseMembers(GenericGroup 
group) {

Review Comment:
   nit: This method could be static? checkJoinGroupResponse as well. Are those 
two helpers as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to