This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b1796ce6d2c KAFKA-15849: Fix ListGroups API when runtime partition 
size is zero (#14785)
b1796ce6d2c is described below

commit b1796ce6d2c04444a62393fbfd7c61811e001d67
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Fri Nov 17 07:48:02 2023 -0500

    KAFKA-15849: Fix ListGroups API when runtime partition size is zero (#14785)
    
    When the group coordinator does not host any __consumer_offsets partitions, 
the existing ListGroup implementation won't schedule any operation, thus a `new 
CompletableFuture<>()` is returned directly and never gets completed. This 
patch fixes the issue.
    
    Reviewers: David Jacot <[email protected]>
---
 .../coordinator/group/GroupCoordinatorService.java |  4 ++++
 .../group/GroupCoordinatorServiceTest.java         | 24 ++++++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index b092316bb31..b184ab9b83b 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -501,6 +501,10 @@ public class GroupCoordinatorService implements 
GroupCoordinator {
         final Set<TopicPartition> existingPartitionSet = runtime.partitions();
         final AtomicInteger cnt = new 
AtomicInteger(existingPartitionSet.size());
 
+        if (existingPartitionSet.isEmpty()) {
+            return CompletableFuture.completedFuture(new 
ListGroupsResponseData());
+        }
+
         for (TopicPartition tp : existingPartitionSet) {
             runtime.scheduleReadOperation(
                 "list-groups",
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index e3e13b82685..98174b07508 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -832,6 +832,30 @@ public class GroupCoordinatorServiceTest {
         assertEquals(Collections.emptyList(), listGroupsResponseData.groups());
     }
 
+    @Test
+    public void testListGroupsWithEmptyTopicPartitions() throws 
ExecutionException, InterruptedException {
+        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();
+        GroupCoordinatorService service = new GroupCoordinatorService(
+            new LogContext(),
+            createConfig(),
+            runtime
+        );
+        int partitionCount = 0;
+        service.startup(() -> partitionCount);
+
+        ListGroupsRequestData request = new ListGroupsRequestData();
+
+        CompletableFuture<ListGroupsResponseData> future = service.listGroups(
+            requestContext(ApiKeys.LIST_GROUPS),
+            request
+        );
+
+        assertEquals(
+            new ListGroupsResponseData(),
+            future.get()
+        );
+    }
+
     @Test
     public void testListGroupsWhenNotStarted() throws ExecutionException, 
InterruptedException {
         CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = 
mockRuntime();

Reply via email to