This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8380ae7795 KAFKA-17750: Extend kafka-consumer-groups command line 
tool to support new consumer group (part 2) (#18034)
c8380ae7795 is described below

commit c8380ae77950bbd2161cf14cbdeb007562655f2f
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Dec 10 21:02:20 2024 +0800

    KAFKA-17750: Extend kafka-consumer-groups command line tool to support new 
consumer group (part 2) (#18034)
    
    * Add fields `groupEpoch` and `targetAssignmentEpoch` to 
`ConsumerGroupDescription.java`.
    * Add fields `memberEpoch` and `upgraded` to `MemberDescription.java`.
    * Add assertion to 
`PlaintextAdminIntegrationTest#testDescribeClassicGroups` to make sure member 
in classic group returns `upgraded` as `Optional.empty`.
    * Add new case `testConsumerGroupWithMemberMigration` to 
`PlaintextAdminIntegrationTest` to make sure migration member has correct 
`upgraded` value. Add assertion for `groupEpoch`, `targetAssignmentEpoch`, 
`memberEpoch` as well.
    
    Reviewers: David Jacot <[email protected]>
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 .../clients/admin/ConsumerGroupDescription.java    | 63 ++++++++------
 .../kafka/clients/admin/MemberDescription.java     | 75 ++++++++++++++--
 .../internals/DescribeClassicGroupsHandler.java    |  5 +-
 .../internals/DescribeConsumerGroupsHandler.java   | 17 +++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 23 +++--
 .../kafka/clients/admin/MemberDescriptionTest.java | 33 ++++++++
 .../DescribeConsumerGroupsHandlerTest.java         | 90 ++++++++++++++------
 .../kafka/api/PlaintextAdminIntegrationTest.scala  | 99 +++++++++++++++++++++-
 .../consumer/group/ConsumerGroupServiceTest.java   | 13 ++-
 9 files changed, 347 insertions(+), 71 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 4cbc5b4b43b..dd1b4b4cb5c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -27,6 +27,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -42,9 +43,11 @@ public class ConsumerGroupDescription {
     private final GroupState groupState;
     private final Node coordinator;
     private final Set<AclOperation> authorizedOperations;
+    private final Optional<Integer> groupEpoch;
+    private final Optional<Integer> targetAssignmentEpoch;
 
     /**
-     * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, 
boolean, Collection, String, GroupState, Node)}.
+     * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, 
boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, 
Optional)}.
      */
     @Deprecated
     public ConsumerGroupDescription(String groupId,
@@ -57,7 +60,7 @@ public class ConsumerGroupDescription {
     }
 
     /**
-     * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, 
boolean, Collection, String, GroupState, Node, Set)}.
+     * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, 
boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, 
Optional)}.
      */
     @Deprecated
     public ConsumerGroupDescription(String groupId,
@@ -71,7 +74,7 @@ public class ConsumerGroupDescription {
     }
 
     /**
-     * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, 
boolean, Collection, String, GroupType, GroupState, Node, Set)}.
+     * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String, 
boolean, Collection, String, GroupType, GroupState, Node, Set, Optional, 
Optional)}.
      */
     @Deprecated
     public ConsumerGroupDescription(String groupId,
@@ -90,25 +93,8 @@ public class ConsumerGroupDescription {
         this.groupState = GroupState.parse(state.name());
         this.coordinator = coordinator;
         this.authorizedOperations = authorizedOperations;
-    }
-
-    public ConsumerGroupDescription(String groupId,
-                                    boolean isSimpleConsumerGroup,
-                                    Collection<MemberDescription> members,
-                                    String partitionAssignor,
-                                    GroupState groupState,
-                                    Node coordinator) {
-        this(groupId, isSimpleConsumerGroup, members, partitionAssignor, 
groupState, coordinator, Collections.emptySet());
-    }
-
-    public ConsumerGroupDescription(String groupId,
-                                    boolean isSimpleConsumerGroup,
-                                    Collection<MemberDescription> members,
-                                    String partitionAssignor,
-                                    GroupState groupState,
-                                    Node coordinator,
-                                    Set<AclOperation> authorizedOperations) {
-        this(groupId, isSimpleConsumerGroup, members, partitionAssignor, 
GroupType.CLASSIC, groupState, coordinator, authorizedOperations);
+        this.groupEpoch = Optional.empty();
+        this.targetAssignmentEpoch = Optional.empty();
     }
 
     public ConsumerGroupDescription(String groupId,
@@ -118,7 +104,9 @@ public class ConsumerGroupDescription {
                                     GroupType type,
                                     GroupState groupState,
                                     Node coordinator,
-                                    Set<AclOperation> authorizedOperations) {
+                                    Set<AclOperation> authorizedOperations,
+                                    Optional<Integer> groupEpoch,
+                                    Optional<Integer> targetAssignmentEpoch) {
         this.groupId = groupId == null ? "" : groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
         this.members = members == null ? Collections.emptyList() : 
List.copyOf(members);
@@ -127,6 +115,8 @@ public class ConsumerGroupDescription {
         this.groupState = groupState;
         this.coordinator = coordinator;
         this.authorizedOperations = authorizedOperations;
+        this.groupEpoch = groupEpoch;
+        this.targetAssignmentEpoch = targetAssignmentEpoch;
     }
 
     @Override
@@ -141,12 +131,15 @@ public class ConsumerGroupDescription {
             type == that.type &&
             groupState == that.groupState &&
             Objects.equals(coordinator, that.coordinator) &&
-            Objects.equals(authorizedOperations, that.authorizedOperations);
+            Objects.equals(authorizedOperations, that.authorizedOperations) &&
+            Objects.equals(groupEpoch, that.groupEpoch) &&
+            Objects.equals(targetAssignmentEpoch, that.targetAssignmentEpoch);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(groupId, isSimpleConsumerGroup, members, 
partitionAssignor, type, groupState, coordinator, authorizedOperations);
+        return Objects.hash(groupId, isSimpleConsumerGroup, members, 
partitionAssignor, type, groupState, coordinator,
+            authorizedOperations, groupEpoch, targetAssignmentEpoch);
     }
 
     /**
@@ -215,6 +208,24 @@ public class ConsumerGroupDescription {
         return authorizedOperations;
     }
 
+    /**
+     * The epoch of the consumer group.
+     * The optional is set to an integer if it is a {@link GroupType#CONSUMER} 
group, and to empty if it
+     * is a {@link GroupType#CLASSIC} group.
+     */
+    public Optional<Integer> groupEpoch() {
+        return groupEpoch;
+    }
+
+    /**
+     * The epoch of the target assignment.
+     * The optional is set to an integer if it is a {@link GroupType#CONSUMER} 
group, and to empty if it
+     * is a {@link GroupType#CLASSIC} group.
+     */
+    public Optional<Integer> targetAssignmentEpoch() {
+        return targetAssignmentEpoch;
+    }
+
     @Override
     public String toString() {
         return "(groupId=" + groupId +
@@ -225,6 +236,8 @@ public class ConsumerGroupDescription {
             ", groupState=" + groupState +
             ", coordinator=" + coordinator +
             ", authorizedOperations=" + authorizedOperations +
+            ", groupEpoch=" + groupEpoch.orElse(null) +
+            ", targetAssignmentEpoch=" + targetAssignmentEpoch.orElse(null) +
             ")";
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
index 5ca7dba86f8..0785f2e6715 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.GroupType;
+
 import java.util.Collections;
 import java.util.Objects;
 import java.util.Optional;
@@ -30,13 +32,18 @@ public class MemberDescription {
     private final String host;
     private final MemberAssignment assignment;
     private final Optional<MemberAssignment> targetAssignment;
+    private final Optional<Integer> memberEpoch;
+    private final Optional<Boolean> upgraded;
 
-    public MemberDescription(String memberId,
+    public MemberDescription(
+        String memberId,
         Optional<String> groupInstanceId,
         String clientId,
         String host,
         MemberAssignment assignment,
-        Optional<MemberAssignment> targetAssignment
+        Optional<MemberAssignment> targetAssignment,
+        Optional<Integer> memberEpoch,
+        Optional<Boolean> upgraded
     ) {
         this.memberId = memberId == null ? "" : memberId;
         this.groupInstanceId = groupInstanceId;
@@ -45,8 +52,38 @@ public class MemberDescription {
         this.assignment = assignment == null ?
             new MemberAssignment(Collections.emptySet()) : assignment;
         this.targetAssignment = targetAssignment;
+        this.memberEpoch = memberEpoch;
+        this.upgraded = upgraded;
     }
 
+    /**
+     * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, 
String, String, MemberAssignment, Optional, Optional, Optional)}.
+     */
+    @Deprecated
+    public MemberDescription(
+        String memberId,
+        Optional<String> groupInstanceId,
+        String clientId,
+        String host,
+        MemberAssignment assignment,
+        Optional<MemberAssignment> targetAssignment
+    ) {
+        this(
+            memberId,
+            groupInstanceId,
+            clientId,
+            host,
+            assignment,
+            targetAssignment,
+            Optional.empty(),
+            Optional.empty()
+        );
+    }
+
+    /**
+     * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, 
String, String, MemberAssignment, Optional, Optional, Optional)}.
+     */
+    @Deprecated
     public MemberDescription(
         String memberId,
         Optional<String> groupInstanceId,
@@ -64,6 +101,10 @@ public class MemberDescription {
         );
     }
 
+    /**
+     * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional, 
String, String, MemberAssignment, Optional, Optional, Optional)}.
+     */
+    @Deprecated
     public MemberDescription(String memberId,
                              String clientId,
                              String host,
@@ -81,12 +122,14 @@ public class MemberDescription {
             clientId.equals(that.clientId) &&
             host.equals(that.host) &&
             assignment.equals(that.assignment) &&
-            targetAssignment.equals(that.targetAssignment);
+            targetAssignment.equals(that.targetAssignment) &&
+            memberEpoch.equals(that.memberEpoch) &&
+            upgraded.equals(that.upgraded);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(memberId, groupInstanceId, clientId, host, 
assignment, targetAssignment);
+        return Objects.hash(memberId, groupInstanceId, clientId, host, 
assignment, targetAssignment, memberEpoch, upgraded);
     }
 
     /**
@@ -131,6 +174,25 @@ public class MemberDescription {
         return targetAssignment;
     }
 
+    /**
+     * The epoch of the group member.
+     * The optional is set to an integer if the member is in a {@link 
GroupType#CONSUMER} group, and to empty if it
+     * is in a {@link GroupType#CLASSIC} group.
+     */
+    public Optional<Integer> memberEpoch() {
+        return memberEpoch;
+    }
+
+    /**
+     * The flag indicating whether a member within a {@link 
GroupType#CONSUMER} group uses the
+     * {@link GroupType#CONSUMER} protocol.
+     * The optional is set to true if it does, to false if it does not, and to 
empty if it is unknown or if the group
+     * is a {@link GroupType#CLASSIC} group.
+     */
+    public Optional<Boolean> upgraded() {
+        return upgraded;
+    }
+
     @Override
     public String toString() {
         return "(memberId=" + memberId +
@@ -138,6 +200,9 @@ public class MemberDescription {
             ", clientId=" + clientId +
             ", host=" + host +
             ", assignment=" + assignment +
-            ", targetAssignment=" + targetAssignment + ")";
+            ", targetAssignment=" + targetAssignment +
+            ", memberEpoch=" + memberEpoch.orElse(null) +
+            ", upgraded=" + upgraded.orElse(null) +
+            ")";
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
index 77c04c5d5f0..686ee43a44b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
@@ -136,7 +136,10 @@ public class DescribeClassicGroupsHandler extends 
AdminApiHandler.Batched<Coordi
                     Optional.ofNullable(groupMember.groupInstanceId()),
                     groupMember.clientId(),
                     groupMember.clientHost(),
-                    new MemberAssignment(partitions)));
+                    new MemberAssignment(partitions),
+                    Optional.empty(),
+                    Optional.empty(),
+                    Optional.empty()));
             });
 
             final ClassicGroupDescription classicGroupDescription =
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
index 1d911e2f0c7..457675e9267 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
@@ -222,7 +222,9 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
                     groupMember.clientId(),
                     groupMember.clientHost(),
                     new 
MemberAssignment(convertAssignment(groupMember.assignment())),
-                    Optional.of(new 
MemberAssignment(convertAssignment(groupMember.targetAssignment())))
+                    Optional.of(new 
MemberAssignment(convertAssignment(groupMember.targetAssignment()))),
+                    Optional.of(groupMember.memberEpoch()),
+                    groupMember.memberType() == -1 ? Optional.empty() : 
Optional.of(groupMember.memberType() == 1)
                 ))
             );
 
@@ -235,7 +237,9 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
                     GroupType.CONSUMER,
                     GroupState.parse(describedGroup.groupState()),
                     coordinator,
-                    authorizedOperations
+                    authorizedOperations,
+                    Optional.of(describedGroup.groupEpoch()),
+                    Optional.of(describedGroup.assignmentEpoch())
                 );
             completed.put(groupIdKey, consumerGroupDescription);
         }
@@ -281,7 +285,10 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
                         Optional.ofNullable(groupMember.groupInstanceId()),
                         groupMember.clientId(),
                         groupMember.clientHost(),
-                        new MemberAssignment(partitions)));
+                        new MemberAssignment(partitions),
+                        Optional.empty(),
+                        Optional.empty(),
+                        Optional.empty()));
                 }
                 final ConsumerGroupDescription consumerGroupDescription =
                     new ConsumerGroupDescription(groupIdKey.idValue, 
protocolType.isEmpty(),
@@ -290,7 +297,9 @@ public class DescribeConsumerGroupsHandler implements 
AdminApiHandler<Coordinato
                         GroupType.CLASSIC,
                         GroupState.parse(describedGroup.groupState()),
                         coordinator,
-                        authorizedOperations);
+                        authorizedOperations,
+                        Optional.empty(),
+                        Optional.empty());
                 completed.put(groupIdKey, consumerGroupDescription);
             } else {
                 failed.put(groupIdKey, new IllegalArgumentException(
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index b0b48e33c67..44f6e1f5a88 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4057,6 +4057,7 @@ public class KafkaAdminClientTest {
                                                 .setTopicName("foo")
                                                 
.setPartitions(singletonList(1))
                                         )))
+                                    .setMemberType((byte) 1)
                             )),
                         new ConsumerGroupDescribeResponseData.DescribedGroup()
                             .setGroupId("grp2")
