This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 610780668e MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406) 610780668e is described below commit 610780668efa7c1e8d1be193985eb6e4d971fa0a Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Fri Jul 15 09:21:35 2022 +0100 MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406) Reviewers: David Jacot <dja...@confluent.io> --- .../java/org/apache/kafka/clients/admin/Admin.java | 7 +-- .../kafka/clients/admin/KafkaAdminClientTest.java | 52 +++++++++++++++------- 2 files changed, 40 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java index 0698d29702..1d469a6643 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java @@ -921,12 +921,13 @@ public interface Admin extends AutoCloseable { * @return The ListGroupOffsetsResult */ default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions options) { - ListConsumerGroupOffsetsOptions listOptions = new ListConsumerGroupOffsetsOptions() - .requireStable(options.requireStable()); @SuppressWarnings("deprecation") ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() .topicPartitions(options.topicPartitions()); - return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), listOptions); + + // We can use the provided options with the batched API, which uses topic partitions from + // the group spec and ignores any topic partitions set in the options. + return listConsumerGroupOffsets(Collections.singletonMap(groupId, groupSpec), options); } /** diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 3d285a45f7..de57813679 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -131,6 +131,8 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic; import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection; import org.apache.kafka.common.message.OffsetFetchRequestData; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup; +import org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics; import org.apache.kafka.common.message.UnregisterBrokerResponseData; import org.apache.kafka.common.message.WriteTxnMarkersResponseData; import org.apache.kafka.common.protocol.ApiKeys; @@ -3075,7 +3077,17 @@ public class KafkaAdminClientTest { } @Test - public void testListConsumerGroupOffsetsOptions() throws Exception { + public void testListConsumerGroupOffsetsOptionsWithUnbatchedApi() throws Exception { + verifyListConsumerGroupOffsetsOptions(false); + } + + @Test + public void testListConsumerGroupOffsetsOptionsWithBatchedApi() throws Exception { + verifyListConsumerGroupOffsetsOptions(true); + } + + @SuppressWarnings("deprecation") + private void verifyListConsumerGroupOffsetsOptions(boolean batchedApi) throws Exception { final Cluster cluster = mockCluster(3, 0); final Time time = new MockTime(); @@ -3085,24 +3097,32 @@ public class KafkaAdminClientTest { env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); - final TopicPartition tp1 = new TopicPartition("A", 0); + final List<TopicPartition> partitions = Collections.singletonList(new TopicPartition("A", 0)); final ListConsumerGroupOffsetsOptions options = new ListConsumerGroupOffsetsOptions() - .requireStable(true); - final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() - .topicPartitions(Collections.singletonList(tp1)); - env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options); + .requireStable(true) + .timeoutMs(300); + if (batchedApi) { + final ListConsumerGroupOffsetsSpec groupSpec = new ListConsumerGroupOffsetsSpec() + .topicPartitions(partitions); + env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, groupSpec), options); + } else { + env.adminClient().listConsumerGroupOffsets(GROUP_ID, options.topicPartitions(partitions)); + } final MockClient mockClient = env.kafkaClient(); - TestUtils.waitForCondition(() -> { - final ClientRequest clientRequest = mockClient.requests().peek(); - if (clientRequest != null) { - OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data; - return data.requireStable() && - data.groups().get(0).topics().get(0).name().equals("A") && - data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0)); - } - return false; - }, "Failed awaiting ListConsumerGroupOfsets request"); + waitForRequest(mockClient, ApiKeys.OFFSET_FETCH); + + ClientRequest clientRequest = mockClient.requests().peek(); + assertNotNull(clientRequest); + assertEquals(300, clientRequest.requestTimeoutMs()); + OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data; + assertTrue(data.requireStable()); + assertEquals(Collections.singletonList(GROUP_ID), + data.groups().stream().map(OffsetFetchRequestGroup::groupId).collect(Collectors.toList())); + assertEquals(Collections.singletonList("A"), + data.groups().get(0).topics().stream().map(OffsetFetchRequestTopics::name).collect(Collectors.toList())); + assertEquals(Collections.singletonList(0), + data.groups().get(0).topics().get(0).partitionIndexes()); } }