[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16439027#comment-16439027
 ] 

ASF GitHub Bot commented on KAFKA-6788:
---------------------------------------

cyrusv closed pull request #4875: KAFKA-6788: Grouping consumer requests per 
consumer coordinator in admin client
URL: https://github.com/apache/kafka/pull/4875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index fa3f943555b..8fd92572586 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -2252,14 +2252,17 @@ public DescribeConsumerGroupsResult 
describeConsumerGroups(final Collection<Stri
             }
         }
 
-        // TODO: KAFKA-6788, we should consider grouping the request per 
coordinator and send one request with a list of
-        // all consumer groups this coordinator host
+        final Set<String> describedGroupIds = new HashSet<>();
+
         for (final Map.Entry<String, 
KafkaFutureImpl<ConsumerGroupDescription>> entry : futures.entrySet()) {
             // skip sending request for those futures that already failed.
             if (entry.getValue().isCompletedExceptionally())
                 continue;
 
             final String groupId = entry.getKey();
+            if (describedGroupIds.contains(groupId)) {
+                continue;
+            }
 
             final long startFindCoordinatorMs = time.milliseconds();
             final long deadline = calcDeadlineMs(startFindCoordinatorMs, 
options.timeoutMs());
@@ -2274,61 +2277,83 @@ public DescribeConsumerGroupsResult 
describeConsumerGroups(final Collection<Stri
                 void handleResponse(AbstractResponse abstractResponse) {
                     final FindCoordinatorResponse response = 
(FindCoordinatorResponse) abstractResponse;
 
-                    final long nowDescribeConsumerGroups = time.milliseconds();
+                    final long nowListConsumerGroups = time.milliseconds();
 
                     final int nodeId = response.node().id();
 
-                    runnable.call(new Call("describeConsumerGroups", deadline, 
new ConstantNodeIdProvider(nodeId)) {
 
+                    runnable.call(new Call("listGroups", deadline, new 
ConstantNodeIdProvider(nodeId)) {
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new 
DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
+                            return new ListGroupsRequest.Builder();
                         }
 
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) 
{
-                            final DescribeGroupsResponse response = 
(DescribeGroupsResponse) abstractResponse;
+                            final ListGroupsResponse listResponse = 
(ListGroupsResponse) abstractResponse;
+                            final long nowDescribeConsumerGroups = 
time.milliseconds();
 
-                            KafkaFutureImpl<ConsumerGroupDescription> future = 
futures.get(groupId);
-                            final DescribeGroupsResponse.GroupMetadata 
groupMetadata = response.groups().get(groupId);
+                            final Set<String> groupIdsToDescribe = new 
HashSet<>();
+                            for (ListGroupsResponse.Group group : 
listResponse.groups()) {
+                                groupIdsToDescribe.add(group.groupId());
+                            }
 
-                            final Errors groupError = groupMetadata.error();
-                            if (groupError != Errors.NONE) {
-                                // TODO: KAFKA-6789, we can retry based on the 
error code
-                                
future.completeExceptionally(groupError.exception());
-                            } else {
-                                final String protocolType = 
groupMetadata.protocolType();
-                                if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
-                                    final 
List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
-                                    final List<MemberDescription> consumers = 
new ArrayList<>(members.size());
-
-                                    for (DescribeGroupsResponse.GroupMember 
groupMember : members) {
-                                        final PartitionAssignor.Assignment 
assignment =
-                                                
ConsumerProtocol.deserializeAssignment(
-                                                        
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
-
-                                        final MemberDescription 
memberDescription =
-                                                new MemberDescription(
-                                                        groupMember.memberId(),
-                                                        groupMember.clientId(),
-                                                        
groupMember.clientHost(),
-                                                        new 
MemberAssignment(assignment.partitions()));
-                                        consumers.add(memberDescription);
+                            runnable.call(new Call("describeConsumerGroups", 
deadline, new ConstantNodeIdProvider(nodeId)) {
+
+                                @Override
+                                AbstractRequest.Builder createRequest(int 
timeoutMs) {
+                                    return new 
DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
+                                }
+
+                                @Override
+                                void handleResponse(AbstractResponse 
abstractResponse) {
+                                    final DescribeGroupsResponse response = 
(DescribeGroupsResponse) abstractResponse;
+                                    for (String describedCandidate : 
groupIdsToDescribe) {
+                                        if 
(response.groups().containsKey(describedCandidate)) {
+                                            
describedGroupIds.add(describedCandidate);
+                                        }
+                                    }
+
+                                    KafkaFutureImpl<ConsumerGroupDescription> 
future = futures.get(groupId);
+                                    final DescribeGroupsResponse.GroupMetadata 
groupMetadata = response.groups().get(groupId);
+
+                                    final Errors groupError = 
groupMetadata.error();
+                                    if (groupError != Errors.NONE) {
+                                        // TODO: KAFKA-6789, we can retry 
based on the error code
+                                        
future.completeExceptionally(groupError.exception());
+                                    } else {
+                                        final String protocolType = 
groupMetadata.protocolType();
+                                        if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
+                                            final 
List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
+                                            final List<MemberDescription> 
consumers = new ArrayList<>(members.size());
+
+                                            for 
(DescribeGroupsResponse.GroupMember groupMember : members) {
+                                                final 
PartitionAssignor.Assignment assignment =
+                                                        
ConsumerProtocol.deserializeAssignment(
+                                                                
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
+
+                                                final MemberDescription 
memberDescription =
+                                                        new MemberDescription(
+                                                                
groupMember.memberId(),
+                                                                
groupMember.clientId(),
+                                                                
groupMember.clientHost(),
+                                                                new 
MemberAssignment(assignment.partitions()));
+                                                
consumers.add(memberDescription);
+                                            }
+                                            final String protocol = 
groupMetadata.protocol();
+                                            final ConsumerGroupDescription 
consumerGroupDescription =
+                                                    new 
ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol);
+                                            
future.complete(consumerGroupDescription);
+                                        }
                                     }
-                                    final String protocol = 
groupMetadata.protocol();
-                                    final ConsumerGroupDescription 
consumerGroupDescription =
-                                            new 
ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol);
-                                    future.complete(consumerGroupDescription);
                                 }
-                            }
-                        }
 
-                        @Override
-                        void handleFailure(Throwable throwable) {
-                            KafkaFutureImpl<ConsumerGroupDescription> future = 
futures.get(groupId);
-                            future.completeExceptionally(throwable);
-                        }
-                    }, nowDescribeConsumerGroups);
+                                @Override
+                                void handleFailure(Throwable throwable) {
+                                    KafkaFutureImpl<ConsumerGroupDescription> 
future = futures.get(groupId);
+                                    future.completeExceptionally(throwable);
+                                }
+                            }, nowDescribeConsumerGroups);
                 }
 
                 @Override
@@ -2548,13 +2573,18 @@ public DeleteConsumerGroupsResult 
deleteConsumerGroups(Collection<String> groupI
             }
         }
 
-        // TODO: KAFKA-6788, we should consider grouping the request per 
coordinator and send one request with a list of
-        // all consumer groups this coordinator host
+        final Set<String> deletedGroupIds = new HashSet<>();
+
         for (final String groupId : groupIds) {
             // skip sending request for those futures that already failed.
             if (futures.get(groupId).isCompletedExceptionally())
                 continue;
 
+            // Skip ones we deleted on the same coordinator
+            if (deletedGroupIds.contains(groupId)) {
+                continue;
+            }
+
             final long startFindCoordinatorMs = time.milliseconds();
             final long deadline = calcDeadlineMs(startFindCoordinatorMs, 
options.timeoutMs());
 
@@ -2568,29 +2598,60 @@ public DeleteConsumerGroupsResult 
deleteConsumerGroups(Collection<String> groupI
                 void handleResponse(AbstractResponse abstractResponse) {
                     final FindCoordinatorResponse response = 
(FindCoordinatorResponse) abstractResponse;
 
-                    final long nowDeleteConsumerGroups = time.milliseconds();
-
+                    final long nowListGroups = time.milliseconds();
                     final int nodeId = response.node().id();
 
-                    runnable.call(new Call("deleteConsumerGroups", deadline, 
new ConstantNodeIdProvider(nodeId)) {
-
+                    runnable.call(new Call("listGroups", deadline, new 
ConstantNodeIdProvider(nodeId)) {
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
-                            return new 
DeleteGroupsRequest.Builder(Collections.singleton(groupId));
+                            return new ListGroupsRequest.Builder();
                         }
 
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) 
{
-                            final DeleteGroupsResponse response = 
(DeleteGroupsResponse) abstractResponse;
-
-                            KafkaFutureImpl<Void> future = 
futures.get(groupId);
-                            final Errors groupError = response.get(groupId);
+                            final ListGroupsResponse listResponse = 
(ListGroupsResponse) abstractResponse;
+                            final long nowDeleteConsumerGroups = 
time.milliseconds();
 
-                            if (groupError != Errors.NONE) {
-                                
future.completeExceptionally(groupError.exception());
-                            } else {
-                                future.complete(null);
+                            final Set<String> groupIdsToDelete = new 
HashSet<>();
+                            for (ListGroupsResponse.Group group : 
listResponse.groups()) {
+                                groupIdsToDelete.add(group.groupId());
                             }
+                            runnable.call(new Call("deleteConsumerGroups", 
deadline, new ConstantNodeIdProvider(nodeId)) {
+
+                                @Override
+                                AbstractRequest.Builder createRequest(int 
timeoutMs) {
+
+
+                                    return new 
DeleteGroupsRequest.Builder(groupIdsToDelete);
+                                }
+
+                                @Override
+                                void handleResponse(AbstractResponse 
abstractResponse) {
+                                    final DeleteGroupsResponse response = 
(DeleteGroupsResponse) abstractResponse;
+
+                                    // If we submitted it and it wasn't an 
error
+                                    for (final String delCandidateGroupId : 
groupIdsToDelete) {
+                                        if 
(!response.errors().containsKey(delCandidateGroupId)) {
+                                            
deletedGroupIds.add(delCandidateGroupId);
+                                        }
+                                    }
+
+                                    KafkaFutureImpl<Void> future = 
futures.get(groupId);
+                                    final Errors groupError = 
response.get(groupId);
+
+                                    if (groupError != Errors.NONE) {
+                                        
future.completeExceptionally(groupError.exception());
+                                    } else {
+                                        future.complete(null);
+                                    }
+                                }
+
+                                @Override
+                                void handleFailure(Throwable throwable) {
+                                    KafkaFutureImpl<Void> future = 
futures.get(groupId);
+                                    future.completeExceptionally(throwable);
+                                }
+                            }, nowDeleteConsumerGroups);
                         }
 
                         @Override
@@ -2598,7 +2659,7 @@ void handleFailure(Throwable throwable) {
                             KafkaFutureImpl<Void> future = 
futures.get(groupId);
                             future.completeExceptionally(throwable);
                         }
-                    }, nowDeleteConsumerGroups);
+                    }, nowListGroups);
                 }
 
                 @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d2789b62621..be44b37496f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -867,6 +867,15 @@ public void testDeleteConsumerGroups() throws Exception {
 
             env.kafkaClient().prepareResponse(new 
FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
 
+            env.kafkaClient().prepareResponse(
+                new ListGroupsResponse(
+                    Errors.NONE,
+                    Arrays.asList(
+                        new ListGroupsResponse.Group("group-1", 
ConsumerProtocol.PROTOCOL_TYPE),
+                        new ListGroupsResponse.Group("group-connect-1", 
"connector")
+                    )));
+
+
             final Map<String, Errors> response = new HashMap<>();
             response.put("group-0", Errors.NONE);
             env.kafkaClient().prepareResponse(new 
DeleteGroupsResponse(response));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Grouping consumer requests per consumer coordinator in admin client
> -------------------------------------------------------------------
>
>                 Key: KAFKA-6788
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6788
>             Project: Kafka
>          Issue Type: Improvement
>          Components: admin
>            Reporter: Guozhang Wang
>            Priority: Major
>              Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to