@@ -4110,14 +4111,18 @@ public class KafkaAdminClientTest {
                         ),
                         Optional.of(new MemberAssignment(
                             Collections.singleton(new TopicPartition("foo", 1))
-                        ))
+                        )),
+                        Optional.of(10),
+                        Optional.of(true)
                     )
                 ),
                 "range",
                 GroupType.CONSUMER,
                 GroupState.STABLE,
                 env.cluster().controller(),
-                Collections.emptySet()
+                Collections.emptySet(),
+                Optional.of(10),
+                Optional.of(10)
             ));
             expectedResult.put("grp2", new ConsumerGroupDescription(
                 "grp2",
@@ -4130,14 +4135,19 @@ public class KafkaAdminClientTest {
                         "clientHost",
                         new MemberAssignment(
                             Collections.singleton(new TopicPartition("bar", 0))
-                        )
+                        ),
+                        Optional.empty(),
+                        Optional.empty(),
+                        Optional.empty()
                     )
                 ),
                 "range",
                 GroupType.CLASSIC,
                 GroupState.STABLE,
                 env.cluster().controller(),
-                Collections.emptySet()
+                Collections.emptySet(),
+                Optional.empty(),
+                Optional.empty()
             ));
 
             assertEquals(expectedResult, result.all().get());
