dajac commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r920378132


##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws 
Exception {
         }
     }
 
+    @Test
+    public void testBatchedListConsumerGroupOffsets() throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller(), groupSpecs.keySet()));
+
+            ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs, new 
ListConsumerGroupOffsetsOptions());
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    @Test
+    public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport() 
throws Exception {
+        Cluster cluster = mockCluster(1, 0);
+        Time time = new MockTime();
+        Map<String, ListConsumerGroupOffsetsSpec> groupSpecs = 
batchedListConsumerGroupOffsetsSpec();
+
+        ApiVersion findCoordinatorV3 = new ApiVersion()
+                .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 3);
+        ApiVersion offsetFetchV7 = new ApiVersion()
+                .setApiKey(ApiKeys.OFFSET_FETCH.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 7);
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+            
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3,
 offsetFetchV7)));
+            
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
  Node.noNode()));
+            
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller()));
+            
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE,
 env.cluster().controller()));
+
+            ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets(groupSpecs);
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false);
+            sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false);
+
+            verifyListOffsetsForMultipleGroups(groupSpecs, result);
+        }
+    }
+
+    private Map<String, ListConsumerGroupOffsetsSpec> 
batchedListConsumerGroupOffsetsSpec() {
+        Set<TopicPartition> groupAPartitions = Collections.singleton(new 
TopicPartition("A", 1));
+        Set<TopicPartition> groupBPartitions =  Collections.singleton(new 
TopicPartition("B", 2));
+
+        ListConsumerGroupOffsetsSpec groupASpec = new 
ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions);
+        ListConsumerGroupOffsetsSpec groupBSpec = new 
ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions);
+        return Utils.mkMap(Utils.mkEntry("groupA", groupASpec), 
Utils.mkEntry("groupB", groupBSpec));
+    }
+
+    private void sendOffsetFetchResponse(MockClient mockClient, Map<String, 
ListConsumerGroupOffsetsSpec> groupSpecs, boolean batched) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            ClientRequest clientRequest = mockClient.requests().peek();
+            return clientRequest != null && clientRequest.apiKey() == 
ApiKeys.OFFSET_FETCH;
+        }, "Failed awaiting OffsetFetch request");
+
+        ClientRequest clientRequest = mockClient.requests().peek();
+        OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).data;
+        Map<String, Map<TopicPartition, PartitionData>> results = new 
HashMap<>();
+        Map<String, Errors> errors = new HashMap<>();
+        data.groups().forEach(group -> {
+            Map<TopicPartition, PartitionData> partitionResults = new 
HashMap<>();
+            for (TopicPartition tp : 
groupSpecs.get(group.groupId()).topicPartitions()) {
+                partitionResults.put(tp, new PartitionData(10, 
Optional.empty(), "", Errors.NONE));
+            }
+            results.put(group.groupId(), partitionResults);
+            errors.put(group.groupId(), Errors.NONE);
+        });
+        if (!batched) {
+            assertEquals(1, data.groups().size());
+            mockClient.respond(new OffsetFetchResponse(THROTTLE, Errors.NONE, 
results.values().iterator().next()));
+        } else
+            mockClient.respond(new OffsetFetchResponse(THROTTLE, errors, 
results));
+    }
+
+    private void verifyListOffsetsForMultipleGroups(Map<String, 
ListConsumerGroupOffsetsSpec> groupSpecs,
+                                                    
ListConsumerGroupOffsetsResult result) throws Exception {
+        assertEquals(2, result.all().get(10, TimeUnit.SECONDS).size());

Review Comment:
   nit: Should we replace 2 by groupSpecs.size()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to