This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16ad358 KAFKA-6868; Fix buffer underflow and expose group state in
the consumer groups API (#4980)
16ad358 is described below
commit 16ad358d64a138fc4b455379745ae1550a93d57b
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Mon May 21 08:37:35 2018 -0700
KAFKA-6868; Fix buffer underflow and expose group state in the consumer
groups API (#4980)
* The consumer groups API should expose group state and coordinator
information. This information is needed by administrative tools and scripts
that access consume groups.
* The partition assignment will be empty when the group is rebalancing. Fix
an issue where the adminclient attempted to deserialize this empty buffer.
* Remove nulls from the API and make all collections immutable.
* DescribeConsumerGroupsResult#all should return a result as expected,
rather than Void
* Fix exception text for GroupIdNotFoundException, GroupNotEmptyException.
It was being filled in as "The group id The group id does not exist was not
found" and similar.
Reviewers: Attila Sasvari <[email protected]>, Andras Beni
<[email protected]>, Dong Lin <[email protected]>, Jason Gustafson
<[email protected]>
---
.../clients/admin/ConsumerGroupDescription.java | 78 +++++++++-----
.../admin/DescribeConsumerGroupsResult.java | 26 ++++-
.../kafka/clients/admin/KafkaAdminClient.java | 51 +++++----
.../kafka/clients/admin/MemberAssignment.java | 13 ++-
.../kafka/clients/admin/MemberDescription.java | 45 ++++----
.../apache/kafka/common/ConsumerGroupState.java | 61 +++++++++++
.../common/errors/GroupIdNotFoundException.java | 12 +--
.../common/errors/GroupNotEmptyException.java | 12 +--
.../main/scala/kafka/tools/StreamsResetter.java | 3 +-
.../kafka/api/AdminClientIntegrationTest.scala | 120 ++++++++++++++++++++-
10 files changed, 315 insertions(+), 106 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 0bfa8a7..bc3857d 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -17,55 +17,56 @@
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.Node;
import org.apache.kafka.common.utils.Utils;
-import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
/**
* A detailed description of a single consumer group in the cluster.
*/
public class ConsumerGroupDescription {
-
private final String groupId;
private final boolean isSimpleConsumerGroup;
- private final List<MemberDescription> members;
+ private final Collection<MemberDescription> members;
private final String partitionAssignor;
+ private final ConsumerGroupState state;
+ private final Node coordinator;
- /**
- * Creates an instance with the specified parameters.
- *
- * @param groupId The consumer group id
- * @param isSimpleConsumerGroup If Consumer Group is simple
- * @param members The consumer group members
- * @param partitionAssignor The consumer group partition assignor
- */
- public ConsumerGroupDescription(String groupId, boolean
isSimpleConsumerGroup, List<MemberDescription> members, String
partitionAssignor) {
- this.groupId = groupId;
+ ConsumerGroupDescription(String groupId,
+ boolean isSimpleConsumerGroup,
+ Collection<MemberDescription> members,
+ String partitionAssignor,
+ ConsumerGroupState state,
+ Node coordinator) {
+ this.groupId = groupId == null ? "" : groupId;
this.isSimpleConsumerGroup = isSimpleConsumerGroup;
- this.members = members;
- this.partitionAssignor = partitionAssignor;
+ this.members = members == null ?
Collections.<MemberDescription>emptyList() :
+ Collections.unmodifiableList(new ArrayList<>(members));
+ this.partitionAssignor = partitionAssignor == null ? "" :
partitionAssignor;
+ this.state = state;
+ this.coordinator = coordinator;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
-
ConsumerGroupDescription that = (ConsumerGroupDescription) o;
-
- if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false;
- if (groupId != null ? !groupId.equals(that.groupId) : that.groupId !=
null) return false;
- if (members != null ? !members.equals(that.members) : that.members !=
null) return false;
- return partitionAssignor != null ?
partitionAssignor.equals(that.partitionAssignor) : that.partitionAssignor ==
null;
+ return isSimpleConsumerGroup == that.isSimpleConsumerGroup &&
+ groupId.equals(that.groupId) &&
+ members.equals(that.members) &&
+ partitionAssignor.equals(that.partitionAssignor) &&
+ state.equals(that.state);
}
@Override
public int hashCode() {
- int result = groupId != null ? groupId.hashCode() : 0;
- result = 31 * result + (isSimpleConsumerGroup ? 1 : 0);
- result = 31 * result + (members != null ? members.hashCode() : 0);
- result = 31 * result + (partitionAssignor != null ?
partitionAssignor.hashCode() : 0);
- return result;
+ return Objects.hash(isSimpleConsumerGroup, groupId, members,
partitionAssignor, state);
}
/**
@@ -85,7 +86,7 @@ public class ConsumerGroupDescription {
/**
* A list of the members of the consumer group.
*/
- public List<MemberDescription> members() {
+ public Collection<MemberDescription> members() {
return members;
}
@@ -96,9 +97,28 @@ public class ConsumerGroupDescription {
return partitionAssignor;
}
+ /**
+ * The consumer group state, or UNKNOWN if the state is too new for us to
parse.
+ */
+ public ConsumerGroupState state() {
+ return state;
+ }
+
+ /**
+ * The consumer group coordinator, or null if the coordinator is not known.
+ */
+ public Node coordinator() {
+ return coordinator;
+ }
+
@Override
public String toString() {
- return "(groupId=" + groupId + ", isSimpleConsumerGroup=" +
isSimpleConsumerGroup + ", members=" +
- Utils.join(members, ",") + ", partitionAssignor=" +
partitionAssignor + ")";
+ return "(groupId=" + groupId +
+ ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
+ ", members=" + Utils.join(members, ",") +
+ ", partitionAssignor=" + partitionAssignor +
+ ", state=" + state +
+ ", coordinator=" + coordinator +
+ ")";
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index ac2189c..8f0ebad 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -21,7 +21,9 @@ import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.annotation.InterfaceStability;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
/**
@@ -39,16 +41,32 @@ public class DescribeConsumerGroupsResult {
}
/**
- * Return a map from group id to futures which can be used to check the
description of a consumer group.
+ * Return a map from group id to futures which yield group descriptions.
*/
public Map<String, KafkaFuture<ConsumerGroupDescription>>
describedGroups() {
return futures;
}
/**
- * Return a future which succeeds only if all the consumer group
description succeed.
+ * Return a future which yields all ConsumerGroupDescription objects, if
all the describes succeed.
*/
- public KafkaFuture<Void> all() {
- return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+ public KafkaFuture<Map<String, ConsumerGroupDescription>> all() {
+ return KafkaFuture.allOf(futures.values().toArray(new
KafkaFuture[0])).thenApply(
+ new KafkaFuture.BaseFunction<Void, Map<String,
ConsumerGroupDescription>>() {
+ @Override
+ public Map<String, ConsumerGroupDescription> apply(Void v) {
+ try {
+ Map<String, ConsumerGroupDescription> descriptions =
new HashMap<>(futures.size());
+ for (Map.Entry<String,
KafkaFuture<ConsumerGroupDescription>> entry : futures.entrySet()) {
+ descriptions.put(entry.getKey(),
entry.getValue().get());
+ }
+ return descriptions;
+ } catch (InterruptedException | ExecutionException e) {
+ // This should be unreachable, since the
KafkaFuture#allOf already ensured
+ // that all of the futures completed successfully.
+ throw new RuntimeException(e);
+ }
+ }
+ });
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c9e0e18..5f4eefe 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
@@ -120,11 +121,9 @@ import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -2347,14 +2346,21 @@ public class KafkaAdminClient extends AdminClient {
@Override
void handleResponse(AbstractResponse abstractResponse) {
- final FindCoordinatorResponse response =
(FindCoordinatorResponse) abstractResponse;
+ final FindCoordinatorResponse fcResponse =
(FindCoordinatorResponse) abstractResponse;
+ Errors error = fcResponse.error();
+ if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
+ // Retry COORDINATOR_NOT_AVAILABLE, in case the error
is temporary.
+ throw error.exception();
+ } else if (error != Errors.NONE) {
+ // All other errors are immediate failures.
+ KafkaFutureImpl<ConsumerGroupDescription> future =
futures.get(groupId);
+ future.completeExceptionally(error.exception());
+ return;
+ }
final long nowDescribeConsumerGroups = time.milliseconds();
-
- final int nodeId = response.node().id();
-
+ final int nodeId = fcResponse.node().id();
runnable.call(new Call("describeConsumerGroups", deadline,
new ConstantNodeIdProvider(nodeId)) {
-
@Override
AbstractRequest.Builder createRequest(int timeoutMs) {
return new
DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
@@ -2375,24 +2381,29 @@ public class KafkaAdminClient extends AdminClient {
final String protocolType =
groupMetadata.protocolType();
if
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty())
{
final
List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
- final List<MemberDescription> consumers =
new ArrayList<>(members.size());
+ final List<MemberDescription>
memberDescriptions = new ArrayList<>(members.size());
for (DescribeGroupsResponse.GroupMember
groupMember : members) {
- final PartitionAssignor.Assignment
assignment =
-
ConsumerProtocol.deserializeAssignment(
-
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
-
+ Set<TopicPartition> partitions =
Collections.emptySet();
+ if
(groupMember.memberAssignment().remaining() > 0) {
+ final PartitionAssignor.Assignment
assignment = ConsumerProtocol.
+
deserializeAssignment(groupMember.memberAssignment().duplicate());
+ partitions = new
HashSet<>(assignment.partitions());
+ }
final MemberDescription
memberDescription =
- new MemberDescription(
- groupMember.memberId(),
- groupMember.clientId(),
-
groupMember.clientHost(),
- new
MemberAssignment(assignment.partitions()));
- consumers.add(memberDescription);
+ new
MemberDescription(groupMember.memberId(),
+ groupMember.clientId(),
+ groupMember.clientHost(),
+ new
MemberAssignment(partitions));
+
memberDescriptions.add(memberDescription);
}
- final String protocol =
groupMetadata.protocol();
final ConsumerGroupDescription
consumerGroupDescription =
- new
ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol);
+ new
ConsumerGroupDescription(groupId,
+ protocolType.isEmpty(),
+ memberDescriptions,
+ groupMetadata.protocol(),
+
ConsumerGroupState.parse(groupMetadata.state()),
+ fcResponse.node());
future.complete(consumerGroupDescription);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
index bd95813..6c180ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
@@ -19,21 +19,24 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
-import java.util.List;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
/**
* A description of the assignments of a specific group member.
*/
public class MemberAssignment {
- private final List<TopicPartition> topicPartitions;
+ private final Set<TopicPartition> topicPartitions;
/**
* Creates an instance with the specified parameters.
*
* @param topicPartitions List of topic partitions
*/
- public MemberAssignment(List<TopicPartition> topicPartitions) {
- this.topicPartitions = topicPartitions;
+ MemberAssignment(Set<TopicPartition> topicPartitions) {
+ this.topicPartitions = topicPartitions == null ?
Collections.<TopicPartition>emptySet() :
+ Collections.unmodifiableSet(new HashSet<>(topicPartitions));
}
@Override
@@ -54,7 +57,7 @@ public class MemberAssignment {
/**
* The topic partitions assigned to a group member.
*/
- public List<TopicPartition> topicPartitions() {
+ public Set<TopicPartition> topicPartitions() {
return topicPartitions;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
index 2ba1963..895abad 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -17,49 +17,42 @@
package org.apache.kafka.clients.admin;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Objects;
+
/**
* A detailed description of a single group instance in the cluster.
*/
public class MemberDescription {
-
private final String memberId;
private final String clientId;
private final String host;
private final MemberAssignment assignment;
- /**
- * Creates an instance with the specified parameters.
- *
- * @param memberId The consumer id
- * @param clientId The client id
- * @param host The host
- * @param assignment The assignment
- */
- public MemberDescription(String memberId, String clientId, String host,
MemberAssignment assignment) {
- this.memberId = memberId;
- this.clientId = clientId;
- this.host = host;
- this.assignment = assignment;
+ MemberDescription(String memberId, String clientId, String host,
MemberAssignment assignment) {
+ this.memberId = memberId == null ? "" : memberId;
+ this.clientId = clientId == null ? "" : clientId;
+ this.host = host == null ? "" : host;
+ this.assignment = assignment == null ?
+ new MemberAssignment(Collections.<TopicPartition>emptySet()) :
assignment;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
-
MemberDescription that = (MemberDescription) o;
-
- if (memberId != null ? !memberId.equals(that.memberId) : that.memberId
!= null) return false;
- if (clientId != null ? !clientId.equals(that.clientId) : that.clientId
!= null) return false;
- return assignment != null ? assignment.equals(that.assignment) :
that.assignment == null;
+ return memberId.equals(that.memberId) &&
+ clientId.equals(that.clientId) &&
+ host.equals(that.host) &&
+ assignment.equals(that.assignment);
}
@Override
public int hashCode() {
- int result = memberId != null ? memberId.hashCode() : 0;
- result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
- result = 31 * result + (assignment != null ? assignment.hashCode() :
0);
- return result;
+ return Objects.hash(memberId, clientId, host, assignment);
}
/**
@@ -92,7 +85,9 @@ public class MemberDescription {
@Override
public String toString() {
- return "(memberId=" + memberId + ", clientId=" + clientId + ", host="
+ host + ", assignment=" +
- assignment + ")";
+ return "(memberId=" + memberId +
+ ", clientId=" + clientId +
+ ", host=" + host +
+ ", assignment=" + assignment + ")";
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
new file mode 100644
index 0000000..7f3d4f0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import java.util.HashMap;
+
+/**
+ * The consumer group state.
+ */
+public enum ConsumerGroupState {
+ UNKNOWN("Unknown"),
+ PREPARING_REBALANCE("PreparingRebalance"),
+ COMPLETING_REBALANCE("CompletingRebalance"),
+ STABLE("Stable"),
+ DEAD("Dead"),
+ EMPTY("Empty");
+
+ private final static HashMap<String, ConsumerGroupState> NAME_TO_ENUM;
+
+ static {
+ NAME_TO_ENUM = new HashMap<>();
+ for (ConsumerGroupState state : ConsumerGroupState.values()) {
+ NAME_TO_ENUM.put(state.name, state);
+ }
+ }
+
+ private final String name;
+
+ ConsumerGroupState(String name) {
+ this.name = name;
+ }
+
+
+ /**
+ * Parse a string into a consumer group state.
+ */
+ public static ConsumerGroupState parse(String name) {
+ ConsumerGroupState state = NAME_TO_ENUM.get(name);
+ return state == null ? UNKNOWN : state;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
index 1ff30f1..a4d509d 100644
---
a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
@@ -17,15 +17,7 @@
package org.apache.kafka.common.errors;
public class GroupIdNotFoundException extends ApiException {
- private final String groupId;
-
- public GroupIdNotFoundException(String groupId) {
- super("The group id " + groupId + " was not found");
- this.groupId = groupId;
- }
-
- public String groupId() {
- return groupId;
+ public GroupIdNotFoundException(String message) {
+ super(message);
}
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
index 264e613..e15b3e6 100644
---
a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
+++
b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
@@ -17,15 +17,7 @@
package org.apache.kafka.common.errors;
public class GroupNotEmptyException extends ApiException {
- private final String groupId;
-
- public GroupNotEmptyException(String groupId) {
- super("The group " + groupId + " is not empty");
- this.groupId = groupId;
- }
-
- public String groupId() {
- return groupId;
+ public GroupNotEmptyException(String message) {
+ super(message);
}
-
}
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index d7c4e43..3c045c6 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -157,7 +157,8 @@ public class StreamsResetter {
final AdminClient adminClient)
throws ExecutionException, InterruptedException {
final DescribeConsumerGroupsResult describeResult =
adminClient.describeConsumerGroups(Arrays.asList(groupId),
(new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
- final List<MemberDescription> members =
describeResult.describedGroups().get(groupId).get().members();
+ final List<MemberDescription> members =
+ new
ArrayList<MemberDescription>(describeResult.describedGroups().get(groupId).get().members());
if (!members.isEmpty()) {
throw new IllegalStateException("Consumer group '" + groupId + "'
is still active "
+ "and has following members: " + members + ". "
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index b31c09d..e7dd108 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -31,10 +31,10 @@ import org.apache.kafka.clients.admin._
import kafka.utils.{Logging, TestUtils}
import kafka.utils.Implicits._
import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{KafkaFuture, TopicPartition,
TopicPartitionReplica}
+import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture,
TopicPartition, TopicPartitionReplica}
import org.apache.kafka.common.acl._
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors._
@@ -49,6 +49,7 @@ import scala.collection.JavaConverters._
import java.lang.{Long => JLong}
import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.internals.Topic
import org.scalatest.Assertions.intercept
import scala.concurrent.duration.Duration
@@ -98,6 +99,7 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
config.setProperty(KafkaConfig.InterBrokerListenerNameProp,
listenerName.value)
config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp,
s"${listenerName.value}:${securityProtocol.name}")
config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+ config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
// We set this in order to test that we don't expose sensitive data via
describe configs. This will already be
// set for subclasses with security enabled and we don't want to
overwrite it.
if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
@@ -959,6 +961,120 @@ class AdminClientIntegrationTest extends
IntegrationTestHarness with Logging {
client.close()
assertEquals(1, factory.failuresInjected)
}
+
+ /**
+ * Test the consumer group APIs.
+ */
+ @Test
+ def testConsumerGroups(): Unit = {
+ val config = createConfig()
+ val client = AdminClient.create(config)
+ try {
+ // Verify that initially there are no consumer groups to list.
+ val list1 = client.listConsumerGroups()
+ assertTrue(0 == list1.all().get().size())
+ assertTrue(0 == list1.errors().get().size())
+ assertTrue(0 == list1.valid().get().size())
+ val testTopicName = "test_topic"
+ val testNumPartitions = 2
+ client.createTopics(Collections.singleton(
+ new NewTopic(testTopicName, testNumPartitions, 1))).all().get()
+ val producer = createNewProducer
+ try {
+ producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+ } finally {
+ Utils.closeQuietly(producer, "producer")
+ }
+ val testGroupId = "test_group_id"
+ val testClientId = "test_client_id"
+ val fakeGroupId = "fake_group_id"
+ val newConsumerConfig = new Properties(consumerConfig)
+ newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
testGroupId)
+ newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,
testClientId)
+ val consumer = TestUtils.createNewConsumer(brokerList,
+ securityProtocol = this.securityProtocol,
+ trustStoreFile = this.trustStoreFile,
+ saslProperties = this.clientSaslProperties,
+ props = Some(newConsumerConfig))
+ try {
+ // Start a consumer in a thread that will subscribe to a new group.
+ val consumerThread = new Thread {
+ override def run {
+ consumer.subscribe(Collections.singleton(testTopicName))
+ while (true) {
+ consumer.poll(5000)
+ consumer.commitSync()
+ }
+ }
+ }
+ try {
+ consumerThread.start
+ // Test that we can list the new group.
+ TestUtils.waitUntilTrue(() => {
+ val matching = client.listConsumerGroups().all().get().asScala.
+ filter(listing => listing.groupId().equals(testGroupId))
+ !matching.isEmpty
+ }, s"Expected to be able to list $testGroupId")
+
+ val result = client.describeConsumerGroups(Seq(testGroupId,
fakeGroupId).asJava)
+ assertEquals(2, result.describedGroups().size())
+
+ // Test that we can get information about the test consumer group.
+ assertTrue(result.describedGroups().containsKey(testGroupId))
+ val testGroupDescription =
result.describedGroups().get(testGroupId).get()
+ assertEquals(testGroupId, testGroupDescription.groupId())
+ assertFalse(testGroupDescription.isSimpleConsumerGroup())
+ assertEquals(1, testGroupDescription.members().size())
+ val member = testGroupDescription.members().iterator().next()
+ assertEquals(testClientId, member.clientId())
+ val topicPartitions = member.assignment().topicPartitions()
+ assertEquals(testNumPartitions, topicPartitions.size())
+ assertEquals(testNumPartitions, topicPartitions.asScala.
+ count(tp => tp.topic().equals(testTopicName)))
+
+ // Test that the fake group is listed as dead.
+ assertTrue(result.describedGroups().containsKey(fakeGroupId))
+ val fakeGroupDescription =
result.describedGroups().get(fakeGroupId).get()
+ assertEquals(fakeGroupId, fakeGroupDescription.groupId())
+ assertEquals(0, fakeGroupDescription.members().size())
+ assertEquals("", fakeGroupDescription.partitionAssignor())
+ assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
+
+ // Test that all() returns 2 results
+ assertEquals(2, result.all().get().size())
+
+ // Test listConsumerGroupOffsets
+ val parts =
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+ TestUtils.waitUntilTrue(() => {
+ val parts =
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+ val part = new TopicPartition(testTopicName, 0)
+ parts.containsKey(part) && (parts.get(part).offset() == 1)
+ }, s"Expected the offset for partition 0 to eventually become 1.")
+
+ // Test consumer group deletion
+ val deleteResult = client.deleteConsumerGroups(Seq(testGroupId,
fakeGroupId).asJava)
+ assertEquals(2, deleteResult.deletedGroups().size())
+
+ // Deleting the fake group ID should get GroupIdNotFoundException.
+ assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
+
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
+ classOf[GroupIdNotFoundException])
+
+ // Deleting the real group ID should get GroupNotEmptyException
+ assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
+
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
+ classOf[GroupNotEmptyException])
+ } finally {
+ consumerThread.interrupt()
+ consumerThread.join()
+ }
+ } finally {
+ Utils.closeQuietly(consumer, "consumer")
+ }
+ } finally {
+ Utils.closeQuietly(client, "adminClient")
+ }
+ }
}
object AdminClientIntegrationTest {
--
To stop receiving notification emails like this one, please contact
[email protected].