dajac commented on code in PR #10964:
URL: https://github.com/apache/kafka/pull/10964#discussion_r920378132
##########
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java:
##########
@@ -3263,6 +3282,95 @@ public void testListConsumerGroupOffsets() throws
Exception {
}
}
+ @Test
+ public void testBatchedListConsumerGroupOffsets() throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs =
batchedListConsumerGroupOffsetsSpec();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster, AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
env.kafkaClient().prepareResponse(prepareBatchedFindCoordinatorResponse(Errors.NONE,
env.cluster().controller(), groupSpecs.keySet()));
+
+ ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(groupSpecs, new
ListConsumerGroupOffsetsOptions());
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, true);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ @Test
+ public void testBatchedListConsumerGroupOffsetsWithNoBatchingSupport()
throws Exception {
+ Cluster cluster = mockCluster(1, 0);
+ Time time = new MockTime();
+ Map<String, ListConsumerGroupOffsetsSpec> groupSpecs =
batchedListConsumerGroupOffsetsSpec();
+
+ ApiVersion findCoordinatorV3 = new ApiVersion()
+ .setApiKey(ApiKeys.FIND_COORDINATOR.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 3);
+ ApiVersion offsetFetchV7 = new ApiVersion()
+ .setApiKey(ApiKeys.OFFSET_FETCH.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 7);
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster, AdminClientConfig.RETRY_BACKOFF_MS_CONFIG, "0")) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Arrays.asList(findCoordinatorV3,
offsetFetchV7)));
+
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.COORDINATOR_NOT_AVAILABLE,
Node.noNode()));
+
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
env.kafkaClient().prepareResponse(prepareOldFindCoordinatorResponse(Errors.NONE,
env.cluster().controller()));
+
+ ListConsumerGroupOffsetsResult result =
env.adminClient().listConsumerGroupOffsets(groupSpecs);
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false);
+ sendOffsetFetchResponse(env.kafkaClient(), groupSpecs, false);
+
+ verifyListOffsetsForMultipleGroups(groupSpecs, result);
+ }
+ }
+
+ private Map<String, ListConsumerGroupOffsetsSpec>
batchedListConsumerGroupOffsetsSpec() {
+ Set<TopicPartition> groupAPartitions = Collections.singleton(new
TopicPartition("A", 1));
+ Set<TopicPartition> groupBPartitions = Collections.singleton(new
TopicPartition("B", 2));
+
+ ListConsumerGroupOffsetsSpec groupASpec = new
ListConsumerGroupOffsetsSpec().topicPartitions(groupAPartitions);
+ ListConsumerGroupOffsetsSpec groupBSpec = new
ListConsumerGroupOffsetsSpec().topicPartitions(groupBPartitions);
+ return Utils.mkMap(Utils.mkEntry("groupA", groupASpec),
Utils.mkEntry("groupB", groupBSpec));
+ }
+
+ private void sendOffsetFetchResponse(MockClient mockClient, Map<String,
ListConsumerGroupOffsetsSpec> groupSpecs, boolean batched) throws Exception {
+ TestUtils.waitForCondition(() -> {
+ ClientRequest clientRequest = mockClient.requests().peek();
+ return clientRequest != null && clientRequest.apiKey() ==
ApiKeys.OFFSET_FETCH;
+ }, "Failed awaiting OffsetFetch request");
+
+ ClientRequest clientRequest = mockClient.requests().peek();
+ OffsetFetchRequestData data = ((OffsetFetchRequest.Builder)
clientRequest.requestBuilder()).data;
+ Map<String, Map<TopicPartition, PartitionData>> results = new
HashMap<>();
+ Map<String, Errors> errors = new HashMap<>();
+ data.groups().forEach(group -> {
+ Map<TopicPartition, PartitionData> partitionResults = new
HashMap<>();
+ for (TopicPartition tp :
groupSpecs.get(group.groupId()).topicPartitions()) {
+ partitionResults.put(tp, new PartitionData(10,
Optional.empty(), "", Errors.NONE));
+ }
+ results.put(group.groupId(), partitionResults);
+ errors.put(group.groupId(), Errors.NONE);
+ });
+ if (!batched) {
+ assertEquals(1, data.groups().size());
+ mockClient.respond(new OffsetFetchResponse(THROTTLE, Errors.NONE,
results.values().iterator().next()));
+ } else
+ mockClient.respond(new OffsetFetchResponse(THROTTLE, errors,
results));
+ }
+
+ private void verifyListOffsetsForMultipleGroups(Map<String,
ListConsumerGroupOffsetsSpec> groupSpecs,
+
ListConsumerGroupOffsetsResult result) throws Exception {
+ assertEquals(2, result.all().get(10, TimeUnit.SECONDS).size());
Review Comment:
nit: Should we replace 2 by groupSpecs.size()?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]