@@ -8674,7 +8684,10 @@ public class KafkaAdminClientTest {
                                      
Optional.ofNullable(member.groupInstanceId()),
                                      member.clientId(),
                                      member.clientHost(),
-                                     assignment);
+                                     assignment,
+                                     Optional.empty(),
+                                     Optional.empty(),
+                                     Optional.empty());
     }
 
     private static ShareMemberDescription 
convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member,
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
index 0bddc618cfc..16ce11d7361 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
@@ -99,5 +99,38 @@ public class MemberDescriptionTest {
 
         assertNotEquals(STATIC_MEMBER_DESCRIPTION, newInstanceDescription);
         assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), 
newInstanceDescription.hashCode());
+
+        MemberDescription newTargetAssignmentDescription = new 
MemberDescription(MEMBER_ID,
+                                                                               
  INSTANCE_ID,
+                                                                               
  CLIENT_ID,
+                                                                               
  HOST,
+                                                                               
  ASSIGNMENT,
+                                                                               
  Optional.of(ASSIGNMENT),
+                                                                               
  Optional.empty(),
+                                                                               
  Optional.empty());
+        assertNotEquals(STATIC_MEMBER_DESCRIPTION, 
newTargetAssignmentDescription);
+        assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), 
newTargetAssignmentDescription.hashCode());
+
+        MemberDescription newMemberEpochDescription = new 
MemberDescription(MEMBER_ID,
+                                                                            
INSTANCE_ID,
+                                                                            
CLIENT_ID,
+                                                                            
HOST,
+                                                                            
ASSIGNMENT,
+                                                                            
Optional.empty(),
+                                                                            
Optional.of(1),
+                                                                            
Optional.empty());
+        assertNotEquals(STATIC_MEMBER_DESCRIPTION, newMemberEpochDescription);
+        assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), 
newMemberEpochDescription.hashCode());
+
+        MemberDescription newIsClassicDescription = new 
MemberDescription(MEMBER_ID,
+                                                                          
INSTANCE_ID,
+                                                                          
CLIENT_ID,
+                                                                          HOST,
+                                                                          
ASSIGNMENT,
+                                                                          
Optional.empty(),
+                                                                          
Optional.empty(),
+                                                                          
Optional.of(false));
+        assertNotEquals(STATIC_MEMBER_DESCRIPTION, newIsClassicDescription);
+        assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(), 
newIsClassicDescription.hashCode());
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
index cfbf67e2090..20cf0b761e6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.MemberDescription;
 import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupState;
 import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -54,6 +55,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
