This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6551e87815c KAFKA-18925: Add streams groups support to
Admin.listGroups (#19155)
6551e87815c is described below
commit 6551e87815caddee02658534efccba5dc95d2dab
Author: Lucas Brutschy <[email protected]>
AuthorDate: Tue Mar 11 15:48:07 2025 +0100
KAFKA-18925: Add streams groups support to Admin.listGroups (#19155)
Add support so that Admin.listGroups can represent
streams groups and their states.
Reviewers: Bill Bejeck <[email protected]>
---
.../java/org/apache/kafka/common/GroupState.java | 24 +--
.../java/org/apache/kafka/common/GroupType.java | 3 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 184 +++++++++++++++++++++
3 files changed, 200 insertions(+), 11 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/GroupState.java
b/clients/src/main/java/org/apache/kafka/common/GroupState.java
index c0bcfb999b0..aa2565abf24 100644
--- a/clients/src/main/java/org/apache/kafka/common/GroupState.java
+++ b/clients/src/main/java/org/apache/kafka/common/GroupState.java
@@ -32,17 +32,18 @@ import java.util.stream.Collectors;
* The following table shows the correspondence between the group states and
types.
* <table>
* <thead>
- * <tr><th>State</th><th>Classic group</th><th>Consumer
group</th><th>Share group</th></tr>
+ * <tr><th>State</th><th>Classic group</th><th>Consumer
group</th><th>Share group</th><th>Streams group</th></tr>
* </thead>
* <tbody>
- * <tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
- *
<tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr>
- *
<tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td></tr>
- * <tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
- * <tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
- * <tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
- * <tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td></tr>
- * <tr><td>RECONCILING</td><td></td><td>Yes</td><td></td></tr>
+ *
<tr><td>UNKNOWN</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
+ *
<tr><td>PREPARING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td><td></td></tr>
+ *
<tr><td>COMPLETING_REBALANCE</td><td>Yes</td><td>Yes</td><td></td><td></td></tr>
+ *
<tr><td>STABLE</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
+ *
<tr><td>DEAD</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
+ *
<tr><td>EMPTY</td><td>Yes</td><td>Yes</td><td>Yes</td><td>Yes</td></tr>
+ *
<tr><td>ASSIGNING</td><td></td><td>Yes</td><td></td><td>Yes</td></tr>
+ *
<tr><td>RECONCILING</td><td></td><td>Yes</td><td></td><td>Yes</td></tr>
+ * <tr><td>NOT_READY</td><td></td><td></td><td></td><td>Yes</td></tr>
* </tbody>
* </table>
*/
@@ -55,7 +56,8 @@ public enum GroupState {
DEAD("Dead"),
EMPTY("Empty"),
ASSIGNING("Assigning"),
- RECONCILING("Reconciling");
+ RECONCILING("Reconciling"),
+ NOT_READY("NotReady");
private static final Map<String, GroupState> NAME_TO_ENUM =
Arrays.stream(values())
.collect(Collectors.toMap(state ->
state.name.toUpperCase(Locale.ROOT), Function.identity()));
@@ -79,6 +81,8 @@ public enum GroupState {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE,
DEAD, EMPTY);
} else if (type == GroupType.CONSUMER) {
return Set.of(PREPARING_REBALANCE, COMPLETING_REBALANCE, STABLE,
DEAD, EMPTY, ASSIGNING, RECONCILING);
+ } else if (type == GroupType.STREAMS) {
+ return Set.of(STABLE, DEAD, EMPTY, ASSIGNING, RECONCILING,
NOT_READY);
} else if (type == GroupType.SHARE) {
return Set.of(STABLE, DEAD, EMPTY);
} else {
diff --git a/clients/src/main/java/org/apache/kafka/common/GroupType.java
b/clients/src/main/java/org/apache/kafka/common/GroupType.java
index eeb79ea2825..4c3aeac93fb 100644
--- a/clients/src/main/java/org/apache/kafka/common/GroupType.java
+++ b/clients/src/main/java/org/apache/kafka/common/GroupType.java
@@ -26,7 +26,8 @@ public enum GroupType {
UNKNOWN("Unknown"),
CONSUMER("Consumer"),
CLASSIC("Classic"),
- SHARE("Share");
+ SHARE("Share"),
+ STREAMS("Streams");
private static final Map<String, GroupType> NAME_TO_ENUM =
Arrays.stream(values())
.collect(Collectors.toMap(type -> type.name.toLowerCase(Locale.ROOT),
Function.identity()));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 17569ed3956..5e3a884742d 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -140,6 +140,7 @@ import
org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupResponseData.MemberResponse;
import org.apache.kafka.common.message.ListClientMetricsResourcesResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
+import org.apache.kafka.common.message.ListGroupsResponseData.ListedGroup;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import
org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsTopicResponse;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
@@ -5991,6 +5992,189 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testListStreamsGroups() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(4, 0),
+ AdminClientConfig.RETRIES_CONFIG, "2")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Empty metadata response should be retried
+ env.kafkaClient().prepareResponse(
+ RequestTestUtils.metadataResponse(
+ Collections.emptyList(),
+ env.cluster().clusterResource().clusterId(),
+ -1,
+ Collections.emptyList()));
+
+ env.kafkaClient().prepareResponse(
+ RequestTestUtils.metadataResponse(
+ env.cluster().nodes(),
+ env.cluster().clusterResource().clusterId(),
+ env.cluster().controller().id(),
+ Collections.emptyList()));
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(singletonList(
+ new ListedGroup()
+ .setGroupId("streams-group-1")
+ .setGroupType(GroupType.STREAMS.toString())
+ .setGroupState("Stable")
+ ))),
+ env.cluster().nodeById(0));
+
+ // handle retriable errors
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())
+ .setGroups(Collections.emptyList())
+ ),
+ env.cluster().nodeById(1));
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+
.setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())
+ .setGroups(Collections.emptyList())
+ ),
+ env.cluster().nodeById(1));
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("streams-group-2")
+ .setGroupType(GroupType.STREAMS.toString())
+ .setGroupState("Stable"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("streams-group-3")
+ .setGroupType(GroupType.STREAMS.toString())
+ .setGroupState("Stable")
+ ))),
+ env.cluster().nodeById(1));
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(singletonList(
+ new ListedGroup()
+ .setGroupId("streams-group-4")
+ .setGroupType(GroupType.STREAMS.toString())
+ .setGroupState("Stable")
+ ))),
+ env.cluster().nodeById(2));
+
+ // fatal error
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(
+ new ListGroupsResponseData()
+ .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
+ .setGroups(Collections.emptyList())),
+ env.cluster().nodeById(3));
+
+ final ListGroupsResult result = env.adminClient().listGroups(new
ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
+ TestUtils.assertFutureThrows(UnknownServerException.class,
result.all());
+
+ Collection<GroupListing> listings = result.valid().get();
+ assertEquals(4, listings.size());
+
+ Set<String> groupIds = new HashSet<>();
+ for (GroupListing listing : listings) {
+ groupIds.add(listing.groupId());
+ assertTrue(listing.groupState().isPresent());
+ }
+
+ assertEquals(Set.of("streams-group-1", "streams-group-2",
"streams-group-3", "streams-group-4"), groupIds);
+ assertEquals(1, result.errors().get().size());
+ }
+ }
+
+ @Test
+ public void testListStreamsGroupsMetadataFailure() throws Exception {
+ final Cluster cluster = mockCluster(3, 0);
+ final Time time = new MockTime();
+
+ try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time,
cluster,
+ AdminClientConfig.RETRIES_CONFIG, "0")) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Empty metadata causes the request to fail since we have no list
of brokers
+ // to send the ListGroups requests to
+ env.kafkaClient().prepareResponse(
+ RequestTestUtils.metadataResponse(
+ Collections.emptyList(),
+ env.cluster().clusterResource().clusterId(),
+ -1,
+ Collections.emptyList()));
+
+ final ListGroupsResult result = env.adminClient().listGroups(new
ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
+ TestUtils.assertFutureThrows(KafkaException.class, result.all());
+ }
+ }
+
+ @Test
+ public void testListStreamsGroupsWithStates() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("streams-group-1")
+ .setGroupType(GroupType.STREAMS.toString())
+ .setProtocolType("streams")
+ .setGroupState("Stable"),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("streams-group-2")
+ .setGroupType(GroupType.STREAMS.toString())
+ .setProtocolType("streams")
+ .setGroupState("NotReady")))),
+ env.cluster().nodeById(0));
+
+ final ListGroupsResult result = env.adminClient().listGroups(new
ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
+ Collection<GroupListing> listings = result.valid().get();
+
+ assertEquals(2, listings.size());
+ List<GroupListing> expected = new ArrayList<>();
+ expected.add(new GroupListing("streams-group-1",
Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)));
+ expected.add(new GroupListing("streams-group-2",
Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.NOT_READY)));
+ assertEquals(expected, listings);
+ assertEquals(0, result.errors().get().size());
+ }
+ }
+
+ @Test
+ public void testListStreamsGroupsWithStatesOlderBrokerVersion() {
+ ApiVersion listGroupV4 = new ApiVersion()
+ .setApiKey(ApiKeys.LIST_GROUPS.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 4);
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ // Check we should not be able to list streams groups with broker
having version < 5
+ env.kafkaClient().prepareResponseFrom(
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Collections.singletonList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("streams-group-1")))),
+ env.cluster().nodeById(0));
+ ListGroupsResult result = env.adminClient().listGroups(new
ListGroupsOptions().withTypes(Set.of(GroupType.STREAMS)));
+ TestUtils.assertFutureThrows(UnsupportedVersionException.class,
result.all());
+ }
+ }
+
@Test
public void testDescribeShareGroups() throws Exception {
try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {