This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 610780668e MINOR: Fix options for old-style 
Admin.listConsumerGroupOffsets (#12406)
610780668e is described below

commit 610780668efa7c1e8d1be193985eb6e4d971fa0a
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
AuthorDate: Fri Jul 15 09:21:35 2022 +0100

    MINOR: Fix options for old-style Admin.listConsumerGroupOffsets (#12406)
    
    Reviewers: David Jacot <dja...@confluent.io>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  7 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 52 +++++++++++++++-------
 2 files changed, 40 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index 0698d29702..1d469a6643 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -921,12 +921,13 @@ public interface Admin extends AutoCloseable {
      * @return The ListGroupOffsetsResult
      */
     default ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
-        ListConsumerGroupOffsetsOptions listOptions = new 
ListConsumerGroupOffsetsOptions()
-            .requireStable(options.requireStable());
         @SuppressWarnings("deprecation")
         ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
             .topicPartitions(options.topicPartitions());
-        return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), listOptions);
+
+        // We can use the provided options with the batched API, which uses 
topic partitions from
+        // the group spec and ignores any topic partitions set in the options.
+        return listConsumerGroupOffsets(Collections.singletonMap(groupId, 
groupSpec), options);
     }
 
     /**
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 3d285a45f7..de57813679 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -131,6 +131,8 @@ import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
 import 
org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
 import org.apache.kafka.common.message.OffsetFetchRequestData;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestGroup;
+import 
org.apache.kafka.common.message.OffsetFetchRequestData.OffsetFetchRequestTopics;
 import org.apache.kafka.common.message.UnregisterBrokerResponseData;
 import org.apache.kafka.common.message.WriteTxnMarkersResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -3075,7 +3077,17 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testListConsumerGroupOffsetsOptions() throws Exception {
+    public void testListConsumerGroupOffsetsOptionsWithUnbatchedApi() throws 
Exception {
+        verifyListConsumerGroupOffsetsOptions(false);
+    }
+
+    @Test
+    public void testListConsumerGroupOffsetsOptionsWithBatchedApi() throws 
Exception {
+        verifyListConsumerGroupOffsetsOptions(true);
+    }
+
+    @SuppressWarnings("deprecation")
+    private void verifyListConsumerGroupOffsetsOptions(boolean batchedApi) 
throws Exception {
         final Cluster cluster = mockCluster(3, 0);
         final Time time = new MockTime();
 
@@ -3085,24 +3097,32 @@ public class KafkaAdminClientTest {
 
             
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
 
-            final TopicPartition tp1 = new TopicPartition("A", 0);
+            final List<TopicPartition> partitions = 
Collections.singletonList(new TopicPartition("A", 0));
             final ListConsumerGroupOffsetsOptions options = new 
ListConsumerGroupOffsetsOptions()
-                    .requireStable(true);
-            final ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
-                    .topicPartitions(Collections.singletonList(tp1));
-            
env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, 
groupSpec), options);
+                    .requireStable(true)
+                    .timeoutMs(300);
+            if (batchedApi) {
+                final ListConsumerGroupOffsetsSpec groupSpec = new 
ListConsumerGroupOffsetsSpec()
+                        .topicPartitions(partitions);
+                
env.adminClient().listConsumerGroupOffsets(Collections.singletonMap(GROUP_ID, 
groupSpec), options);
+            } else {
+                env.adminClient().listConsumerGroupOffsets(GROUP_ID, 
options.topicPartitions(partitions));
+            }
 
             final MockClient mockClient = env.kafkaClient();
-            TestUtils.waitForCondition(() -> {
-                final ClientRequest clientRequest = 
mockClient.requests().peek();
-                if (clientRequest != null) {
-                    OffsetFetchRequestData data = 
((OffsetFetchRequest.Builder) clientRequest.requestBuilder()).data;
-                    return data.requireStable() &&
-                        
data.groups().get(0).topics().get(0).name().equals("A") &&
-                        
data.groups().get(0).topics().get(0).partitionIndexes().equals(Collections.singletonList(0));
-                }
-                return false;
-            }, "Failed awaiting ListConsumerGroupOfsets request");
+            waitForRequest(mockClient, ApiKeys.OFFSET_FETCH);
+
+            ClientRequest clientRequest = mockClient.requests().peek();
+            assertNotNull(clientRequest);
+            assertEquals(300, clientRequest.requestTimeoutMs());
+            OffsetFetchRequestData data = ((OffsetFetchRequest.Builder) 
clientRequest.requestBuilder()).data;
+            assertTrue(data.requireStable());
+            assertEquals(Collections.singletonList(GROUP_ID),
+                    
data.groups().stream().map(OffsetFetchRequestGroup::groupId).collect(Collectors.toList()));
+            assertEquals(Collections.singletonList("A"),
+                    
data.groups().get(0).topics().stream().map(OffsetFetchRequestTopics::name).collect(Collectors.toList()));
+            assertEquals(Collections.singletonList(0),
+                    data.groups().get(0).topics().get(0).partitionIndexes());
         }
     }
 

Reply via email to