This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c8380ae7795 KAFKA-17750: Extend kafka-consumer-groups command line
tool to support new consumer group (part 2) (#18034)
c8380ae7795 is described below
commit c8380ae77950bbd2161cf14cbdeb007562655f2f
Author: PoAn Yang <[email protected]>
AuthorDate: Tue Dec 10 21:02:20 2024 +0800
KAFKA-17750: Extend kafka-consumer-groups command line tool to support new
consumer group (part 2) (#18034)
* Add fields `groupEpoch` and `targetAssignmentEpoch` to
`ConsumerGroupDescription.java`.
* Add fields `memberEpoch` and `upgraded` to `MemberDescription.java`.
* Add assertion to
`PlaintextAdminIntegrationTest#testDescribeClassicGroups` to make sure member
in classic group returns `upgraded` as `Optional.empty`.
* Add new case `testConsumerGroupWithMemberMigration` to
`PlaintextAdminIntegrationTest` to make sure migration member has correct
`upgraded` value. Add assertion for `groupEpoch`, `targetAssignmentEpoch`,
`memberEpoch` as well.
Reviewers: David Jacot <[email protected]>
Signed-off-by: PoAn Yang <[email protected]>
---
.../clients/admin/ConsumerGroupDescription.java | 63 ++++++++------
.../kafka/clients/admin/MemberDescription.java | 75 ++++++++++++++--
.../internals/DescribeClassicGroupsHandler.java | 5 +-
.../internals/DescribeConsumerGroupsHandler.java | 17 +++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 23 +++--
.../kafka/clients/admin/MemberDescriptionTest.java | 33 ++++++++
.../DescribeConsumerGroupsHandlerTest.java | 90 ++++++++++++++------
.../kafka/api/PlaintextAdminIntegrationTest.scala | 99 +++++++++++++++++++++-
.../consumer/group/ConsumerGroupServiceTest.java | 13 ++-
9 files changed, 347 insertions(+), 71 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 4cbc5b4b43b..dd1b4b4cb5c 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -42,9 +43,11 @@ public class ConsumerGroupDescription {
private final GroupState groupState;
private final Node coordinator;
private final Set<AclOperation> authorizedOperations;
+ private final Optional<Integer> groupEpoch;
+ private final Optional<Integer> targetAssignmentEpoch;
/**
- * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String,
boolean, Collection, String, GroupState, Node)}.
+ * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String,
boolean, Collection, String, GroupType, GroupState, Node, Set, Optional,
Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
@@ -57,7 +60,7 @@ public class ConsumerGroupDescription {
}
/**
- * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String,
boolean, Collection, String, GroupState, Node, Set)}.
+ * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String,
boolean, Collection, String, GroupType, GroupState, Node, Set, Optional,
Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
@@ -71,7 +74,7 @@ public class ConsumerGroupDescription {
}
/**
- * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String,
boolean, Collection, String, GroupType, GroupState, Node, Set)}.
+ * @deprecated Since 4.0. Use {@link #ConsumerGroupDescription(String,
boolean, Collection, String, GroupType, GroupState, Node, Set, Optional,
Optional)}.
*/
@Deprecated
public ConsumerGroupDescription(String groupId,
@@ -90,25 +93,8 @@ public class ConsumerGroupDescription {
this.groupState = GroupState.parse(state.name());
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
- }
-
- public ConsumerGroupDescription(String groupId,
- boolean isSimpleConsumerGroup,
- Collection<MemberDescription> members,
- String partitionAssignor,
- GroupState groupState,
- Node coordinator) {
- this(groupId, isSimpleConsumerGroup, members, partitionAssignor,
groupState, coordinator, Collections.emptySet());
- }
-
- public ConsumerGroupDescription(String groupId,
- boolean isSimpleConsumerGroup,
- Collection<MemberDescription> members,
- String partitionAssignor,
- GroupState groupState,
- Node coordinator,
- Set<AclOperation> authorizedOperations) {
- this(groupId, isSimpleConsumerGroup, members, partitionAssignor,
GroupType.CLASSIC, groupState, coordinator, authorizedOperations);
+ this.groupEpoch = Optional.empty();
+ this.targetAssignmentEpoch = Optional.empty();
}
public ConsumerGroupDescription(String groupId,
@@ -118,7 +104,9 @@ public class ConsumerGroupDescription {
GroupType type,
GroupState groupState,
Node coordinator,
- Set<AclOperation> authorizedOperations) {
+ Set<AclOperation> authorizedOperations,
+ Optional<Integer> groupEpoch,
+ Optional<Integer> targetAssignmentEpoch) {
this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.members = members == null ? Collections.emptyList() :
List.copyOf(members);
@@ -127,6 +115,8 @@ public class ConsumerGroupDescription {
this.groupState = groupState;
this.coordinator = coordinator;
this.authorizedOperations = authorizedOperations;
+ this.groupEpoch = groupEpoch;
+ this.targetAssignmentEpoch = targetAssignmentEpoch;
}
@Override
@@ -141,12 +131,15 @@ public class ConsumerGroupDescription {
type == that.type &&
groupState == that.groupState &&
Objects.equals(coordinator, that.coordinator) &&
- Objects.equals(authorizedOperations, that.authorizedOperations);
+ Objects.equals(authorizedOperations, that.authorizedOperations) &&
+ Objects.equals(groupEpoch, that.groupEpoch) &&
+ Objects.equals(targetAssignmentEpoch, that.targetAssignmentEpoch);
}
@Override
public int hashCode() {
- return Objects.hash(groupId, isSimpleConsumerGroup, members,
partitionAssignor, type, groupState, coordinator, authorizedOperations);
+ return Objects.hash(groupId, isSimpleConsumerGroup, members,
partitionAssignor, type, groupState, coordinator,
+ authorizedOperations, groupEpoch, targetAssignmentEpoch);
}
/**
@@ -215,6 +208,24 @@ public class ConsumerGroupDescription {
return authorizedOperations;
}
+ /**
+ * The epoch of the consumer group.
+ * The optional is set to an integer if it is a {@link GroupType#CONSUMER}
group, and to empty if it
+ * is a {@link GroupType#CLASSIC} group.
+ */
+ public Optional<Integer> groupEpoch() {
+ return groupEpoch;
+ }
+
+ /**
+ * The epoch of the target assignment.
+ * The optional is set to an integer if it is a {@link GroupType#CONSUMER}
group, and to empty if it
+ * is a {@link GroupType#CLASSIC} group.
+ */
+ public Optional<Integer> targetAssignmentEpoch() {
+ return targetAssignmentEpoch;
+ }
+
@Override
public String toString() {
return "(groupId=" + groupId +
@@ -225,6 +236,8 @@ public class ConsumerGroupDescription {
", groupState=" + groupState +
", coordinator=" + coordinator +
", authorizedOperations=" + authorizedOperations +
+ ", groupEpoch=" + groupEpoch.orElse(null) +
+ ", targetAssignmentEpoch=" + targetAssignmentEpoch.orElse(null) +
")";
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
index 5ca7dba86f8..0785f2e6715 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.GroupType;
+
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
@@ -30,13 +32,18 @@ public class MemberDescription {
private final String host;
private final MemberAssignment assignment;
private final Optional<MemberAssignment> targetAssignment;
+ private final Optional<Integer> memberEpoch;
+ private final Optional<Boolean> upgraded;
- public MemberDescription(String memberId,
+ public MemberDescription(
+ String memberId,
Optional<String> groupInstanceId,
String clientId,
String host,
MemberAssignment assignment,
- Optional<MemberAssignment> targetAssignment
+ Optional<MemberAssignment> targetAssignment,
+ Optional<Integer> memberEpoch,
+ Optional<Boolean> upgraded
) {
this.memberId = memberId == null ? "" : memberId;
this.groupInstanceId = groupInstanceId;
@@ -45,8 +52,38 @@ public class MemberDescription {
this.assignment = assignment == null ?
new MemberAssignment(Collections.emptySet()) : assignment;
this.targetAssignment = targetAssignment;
+ this.memberEpoch = memberEpoch;
+ this.upgraded = upgraded;
}
+ /**
+ * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional,
String, String, MemberAssignment, Optional, Optional, Optional)}.
+ */
+ @Deprecated
+ public MemberDescription(
+ String memberId,
+ Optional<String> groupInstanceId,
+ String clientId,
+ String host,
+ MemberAssignment assignment,
+ Optional<MemberAssignment> targetAssignment
+ ) {
+ this(
+ memberId,
+ groupInstanceId,
+ clientId,
+ host,
+ assignment,
+ targetAssignment,
+ Optional.empty(),
+ Optional.empty()
+ );
+ }
+
+ /**
+ * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional,
String, String, MemberAssignment, Optional, Optional, Optional)}.
+ */
+ @Deprecated
public MemberDescription(
String memberId,
Optional<String> groupInstanceId,
@@ -64,6 +101,10 @@ public class MemberDescription {
);
}
+ /**
+ * @deprecated Since 4.0. Use {@link #MemberDescription(String, Optional,
String, String, MemberAssignment, Optional, Optional, Optional)}.
+ */
+ @Deprecated
public MemberDescription(String memberId,
String clientId,
String host,
@@ -81,12 +122,14 @@ public class MemberDescription {
clientId.equals(that.clientId) &&
host.equals(that.host) &&
assignment.equals(that.assignment) &&
- targetAssignment.equals(that.targetAssignment);
+ targetAssignment.equals(that.targetAssignment) &&
+ memberEpoch.equals(that.memberEpoch) &&
+ upgraded.equals(that.upgraded);
}
@Override
public int hashCode() {
- return Objects.hash(memberId, groupInstanceId, clientId, host,
assignment, targetAssignment);
+ return Objects.hash(memberId, groupInstanceId, clientId, host,
assignment, targetAssignment, memberEpoch, upgraded);
}
/**
@@ -131,6 +174,25 @@ public class MemberDescription {
return targetAssignment;
}
+ /**
+ * The epoch of the group member.
+ * The optional is set to an integer if the member is in a {@link
GroupType#CONSUMER} group, and to empty if it
+ * is in a {@link GroupType#CLASSIC} group.
+ */
+ public Optional<Integer> memberEpoch() {
+ return memberEpoch;
+ }
+
+ /**
+ * The flag indicating whether a member within a {@link
GroupType#CONSUMER} group uses the
+ * {@link GroupType#CONSUMER} protocol.
+ * The optional is set to true if it does, to false if it does not, and to
empty if it is unknown or if the group
+ * is a {@link GroupType#CLASSIC} group.
+ */
+ public Optional<Boolean> upgraded() {
+ return upgraded;
+ }
+
@Override
public String toString() {
return "(memberId=" + memberId +
@@ -138,6 +200,9 @@ public class MemberDescription {
", clientId=" + clientId +
", host=" + host +
", assignment=" + assignment +
- ", targetAssignment=" + targetAssignment + ")";
+ ", targetAssignment=" + targetAssignment +
+ ", memberEpoch=" + memberEpoch.orElse(null) +
+ ", upgraded=" + upgraded.orElse(null) +
+ ")";
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
index 77c04c5d5f0..686ee43a44b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeClassicGroupsHandler.java
@@ -136,7 +136,10 @@ public class DescribeClassicGroupsHandler extends
AdminApiHandler.Batched<Coordi
Optional.ofNullable(groupMember.groupInstanceId()),
groupMember.clientId(),
groupMember.clientHost(),
- new MemberAssignment(partitions)));
+ new MemberAssignment(partitions),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
});
final ClassicGroupDescription classicGroupDescription =
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
index 1d911e2f0c7..457675e9267 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandler.java
@@ -222,7 +222,9 @@ public class DescribeConsumerGroupsHandler implements
AdminApiHandler<Coordinato
groupMember.clientId(),
groupMember.clientHost(),
new
MemberAssignment(convertAssignment(groupMember.assignment())),
- Optional.of(new
MemberAssignment(convertAssignment(groupMember.targetAssignment())))
+ Optional.of(new
MemberAssignment(convertAssignment(groupMember.targetAssignment()))),
+ Optional.of(groupMember.memberEpoch()),
+ groupMember.memberType() == -1 ? Optional.empty() :
Optional.of(groupMember.memberType() == 1)
))
);
@@ -235,7 +237,9 @@ public class DescribeConsumerGroupsHandler implements
AdminApiHandler<Coordinato
GroupType.CONSUMER,
GroupState.parse(describedGroup.groupState()),
coordinator,
- authorizedOperations
+ authorizedOperations,
+ Optional.of(describedGroup.groupEpoch()),
+ Optional.of(describedGroup.assignmentEpoch())
);
completed.put(groupIdKey, consumerGroupDescription);
}
@@ -281,7 +285,10 @@ public class DescribeConsumerGroupsHandler implements
AdminApiHandler<Coordinato
Optional.ofNullable(groupMember.groupInstanceId()),
groupMember.clientId(),
groupMember.clientHost(),
- new MemberAssignment(partitions)));
+ new MemberAssignment(partitions),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
}
final ConsumerGroupDescription consumerGroupDescription =
new ConsumerGroupDescription(groupIdKey.idValue,
protocolType.isEmpty(),
@@ -290,7 +297,9 @@ public class DescribeConsumerGroupsHandler implements
AdminApiHandler<Coordinato
GroupType.CLASSIC,
GroupState.parse(describedGroup.groupState()),
coordinator,
- authorizedOperations);
+ authorizedOperations,
+ Optional.empty(),
+ Optional.empty());
completed.put(groupIdKey, consumerGroupDescription);
} else {
failed.put(groupIdKey, new IllegalArgumentException(
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index b0b48e33c67..44f6e1f5a88 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4057,6 +4057,7 @@ public class KafkaAdminClientTest {
.setTopicName("foo")
.setPartitions(singletonList(1))
)))
+ .setMemberType((byte) 1)
)),
new ConsumerGroupDescribeResponseData.DescribedGroup()
.setGroupId("grp2")
@@ -4110,14 +4111,18 @@ public class KafkaAdminClientTest {
),
Optional.of(new MemberAssignment(
Collections.singleton(new TopicPartition("foo", 1))
- ))
+ )),
+ Optional.of(10),
+ Optional.of(true)
)
),
"range",
GroupType.CONSUMER,
GroupState.STABLE,
env.cluster().controller(),
- Collections.emptySet()
+ Collections.emptySet(),
+ Optional.of(10),
+ Optional.of(10)
));
expectedResult.put("grp2", new ConsumerGroupDescription(
"grp2",
@@ -4130,14 +4135,19 @@ public class KafkaAdminClientTest {
"clientHost",
new MemberAssignment(
Collections.singleton(new TopicPartition("bar", 0))
- )
+ ),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()
)
),
"range",
GroupType.CLASSIC,
GroupState.STABLE,
env.cluster().controller(),
- Collections.emptySet()
+ Collections.emptySet(),
+ Optional.empty(),
+ Optional.empty()
));
assertEquals(expectedResult, result.all().get());
@@ -8674,7 +8684,10 @@ public class KafkaAdminClientTest {
Optional.ofNullable(member.groupInstanceId()),
member.clientId(),
member.clientHost(),
- assignment);
+ assignment,
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty());
}
private static ShareMemberDescription
convertToShareMemberDescriptions(ShareGroupDescribeResponseData.Member member,
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
index 0bddc618cfc..16ce11d7361 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/MemberDescriptionTest.java
@@ -99,5 +99,38 @@ public class MemberDescriptionTest {
assertNotEquals(STATIC_MEMBER_DESCRIPTION, newInstanceDescription);
assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(),
newInstanceDescription.hashCode());
+
+ MemberDescription newTargetAssignmentDescription = new
MemberDescription(MEMBER_ID,
+
INSTANCE_ID,
+
CLIENT_ID,
+
HOST,
+
ASSIGNMENT,
+
Optional.of(ASSIGNMENT),
+
Optional.empty(),
+
Optional.empty());
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION,
newTargetAssignmentDescription);
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(),
newTargetAssignmentDescription.hashCode());
+
+ MemberDescription newMemberEpochDescription = new
MemberDescription(MEMBER_ID,
+
INSTANCE_ID,
+
CLIENT_ID,
+
HOST,
+
ASSIGNMENT,
+
Optional.empty(),
+
Optional.of(1),
+
Optional.empty());
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION, newMemberEpochDescription);
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(),
newMemberEpochDescription.hashCode());
+
+ MemberDescription newIsClassicDescription = new
MemberDescription(MEMBER_ID,
+
INSTANCE_ID,
+
CLIENT_ID,
+ HOST,
+
ASSIGNMENT,
+
Optional.empty(),
+
Optional.empty(),
+
Optional.of(false));
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION, newIsClassicDescription);
+ assertNotEquals(STATIC_MEMBER_DESCRIPTION.hashCode(),
newIsClassicDescription.hashCode());
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
index cfbf67e2090..20cf0b761e6 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/DescribeConsumerGroupsHandlerTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -54,6 +55,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -152,29 +154,46 @@ public class DescribeConsumerGroupsHandlerTest {
@Test
public void testSuccessfulHandleConsumerGroupResponse() {
DescribeConsumerGroupsHandler handler = new
DescribeConsumerGroupsHandler(false, logContext);
- Collection<MemberDescription> members = singletonList(new
MemberDescription(
- "memberId",
- Optional.of("instanceId"),
- "clientId",
- "host",
- new MemberAssignment(Set.of(
- new TopicPartition("foo", 0),
- new TopicPartition("bar", 1))
+ Collection<MemberDescription> members = List.of(
+ new MemberDescription(
+ "memberId",
+ Optional.of("instanceId"),
+ "clientId",
+ "host",
+ new MemberAssignment(Set.of(
+ new TopicPartition("foo", 0)
+ )),
+ Optional.of(new MemberAssignment(Set.of(
+ new TopicPartition("foo", 1)
+ ))),
+ Optional.of(10),
+ Optional.of(true)
),
- Optional.of(new MemberAssignment(Set.of(
- new TopicPartition("foo", 1),
- new TopicPartition("bar", 2)
- )))
- ));
+ new MemberDescription(
+ "memberId-classic",
+ Optional.of("instanceId-classic"),
+ "clientId-classic",
+ "host",
+ new MemberAssignment(Set.of(
+ new TopicPartition("bar", 0)
+ )),
+ Optional.of(new MemberAssignment(Set.of(
+ new TopicPartition("bar", 1)
+ ))),
+ Optional.of(9),
+ Optional.of(false)
+ ));
ConsumerGroupDescription expected = new ConsumerGroupDescription(
groupId1,
false,
members,
"range",
GroupType.CONSUMER,
- ConsumerGroupState.STABLE,
+ GroupState.STABLE,
coordinator,
- Collections.emptySet()
+ Collections.emptySet(),
+ Optional.of(10),
+ Optional.of(10)
);
AdminApiHandler.ApiResult<CoordinatorKey, ConsumerGroupDescription>
result = handler.handleResponse(
coordinator,
@@ -189,7 +208,7 @@ public class DescribeConsumerGroupsHandlerTest {
.setAssignmentEpoch(10)
.setAssignorName("range")
.setAuthorizedOperations(Utils.to32BitField(emptySet()))
- .setMembers(singletonList(
+ .setMembers(List.of(
new ConsumerGroupDescribeResponseData.Member()
.setMemberId("memberId")
.setInstanceId("instanceId")
@@ -200,27 +219,44 @@ public class DescribeConsumerGroupsHandlerTest {
.setSubscribedTopicNames(singletonList("foo"))
.setSubscribedTopicRegex("regex")
.setAssignment(new
ConsumerGroupDescribeResponseData.Assignment()
- .setTopicPartitions(Arrays.asList(
+ .setTopicPartitions(List.of(
new
ConsumerGroupDescribeResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setTopicName("foo")
-
.setPartitions(Collections.singletonList(0)),
+
.setPartitions(Collections.singletonList(0))
+ )))
+ .setTargetAssignment(new
ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List.of(
new
ConsumerGroupDescribeResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
- .setTopicName("bar")
+ .setTopicName("foo")
.setPartitions(Collections.singletonList(1))
)))
- .setTargetAssignment(new
ConsumerGroupDescribeResponseData.Assignment()
- .setTopicPartitions(Arrays.asList(
+ .setMemberType((byte) 1),
+ new ConsumerGroupDescribeResponseData.Member()
+ .setMemberId("memberId-classic")
+ .setInstanceId("instanceId-classic")
+ .setClientHost("host")
+ .setClientId("clientId-classic")
+ .setMemberEpoch(9)
+ .setRackId("rackid")
+
.setSubscribedTopicNames(singletonList("bar"))
+ .setSubscribedTopicRegex("regex")
+ .setAssignment(new
ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List.of(
new
ConsumerGroupDescribeResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
- .setTopicName("foo")
-
.setPartitions(Collections.singletonList(1)),
+ .setTopicName("bar")
+
.setPartitions(Collections.singletonList(0))
+ )))
+ .setTargetAssignment(new
ConsumerGroupDescribeResponseData.Assignment()
+ .setTopicPartitions(List.of(
new
ConsumerGroupDescribeResponseData.TopicPartitions()
.setTopicId(Uuid.randomUuid())
.setTopicName("bar")
-
.setPartitions(Collections.singletonList(2))
+
.setPartitions(Collections.singletonList(1))
)))
+ .setMemberType((byte) 0)
))
))
)
@@ -232,9 +268,13 @@ public class DescribeConsumerGroupsHandlerTest {
public void testSuccessfulHandleClassicGroupResponse() {
Collection<MemberDescription> members = singletonList(new
MemberDescription(
"memberId",
+ Optional.empty(),
"clientId",
"host",
- new MemberAssignment(tps)));
+ new MemberAssignment(tps),
+ Optional.empty(),
+ Optional.empty(),
+ Optional.empty()));
ConsumerGroupDescription expected = new ConsumerGroupDescription(
groupId1,
true,
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 64d9cc94c2d..bd381f0306e 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1921,12 +1921,17 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
// Test that we can get information about the test consumer group.
assertTrue(describeWithFakeGroupResult.describedGroups().containsKey(testGroupId))
var testGroupDescription =
describeWithFakeGroupResult.describedGroups().get(testGroupId).get()
+ assertEquals(groupType == GroupType.CLASSIC,
testGroupDescription.groupEpoch.isEmpty)
+ assertEquals(groupType == GroupType.CLASSIC,
testGroupDescription.targetAssignmentEpoch.isEmpty)
assertEquals(testGroupId, testGroupDescription.groupId())
assertFalse(testGroupDescription.isSimpleConsumerGroup)
assertEquals(groupInstanceSet.size,
testGroupDescription.members().size())
val members = testGroupDescription.members()
- members.asScala.foreach(member => assertEquals(testClientId,
member.clientId()))
+ members.asScala.foreach { member =>
+ assertEquals(testClientId, member.clientId)
+ assertEquals(if (groupType == GroupType.CLASSIC) Optional.empty
else Optional.of(true), member.upgraded)
+ }
val topicPartitionsByTopic =
members.asScala.flatMap(_.assignment().topicPartitions().asScala).groupBy(_.topic())
topicSet.foreach { topic =>
val topicPartitions = topicPartitionsByTopic.getOrElse(topic,
List.empty)
@@ -2058,6 +2063,89 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}
}
+ /**
+ * Test the consumer group APIs.
+ */
+ @ParameterizedTest
+ @ValueSource(strings = Array("kraft"))
+ def testConsumerGroupWithMemberMigration(quorum: String): Unit = {
+ val config = createConfig
+ client = Admin.create(config)
+ var classicConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ var consumerConsumer: Consumer[Array[Byte], Array[Byte]] = null
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups
+ assertEquals(0, list1.all.get.size)
+ assertEquals(0, list1.errors.get.size)
+ assertEquals(0, list1.valid.get.size)
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+
+ client.createTopics(util.Arrays.asList(
+ new NewTopic(testTopicName, testNumPartitions, 1.toShort),
+ )).all.get
+ waitForTopics(client, List(testTopicName), List())
+
+ val producer = createProducer()
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null))
+ producer.send(new ProducerRecord(testTopicName, 1, null, null))
+ producer.flush()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+
+ val testGroupId = "test_group_id"
+ val testClassicClientId = "test_classic_client_id"
+ val testConsumerClientId = "test_consumer_client_id"
+
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, testGroupId)
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testClassicClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name)
+
+ classicConsumer = createConsumer(configOverrides = newConsumerConfig)
+ classicConsumer.subscribe(List(testTopicName).asJava)
+ classicConsumer.poll(JDuration.ofMillis(1000))
+
+ newConsumerConfig.put(ConsumerConfig.CLIENT_ID_CONFIG,
testConsumerClientId)
+ consumerConfig.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name)
+ consumerConsumer = createConsumer(configOverrides = newConsumerConfig)
+ consumerConsumer.subscribe(List(testTopicName).asJava)
+ consumerConsumer.poll(JDuration.ofMillis(1000))
+
+ TestUtils.waitUntilTrue(() => {
+ classicConsumer.poll(JDuration.ofMillis(100))
+ consumerConsumer.poll(JDuration.ofMillis(100))
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ describeConsumerGroupResult.containsKey(testGroupId) &&
+ describeConsumerGroupResult.get(testGroupId).groupState ==
GroupState.STABLE &&
+ describeConsumerGroupResult.get(testGroupId).members.size == 2
+ }, s"Expected to find 2 members in a stable group $testGroupId")
+
+ val describeConsumerGroupResult =
client.describeConsumerGroups(Seq(testGroupId).asJava).all.get
+ val group = describeConsumerGroupResult.get(testGroupId)
+ assertNotNull(group)
+ assertEquals(Optional.of(2), group.groupEpoch)
+ assertEquals(Optional.of(2), group.targetAssignmentEpoch)
+
+ val classicMember = group.members.asScala.find(_.clientId ==
testClassicClientId)
+ assertTrue(classicMember.isDefined)
+ assertEquals(Optional.of(2), classicMember.get.memberEpoch)
+ assertEquals(Optional.of(false), classicMember.get.upgraded)
+
+ val consumerMember = group.members.asScala.find(_.clientId ==
testConsumerClientId)
+ assertTrue(consumerMember.isDefined)
+ assertEquals(Optional.of(2), consumerMember.get.memberEpoch)
+ assertEquals(Optional.of(true), consumerMember.get.upgraded)
+ } finally {
+ Utils.closeQuietly(classicConsumer, "classicConsumer")
+ Utils.closeQuietly(consumerConsumer, "consumerConsumer")
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
+
/**
* Test the consumer group APIs.
*/
@@ -2546,9 +2634,12 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
}, "Expected to find all groups")
val classicConsumers =
client.describeClassicGroups(groupIds.asJavaCollection).all().get()
- assertNotNull(classicConsumers.get(classicGroupId))
- assertEquals(classicGroupId,
classicConsumers.get(classicGroupId).groupId())
- assertEquals("consumer", classicConsumers.get(classicGroupId).protocol())
+ val classicConsumer = classicConsumers.get(classicGroupId)
+ assertNotNull(classicConsumer)
+ assertEquals(classicGroupId, classicConsumer.groupId)
+ assertEquals("consumer", classicConsumer.protocol)
+ assertFalse(classicConsumer.members.isEmpty)
+ classicConsumer.members.forEach(member =>
assertTrue(member.upgraded.isEmpty))
assertNotNull(classicConsumers.get(simpleGroupId))
assertEquals(simpleGroupId,
classicConsumers.get(simpleGroupId).groupId())
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
index dcb8fd05481..7a134ac0c96 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupServiceTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.GroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
@@ -139,8 +140,12 @@ public class ConsumerGroupServiceTest {
true,
Collections.singleton(new MemberDescription("member1",
Optional.of("instance1"), "client1", "host1", new
MemberAssignment(assignedTopicPartitions))),
RangeAssignor.class.getName(),
+ GroupType.CLASSIC,
GroupState.STABLE,
- new Node(1, "localhost", 9092));
+ new Node(1, "localhost", 9092),
+ Set.of(),
+ Optional.empty(),
+ Optional.empty());
Function<Collection<TopicPartition>,
ArgumentMatcher<Map<TopicPartition, OffsetSpec>>> offsetsArgMatcher =
expectedPartitions ->
topicPartitionOffsets -> topicPartitionOffsets != null &&
topicPartitionOffsets.keySet().equals(expectedPartitions);
@@ -233,8 +238,12 @@ public class ConsumerGroupServiceTest {
true,
Collections.singleton(member1),
RangeAssignor.class.getName(),
+ GroupType.CLASSIC,
groupState,
- new Node(1, "localhost", 9092));
+ new Node(1, "localhost", 9092),
+ Set.of(),
+ Optional.empty(),
+ Optional.empty());
KafkaFutureImpl<ConsumerGroupDescription> future = new
KafkaFutureImpl<>();
future.complete(description);
return new
DescribeConsumerGroupsResult(Collections.singletonMap(GROUP, future));