This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2bb9f53477f KAFKA-17465 Refactor getMembersFromGroup to be
non-blocking (#17080)
2bb9f53477f is described below
commit 2bb9f53477f6a71c87c9bbe0d0281652cffb6eb7
Author: TaiJuWu <[email protected]>
AuthorDate: Wed Sep 25 02:40:32 2024 +0800
KAFKA-17465 Refactor getMembersFromGroup to be non-blocking (#17080)
Reviewers: TengYao Chi <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 60 ++++++++++++----------
1 file changed, 33 insertions(+), 27 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1f08ca95126..7ba2b3f7db7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4114,27 +4114,24 @@ public class KafkaAdminClient extends AdminClient {
}
}
- private List<MemberIdentity> getMembersFromGroup(String groupId, String
reason) {
- Collection<MemberDescription> members;
- try {
- members =
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
- } catch (Exception ex) {
- throw new KafkaException("Encounter exception when trying to get
members from group: " + groupId, ex);
- }
+ private KafkaFutureImpl<List<MemberIdentity>> getMembersFromGroup(String
groupId, String reason) {
+ KafkaFutureImpl<List<MemberIdentity>> future = new KafkaFutureImpl<>();
- List<MemberIdentity> membersToRemove = new ArrayList<>();
- for (final MemberDescription member : members) {
- MemberIdentity memberIdentity = new
MemberIdentity().setReason(reason);
-
- if (member.groupInstanceId().isPresent()) {
-
memberIdentity.setGroupInstanceId(member.groupInstanceId().get());
+
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).whenComplete((res,
ex) -> {
+ if (ex != null) {
+ future.completeExceptionally(new KafkaException("Encounter
exception when trying to get members from group: " + groupId, ex));
} else {
- memberIdentity.setMemberId(member.consumerId());
+ List<MemberIdentity> membersToRemove =
res.members().stream().map(member ->
+ member.groupInstanceId().map(id -> new
MemberIdentity().setGroupInstanceId(id))
+ .orElseGet(() -> new
MemberIdentity().setMemberId(member.consumerId()))
+ .setReason(reason)
+ ).collect(Collectors.toList());
+
+ future.complete(membersToRemove);
}
+ });
- membersToRemove.add(memberIdentity);
- }
- return membersToRemove;
+ return future;
}
@Override
@@ -4143,20 +4140,29 @@ public class KafkaAdminClient extends AdminClient {
String reason = options.reason() == null || options.reason().isEmpty()
?
DEFAULT_LEAVE_GROUP_REASON :
JoinGroupRequest.maybeTruncateReason(options.reason());
- List<MemberIdentity> members;
+ final SimpleAdminApiFuture<CoordinatorKey, Map<MemberIdentity,
Errors>> adminFuture =
+ RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
+
+ KafkaFutureImpl<List<MemberIdentity>> memFuture;
if (options.removeAll()) {
- members = getMembersFromGroup(groupId, reason);
+ memFuture = getMembersFromGroup(groupId, reason);
} else {
- members = options.members().stream()
- .map(m -> m.toMemberIdentity().setReason(reason))
- .collect(Collectors.toList());
+ memFuture = new KafkaFutureImpl<>();
+ memFuture.complete(options.members().stream()
+ .map(m -> m.toMemberIdentity().setReason(reason))
+ .collect(Collectors.toList()));
}
- SimpleAdminApiFuture<CoordinatorKey, Map<MemberIdentity, Errors>>
future =
- RemoveMembersFromConsumerGroupHandler.newFuture(groupId);
- RemoveMembersFromConsumerGroupHandler handler = new
RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
- invokeDriver(handler, future, options.timeoutMs);
- return new
RemoveMembersFromConsumerGroupResult(future.get(CoordinatorKey.byGroupId(groupId)),
options.members());
+ memFuture.whenComplete((members, ex) -> {
+ if (ex != null) {
+
adminFuture.completeExceptionally(Collections.singletonMap(CoordinatorKey.byGroupId(groupId),
ex));
+ } else {
+ RemoveMembersFromConsumerGroupHandler handler = new
RemoveMembersFromConsumerGroupHandler(groupId, members, logContext);
+ invokeDriver(handler, adminFuture, options.timeoutMs());
+ }
+ });
+
+ return new
RemoveMembersFromConsumerGroupResult(adminFuture.get(CoordinatorKey.byGroupId(groupId)),
options.members());
}
@Override