This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2bb9f53477f KAFKA-17465 Refactor getMembersFromGroup to be 
non-blocking (#17080)
2bb9f53477f is described below

commit 2bb9f53477f6a71c87c9bbe0d0281652cffb6eb7
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Sep 25 02:40:32 2024 +0800

    KAFKA-17465 Refactor getMembersFromGroup to be non-blocking (#17080)
    
    Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 60 ++++++++++++----------
 1 file changed, 33 insertions(+), 27 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1f08ca95126..7ba2b3f7db7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4114,27 +4114,24 @@ public class KafkaAdminClient extends AdminClient {
         }
     }
 
-    private List<MemberIdentity> getMembersFromGroup(String groupId, String 
reason) {
-        Collection<MemberDescription> members;
-        try {
-            members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
-        } catch (Exception ex) {
-            throw new KafkaException("Encounter exception when trying to get 
members from group: " + groupId, ex);
-        }
+    private KafkaFutureImpl<List<MemberIdentity>> getMembersFromGroup(String 
groupId, String reason) {
+        KafkaFutureImpl<List<MemberIdentity>> future = new KafkaFutureImpl<>();
 
-        List<MemberIdentity> membersToRemove = new ArrayList<>();
-        for (final MemberDescription member : members) {
-            MemberIdentity memberIdentity = new 
MemberIdentity().setReason(reason);
-
-            if (member.groupInstanceId().isPresent()) {
-                
memberIdentity.setGroupInstanceId(member.groupInstanceId().get());
+        
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).whenComplete((res,
 ex) -> {
+            if (ex != null) {
+                future.completeExceptionally(new KafkaException("Encounter 
exception when trying to get members from group: " + groupId, ex));
             } else {
-                memberIdentity.setMemberId(member.consumerId());
+                List<MemberIdentity> membersToRemove = 
res.members().stream().map(member ->
+                    member.groupInstanceId().map(id -> new 
MemberIdentity().setGroupInstanceId(id))
+                    .orElseGet(() -> new 
MemberIdentity().setMemberId(member.consumerId()))
+                    .setReason(reason)
+                ).collect(Collectors.toList());
+
+                future.complete(membersToRemove);
             }
+        });
 
-            membersToRemove.add(memberIdentity);
-        }
-        return membersToRemove;
+        return future;
     }
 
     @Override
@@ -4143,20 +4140,29 @@ public class KafkaAdminClient extends AdminClient {
         String reason = options.reason() == null || options.reason().isEmpty() 
?
             DEFAULT_LEAVE_GROUP_REASON : 
JoinGroupRequest.maybeTruncateReason(options.reason());
 
-        List<MemberIdentity> members;
+        final SimpleAdminApiFuture<CoordinatorKey, Map<MemberIdentity, 
Errors>> adminFuture =
+                RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
+
+        KafkaFutureImpl<List<MemberIdentity>> memFuture;
         if (options.removeAll()) {
-            members = getMembersFromGroup(groupId, reason);
+            memFuture = getMembersFromGroup(groupId, reason);
         } else {
-            members = options.members().stream()
-                .map(m -> m.toMemberIdentity().setReason(reason))
-                .collect(Collectors.toList());
+            memFuture = new KafkaFutureImpl<>();
+            memFuture.complete(options.members().stream()
+                    .map(m -> m.toMemberIdentity().setReason(reason))
+                    .collect(Collectors.toList()));
         }
 
-        SimpleAdminApiFuture<CoordinatorKey, Map<MemberIdentity, Errors>> 
future =
-                RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
-        RemoveMembersFromConsumerGroupHandler handler = new 
RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
-        invokeDriver(handler, future, options.timeoutMs);
-        return new 
RemoveMembersFromConsumerGroupResult(future.get(CoordinatorKey.byGroupId(groupId)),
 options.members());
+        memFuture.whenComplete((members, ex) -> {
+            if (ex != null) {
+                
adminFuture.completeExceptionally(Collections.singletonMap(CoordinatorKey.byGroupId(groupId),
 ex));
+            } else {
+                RemoveMembersFromConsumerGroupHandler handler = new 
RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
+                invokeDriver(handler, adminFuture, options.timeoutMs());
+            }
+        });
+
+        return new 
RemoveMembersFromConsumerGroupResult(adminFuture.get(CoordinatorKey.byGroupId(groupId)),
 options.members());
     }
 
     @Override

Reply via email to