[ https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508215#comment-16508215 ]
ASF GitHub Bot commented on KAFKA-6788: --------------------------------------- cyrusv closed pull request #4878: KAFKA-6788: Combine queries for describe and delete groups in AdminCl… URL: https://github.com/apache/kafka/pull/4878 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index fa3f943555b..cd79453d29a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -2252,14 +2252,17 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri } } - // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of - // all consumer groups this coordinator host + final Set<String> describedGroupIds = new HashSet<>(); + for (final Map.Entry<String, KafkaFutureImpl<ConsumerGroupDescription>> entry : futures.entrySet()) { // skip sending request for those futures that already failed. if (entry.getValue().isCompletedExceptionally()) continue; final String groupId = entry.getKey(); + if (describedGroupIds.contains(groupId)) { + continue; + } final long startFindCoordinatorMs = time.milliseconds(); final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); @@ -2274,53 +2277,82 @@ public DescribeConsumerGroupsResult describeConsumerGroups(final Collection<Stri void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; - final long nowDescribeConsumerGroups = time.milliseconds(); - + final long nowListConsumerGroups = time.milliseconds(); final int nodeId = response.node().id(); - runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { + runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(nodeId)) { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)); + return new ListGroupsRequest.Builder(); } @Override void handleResponse(AbstractResponse abstractResponse) { - final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; + final ListGroupsResponse listResponse = (ListGroupsResponse) abstractResponse; + final long nowDescribeConsumerGroups = time.milliseconds(); - KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); - final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId); + final Set<String> groupIdsToDescribe = new HashSet<>(); + for (ListGroupsResponse.Group group : listResponse.groups()) { + groupIdsToDescribe.add(group.groupId()); + } - final Errors groupError = groupMetadata.error(); - if (groupError != Errors.NONE) { - // TODO: KAFKA-6789, we can retry based on the error code - future.completeExceptionally(groupError.exception()); - } else { - final String protocolType = groupMetadata.protocolType(); - if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { - final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members(); - final List<MemberDescription> consumers = new ArrayList<>(members.size()); - - for (DescribeGroupsResponse.GroupMember groupMember : members) { - final PartitionAssignor.Assignment assignment = - ConsumerProtocol.deserializeAssignment( - ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment()))); - - final MemberDescription memberDescription = - new MemberDescription( - groupMember.memberId(), - groupMember.clientId(), - groupMember.clientHost(), - new MemberAssignment(assignment.partitions())); - consumers.add(memberDescription); + runnable.call(new Call("describeConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + return new DescribeGroupsRequest.Builder(Collections.singletonList(groupId)); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final DescribeGroupsResponse response = (DescribeGroupsResponse) abstractResponse; + for (String describedCandidate : groupIdsToDescribe) { + if (response.groups().containsKey(describedCandidate)) { + describedGroupIds.add(describedCandidate); + } + } + + KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); + final DescribeGroupsResponse.GroupMetadata groupMetadata = response.groups().get(groupId); + + final Errors groupError = groupMetadata.error(); + if (groupError != Errors.NONE) { + // TODO: KAFKA-6789, we can retry based on the error code + future.completeExceptionally(groupError.exception()); + } else { + final String protocolType = groupMetadata.protocolType(); + if (protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) { + final List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members(); + final List<MemberDescription> consumers = new ArrayList<>(members.size()); + + for (DescribeGroupsResponse.GroupMember groupMember : members) { + final PartitionAssignor.Assignment assignment = + ConsumerProtocol.deserializeAssignment( + ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment()))); + + final MemberDescription memberDescription = + new MemberDescription( + groupMember.memberId(), + groupMember.clientId(), + groupMember.clientHost(), + new MemberAssignment(assignment.partitions())); + consumers.add(memberDescription); + } + final String protocol = groupMetadata.protocol(); + final ConsumerGroupDescription consumerGroupDescription = + new ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol); + future.complete(consumerGroupDescription); + } } - final String protocol = groupMetadata.protocol(); - final ConsumerGroupDescription consumerGroupDescription = - new ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol); - future.complete(consumerGroupDescription); } - } + + @Override + void handleFailure(Throwable throwable) { + KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); + future.completeExceptionally(throwable); + } + }, nowDescribeConsumerGroups); } @Override @@ -2328,7 +2360,7 @@ void handleFailure(Throwable throwable) { KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); future.completeExceptionally(throwable); } - }, nowDescribeConsumerGroups); + }, nowListConsumerGroups); } @Override @@ -2336,6 +2368,7 @@ void handleFailure(Throwable throwable) { KafkaFutureImpl<ConsumerGroupDescription> future = futures.get(groupId); future.completeExceptionally(throwable); } + }, startFindCoordinatorMs); } @@ -2548,13 +2581,18 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupI } } - // TODO: KAFKA-6788, we should consider grouping the request per coordinator and send one request with a list of - // all consumer groups this coordinator host + final Set<String> deletedGroupIds = new HashSet<>(); + for (final String groupId : groupIds) { // skip sending request for those futures that already failed. if (futures.get(groupId).isCompletedExceptionally()) continue; + // Skip ones we deleted on the same coordinator + if (deletedGroupIds.contains(groupId)) { + continue; + } + final long startFindCoordinatorMs = time.milliseconds(); final long deadline = calcDeadlineMs(startFindCoordinatorMs, options.timeoutMs()); @@ -2568,29 +2606,60 @@ public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> groupI void handleResponse(AbstractResponse abstractResponse) { final FindCoordinatorResponse response = (FindCoordinatorResponse) abstractResponse; - final long nowDeleteConsumerGroups = time.milliseconds(); - + final long nowListGroups = time.milliseconds(); final int nodeId = response.node().id(); - runnable.call(new Call("deleteConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { - + runnable.call(new Call("listGroups", deadline, new ConstantNodeIdProvider(nodeId)) { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new DeleteGroupsRequest.Builder(Collections.singleton(groupId)); + return new ListGroupsRequest.Builder(); } @Override void handleResponse(AbstractResponse abstractResponse) { - final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; - - KafkaFutureImpl<Void> future = futures.get(groupId); - final Errors groupError = response.get(groupId); + final ListGroupsResponse listResponse = (ListGroupsResponse) abstractResponse; + final long nowDeleteConsumerGroups = time.milliseconds(); - if (groupError != Errors.NONE) { - future.completeExceptionally(groupError.exception()); - } else { - future.complete(null); + final Set<String> groupIdsToDelete = new HashSet<>(); + for (ListGroupsResponse.Group group : listResponse.groups()) { + groupIdsToDelete.add(group.groupId()); } + runnable.call(new Call("deleteConsumerGroups", deadline, new ConstantNodeIdProvider(nodeId)) { + + @Override + AbstractRequest.Builder createRequest(int timeoutMs) { + + + return new DeleteGroupsRequest.Builder(groupIdsToDelete); + } + + @Override + void handleResponse(AbstractResponse abstractResponse) { + final DeleteGroupsResponse response = (DeleteGroupsResponse) abstractResponse; + + // If we submitted it and it wasn't an error + for (final String delCandidateGroupId : groupIdsToDelete) { + if (!response.errors().containsKey(delCandidateGroupId)) { + deletedGroupIds.add(delCandidateGroupId); + } + } + + KafkaFutureImpl<Void> future = futures.get(groupId); + final Errors groupError = response.get(groupId); + + if (groupError != Errors.NONE) { + future.completeExceptionally(groupError.exception()); + } else { + future.complete(null); + } + } + + @Override + void handleFailure(Throwable throwable) { + KafkaFutureImpl<Void> future = futures.get(groupId); + future.completeExceptionally(throwable); + } + }, nowDeleteConsumerGroups); } @Override @@ -2598,7 +2667,7 @@ void handleFailure(Throwable throwable) { KafkaFutureImpl<Void> future = futures.get(groupId); future.completeExceptionally(throwable); } - }, nowDeleteConsumerGroups); + }, nowListGroups); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index d2789b62621..2af29509908 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -794,6 +794,14 @@ public void testDescribeConsumerGroups() throws Exception { new DescribeGroupsResponse.GroupMember("0", "clientId0", "clientHost", null, memberAssignment), new DescribeGroupsResponse.GroupMember("1", "clientId1", "clientHost", null, memberAssignment)))); + + env.kafkaClient().prepareResponse( + new ListGroupsResponse( + Errors.NONE, + Arrays.asList( + new ListGroupsResponse.Group("group-0", ConsumerProtocol.PROTOCOL_TYPE) + ))); + env.kafkaClient().prepareResponse(new DescribeGroupsResponse(groupMetadataMap)); final DescribeConsumerGroupsResult result = env.adminClient().describeConsumerGroups(Collections.singletonList("group-0")); @@ -867,6 +875,15 @@ public void testDeleteConsumerGroups() throws Exception { env.kafkaClient().prepareResponse(new FindCoordinatorResponse(Errors.NONE, env.cluster().controller())); + env.kafkaClient().prepareResponse( + new ListGroupsResponse( + Errors.NONE, + Arrays.asList( + new ListGroupsResponse.Group("group-1", ConsumerProtocol.PROTOCOL_TYPE), + new ListGroupsResponse.Group("group-connect-1", "connector") + ))); + + final Map<String, Errors> response = new HashMap<>(); response.put("group-0", Errors.NONE); env.kafkaClient().prepareResponse(new DeleteGroupsResponse(response)); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Grouping consumer requests per consumer coordinator in admin client > ------------------------------------------------------------------- > > Key: KAFKA-6788 > URL: https://issues.apache.org/jira/browse/KAFKA-6788 > Project: Kafka > Issue Type: Improvement > Components: admin > Reporter: Guozhang Wang > Priority: Major > Labels: newbie++ > > In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we > will first try to get the coordinator for each requested group id, and then > send the corresponding request for that group id. However, different group > ids could be hosted on the same coordinator, and these requests do support > multi group ids be sent within the same request. So we can consider optimize > it by grouping the requests per coordinator destination. -- This message was sent by Atlassian JIRA (v7.6.3#76005)