This is an automated email from the ASF dual-hosted git repository.

manikumar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e7bbcdb2514 KAFKA-18090: Add ShareMemberDescription and Assignment 
(#17975)
e7bbcdb2514 is described below

commit e7bbcdb2514804ea9e010d170dc576183cfb6637
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Nov 29 04:50:01 2024 +0000

    KAFKA-18090: Add ShareMemberDescription and Assignment (#17975)
    
    Introduce ShareMemberDescription and ShareMemberAssignment as distinct 
classes for share groups. Although the correspondence with consumer groups is 
fairly close, the concepts are likely to diverge over time and separating these 
concepts now makes sense.
    
    Reviewers: Manikumar Reddy <[email protected]>
---
 .../kafka/clients/admin/MemberDescription.java     |  2 +-
 .../kafka/clients/admin/ShareGroupDescription.java | 10 +--
 .../kafka/clients/admin/ShareMemberAssignment.java | 69 +++++++++++++++++++
 ...escription.java => ShareMemberDescription.java} | 77 +++++-----------------
 .../internals/DescribeShareGroupsHandler.java      | 10 +--
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 23 ++++---
 .../tools/consumer/group/ShareGroupCommand.java    | 19 +++---
 .../consumer/group/ShareGroupCommandTest.java      | 15 +++--
 8 files changed, 124 insertions(+), 101 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
index 5320b7f711d..5ca7dba86f8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -21,7 +21,7 @@ import java.util.Objects;
 import java.util.Optional;
 
 /**
- * A detailed description of a single group instance in the cluster.
+ * A detailed description of a single group member in the cluster.
  */
 public class MemberDescription {
     private final String memberId;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
index 4a2292a4d45..913667bcf47 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareGroupDescription.java
@@ -35,20 +35,20 @@ import java.util.stream.Collectors;
 @InterfaceStability.Evolving
 public class ShareGroupDescription {
     private final String groupId;
-    private final Collection<MemberDescription> members;
+    private final Collection<ShareMemberDescription> members;
     private final GroupState groupState;
     private final Node coordinator;
     private final Set<AclOperation> authorizedOperations;
 
     public ShareGroupDescription(String groupId,
-                                 Collection<MemberDescription> members,
+                                 Collection<ShareMemberDescription> members,
                                  GroupState groupState,
                                  Node coordinator) {
         this(groupId, members, groupState, coordinator, 
Collections.emptySet());
     }
 
     public ShareGroupDescription(String groupId,
-                                 Collection<MemberDescription> members,
+                                 Collection<ShareMemberDescription> members,
                                  GroupState groupState,
                                  Node coordinator,
                                  Set<AclOperation> authorizedOperations) {
@@ -86,7 +86,7 @@ public class ShareGroupDescription {
     /**
      * A list of the members of the share group.
      */
-    public Collection<MemberDescription> members() {
+    public Collection<ShareMemberDescription> members() {
         return members;
     }
 
@@ -114,7 +114,7 @@ public class ShareGroupDescription {
     @Override
     public String toString() {
         return "(groupId=" + groupId +
-            ", members=" + 
members.stream().map(MemberDescription::toString).collect(Collectors.joining(","))
 +
+            ", members=" + 
members.stream().map(ShareMemberDescription::toString).collect(Collectors.joining(","))
 +
             ", groupState=" + groupState +
             ", coordinator=" + coordinator +
             ", authorizedOperations=" + authorizedOperations +
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberAssignment.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberAssignment.java
new file mode 100644
index 00000000000..de3be9d73f4
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberAssignment.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collections;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * A description of the assignments of a specific share group member.
+ */
[email protected]
+public class ShareMemberAssignment {
+    private final Set<TopicPartition> topicPartitions;
+
+    /**
+     * Creates an instance with the specified parameters.
+     *
+     * @param topicPartitions List of topic partitions
+     */
+    public ShareMemberAssignment(Set<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions == null ? 
Collections.emptySet() : Set.copyOf(topicPartitions);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ShareMemberAssignment that = (ShareMemberAssignment) o;
+
+        return Objects.equals(topicPartitions, that.topicPartitions);
+    }
+
+    @Override
+    public int hashCode() {
+        return topicPartitions != null ? topicPartitions.hashCode() : 0;
+    }
+
+    /**
+     * The topic partitions assigned to a group member.
+     */
+    public Set<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public String toString() {
+        return "(topicPartitions=" + 
topicPartitions.stream().map(TopicPartition::toString).collect(Collectors.joining(","))
 + ")";
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberDescription.java
similarity index 50%
copy from 
clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
copy to 
clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberDescription.java
index 5320b7f711d..57f2d90ae86 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ShareMemberDescription.java
@@ -16,77 +16,48 @@
  */
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.annotation.InterfaceStability;
+
 import java.util.Collections;
 import java.util.Objects;
-import java.util.Optional;
 
 /**
- * A detailed description of a single group instance in the cluster.
+ * A detailed description of a single share group member in the cluster.
  */
-public class MemberDescription {
[email protected]
+public class ShareMemberDescription {
     private final String memberId;
-    private final Optional<String> groupInstanceId;
     private final String clientId;
     private final String host;
-    private final MemberAssignment assignment;
-    private final Optional<MemberAssignment> targetAssignment;
+    private final ShareMemberAssignment assignment;
 
-    public MemberDescription(String memberId,
-        Optional<String> groupInstanceId,
+    public ShareMemberDescription(
+        String memberId,
         String clientId,
         String host,
-        MemberAssignment assignment,
-        Optional<MemberAssignment> targetAssignment
+        ShareMemberAssignment assignment
     ) {
         this.memberId = memberId == null ? "" : memberId;
-        this.groupInstanceId = groupInstanceId;
         this.clientId = clientId == null ? "" : clientId;
         this.host = host == null ? "" : host;
         this.assignment = assignment == null ?
-            new MemberAssignment(Collections.emptySet()) : assignment;
-        this.targetAssignment = targetAssignment;
-    }
-
-    public MemberDescription(
-        String memberId,
-        Optional<String> groupInstanceId,
-        String clientId,
-        String host,
-        MemberAssignment assignment
-    ) {
-        this(
-            memberId,
-            groupInstanceId,
-            clientId,
-            host,
-            assignment,
-            Optional.empty()
-        );
-    }
-
-    public MemberDescription(String memberId,
-                             String clientId,
-                             String host,
-                             MemberAssignment assignment) {
-        this(memberId, Optional.empty(), clientId, host, assignment);
+            new ShareMemberAssignment(Collections.emptySet()) : assignment;
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-        MemberDescription that = (MemberDescription) o;
+        ShareMemberDescription that = (ShareMemberDescription) o;
         return memberId.equals(that.memberId) &&
-            groupInstanceId.equals(that.groupInstanceId) &&
             clientId.equals(that.clientId) &&
             host.equals(that.host) &&
-            assignment.equals(that.assignment) &&
-            targetAssignment.equals(that.targetAssignment);
+            assignment.equals(that.assignment);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(memberId, groupInstanceId, clientId, host, 
assignment, targetAssignment);
+        return Objects.hash(memberId, clientId, host, assignment);
     }
 
     /**
@@ -96,13 +67,6 @@ public class MemberDescription {
         return memberId;
     }
 
-    /**
-     * The instance id of the group member.
-     */
-    public Optional<String> groupInstanceId() {
-        return groupInstanceId;
-    }
-
     /**
      * The client id of the group member.
      */
@@ -118,26 +82,17 @@ public class MemberDescription {
     }
 
     /**
-     * The assignment of the group member. Provided for both classic group and 
consumer group.
+     * The assignment of the group member.
      */
-    public MemberAssignment assignment() {
+    public ShareMemberAssignment assignment() {
         return assignment;
     }
 
-    /**
-     * The target assignment of the member. Provided only for consumer group.
-     */
-    public Optional<MemberAssignment> targetAssignment() {
-        return targetAssignment;
-    }
-
     @Override
     public String toString() {
         return "(memberId=" + memberId +
-            ", groupInstanceId=" + groupInstanceId.orElse("null") +
             ", clientId=" + clientId +
             ", host=" + host +
-            ", assignment=" + assignment +
-            ", targetAssignment=" + targetAssignment + ")";
+            ", assignment=" + assignment + ")";
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
index 2557c39729b..f1c9e5d45ca 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeShareGroupsHandler.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.clients.admin.internals;
 
-import org.apache.kafka.clients.admin.MemberAssignment;
-import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
+import org.apache.kafka.clients.admin.ShareMemberAssignment;
+import org.apache.kafka.clients.admin.ShareMemberDescription;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -114,15 +114,15 @@ public class DescribeShareGroupsHandler extends 
AdminApiHandler.Batched<Coordina
                 continue;
             }
 
-            final List<MemberDescription> memberDescriptions = new 
ArrayList<>(describedGroup.members().size());
+            final List<ShareMemberDescription> memberDescriptions = new 
ArrayList<>(describedGroup.members().size());
             final Set<AclOperation> authorizedOperations = 
validAclOperations(describedGroup.authorizedOperations());
 
             describedGroup.members().forEach(groupMember ->
-                memberDescriptions.add(new MemberDescription(
+                memberDescriptions.add(new ShareMemberDescription(
                     groupMember.memberId(),
                     groupMember.clientId(),
                     groupMember.clientHost(),
-                    new 
MemberAssignment(convertAssignment(groupMember.assignment()))
+                    new 
ShareMemberAssignment(convertAssignment(groupMember.assignment()))
                 ))
             );
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 782dd00d0a3..cf5fbc938e3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4974,11 +4974,11 @@ public class KafkaAdminClientTest {
             expectedTopicPartitions.add(1, new TopicPartition("my_topic", 1));
             expectedTopicPartitions.add(2, new TopicPartition("my_topic", 2));
 
-            List<MemberDescription> expectedMemberDescriptions = new 
ArrayList<>();
-            
expectedMemberDescriptions.add(convertToMemberDescriptions(memberOne,
-                new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
-            
expectedMemberDescriptions.add(convertToMemberDescriptions(memberTwo,
-                new MemberAssignment(new HashSet<>(expectedTopicPartitions))));
+            List<ShareMemberDescription> expectedMemberDescriptions = new 
ArrayList<>();
+            
expectedMemberDescriptions.add(convertToShareMemberDescriptions(memberOne,
+                new ShareMemberAssignment(new 
HashSet<>(expectedTopicPartitions))));
+            
expectedMemberDescriptions.add(convertToShareMemberDescriptions(memberTwo,
+                new ShareMemberAssignment(new 
HashSet<>(expectedTopicPartitions))));
             data.groups().add(new 
ShareGroupDescribeResponseData.DescribedGroup()
                 .setGroupId(GROUP_ID)
                 .setGroupState(GroupState.STABLE.toString())
@@ -8549,13 +8549,12 @@ public class KafkaAdminClientTest {
                                      assignment);
     }
 
-    private static MemberDescription 
convertToMemberDescriptions(ShareGroupDescribeResponseData.Member member,
-                                                                 
MemberAssignment assignment) {
-        return new MemberDescription(member.memberId(),
-                                     Optional.empty(),
-                                     member.clientId(),
-                                     member.clientHost(),
-                                     assignment);
+    private static ShareMemberDescription 
convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member,
+                                                                           
ShareMemberAssignment assignment) {
+        return new ShareMemberDescription(member.memberId(),
+                                          member.clientId(),
+                                          member.clientHost(),
+                                          assignment);
     }
 
     @Test
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index dc1a8d418a1..8e774da0988 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -23,9 +23,9 @@ import org.apache.kafka.clients.admin.GroupListing;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
+import org.apache.kafka.clients.admin.ShareMemberDescription;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.TopicPartition;
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -70,7 +69,7 @@ public class ShareGroupCommand {
     }
 
     public static void run(ShareGroupCommandOptions opts) {
-        try (ShareGroupService shareGroupService = new ShareGroupService(opts, 
Collections.emptyMap())) {
+        try (ShareGroupService shareGroupService = new ShareGroupService(opts, 
Map.of())) {
             if (opts.options.has(opts.listOpt)) {
                 shareGroupService.listGroups();
             } else if (opts.options.has(opts.describeOpt)) {
@@ -128,7 +127,7 @@ public class ShareGroupCommand {
             if (opts.options.has(opts.stateOpt)) {
                 String stateValue = opts.options.valueOf(opts.stateOpt);
                 Set<GroupState> states = (stateValue == null || 
stateValue.isEmpty())
-                    ? Collections.emptySet()
+                    ? Set.of()
                     : groupStatesFromString(stateValue);
                 List<GroupListing> listings = listShareGroupsInStates(states);
 
@@ -201,7 +200,7 @@ public class ShareGroupCommand {
         }
 
         ShareGroupDescription getDescribeGroup(String group) throws 
ExecutionException, InterruptedException {
-            DescribeShareGroupsResult result = 
adminClient.describeShareGroups(Collections.singletonList(group));
+            DescribeShareGroupsResult result = 
adminClient.describeShareGroups(List.of(group));
             Map<String, ShareGroupDescription> descriptionMap = 
result.all().get();
             if (descriptionMap.containsKey(group)) {
                 return descriptionMap.get(group);
@@ -209,9 +208,9 @@ public class ShareGroupCommand {
             return null;
         }
 
-        Map<TopicPartition, Long> getOffsets(Collection<MemberDescription> 
members) throws ExecutionException, InterruptedException {
+        Map<TopicPartition, Long> 
getOffsets(Collection<ShareMemberDescription> members) throws 
ExecutionException, InterruptedException {
             Set<TopicPartition> allTp = new HashSet<>();
-            for (MemberDescription memberDescription : members) {
+            for (ShareMemberDescription memberDescription : members) {
                 allTp.addAll(memberDescription.assignment().topicPartitions());
             }
             // fetch latest and earliest offsets
@@ -269,9 +268,9 @@ public class ShareGroupCommand {
         private void printMembers(ShareGroupDescription description) {
             int groupLen = Math.max(15, description.groupId().length());
             int maxConsumerIdLen = 15, maxHostLen = 15, maxClientIdLen = 15;
-            Collection<MemberDescription> members = description.members();
+            Collection<ShareMemberDescription> members = description.members();
             if (maybePrintEmptyGroupState(description.groupId(), 
description.groupState(), description.members().size())) {
-                for (MemberDescription member : members) {
+                for (ShareMemberDescription member : members) {
                     maxConsumerIdLen = Math.max(maxConsumerIdLen, 
member.consumerId().length());
                     maxHostLen = Math.max(maxHostLen, member.host().length());
                     maxClientIdLen = Math.max(maxClientIdLen, 
member.clientId().length());
@@ -279,7 +278,7 @@ public class ShareGroupCommand {
 
                 String fmt = "%" + -groupLen + "s %" + -maxConsumerIdLen + "s 
%" + -maxHostLen + "s %" + -maxClientIdLen + "s %s\n";
                 System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", 
"CLIENT-ID", "ASSIGNMENT");
-                for (MemberDescription member : members) {
+                for (ShareMemberDescription member : members) {
                     System.out.printf(fmt, description.groupId(), 
member.consumerId(), member.host(), member.clientId(),
                         
member.assignment().topicPartitions().stream().map(part -> part.topic() + ":" + 
part.partition()).collect(Collectors.joining(",")));
                 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 1e293368572..ce132effe09 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -23,10 +23,10 @@ import org.apache.kafka.clients.admin.KafkaAdminClient;
 import org.apache.kafka.clients.admin.ListGroupsOptions;
 import org.apache.kafka.clients.admin.ListGroupsResult;
 import org.apache.kafka.clients.admin.ListOffsetsResult;
-import org.apache.kafka.clients.admin.MemberAssignment;
-import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.clients.admin.ShareGroupDescription;
+import org.apache.kafka.clients.admin.ShareMemberAssignment;
+import org.apache.kafka.clients.admin.ShareMemberDescription;
 import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
@@ -42,6 +42,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -92,8 +93,8 @@ public class ShareGroupCommandTest {
         Map<String, ShareGroupDescription> resultMap = new HashMap<>();
         ShareGroupDescription exp = new ShareGroupDescription(
                 firstGroup,
-                Collections.singletonList(new MemberDescription("memid1", 
"clId1", "host1", new MemberAssignment(
-                        Collections.singleton(new TopicPartition("topic1", 0))
+                List.of(new ShareMemberDescription("memid1", "clId1", "host1", 
new ShareMemberAssignment(
+                        Set.of(new TopicPartition("topic1", 0))
                 ))),
                 GroupState.STABLE,
                 new Node(0, "host1", 9090));
@@ -123,10 +124,10 @@ public class ShareGroupCommandTest {
 
         
when(adminClient.listOffsets(ArgumentMatchers.anyMap())).thenReturn(startOffset,
 endOffset);
 
-        MemberDescription description = new MemberDescription("", "", "",
-                new MemberAssignment(Collections.singleton(new 
TopicPartition("topic1", 0))));
+        ShareMemberDescription description = new ShareMemberDescription("", 
"", "",
+                new ShareMemberAssignment(Set.of(new TopicPartition("topic1", 
0))));
         ShareGroupService service = new ShareGroupService(null, adminClient);
-        Map<TopicPartition, Long> lags = 
service.getOffsets(Collections.singletonList(description));
+        Map<TopicPartition, Long> lags = 
service.getOffsets(List.of(description));
         assertEquals(1, lags.size());
         assertEquals(20, lags.get(new TopicPartition("topic1", 0)));
         service.close();

Reply via email to