[ 
https://issues.apache.org/jira/browse/KAFKA-6868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16482631#comment-16482631
 ] 

ASF GitHub Bot commented on KAFKA-6868:
---------------------------------------

hachikuji closed pull request #4980: KAFKA-6868: Fix buffer underflow and 
expose group state in the consumer groups API
URL: https://github.com/apache/kafka/pull/4980
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
index 0bfa8a782d5..bc3857d7aea 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -17,55 +17,56 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.ConsumerGroupState;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.List;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Objects;
 
 /**
  * A detailed description of a single consumer group in the cluster.
  */
 public class ConsumerGroupDescription {
-
     private final String groupId;
     private final boolean isSimpleConsumerGroup;
-    private final List<MemberDescription> members;
+    private final Collection<MemberDescription> members;
     private final String partitionAssignor;
+    private final ConsumerGroupState state;
+    private final Node coordinator;
 
-    /**
-     * Creates an instance with the specified parameters.
-     *
-     * @param groupId               The consumer group id
-     * @param isSimpleConsumerGroup If Consumer Group is simple
-     * @param members               The consumer group members
-     * @param partitionAssignor     The consumer group partition assignor
-     */
-    public ConsumerGroupDescription(String groupId, boolean 
isSimpleConsumerGroup, List<MemberDescription> members, String 
partitionAssignor) {
-        this.groupId = groupId;
+    ConsumerGroupDescription(String groupId,
+            boolean isSimpleConsumerGroup,
+            Collection<MemberDescription> members,
+            String partitionAssignor,
+            ConsumerGroupState state,
+            Node coordinator) {
+        this.groupId = groupId == null ? "" : groupId;
         this.isSimpleConsumerGroup = isSimpleConsumerGroup;
-        this.members = members;
-        this.partitionAssignor = partitionAssignor;
+        this.members = members == null ? 
Collections.<MemberDescription>emptyList() :
+            Collections.unmodifiableList(new ArrayList<>(members));
+        this.partitionAssignor = partitionAssignor == null ? "" : 
partitionAssignor;
+        this.state = state;
+        this.coordinator = coordinator;
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-
         ConsumerGroupDescription that = (ConsumerGroupDescription) o;
-
-        if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false;
-        if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != 
null) return false;
-        if (members != null ? !members.equals(that.members) : that.members != 
null) return false;
-        return partitionAssignor != null ? 
partitionAssignor.equals(that.partitionAssignor) : that.partitionAssignor == 
null;
+        return isSimpleConsumerGroup == that.isSimpleConsumerGroup &&
+            groupId.equals(that.groupId) &&
+            members.equals(that.members) &&
+            partitionAssignor.equals(that.partitionAssignor) &&
+            state.equals(that.state);
     }
 
     @Override
     public int hashCode() {
-        int result = groupId != null ? groupId.hashCode() : 0;
-        result = 31 * result + (isSimpleConsumerGroup ? 1 : 0);
-        result = 31 * result + (members != null ? members.hashCode() : 0);
-        result = 31 * result + (partitionAssignor != null ? 
partitionAssignor.hashCode() : 0);
-        return result;
+        return Objects.hash(isSimpleConsumerGroup, groupId, members, 
partitionAssignor, state);
     }
 
     /**
@@ -85,7 +86,7 @@ public boolean isSimpleConsumerGroup() {
     /**
      * A list of the members of the consumer group.
      */
-    public List<MemberDescription> members() {
+    public Collection<MemberDescription> members() {
         return members;
     }
 
@@ -96,9 +97,28 @@ public String partitionAssignor() {
         return partitionAssignor;
     }
 
+    /**
+     * The consumer group state, or UNKNOWN if the state is too new for us to 
parse.
+     */
+    public ConsumerGroupState state() {
+        return state;
+    }
+
+    /**
+     * The consumer group coordinator, or null if the coordinator is not known.
+     */
+    public Node coordinator() {
+        return coordinator;
+    }
+
     @Override
     public String toString() {
-        return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + 
isSimpleConsumerGroup + ", members=" +
-            Utils.join(members, ",") + ", partitionAssignor=" + 
partitionAssignor + ")";
+        return "(groupId=" + groupId +
+            ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
+            ", members=" + Utils.join(members, ",") +
+            ", partitionAssignor=" + partitionAssignor +
+            ", state=" + state +
+            ", coordinator=" + coordinator +
+            ")";
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
index ac2189cc6dc..8f0ebad4f7a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -21,7 +21,9 @@
 import org.apache.kafka.common.annotation.InterfaceStability;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
 
 
 /**
@@ -39,16 +41,32 @@ public DescribeConsumerGroupsResult(final Map<String, 
KafkaFuture<ConsumerGroupD
     }
 
     /**
-     * Return a map from group id to futures which can be used to check the 
description of a consumer group.
+     * Return a map from group id to futures which yield group descriptions.
      */
     public Map<String, KafkaFuture<ConsumerGroupDescription>> 
describedGroups() {
         return futures;
     }
 
     /**
-     * Return a future which succeeds only if all the consumer group 
description succeed.
+     * Return a future which yields all ConsumerGroupDescription objects, if 
all the describes succeed.
      */
-    public KafkaFuture<Void> all() {
-        return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0]));
+    public KafkaFuture<Map<String, ConsumerGroupDescription>> all() {
+        return KafkaFuture.allOf(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+            new KafkaFuture.BaseFunction<Void, Map<String, 
ConsumerGroupDescription>>() {
+                @Override
+                public Map<String, ConsumerGroupDescription> apply(Void v) {
+                    try {
+                        Map<String, ConsumerGroupDescription> descriptions = 
new HashMap<>(futures.size());
+                        for (Map.Entry<String, 
KafkaFuture<ConsumerGroupDescription>> entry : futures.entrySet()) {
+                            descriptions.put(entry.getKey(), 
entry.getValue().get());
+                        }
+                        return descriptions;
+                    } catch (InterruptedException | ExecutionException e) {
+                        // This should be unreachable, since the 
KafkaFuture#allOf already ensured
+                        // that all of the futures completed successfully.
+                        throw new RuntimeException(e);
+                    }
+                }
+            });
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index c9e0e186316..5f4eefe8279 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -31,6 +31,7 @@
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.ConsumerGroupState;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
@@ -120,11 +121,9 @@
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -2347,14 +2346,21 @@ public DescribeConsumerGroupsResult 
describeConsumerGroups(final Collection<Stri
 
                 @Override
                 void handleResponse(AbstractResponse abstractResponse) {
-                    final FindCoordinatorResponse response = 
(FindCoordinatorResponse) abstractResponse;
+                    final FindCoordinatorResponse fcResponse = 
(FindCoordinatorResponse) abstractResponse;
+                    Errors error = fcResponse.error();
+                    if (error == Errors.COORDINATOR_NOT_AVAILABLE) {
+                        // Retry COORDINATOR_NOT_AVAILABLE, in case the error 
is temporary.
+                        throw error.exception();
+                    } else if (error != Errors.NONE) {
+                        // All other errors are immediate failures.
+                        KafkaFutureImpl<ConsumerGroupDescription> future = 
futures.get(groupId);
+                        future.completeExceptionally(error.exception());
+                        return;
+                    }
 
                     final long nowDescribeConsumerGroups = time.milliseconds();
-
-                    final int nodeId = response.node().id();
-
+                    final int nodeId = fcResponse.node().id();
                     runnable.call(new Call("describeConsumerGroups", deadline, 
new ConstantNodeIdProvider(nodeId)) {
-
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
                             return new 
DescribeGroupsRequest.Builder(Collections.singletonList(groupId));
@@ -2375,24 +2381,29 @@ void handleResponse(AbstractResponse abstractResponse) {
                                 final String protocolType = 
groupMetadata.protocolType();
                                 if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
                                     final 
List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
-                                    final List<MemberDescription> consumers = 
new ArrayList<>(members.size());
+                                    final List<MemberDescription> 
memberDescriptions = new ArrayList<>(members.size());
 
                                     for (DescribeGroupsResponse.GroupMember 
groupMember : members) {
-                                        final PartitionAssignor.Assignment 
assignment =
-                                                
ConsumerProtocol.deserializeAssignment(
-                                                        
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
-
+                                        Set<TopicPartition> partitions = 
Collections.emptySet();
+                                        if 
(groupMember.memberAssignment().remaining() > 0) {
+                                            final PartitionAssignor.Assignment 
assignment = ConsumerProtocol.
+                                                
deserializeAssignment(groupMember.memberAssignment().duplicate());
+                                            partitions = new 
HashSet<>(assignment.partitions());
+                                        }
                                         final MemberDescription 
memberDescription =
-                                                new MemberDescription(
-                                                        groupMember.memberId(),
-                                                        groupMember.clientId(),
-                                                        
groupMember.clientHost(),
-                                                        new 
MemberAssignment(assignment.partitions()));
-                                        consumers.add(memberDescription);
+                                            new 
MemberDescription(groupMember.memberId(),
+                                                groupMember.clientId(),
+                                                groupMember.clientHost(),
+                                                new 
MemberAssignment(partitions));
+                                        
memberDescriptions.add(memberDescription);
                                     }
-                                    final String protocol = 
groupMetadata.protocol();
                                     final ConsumerGroupDescription 
consumerGroupDescription =
-                                            new 
ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol);
+                                            new 
ConsumerGroupDescription(groupId,
+                                                protocolType.isEmpty(),
+                                                memberDescriptions,
+                                                groupMetadata.protocol(),
+                                                
ConsumerGroupState.parse(groupMetadata.state()),
+                                                fcResponse.node());
                                     future.complete(consumerGroupDescription);
                                 }
                             }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
index bd958132b7c..6c180ad574d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
@@ -19,21 +19,24 @@
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
 
-import java.util.List;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 
 /**
  * A description of the assignments of a specific group member.
  */
 public class MemberAssignment {
-    private final List<TopicPartition> topicPartitions;
+    private final Set<TopicPartition> topicPartitions;
 
     /**
      * Creates an instance with the specified parameters.
      *
      * @param topicPartitions List of topic partitions
      */
-    public MemberAssignment(List<TopicPartition> topicPartitions) {
-        this.topicPartitions = topicPartitions;
+    MemberAssignment(Set<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions == null ? 
Collections.<TopicPartition>emptySet() :
+            Collections.unmodifiableSet(new HashSet<>(topicPartitions));
     }
 
     @Override
@@ -54,7 +57,7 @@ public int hashCode() {
     /**
      * The topic partitions assigned to a group member.
      */
-    public List<TopicPartition> topicPartitions() {
+    public Set<TopicPartition> topicPartitions() {
         return topicPartitions;
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
index 2ba19634208..895abadfdb8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -17,49 +17,42 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.Objects;
+
 /**
  * A detailed description of a single group instance in the cluster.
  */
 public class MemberDescription {
-
     private final String memberId;
     private final String clientId;
     private final String host;
     private final MemberAssignment assignment;
 
-    /**
-     * Creates an instance with the specified parameters.
-     *
-     * @param memberId The consumer id
-     * @param clientId   The client id
-     * @param host       The host
-     * @param assignment The assignment
-     */
-    public MemberDescription(String memberId, String clientId, String host, 
MemberAssignment assignment) {
-        this.memberId = memberId;
-        this.clientId = clientId;
-        this.host = host;
-        this.assignment = assignment;
+    MemberDescription(String memberId, String clientId, String host, 
MemberAssignment assignment) {
+        this.memberId = memberId == null ? "" : memberId;
+        this.clientId = clientId == null ? "" : clientId;
+        this.host = host == null ? "" : host;
+        this.assignment = assignment == null ?
+            new MemberAssignment(Collections.<TopicPartition>emptySet()) : 
assignment;
     }
 
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
-
         MemberDescription that = (MemberDescription) o;
-
-        if (memberId != null ? !memberId.equals(that.memberId) : that.memberId 
!= null) return false;
-        if (clientId != null ? !clientId.equals(that.clientId) : that.clientId 
!= null) return false;
-        return assignment != null ? assignment.equals(that.assignment) : 
that.assignment == null;
+        return memberId.equals(that.memberId) &&
+            clientId.equals(that.clientId) &&
+            host.equals(that.host) &&
+            assignment.equals(that.assignment);
     }
 
     @Override
     public int hashCode() {
-        int result = memberId != null ? memberId.hashCode() : 0;
-        result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
-        result = 31 * result + (assignment != null ? assignment.hashCode() : 
0);
-        return result;
+        return Objects.hash(memberId, clientId, host, assignment);
     }
 
     /**
@@ -92,7 +85,9 @@ public MemberAssignment assignment() {
 
     @Override
     public String toString() {
-        return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" 
+ host + ", assignment=" +
-            assignment + ")";
+        return "(memberId=" + memberId +
+            ", clientId=" + clientId +
+            ", host=" + host +
+            ", assignment=" + assignment + ")";
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java 
b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
new file mode 100644
index 00000000000..7f3d4f0883b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/ConsumerGroupState.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+import java.util.HashMap;
+
+/**
+ * The consumer group state.
+ */
+public enum ConsumerGroupState {
+    UNKNOWN("Unknown"),
+    PREPARING_REBALANCE("PreparingRebalance"),
+    COMPLETING_REBALANCE("CompletingRebalance"),
+    STABLE("Stable"),
+    DEAD("Dead"),
+    EMPTY("Empty");
+
+    private final static HashMap<String, ConsumerGroupState> NAME_TO_ENUM;
+
+    static {
+        NAME_TO_ENUM = new HashMap<>();
+        for (ConsumerGroupState state : ConsumerGroupState.values()) {
+            NAME_TO_ENUM.put(state.name, state);
+        }
+    }
+
+    private final String name;
+
+    ConsumerGroupState(String name) {
+        this.name = name;
+    }
+
+
+    /**
+     * Parse a string into a consumer group state.
+     */
+    public static ConsumerGroupState parse(String name) {
+        ConsumerGroupState state = NAME_TO_ENUM.get(name);
+        return state == null ? UNKNOWN : state;
+    }
+
+    @Override
+    public String toString() {
+        return name;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
index 1ff30f14fc0..a4d509d3a20 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupIdNotFoundException.java
@@ -17,15 +17,7 @@
 package org.apache.kafka.common.errors;
 
 public class GroupIdNotFoundException extends ApiException {
-    private final String groupId;
-
-    public GroupIdNotFoundException(String groupId) {
-        super("The group id " + groupId + " was not found");
-        this.groupId = groupId;
-    }
-
-    public String groupId() {
-        return groupId;
+    public GroupIdNotFoundException(String message) {
+        super(message);
     }
-
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
index 264e613719c..e15b3e6d57f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/GroupNotEmptyException.java
@@ -17,15 +17,7 @@
 package org.apache.kafka.common.errors;
 
 public class GroupNotEmptyException extends ApiException {
-    private final String groupId;
-
-    public GroupNotEmptyException(String groupId) {
-        super("The group " + groupId + " is not empty");
-        this.groupId = groupId;
-    }
-
-    public String groupId() {
-        return groupId;
+    public GroupNotEmptyException(String message) {
+        super(message);
     }
-
 }
diff --git a/core/src/main/scala/kafka/tools/StreamsResetter.java 
b/core/src/main/scala/kafka/tools/StreamsResetter.java
index d7c4e435e58..3c045c69eb2 100644
--- a/core/src/main/scala/kafka/tools/StreamsResetter.java
+++ b/core/src/main/scala/kafka/tools/StreamsResetter.java
@@ -157,7 +157,8 @@ private void validateNoActiveConsumers(final String groupId,
                                            final AdminClient adminClient) 
throws ExecutionException, InterruptedException {
         final DescribeConsumerGroupsResult describeResult = 
adminClient.describeConsumerGroups(Arrays.asList(groupId),
                 (new DescribeConsumerGroupsOptions()).timeoutMs(10 * 1000));
-        final List<MemberDescription> members = 
describeResult.describedGroups().get(groupId).get().members();
+        final List<MemberDescription> members =
+            new 
ArrayList<MemberDescription>(describeResult.describedGroups().get(groupId).get().members());
         if (!members.isEmpty()) {
             throw new IllegalStateException("Consumer group '" + groupId + "' 
is still active "
                     + "and has following members: " + members + ". "
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index b31c09d78d3..e7dd1084040 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -31,10 +31,10 @@ import org.apache.kafka.clients.admin._
 import kafka.utils.{Logging, TestUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.admin.NewTopic
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{KafkaFuture, TopicPartition, 
TopicPartitionReplica}
+import org.apache.kafka.common.{ConsumerGroupState, KafkaFuture, 
TopicPartition, TopicPartitionReplica}
 import org.apache.kafka.common.acl._
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.errors._
@@ -49,6 +49,7 @@ import scala.collection.JavaConverters._
 import java.lang.{Long => JLong}
 
 import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.internals.Topic
 import org.scalatest.Assertions.intercept
 
 import scala.concurrent.duration.Duration
@@ -98,6 +99,7 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
       config.setProperty(KafkaConfig.InterBrokerListenerNameProp, 
listenerName.value)
       config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, 
s"${listenerName.value}:${securityProtocol.name}")
       config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true")
+      config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
       // We set this in order to test that we don't expose sensitive data via 
describe configs. This will already be
       // set for subclasses with security enabled and we don't want to 
overwrite it.
       if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp))
@@ -959,6 +961,120 @@ class AdminClientIntegrationTest extends 
IntegrationTestHarness with Logging {
     client.close()
     assertEquals(1, factory.failuresInjected)
   }
+
+  /**
+    * Test the consumer group APIs.
+    */
+  @Test
+  def testConsumerGroups(): Unit = {
+    val config = createConfig()
+    val client = AdminClient.create(config)
+    try {
+      // Verify that initially there are no consumer groups to list.
+      val list1 = client.listConsumerGroups()
+      assertTrue(0 == list1.all().get().size())
+      assertTrue(0 == list1.errors().get().size())
+      assertTrue(0 == list1.valid().get().size())
+      val testTopicName = "test_topic"
+      val testNumPartitions = 2
+      client.createTopics(Collections.singleton(
+        new NewTopic(testTopicName, testNumPartitions, 1))).all().get()
+      val producer = createNewProducer
+      try {
+        producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
+      } finally {
+        Utils.closeQuietly(producer, "producer")
+      }
+      val testGroupId = "test_group_id"
+      val testClientId = "test_client_id"
+      val fakeGroupId = "fake_group_id"
+      val newConsumerConfig = new Properties(consumerConfig)
+      newConsumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, 
testGroupId)
+      newConsumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
testClientId)
+      val consumer = TestUtils.createNewConsumer(brokerList,
+        securityProtocol = this.securityProtocol,
+        trustStoreFile = this.trustStoreFile,
+        saslProperties = this.clientSaslProperties,
+        props = Some(newConsumerConfig))
+      try {
+        // Start a consumer in a thread that will subscribe to a new group.
+        val consumerThread = new Thread {
+          override def run {
+            consumer.subscribe(Collections.singleton(testTopicName))
+            while (true) {
+              consumer.poll(5000)
+              consumer.commitSync()
+            }
+          }
+        }
+        try {
+          consumerThread.start
+          // Test that we can list the new group.
+          TestUtils.waitUntilTrue(() => {
+            val matching = client.listConsumerGroups().all().get().asScala.
+              filter(listing => listing.groupId().equals(testGroupId))
+            !matching.isEmpty
+          }, s"Expected to be able to list $testGroupId")
+
+          val result = client.describeConsumerGroups(Seq(testGroupId, 
fakeGroupId).asJava)
+          assertEquals(2, result.describedGroups().size())
+
+          // Test that we can get information about the test consumer group.
+          assertTrue(result.describedGroups().containsKey(testGroupId))
+          val testGroupDescription = 
result.describedGroups().get(testGroupId).get()
+          assertEquals(testGroupId, testGroupDescription.groupId())
+          assertFalse(testGroupDescription.isSimpleConsumerGroup())
+          assertEquals(1, testGroupDescription.members().size())
+          val member = testGroupDescription.members().iterator().next()
+          assertEquals(testClientId, member.clientId())
+          val topicPartitions = member.assignment().topicPartitions()
+          assertEquals(testNumPartitions, topicPartitions.size())
+          assertEquals(testNumPartitions, topicPartitions.asScala.
+            count(tp => tp.topic().equals(testTopicName)))
+
+          // Test that the fake group is listed as dead.
+          assertTrue(result.describedGroups().containsKey(fakeGroupId))
+          val fakeGroupDescription = 
result.describedGroups().get(fakeGroupId).get()
+          assertEquals(fakeGroupId, fakeGroupDescription.groupId())
+          assertEquals(0, fakeGroupDescription.members().size())
+          assertEquals("", fakeGroupDescription.partitionAssignor())
+          assertEquals(ConsumerGroupState.DEAD, fakeGroupDescription.state())
+
+          // Test that all() returns 2 results
+          assertEquals(2, result.all().get().size())
+
+          // Test listConsumerGroupOffsets
+          val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+          TestUtils.waitUntilTrue(() => {
+            val parts = 
client.listConsumerGroupOffsets(testGroupId).partitionsToOffsetAndMetadata().get()
+            val part = new TopicPartition(testTopicName, 0)
+            parts.containsKey(part) && (parts.get(part).offset() == 1)
+          }, s"Expected the offset for partition 0 to eventually become 1.")
+
+          // Test consumer group deletion
+          val deleteResult = client.deleteConsumerGroups(Seq(testGroupId, 
fakeGroupId).asJava)
+          assertEquals(2, deleteResult.deletedGroups().size())
+
+          // Deleting the fake group ID should get GroupIdNotFoundException.
+          assertTrue(deleteResult.deletedGroups().containsKey(fakeGroupId))
+          
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(fakeGroupId),
+            classOf[GroupIdNotFoundException])
+
+          // Deleting the real group ID should get GroupNotEmptyException
+          assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
+          
assertFutureExceptionTypeEquals(deleteResult.deletedGroups().get(testGroupId),
+            classOf[GroupNotEmptyException])
+        } finally {
+          consumerThread.interrupt()
+          consumerThread.join()
+        }
+      } finally {
+        Utils.closeQuietly(consumer, "consumer")
+      }
+    } finally {
+      Utils.closeQuietly(client, "adminClient")
+    }
+  }
 }
 
 object AdminClientIntegrationTest {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> BufferUnderflowException in client when querying consumer group information
> ---------------------------------------------------------------------------
>
>                 Key: KAFKA-6868
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6868
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 2.0.0
>            Reporter: Xavier Léauté
>            Assignee: Colin P. McCabe
>            Priority: Blocker
>             Fix For: 2.0.0
>
>
> Exceptions get thrown when describing consumer group or querying group 
> offsets from a 1.0 cluster
> Stacktrace is a result of calling 
> {{AdminClient.describeConsumerGroups(Collection<String> 
> groupIds).describedGroups().entrySet()}} followed by 
> {{KafkaFuture<ConsumerGroupDescription>.whenComplete()}}
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.protocol.types.SchemaException: Error reading field 
> 'version': java.nio.BufferUnderflowException
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:104)
>       at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:274)
>       at
> [snip]
> Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error 
> reading field 'version': java.nio.BufferUnderflowException
>       at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:76)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerProtocol.deserializeAssignment(ConsumerProtocol.java:105)
>       at 
> org.apache.kafka.clients.admin.KafkaAdminClient$21$1.handleResponse(KafkaAdminClient.java:2307)
>       at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.handleResponses(KafkaAdminClient.java:960)
>       at 
> org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1045)
>       ... 1 more
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to