This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 96c68096a26 KAFKA-15462: Add Group Type Filter for List Group to the
Admin Client (#15150)
96c68096a26 is described below
commit 96c68096a26ea5e7c2333308dfbaef47cb1eac72
Author: Ritika Reddy <[email protected]>
AuthorDate: Thu Feb 29 00:38:42 2024 -0800
KAFKA-15462: Add Group Type Filter for List Group to the Admin Client
(#15150)
In KIP-848, we introduce the notion of Group Types based on the protocol
type that the members in the consumer group use. As of now we support two types
of groups:
* Classic : Members use the classic consumer group protocol ( existing one )
* Consumer : Members use the consumer group protocol introduced in KIP-848.
Currently List Groups allows users to list all the consumer groups
available. KIP-518 introduced filtering the consumer groups by the state that
they are in. We now want to allow users to filter consumer groups by type.
This patch includes the changes to the admin client and related files. It
also includes changes to parameterize the tests to include permutations of the
old GC and the new GC with the different protocol types.
Reviewers: David Jacot <[email protected]>
---
checkstyle/suppressions.xml | 1 +
.../kafka/clients/admin/ConsumerGroupListing.java | 64 ++--
.../kafka/clients/admin/KafkaAdminClient.java | 20 +-
.../clients/admin/ListConsumerGroupsOptions.java | 25 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 124 ++++++-
.../scala/kafka/admin/ConsumerGroupCommand.scala | 107 ++++--
.../integration/kafka/api/BaseConsumerTest.scala | 19 +-
.../kafka/admin/ConsumerGroupCommandTest.scala | 7 +-
.../org/apache/kafka/tools/ToolsTestUtils.java | 2 +
.../consumer/group/ConsumerGroupCommandTest.java | 13 +-
.../consumer/group/ListConsumerGroupTest.java | 386 +++++++++++++++++++--
11 files changed, 669 insertions(+), 99 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7486ef9a80d..c65cd675a9e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -46,6 +46,7 @@
<!-- server tests -->
<suppress checks="MethodLength|JavaNCSS|NPath"
files="DescribeTopicPartitionsRequestHandlerTest.java"/>
+ <suppress checks="CyclomaticComplexity"
files="ListConsumerGroupTest.java"/>
<!-- Clients -->
<suppress id="dontUseSystemExit"
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
index 0abc3e01ca9..01c23796d41 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
@@ -21,6 +21,7 @@ import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
/**
* A listing of a consumer group in the cluster.
@@ -29,6 +30,7 @@ public class ConsumerGroupListing {
private final String groupId;
private final boolean isSimpleConsumerGroup;
private final Optional<ConsumerGroupState> state;
+ private final Optional<GroupType> type;
/**
* Create an instance with the specified parameters.
@@ -37,7 +39,7 @@ public class ConsumerGroupListing {
* @param isSimpleConsumerGroup If consumer group is simple or not.
*/
public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup)
{
- this(groupId, isSimpleConsumerGroup, Optional.empty());
+ this(groupId, isSimpleConsumerGroup, Optional.empty(),
Optional.empty());
}
/**
@@ -48,9 +50,27 @@ public class ConsumerGroupListing {
* @param state The state of the consumer group
*/
public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup,
Optional<ConsumerGroupState> state) {
+ this(groupId, isSimpleConsumerGroup, state, Optional.empty());
+ }
+
+ /**
+ * Create an instance with the specified parameters.
+ *
+ * @param groupId Group Id.
+ * @param isSimpleConsumerGroup If consumer group is simple or not.
+ * @param state The state of the consumer group.
+ * @param type The type of the consumer group.
+ */
+ public ConsumerGroupListing(
+ String groupId,
+ boolean isSimpleConsumerGroup,
+ Optional<ConsumerGroupState> state,
+ Optional<GroupType> type
+ ) {
this.groupId = groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
this.state = Objects.requireNonNull(state);
+ this.type = Objects.requireNonNull(type);
}
/**
@@ -74,42 +94,38 @@ public class ConsumerGroupListing {
return state;
}
+ /**
+ * The type of the consumer group.
+ *
+ * @return An Optional containing the type, if available.
+ */
+ public Optional<GroupType> type() {
+ return type;
+ }
+
@Override
public String toString() {
return "(" +
"groupId='" + groupId + '\'' +
", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
", state=" + state +
+ ", type=" + type +
')';
}
@Override
public int hashCode() {
- return Objects.hash(groupId, isSimpleConsumerGroup, state);
+ return Objects.hash(groupId, isSimpleConsumerGroup(), state, type);
}
@Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- ConsumerGroupListing other = (ConsumerGroupListing) obj;
- if (groupId == null) {
- if (other.groupId != null)
- return false;
- } else if (!groupId.equals(other.groupId))
- return false;
- if (isSimpleConsumerGroup != other.isSimpleConsumerGroup)
- return false;
- if (state == null) {
- if (other.state != null)
- return false;
- } else if (!state.equals(other.state))
- return false;
- return true;
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof ConsumerGroupListing)) return false;
+ ConsumerGroupListing that = (ConsumerGroupListing) o;
+ return isSimpleConsumerGroup() == that.isSimpleConsumerGroup() &&
+ Objects.equals(groupId, that.groupId) &&
+ Objects.equals(state, that.state) &&
+ Objects.equals(type, that.type);
}
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 85c82e25144..d98ad8ac04e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -58,6 +58,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
@@ -3382,7 +3383,14 @@ public class KafkaAdminClient extends AdminClient {
.stream()
.map(ConsumerGroupState::toString)
.collect(Collectors.toList());
- return new ListGroupsRequest.Builder(new
ListGroupsRequestData().setStatesFilter(states));
+ List<String> groupTypes = options.types()
+ .stream()
+ .map(GroupType::toString)
+ .collect(Collectors.toList());
+ return new ListGroupsRequest.Builder(new
ListGroupsRequestData()
+ .setStatesFilter(states)
+ .setTypesFilter(groupTypes)
+ );
}
private void
maybeAddConsumerGroup(ListGroupsResponseData.ListedGroup group) {
@@ -3392,7 +3400,15 @@ public class KafkaAdminClient extends AdminClient {
final Optional<ConsumerGroupState> state =
group.groupState().equals("")
? Optional.empty()
:
Optional.of(ConsumerGroupState.parse(group.groupState()));
- final ConsumerGroupListing groupListing = new
ConsumerGroupListing(groupId, protocolType.isEmpty(), state);
+ final Optional<GroupType> type =
group.groupType().equals("")
+ ? Optional.empty()
+ :
Optional.of(GroupType.parse(group.groupType()));
+ final ConsumerGroupListing groupListing = new
ConsumerGroupListing(
+ groupId,
+ protocolType.isEmpty(),
+ state,
+ type
+ );
results.addListing(groupListing);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
index 9f1f38dd4a8..c240da159ff 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
@@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
@@ -34,20 +35,38 @@ public class ListConsumerGroupsOptions extends
AbstractOptions<ListConsumerGroup
private Set<ConsumerGroupState> states = Collections.emptySet();
+ private Set<GroupType> types = Collections.emptySet();
+
/**
- * If states is set, only groups in these states will be returned by
listConsumerGroups()
+ * If states is set, only groups in these states will be returned by
listConsumerGroups().
* Otherwise, all groups are returned.
* This operation is supported by brokers with version 2.6.0 or later.
*/
public ListConsumerGroupsOptions inStates(Set<ConsumerGroupState> states) {
- this.states = (states == null) ? Collections.emptySet() : new
HashSet<>(states);
+ this.states = (states == null || states.isEmpty()) ?
Collections.emptySet() : new HashSet<>(states);
return this;
}
/**
- * Returns the list of States that are requested or empty if no states
have been specified
+ * If types is set, only groups of these types will be returned by
listConsumerGroups().
+ * Otherwise, all groups are returned.
+ */
+ public ListConsumerGroupsOptions withTypes(Set<GroupType> types) {
+ this.types = (types == null || types.isEmpty()) ?
Collections.emptySet() : new HashSet<>(types);
+ return this;
+ }
+
+ /**
+ * Returns the list of States that are requested or empty if no states
have been specified.
*/
public Set<ConsumerGroupState> states() {
return states;
}
+
+ /**
+ * Returns the list of group types that are requested or empty if no types
have been specified.
+ */
+ public Set<GroupType> types() {
+ return types;
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index b8b3d54ef43..43d391a220e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -243,6 +243,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
@@ -2811,6 +2812,68 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testListConsumerGroupsWithTypes() throws Exception {
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+ env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+ // Test with a specific state filter but no type filter in list
consumer group options.
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.kafkaClient().prepareResponseFrom(
+
expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()),
Collections.emptySet()),
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-1")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState("Stable")
+ .setGroupType(GroupType.CLASSIC.toString())))),
+ env.cluster().nodeById(0));
+
+ final ListConsumerGroupsOptions options = new
ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE));
+ final ListConsumerGroupsResult result =
env.adminClient().listConsumerGroups(options);
+ Collection<ConsumerGroupListing> listings = result.valid().get();
+
+ assertEquals(1, listings.size());
+ List<ConsumerGroupListing> expected = new ArrayList<>();
+ expected.add(new ConsumerGroupListing("group-1", false,
Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CLASSIC)));
+ assertEquals(expected, listings);
+ assertEquals(0, result.errors().get().size());
+
+ // Test with list consumer group options.
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ env.kafkaClient().prepareResponseFrom(
+ expectListGroupsRequestWithFilters(Collections.emptySet(),
singleton(GroupType.CONSUMER.toString())),
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Arrays.asList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-1")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+ .setGroupState("Stable")
+ .setGroupType(GroupType.CONSUMER.toString()),
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-2")
+ .setGroupState("Empty")
+ .setGroupType(GroupType.CONSUMER.toString())))),
+ env.cluster().nodeById(0));
+
+ final ListConsumerGroupsOptions options2 = new
ListConsumerGroupsOptions().withTypes(singleton(GroupType.CONSUMER));
+ final ListConsumerGroupsResult result2 =
env.adminClient().listConsumerGroups(options2);
+ Collection<ConsumerGroupListing> listings2 = result2.valid().get();
+
+ assertEquals(2, listings2.size());
+ List<ConsumerGroupListing> expected2 = new ArrayList<>();
+ expected2.add(new ConsumerGroupListing("group-2", true,
Optional.of(ConsumerGroupState.EMPTY), Optional.of(GroupType.CONSUMER)));
+ expected2.add(new ConsumerGroupListing("group-1", false,
Optional.of(ConsumerGroupState.STABLE), Optional.of(GroupType.CONSUMER)));
+ assertEquals(expected2, listings2);
+ assertEquals(0, result.errors().get().size());
+ }
+ }
+
@Test
public void testListConsumerGroupsWithStatesOlderBrokerVersion() throws
Exception {
ApiVersion listGroupV3 = new ApiVersion()
@@ -2835,7 +2898,7 @@ public class KafkaAdminClientTest {
ListConsumerGroupsResult result =
env.adminClient().listConsumerGroups(options);
Collection<ConsumerGroupListing> listing = result.all().get();
assertEquals(1, listing.size());
- List<ConsumerGroupListing> expected =
Collections.singletonList(new ConsumerGroupListing("group-1", false,
Optional.empty()));
+ List<ConsumerGroupListing> expected =
Collections.singletonList(new ConsumerGroupListing("group-1", false));
assertEquals(expected, listing);
// But we cannot set a state filter with older broker
@@ -2849,6 +2912,65 @@ public class KafkaAdminClientTest {
}
}
+ @Test
+ public void testListConsumerGroupsWithTypesOlderBrokerVersion() throws
Exception {
+ ApiVersion listGroupV4 = new ApiVersion()
+ .setApiKey(ApiKeys.LIST_GROUPS.id)
+ .setMinVersion((short) 0)
+ .setMaxVersion((short) 4);
+ try (AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(mockCluster(1, 0))) {
+
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(listGroupV4)));
+
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+
+ // Check if we can list groups with older broker if we specify
states and don't specify types.
+ env.kafkaClient().prepareResponseFrom(
+
expectListGroupsRequestWithFilters(singleton(ConsumerGroupState.STABLE.toString()),
Collections.emptySet()),
+ new ListGroupsResponse(new ListGroupsResponseData()
+ .setErrorCode(Errors.NONE.code())
+ .setGroups(Collections.singletonList(
+ new ListGroupsResponseData.ListedGroup()
+ .setGroupId("group-1")
+ .setProtocolType(ConsumerProtocol.PROTOCOL_TYPE)
+
.setGroupState(ConsumerGroupState.STABLE.toString())))),
+ env.cluster().nodeById(0));
+
+ ListConsumerGroupsOptions options = new
ListConsumerGroupsOptions().inStates(singleton(ConsumerGroupState.STABLE));
+ ListConsumerGroupsResult result =
env.adminClient().listConsumerGroups(options);
+
+ Collection<ConsumerGroupListing> listing = result.all().get();
+ assertEquals(1, listing.size());
+ List<ConsumerGroupListing> expected = Collections.singletonList(
+ new ConsumerGroupListing("group-1", false,
Optional.of(ConsumerGroupState.STABLE))
+ );
+ assertEquals(expected, listing);
+
+ // Check that we cannot set a type filter with an older broker.
+
env.kafkaClient().prepareResponse(prepareMetadataResponse(env.cluster(),
Errors.NONE));
+ env.kafkaClient().prepareUnsupportedVersionResponse(request ->
+ request instanceof ListGroupsRequest && !((ListGroupsRequest)
request).data().typesFilter().isEmpty()
+ );
+
+ options = new
ListConsumerGroupsOptions().withTypes(singleton(GroupType.CLASSIC));
+ result = env.adminClient().listConsumerGroups(options);
+ TestUtils.assertFutureThrows(result.all(),
UnsupportedVersionException.class);
+ }
+ }
+
+ private MockClient.RequestMatcher expectListGroupsRequestWithFilters(
+ Set<String> expectedStates,
+ Set<String> expectedTypes
+ ) {
+ return body -> {
+ if (body instanceof ListGroupsRequest) {
+ ListGroupsRequest request = (ListGroupsRequest) body;
+ return Objects.equals(new
HashSet<>(request.data().statesFilter()), expectedStates)
+ && Objects.equals(new
HashSet<>(request.data().typesFilter()), expectedTypes);
+ }
+ return false;
+ };
+ }
+
@Test
public void testOffsetCommitNumRetries() throws Exception {
final Cluster cluster = mockCluster(3, 0);
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 4187274a22d..160b9a70aae 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer.OffsetAndMetadata
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
+import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaException,
Node, TopicPartition}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import scala.jdk.CollectionConverters._
@@ -41,7 +41,6 @@ import org.apache.kafka.common.protocol.Errors
import scala.collection.immutable.TreeMap
import scala.reflect.ClassTag
-import org.apache.kafka.common.ConsumerGroupState
import org.apache.kafka.common.requests.ListOffsetsResponse
object ConsumerGroupCommand extends Logging {
@@ -104,6 +103,15 @@ object ConsumerGroupCommand extends Logging {
parsedStates
}
+ def consumerGroupTypesFromString(input: String): Set[GroupType] = {
+ val parsedTypes = input.toLowerCase.split(',').map(s =>
GroupType.parse(s.trim)).toSet
+ if (parsedTypes.contains(GroupType.UNKNOWN)) {
+ val validTypes = GroupType.values().filter(_ != GroupType.UNKNOWN)
+ throw new IllegalArgumentException(s"Invalid types list '$input'. Valid
types are: ${validTypes.mkString(", ")}")
+ }
+ parsedTypes
+ }
+
val MISSING_COLUMN_VALUE = "-"
private def printError(msg: String, e: Option[Throwable] = None): Unit = {
@@ -135,7 +143,7 @@ object ConsumerGroupCommand extends Logging {
private[admin] case class MemberAssignmentState(group: String, consumerId:
String, host: String, clientId: String, groupInstanceId: String,
numPartitions: Int, assignment:
List[TopicPartition])
- case class GroupState(group: String, coordinator: Node, assignmentStrategy:
String, state: String, numMembers: Int)
+ private[admin] case class GroupState(group: String, coordinator: Node,
assignmentStrategy: String, state: String, numMembers: Int)
private[admin] sealed trait CsvRecord
private[admin] case class CsvRecordWithGroup(group: String, topic: String,
partition: Int, offset: Long) extends CsvRecord
@@ -189,16 +197,65 @@ object ConsumerGroupCommand extends Logging {
}
def listGroups(): Unit = {
- if (opts.options.has(opts.stateOpt)) {
- val stateValue = opts.options.valueOf(opts.stateOpt)
- val states = if (stateValue == null || stateValue.isEmpty)
- Set[ConsumerGroupState]()
- else
- consumerGroupStatesFromString(stateValue)
- val listings = listConsumerGroupsWithState(states)
- printGroupStates(listings.map(e => (e.groupId, e.state.get.toString)))
- } else
+ val includeType = opts.options.has(opts.typeOpt)
+ val includeState = opts.options.has(opts.stateOpt)
+
+ if (includeType || includeState) {
+ val types = typeValues()
+ val states = stateValues()
+ val listings = listConsumerGroupsWithFilters(types, states)
+
+ printGroupInfo(listings, includeType, includeState)
+
+ } else {
listConsumerGroups().foreach(println(_))
+ }
+ }
+
+ private def stateValues(): Set[ConsumerGroupState] = {
+ val stateValue = opts.options.valueOf(opts.stateOpt)
+ if (stateValue == null || stateValue.isEmpty)
+ Set[ConsumerGroupState]()
+ else
+ consumerGroupStatesFromString(stateValue)
+ }
+
+ private def typeValues(): Set[GroupType] = {
+ val typeValue = opts.options.valueOf(opts.typeOpt)
+ if (typeValue == null || typeValue.isEmpty)
+ Set[GroupType]()
+ else
+ consumerGroupTypesFromString(typeValue)
+ }
+
+ private def printGroupInfo(groups: List[ConsumerGroupListing],
includeType: Boolean, includeState: Boolean): Unit = {
+ def groupId(groupListing: ConsumerGroupListing): String =
groupListing.groupId
+ def groupType(groupListing: ConsumerGroupListing): String =
groupListing.`type`().orElse(GroupType.UNKNOWN).toString
+ def groupState(groupListing: ConsumerGroupListing): String =
groupListing.state.orElse(ConsumerGroupState.UNKNOWN).toString
+
+ val maxGroupLen = groups.foldLeft(15)((maxLen, groupListing) =>
Math.max(maxLen, groupId(groupListing).length)) + 10
+ var format = s"%-${maxGroupLen}s"
+ var header = List("GROUP")
+ var extractors: List[ConsumerGroupListing => String] = List(groupId)
+
+ if (includeType) {
+ header = header :+ "TYPE"
+ extractors = extractors :+ groupType _
+ format += " %-20s"
+ }
+
+ if (includeState) {
+ header = header :+ "STATE"
+ extractors = extractors :+ groupState _
+ format += " %-20s"
+ }
+
+ println(format.format(header: _*))
+
+ groups.foreach { groupListing =>
+ val info = extractors.map(extractor => extractor(groupListing))
+ println(format.format(info: _*))
+ }
}
def listConsumerGroups(): List[String] = {
@@ -207,26 +264,15 @@ object ConsumerGroupCommand extends Logging {
listings.map(_.groupId).toList
}
- def listConsumerGroupsWithState(states: Set[ConsumerGroupState]):
List[ConsumerGroupListing] = {
+ def listConsumerGroupsWithFilters(types: Set[GroupType], states:
Set[ConsumerGroupState]): List[ConsumerGroupListing] = {
val listConsumerGroupsOptions = withTimeoutMs(new
ListConsumerGroupsOptions())
- listConsumerGroupsOptions.inStates(states.asJava)
+ listConsumerGroupsOptions
+ .inStates(states.asJava)
+ .withTypes(types.asJava)
val result = adminClient.listConsumerGroups(listConsumerGroupsOptions)
result.all.get.asScala.toList
}
- private def printGroupStates(groupsAndStates: List[(String, String)]):
Unit = {
- // find proper columns width
- var maxGroupLen = 15
- for ((groupId, _) <- groupsAndStates) {
- maxGroupLen = Math.max(maxGroupLen, groupId.length)
- }
- val format = s"%${-maxGroupLen}s %s"
- println(format.format("GROUP", "STATE"))
- for ((groupId, state) <- groupsAndStates) {
- println(format.format(groupId, state))
- }
- }
-
private def shouldPrintMemberState(group: String, state: Option[String],
numRows: Option[Int]): Boolean = {
// numRows contains the number of data rows, if any, compiled from the
API call in the caller method.
// if it's undefined or 0, there is no relevant group information to
display.
@@ -1024,6 +1070,9 @@ object ConsumerGroupCommand extends Logging {
"When specified with '--list', it displays the state of all groups. It
can also be used to list groups with specific states." + nl +
"Example: --bootstrap-server localhost:9092 --list --state stable,empty"
+ nl +
"This option may be used with '--describe', '--list' and
'--bootstrap-server' options only."
+ private val TypeDoc = "When specified with '--list', it displays the types
of all the groups. It can also be used to list groups with specific types." +
nl +
+ "Example: --bootstrap-server localhost:9092 --list --type
classic,consumer" + nl +
+ "This option may be used with the '--list' option only."
private val DeleteOffsetsDoc = "Delete offsets of consumer group. Supports
one consumer group at the time, and multiple topics."
val bootstrapServerOpt: OptionSpec[String] =
parser.accepts("bootstrap-server", BootstrapServerDoc)
@@ -1090,6 +1139,10 @@ object ConsumerGroupCommand extends Logging {
.availableIf(describeOpt, listOpt)
.withOptionalArg()
.ofType(classOf[String])
+ val typeOpt: OptionSpec[String] = parser.accepts("type", TypeDoc)
+ .availableIf(listOpt)
+ .withOptionalArg()
+ .ofType(classOf[String])
options = parser.parse(args : _*)
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index bb3259baf98..20159830943 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import java.util
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._
@@ -117,11 +118,12 @@ object BaseConsumerTest {
// * KRaft with the new group coordinator enabled and the classic group
protocol
// * KRaft with the new group coordinator enabled and the consumer group
protocol
def getTestQuorumAndGroupProtocolParametersAll() :
java.util.stream.Stream[Arguments] = {
- java.util.stream.Stream.of(
+ util.Arrays.stream(Array(
Arguments.of("zk", "classic"),
Arguments.of("kraft", "classic"),
Arguments.of("kraft+kip848", "classic"),
- Arguments.of("kraft+kip848", "consumer"))
+ Arguments.of("kraft+kip848", "consumer")
+ ))
}
// In Scala 2.12, it is necessary to disambiguate the
java.util.stream.Stream.of() method call
@@ -138,10 +140,19 @@ object BaseConsumerTest {
// * KRaft and the classic group protocol
// * KRaft with the new group coordinator enabled and the classic group
protocol
def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() :
java.util.stream.Stream[Arguments] = {
- java.util.stream.Stream.of(
+ util.Arrays.stream(Array(
Arguments.of("zk", "classic"),
Arguments.of("kraft", "classic"),
- Arguments.of("kraft+kip848", "classic"))
+ Arguments.of("kraft+kip848", "classic")
+ ))
+ }
+
+ // For tests that only work with the consumer group protocol, we want to
test the following combination:
+ // * KRaft with the new group coordinator enabled and the consumer group
protocol
+ def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly():
java.util.stream.Stream[Arguments] = {
+ util.Arrays.stream(Array(
+ Arguments.of("kraft+kip848", "consumer")
+ ))
}
val updateProducerCount = new AtomicInteger()
diff --git
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index 18c7a0a8f81..f682df1f1dc 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -19,7 +19,7 @@ package kafka.admin
import java.time.Duration
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
-import java.util.{Collections, Properties}
+import java.util.{Collections, Properties, stream}
import kafka.admin.ConsumerGroupCommand.{ConsumerGroupCommandOptions,
ConsumerGroupService}
import kafka.api.BaseConsumerTest
import kafka.integration.KafkaServerTestHarness
@@ -31,6 +31,7 @@ import org.apache.kafka.common.{PartitionInfo, TopicPartition}
import org.apache.kafka.common.errors.WakeupException
import org.apache.kafka.common.serialization.StringDeserializer
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.provider.Arguments
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ArrayBuffer
@@ -122,7 +123,9 @@ class ConsumerGroupCommandTest extends
KafkaServerTestHarness {
}
object ConsumerGroupCommandTest {
- def getTestQuorumAndGroupProtocolParametersAll() =
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
+ def getTestQuorumAndGroupProtocolParametersAll(): stream.Stream[Arguments]
= BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll()
+ def getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly():
stream.Stream[Arguments] =
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly()
+ def getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly():
stream.Stream[Arguments] =
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly()
abstract class AbstractConsumerRunnable(broker: String, groupId: String,
customPropsOpt: Option[Properties] = None,
syncCommit: Boolean = false) extends
Runnable {
diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
index fdc732ea29a..83fa31bf5e7 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java
@@ -45,6 +45,8 @@ import java.util.stream.Collectors;
public class ToolsTestUtils {
/** @see TestInfoUtils#TestWithParameterizedQuorumName() */
public static final String TEST_WITH_PARAMETERIZED_QUORUM_NAME =
"{displayName}.{argumentsWithNames}";
+ /** @see TestInfoUtils#TestWithParameterizedQuorumAndGroupProtocolNames()
*/
+ public static final String
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES =
"{displayName}.quorum={0}.groupProtocol={1}";
private static int randomPort = 0;
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
index b78054cb4ad..bde3af37a1d 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTest.java
@@ -58,6 +58,7 @@ import java.util.stream.Stream;
public class ConsumerGroupCommandTest extends
kafka.integration.KafkaServerTestHarness {
public static final String TOPIC = "foo";
public static final String GROUP = "test.group";
+ public static final String PROTOCOL_GROUP = "protocol-group";
List<ConsumerGroupCommand.ConsumerGroupService> consumerGroupService = new
ArrayList<>();
List<AbstractConsumerGroupExecutor> consumerGroupExecutors = new
ArrayList<>();
@@ -154,8 +155,8 @@ public class ConsumerGroupCommandTest extends
kafka.integration.KafkaServerTestH
return addConsumerGroupExecutor(numConsumers, TOPIC, GROUP,
RangeAssignor.class.getName(), remoteAssignor, Optional.empty(), false,
groupProtocol);
}
- ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String
topic, String group) {
- return addConsumerGroupExecutor(numConsumers, topic, group,
RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false,
GroupProtocol.CLASSIC.name);
+ ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String
group, String groupProtocol) {
+ return addConsumerGroupExecutor(numConsumers, TOPIC, group,
RangeAssignor.class.getName(), Optional.empty(), Optional.empty(), false,
groupProtocol);
}
ConsumerGroupExecutor addConsumerGroupExecutor(int numConsumers, String
topic, String group, String groupProtocol) {
@@ -342,6 +343,14 @@ public class ConsumerGroupCommandTest extends
kafka.integration.KafkaServerTestH
return BaseConsumerTest.getTestQuorumAndGroupProtocolParametersAll();
}
+ public static Stream<Arguments>
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly() {
+ return
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly();
+ }
+
+ public static Stream<Arguments>
getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly() {
+ return
BaseConsumerTest.getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly();
+ }
+
@SuppressWarnings({"deprecation"})
static <T> Seq<T> seq(Collection<T> seq) {
return
JavaConverters.asScalaIteratorConverter(seq.iterator()).asScala().toSeq();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
index 894f00df5e7..ba5ebd254fc 100644
---
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ListConsumerGroupTest.java
@@ -20,83 +20,258 @@ import joptsimple.OptionException;
import kafka.admin.ConsumerGroupCommand;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.GroupType;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.params.provider.MethodSource;
import java.util.Arrays;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
-import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Properties;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.kafka.tools.ToolsTestUtils.TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES;
import static org.apache.kafka.common.utils.Utils.mkSet;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class ListConsumerGroupTest extends ConsumerGroupCommandTest {
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroups(String quorum) throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithoutFilters(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup));
+
+ scala.collection.Set<String> expectedGroups = set(Arrays.asList(GROUP,
simpleGroup, PROTOCOL_GROUP));
final AtomicReference<scala.collection.Set> foundGroups = new
AtomicReference<>();
+
TestUtils.waitForCondition(() -> {
foundGroups.set(service.listConsumerGroups().toSet());
return Objects.equals(expectedGroups, foundGroups.get());
}, "Expected --list to show groups " + expectedGroups + ", but found "
+ foundGroups.get() + ".");
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
+ @Test
public void testListWithUnrecognizedNewConsumerOption() {
String[] cgcArgs = new String[]{"--new-consumer",
"--bootstrap-server", bootstrapServers(listenerName()), "--list"};
assertThrows(OptionException.class, () ->
getConsumerGroupService(cgcArgs));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListConsumerGroupsWithStates() throws Exception {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+ @MethodSource("getTestQuorumAndGroupProtocolParametersAll")
+ public void testListConsumerGroupsWithStates(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
- addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, groupProtocol);
String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state"};
ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
- scala.collection.Set<ConsumerGroupListing> expectedListing =
set(Arrays.asList(
- new ConsumerGroupListing(simpleGroup, true,
Optional.of(ConsumerGroupState.EMPTY)),
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ Set<ConsumerGroupListing> expectedListing = mkSet(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ );
- final AtomicReference<scala.collection.Set> foundListing = new
AtomicReference<>();
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Arrays.asList(ConsumerGroupState.values()))).toSet());
- return Objects.equals(expectedListing, foundListing.get());
- }, "Expected to show groups " + expectedListing + ", but found " +
foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ EnumSet.allOf(ConsumerGroupState.class),
+ expectedListing
+ );
- scala.collection.Set<ConsumerGroupListing> expectedListingStable =
set(Collections.singleton(
- new ConsumerGroupListing(GROUP, false,
Optional.of(ConsumerGroupState.STABLE))));
+ expectedListing = mkSet(
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.parse(groupProtocol))
+ )
+ );
- foundListing.set(null);
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.STABLE),
+ expectedListing
+ );
- TestUtils.waitForCondition(() -> {
-
foundListing.set(service.listConsumerGroupsWithState(set(Collections.singleton(ConsumerGroupState.STABLE))).toSet());
- return Objects.equals(expectedListingStable, foundListing.get());
- }, "Expected to show groups " + expectedListingStable + ", but found "
+ foundListing.get());
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ mkSet(ConsumerGroupState.PREPARING_REBALANCE),
+ Collections.emptySet()
+ );
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testConsumerGroupStatesFromString(String quorum) {
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesClassicProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ Set<ConsumerGroupListing> expectedListing = mkSet(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ );
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ // When group type is mentioned:
+ // Old Group Coordinator returns empty listings if the type is not
Classic.
+ // New Group Coordinator returns groups according to the filter.
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CONSUMER),
+ Collections.emptySet(),
+ Collections.emptySet()
+ );
+
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CLASSIC),
+ Collections.emptySet(),
+ expectedListing
+ );
+ }
+
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+ public void testListConsumerGroupsWithTypesConsumerProtocol(String quorum,
String groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+ String[] cgcArgs = new String[]{"--bootstrap-server",
bootstrapServers(listenerName()), "--list"};
+ ConsumerGroupCommand.ConsumerGroupService service =
getConsumerGroupService(cgcArgs);
+
+ // No filters explicitly mentioned. Expectation is that all groups are
returned.
+ Set<ConsumerGroupListing> expectedListing = mkSet(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ PROTOCOL_GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CONSUMER)
+ )
+ );
+
+ assertGroupListing(
+ service,
+ Collections.emptySet(),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ // When group type is mentioned:
+ // New Group Coordinator returns groups according to the filter.
+ expectedListing = mkSet(
+ new ConsumerGroupListing(
+ PROTOCOL_GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CONSUMER)
+ )
+ );
+
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CONSUMER),
+ Collections.emptySet(),
+ expectedListing
+ );
+
+ expectedListing = mkSet(
+ new ConsumerGroupListing(
+ simpleGroup,
+ true,
+ Optional.of(ConsumerGroupState.EMPTY),
+ Optional.of(GroupType.CLASSIC)
+ ),
+ new ConsumerGroupListing(
+ GROUP,
+ false,
+ Optional.of(ConsumerGroupState.STABLE),
+ Optional.of(GroupType.CLASSIC)
+ )
+ );
+
+ assertGroupListing(
+ service,
+ mkSet(GroupType.CLASSIC),
+ Collections.emptySet(),
+ expectedListing
+ );
+ }
+
+ @Test
+ public void testConsumerGroupStatesFromString() {
scala.collection.Set<ConsumerGroupState> result =
ConsumerGroupCommand.consumerGroupStatesFromString("Stable");
assertEquals(set(Collections.singleton(ConsumerGroupState.STABLE)),
result);
@@ -107,7 +282,7 @@ public class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
assertEquals(set(Arrays.asList(ConsumerGroupState.DEAD,
ConsumerGroupState.COMPLETING_REBALANCE)), result);
result = ConsumerGroupCommand.consumerGroupStatesFromString("stable");
- assertEquals(set(Arrays.asList(ConsumerGroupState.STABLE)), result);
+
assertEquals(set(Collections.singletonList(ConsumerGroupState.STABLE)), result);
result = ConsumerGroupCommand.consumerGroupStatesFromString("stable,
assigning");
assertEquals(set(Arrays.asList(ConsumerGroupState.STABLE,
ConsumerGroupState.ASSIGNING)), result);
@@ -122,10 +297,31 @@ public class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
assertThrows(IllegalArgumentException.class, () ->
ConsumerGroupCommand.consumerGroupStatesFromString(" , ,"));
}
- @ParameterizedTest
- @ValueSource(strings = {"zk", "kraft"})
- public void testListGroupCommand(String quorum) throws Exception {
+ @Test
+ public void testConsumerGroupTypesFromString() {
+ scala.collection.Set<GroupType> result =
ConsumerGroupCommand.consumerGroupTypesFromString("consumer");
+ assertEquals(set(Collections.singleton(GroupType.CONSUMER)), result);
+
+ result = ConsumerGroupCommand.consumerGroupTypesFromString("consumer,
classic");
+ assertEquals(set(Arrays.asList(GroupType.CONSUMER,
GroupType.CLASSIC)), result);
+
+ result = ConsumerGroupCommand.consumerGroupTypesFromString("Consumer,
Classic");
+ assertEquals(set(Arrays.asList(GroupType.CONSUMER,
GroupType.CLASSIC)), result);
+
+ assertThrows(IllegalArgumentException.class, () ->
ConsumerGroupCommand.consumerGroupTypesFromString("bad, wrong"));
+
+ assertThrows(IllegalArgumentException.class, () ->
ConsumerGroupCommand.consumerGroupTypesFromString(" bad, generic"));
+
+ assertThrows(IllegalArgumentException.class, () ->
ConsumerGroupCommand.consumerGroupTypesFromString(" , ,"));
+ }
+
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly")
+ public void testListGroupCommandClassicProtocol(String quorum, String
groupProtocol) throws Exception {
String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
addSimpleGroupExecutor(simpleGroup);
addConsumerGroupExecutor(1);
@@ -147,6 +343,24 @@ public class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
)
);
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--type"),
+ Arrays.asList("GROUP", "TYPE"),
+ mkSet(
+ Arrays.asList(GROUP, "Classic"),
+ Arrays.asList(simpleGroup, "Classic")
+ )
+ );
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--type", "--state"),
+ Arrays.asList("GROUP", "TYPE", "STATE"),
+ mkSet(
+ Arrays.asList(GROUP, "Classic", "Stable"),
+ Arrays.asList(simpleGroup, "Classic", "Empty")
+ )
+ );
+
validateListOutput(
Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state", "Stable"),
Arrays.asList("GROUP", "STATE"),
@@ -155,6 +369,7 @@ public class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
)
);
+ // Check case-insensitivity in state filter.
validateListOutput(
Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state", "stable"),
Arrays.asList("GROUP", "STATE"),
@@ -162,6 +377,109 @@ public class ListConsumerGroupTest extends
ConsumerGroupCommandTest {
Arrays.asList(GROUP, "Stable")
)
);
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--type", "Classic"),
+ Arrays.asList("GROUP", "TYPE"),
+ mkSet(
+ Arrays.asList(GROUP, "Classic"),
+ Arrays.asList(simpleGroup, "Classic")
+ )
+ );
+
+ // Check case-insensitivity in type filter.
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--type", "classic"),
+ Arrays.asList("GROUP", "TYPE"),
+ mkSet(
+ Arrays.asList(GROUP, "Classic"),
+ Arrays.asList(simpleGroup, "Classic")
+ )
+ );
+ }
+
+ @ParameterizedTest(name =
TEST_WITH_PARAMETERIZED_QUORUM_AND_GROUP_PROTOCOL_NAMES)
+
@MethodSource("getTestQuorumAndGroupProtocolParametersConsumerGroupProtocolOnly")
+ public void testListGroupCommandConsumerProtocol(String quorum, String
groupProtocol) throws Exception {
+ String simpleGroup = "simple-group";
+
+ createOffsetsTopic(listenerName(), new Properties());
+
+ addSimpleGroupExecutor(simpleGroup);
+ addConsumerGroupExecutor(1, PROTOCOL_GROUP, groupProtocol);
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list"),
+ Collections.emptyList(),
+ mkSet(
+ Collections.singletonList(PROTOCOL_GROUP),
+ Collections.singletonList(simpleGroup)
+ )
+ );
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--state"),
+ Arrays.asList("GROUP", "STATE"),
+ mkSet(
+ Arrays.asList(PROTOCOL_GROUP, "Stable"),
+ Arrays.asList(simpleGroup, "Empty")
+ )
+ );
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--type"),
+ Arrays.asList("GROUP", "TYPE"),
+ mkSet(
+ Arrays.asList(PROTOCOL_GROUP, "Consumer"),
+ Arrays.asList(simpleGroup, "Classic")
+ )
+ );
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--type", "--state"),
+ Arrays.asList("GROUP", "TYPE", "STATE"),
+ mkSet(
+ Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable"),
+ Arrays.asList(simpleGroup, "Classic", "Empty")
+ )
+ );
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--type", "consumer"),
+ Arrays.asList("GROUP", "TYPE"),
+ mkSet(
+ Arrays.asList(PROTOCOL_GROUP, "Consumer")
+ )
+ );
+
+ validateListOutput(
+ Arrays.asList("--bootstrap-server",
bootstrapServers(listenerName()), "--list", "--type", "consumer", "--state",
"Stable"),
+ Arrays.asList("GROUP", "TYPE", "STATE"),
+ mkSet(
+ Arrays.asList(PROTOCOL_GROUP, "Consumer", "Stable")
+ )
+ );
+ }
+
+ /**
+ * Validates the consumer group listings returned against expected values
using specified filters.
+ *
+ * @param service The service to list consumer groups.
+ * @param typeFilterSet Filters for group types, empty for no filter.
+ * @param stateFilterSet Filters for group states, empty for no filter.
+ * @param expectedListing Expected consumer group listings.
+ */
+ private static void assertGroupListing(
+ ConsumerGroupCommand.ConsumerGroupService service,
+ Set<GroupType> typeFilterSet,
+ Set<ConsumerGroupState> stateFilterSet,
+ Set<ConsumerGroupListing> expectedListing
+ ) throws Exception {
+ final AtomicReference<scala.collection.Set> foundListing = new
AtomicReference<>();
+ TestUtils.waitForCondition(() -> {
+
foundListing.set(service.listConsumerGroupsWithFilters(set(typeFilterSet),
set(stateFilterSet)).toSet());
+ return Objects.equals(set(expectedListing), foundListing.get());
+ }, () -> "Expected to show groups " + expectedListing + ", but found "
+ foundListing.get() + ".");
}
/**