@@ -152,29 +154,46 @@ public class DescribeConsumerGroupsHandlerTest {
     @Test
     public void testSuccessfulHandleConsumerGroupResponse() {
         DescribeConsumerGroupsHandler handler = new 
DescribeConsumerGroupsHandler(false, logContext);
-        Collection<MemberDescription> members = singletonList(new 
MemberDescription(
-            "memberId",
-            Optional.of("instanceId"),
-            "clientId",
-            "host",
-            new MemberAssignment(Set.of(
-                new TopicPartition("foo", 0),
-                new TopicPartition("bar",  1))
+        Collection<MemberDescription> members = List.of(
+            new MemberDescription(
+                "memberId",
+                Optional.of("instanceId"),
+                "clientId",
+                "host",
+                new MemberAssignment(Set.of(
+                    new TopicPartition("foo", 0)
+                )),
+                Optional.of(new MemberAssignment(Set.of(
+                    new TopicPartition("foo", 1)
+                ))),
+                Optional.of(10),
+                Optional.of(true)
             ),
-            Optional.of(new MemberAssignment(Set.of(
-                new TopicPartition("foo", 1),
-                new TopicPartition("bar",  2)
-            )))
-        ));
+            new MemberDescription(
+                "memberId-classic",
+                Optional.of("instanceId-classic"),
+                "clientId-classic",
+                "host",
+                new MemberAssignment(Set.of(
+                    new TopicPartition("bar", 0)
+                )),
+                Optional.of(new MemberAssignment(Set.of(
+                    new TopicPartition("bar", 1)
+                ))),
+                Optional.of(9),
+                Optional.of(false)
+            ));
         ConsumerGroupDescription expected = new ConsumerGroupDescription(
             groupId1,
             false,
             members,
             "range",
             GroupType.CONSUMER,
-            ConsumerGroupState.STABLE,
+            GroupState.STABLE,
             coordinator,
-            Collections.emptySet()
+            Collections.emptySet(),
+            Optional.of(10),
+            Optional.of(10)
         );
         AdminApiHandler.ApiResult<CoordinatorKey, ConsumerGroupDescription> 
result = handler.handleResponse(
             coordinator,
@@ -189,7 +208,7 @@ public class DescribeConsumerGroupsHandlerTest {
                             .setAssignmentEpoch(10)
                             .setAssignorName("range")
                             
.setAuthorizedOperations(Utils.to32BitField(emptySet()))
-                            .setMembers(singletonList(
+                            .setMembers(List.of(
                                 new ConsumerGroupDescribeResponseData.Member()
                                     .setMemberId("memberId")
                                     .setInstanceId("instanceId")
@@ -200,27 +219,44 @@ public class DescribeConsumerGroupsHandlerTest {
                                     
.setSubscribedTopicNames(singletonList("foo"))
                                     .setSubscribedTopicRegex("regex")
                                     .setAssignment(new 
ConsumerGroupDescribeResponseData.Assignment()
-                                        .setTopicPartitions(Arrays.asList(
+                                        .setTopicPartitions(List.of(
                                             new 
ConsumerGroupDescribeResponseData.TopicPartitions()
                                                 .setTopicId(Uuid.randomUuid())
                                                 .setTopicName("foo")
-                                                
.setPartitions(Collections.singletonList(0)),
+                                                
.setPartitions(Collections.singletonList(0))
+                                        )))
+                                    .setTargetAssignment(new 
ConsumerGroupDescribeResponseData.Assignment()
+                                        .setTopicPartitions(List.of(
                                             new 
ConsumerGroupDescribeResponseData.TopicPartitions()
                                                 .setTopicId(Uuid.randomUuid())
-                                                .setTopicName("bar")
+                                                .setTopicName("foo")
                                                 
.setPartitions(Collections.singletonList(1))
                                         )))
-                                    .setTargetAssignment(new 
ConsumerGroupDescribeResponseData.Assignment()
-                                        .setTopicPartitions(Arrays.asList(
+                                    .setMemberType((byte) 1),
+                                new ConsumerGroupDescribeResponseData.Member()
+                                    .setMemberId("memberId-classic")
+                                    .setInstanceId("instanceId-classic")
+                                    .setClientHost("host")
+                                    .setClientId("clientId-classic")
+                                    .setMemberEpoch(9)
+                                    .setRackId("rackid")
+                                    
.setSubscribedTopicNames(singletonList("bar"))
+                                    .setSubscribedTopicRegex("regex")
+                                    .setAssignment(new 
ConsumerGroupDescribeResponseData.Assignment()
+                                        .setTopicPartitions(List.of(
                                             new 
ConsumerGroupDescribeResponseData.TopicPartitions()
                                                 .setTopicId(Uuid.randomUuid())
-                                                .setTopicName("foo")
-                                                
.setPartitions(Collections.singletonList(1)),
+                                                .setTopicName("bar")
+                                                
.setPartitions(Collections.singletonList(0))
+                                        )))
+                                    .setTargetAssignment(new 
ConsumerGroupDescribeResponseData.Assignment()
+                                        .setTopicPartitions(List.of(
                                             new 
ConsumerGroupDescribeResponseData.TopicPartitions()
                                                 .setTopicId(Uuid.randomUuid())
                                                 .setTopicName("bar")
-                                                
.setPartitions(Collections.singletonList(2))
+                                                
.setPartitions(Collections.singletonList(1))
                                         )))
+                                    .setMemberType((byte) 0)
                             ))
                     ))
             )
@@ -232,9 +268,13 @@ public class DescribeConsumerGroupsHandlerTest {
     public void testSuccessfulHandleClassicGroupResponse() {
         Collection<MemberDescription> members = singletonList(new 
MemberDescription(
                 "memberId",
+                Optional.empty(),
                 "clientId",
                 "host",
-                new MemberAssignment(tps)));
+                new MemberAssignment(tps),
+                Optional.empty(),
+                Optional.empty(),
+                Optional.empty()));
         ConsumerGroupDescription expected = new ConsumerGroupDescription(
                 groupId1,
                 true,
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 64d9cc94c2d..bd381f0306e 100644
--- 
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1921,12 +1921,17 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
           // Test that we can get information about the test consumer group.
           
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
           var testGroupDescription = 
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+          assertEquals(groupType == GroupType.CLASSIC, 
testGroupDescription.groupEpoch.isEmpty)
+          assertEquals(groupType == GroupType.CLASSIC, 
testGroupDescription.targetAssignmentEpoch.isEmpty)
 
           assertEquals(testGroupId, testGroupDescription.groupId())
           assertFalse(testGroupDescription.isSimpleConsumerGroup)
           assertEquals(groupInstanceSet.size, 
testGroupDescription.members().size())
           val members = testGroupDescription.members()
-          members.asScala.foreach(member => assertEquals(testClientId, 
member.clientId()))
+          members.asScala.foreach { member =>
+            assertEquals(testClientId, member.clientId)
+            assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty 
else Optional.of(true), member.upgraded)
+          }
           val topicPartitionsByTopic = 
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
           topicSet.foreach { topic =>
             val topicPartitions = topicPartitionsByTopic.getOrElse(topic, 
List.empty)
@@ -2058,6 +2063,89 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
     }
   }
 
+  /**
+   * Test the consumer group APIs.
+   */
+  @ParameterizedTest
+  @ValueSource(strings = Array("kraft"))
+  def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+    val config = createConfig
+    client = Admin.create(config)
+    var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+    var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+    try {
+      // Verify that initially there are no consumer groups to list.
+      val list1 = client.listConsumerGroups
+      assertEquals(0, list1.all.get.size)
+      assertEquals(0, list1.errors.get.size)
+      assertEquals(0, list1.valid.get.size)
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+
+      client.createTopics(util.Arrays.asList(
+        new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+      )).all.get
+      waitForTopics(client, List(testTopicName), List())
+
+      val producer = createProducer()
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null))
+        producer.send(new ProducerRecord(testTopicName, 1, null, null))
+        producer.flush()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+
+      val testGroupId = "test_group_id"
+      val testClassicClientId = "test_classic_client_id"
+      val testConsumerClientId = "test_consumer_client_id"
+
+      val newConsumerConfig = new Properties(consumerConfig)
+      newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+      newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, 
testClassicClientId)
+      consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name)
+
+      classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+      classicConsumer.subscribe(List(testTopicName).asJava)
+      classicConsumer.poll(JDuration.ofMillis(1000))
+
+      newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG, 
testConsumerClientId)
+      consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name)
+      consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+      consumerConsumer.subscribe(List(testTopicName).asJava)
+      consumerConsumer.poll(JDuration.ofMillis(1000))
+
+      TestUtils.waitUntilTrue(() => {
+        classicConsumer.poll(JDuration.ofMillis(100))
+        consumerConsumer.poll(JDuration.ofMillis(100))
+        val describeConsumerGroupResult = 
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+        describeConsumerGroupResult.containsKey(testGroupId) &&
+          describeConsumerGroupResult.get(testGroupId).groupState == 
GroupState.STABLE &&
+          describeConsumerGroupResult.get(testGroupId).members.size == 2
+      }, s"Expected to find 2 members in a stable group $testGroupId")
+
+      val describeConsumerGroupResult = 
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+      val group = describeConsumerGroupResult.get(testGroupId)
+      assertNotNull(group)
+      assertEquals(Optional.of(2), group.groupEpoch)
+      assertEquals(Optional.of(2), group.targetAssignmentEpoch)
+
+      val classicMember = group.members.asScala.find(_.clientId == 
testClassicClientId)
+      assertTrue(classicMember.isDefined)
+      assertEquals(Optional.of(2), classicMember.get.memberEpoch)
+      assertEquals(Optional.of(false), classicMember.get.upgraded)
+
+      val consumerMember = group.members.asScala.find(_.clientId == 
testConsumerClientId)
+      assertTrue(consumerMember.isDefined)
+      assertEquals(Optional.of(2), consumerMember.get.memberEpoch)
+      assertEquals(Optional.of(true), consumerMember.get.upgraded)
+    } finally {
+      Utils.closeQuietly(classicConsumer, "classicConsumer")
+      Utils.closeQuietly(consumerConsumer, "consumerConsumer")
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
+
   /**
    * Test the consumer group APIs.
    */
@@ -2546,9 +2634,12 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
       }, "Expected to find all groups")
 
       val classicConsumers = 
client.describeClassicGroups(groupIds.asJavaCollection).all().get()
-      assertNotNull(classicConsumers.get(classicGroupId))
-      assertEquals(classicGroupId, 
classicConsumers.get(classicGroupId).groupId())
-      assertEquals("consumer", classicConsumers.get(classicGroupId).protocol())
+      val classicConsumer = classicConsumers.get(classicGroupId)
+      assertNotNull(classicConsumer)
+      assertEquals(classicGroupId, classicConsumer.groupId)
+      assertEquals("consumer", classicConsumer.protocol)
+      assertFalse(classicConsumer.members.isEmpty)
+      classicConsumer.members.forEach(member => 
assertTrue(member.upgraded.isEmpty))
 
       assertNotNull(classicConsumers.get(simpleGroupId))
       assertEquals(simpleGroupId, 
classicConsumers.get(simpleGroupId).groupId())
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index dcb8fd05481..7a134ac0c96 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -139,8 +140,12 @@ public class ConsumerGroupServiceTest {
                 true,
                 Collections.singleton(new MemberDescription("member1", 
Optional.of("instance1"), "client1", "host1", new 
MemberAssignment(assignedTopicPartitions))),
                 RangeAssignor.class.getName(),
+                GroupType.CLASSIC,
                 GroupState.STABLE,
-                new Node(1, "localhost", 9092));
+                new Node(1, "localhost", 9092),
+                Set.of(),
+                Optional.empty(),
+                Optional.empty());
 
         Function<Collection<TopicPartition>, 
ArgumentMatcher<Map<TopicPartition, OffsetSpec>>> offsetsArgMatcher = 
expectedPartitions ->
                 topicPartitionOffsets -> topicPartitionOffsets != null && 
topicPartitionOffsets.keySet().equals(expectedPartitions);
@@ -233,8 +238,12 @@ public class ConsumerGroupServiceTest {
                 true,
                 Collections.singleton(member1),
                 RangeAssignor.class.getName(),
+                GroupType.CLASSIC,
                 groupState,
-                new Node(1, "localhost", 9092));
+                new Node(1, "localhost", 9092),
+                Set.of(),
+                Optional.empty(),
+                Optional.empty());
         KafkaFutureImpl<ConsumerGroupDescription> future = new 
KafkaFutureImpl<>();
         future.complete(description);
         return new 
DescribeConsumerGroupsResult(Collections.singletonMap(GROUP, future));


Reply via email to