This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6a99da8  KAFKA-6058: KIP-222; Add Consumer Group operations to Admin 
API
6a99da8 is described below

commit 6a99da87abff26a749cee4e765d125bec8b6c424
Author: Jorge Quilcate Otoya <quilcate.jo...@gmail.com>
AuthorDate: Wed Apr 11 14:17:46 2018 -0700

    KAFKA-6058: KIP-222; Add Consumer Group operations to Admin API
    
    KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-222+-+Add+Consumer+Group+operations+to+Admin+API
    
    Author: Jorge Quilcate Otoya <quilcate.jo...@gmail.com>
    Author: Jorge Esteban Quilcate Otoya <quilcate.jo...@gmail.com>
    Author: Guozhang Wang <wangg...@gmail.com>
    
    Reviewers: Colin Patrick McCabe <co...@cmccabe.xyz>, Guozhang Wang 
<wangg...@gmail.com>
    
    Closes #4454 from jeqo/feature/admin-client-describe-consumer-group
---
 checkstyle/checkstyle.xml                          |   4 +-
 checkstyle/import-control.xml                      |   2 +
 .../apache/kafka/clients/admin/AdminClient.java    |  79 +++++
 .../clients/admin/ConsumerGroupDescription.java    | 104 +++++++
 .../kafka/clients/admin/ConsumerGroupListing.java  |  59 ++++
 .../clients/admin/DeleteConsumerGroupsOptions.java |  31 ++
 .../clients/admin/DeleteConsumerGroupsResult.java  |  41 +++
 .../admin/DescribeConsumerGroupsOptions.java       |  31 ++
 .../admin/DescribeConsumerGroupsResult.java        |  47 +++
 .../kafka/clients/admin/KafkaAdminClient.java      | 336 +++++++++++++++++++++
 .../admin/ListConsumerGroupOffsetsOptions.java     |  53 ++++
 .../admin/ListConsumerGroupOffsetsResult.java      |  48 +++
 .../clients/admin/ListConsumerGroupsOptions.java   |  29 ++
 .../clients/admin/ListConsumerGroupsResult.java    |  44 +++
 .../kafka/clients/admin/MemberAssignment.java      |  65 ++++
 .../kafka/clients/admin/MemberDescription.java     |  98 ++++++
 .../java/org/apache/kafka/common/KafkaFuture.java  |   9 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 192 ++++++++++++
 .../kafka/clients/admin/MockAdminClient.java       |  20 ++
 19 files changed, 1290 insertions(+), 2 deletions(-)

diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index ccab85c..ad85450 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -105,7 +105,7 @@
     </module>
     <module name="ClassDataAbstractionCoupling">
       <!-- default is 7 -->
-      <property name="max" value="20"/>
+      <property name="max" value="25"/>
     </module>
     <module name="BooleanExpressionComplexity">
       <!-- default is 3 -->
@@ -114,7 +114,7 @@
 
     <module name="ClassFanOutComplexity">
       <!-- default is 20 -->
-      <property name="max" value="40"/>
+      <property name="max" value="50"/>
     </module>
     <module name="CyclomaticComplexity">
       <!-- default is 10-->
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 000acc3..192d735 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -164,6 +164,8 @@
 
     <subpackage name="admin">
       <allow pkg="org.apache.kafka.clients.admin" />
+      <allow pkg="org.apache.kafka.clients.consumer.internals" />
+      <allow pkg="org.apache.kafka.clients.consumer" />
     </subpackage>
   </subpackage>
 
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
index 53b77ce..0171b61 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java
@@ -689,4 +689,83 @@ public abstract class AdminClient implements AutoCloseable 
{
      */
     public abstract DescribeDelegationTokenResult 
describeDelegationToken(DescribeDelegationTokenOptions options);
 
+    /**
+     * Describe some group IDs in the cluster.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @param options  The options to use when describing the groups.
+     * @return The DescribeConsumerGroupResult.
+     */
+    public abstract DescribeConsumerGroupsResult 
describeConsumerGroups(Collection<String> groupIds,
+                                                                        
DescribeConsumerGroupsOptions options);
+
+    /**
+     * Describe some group IDs in the cluster, with the default options.
+     * <p>
+     * This is a convenience method for
+     * #{@link AdminClient#describeConsumerGroups(Collection, 
DescribeConsumerGroupsOptions)} with
+     * default options. See the overload for more details.
+     *
+     * @param groupIds The IDs of the groups to describe.
+     * @return The DescribeConsumerGroupResult.
+     */
+    public DescribeConsumerGroupsResult 
describeConsumerGroups(Collection<String> groupIds) {
+        return describeConsumerGroups(groupIds, new 
DescribeConsumerGroupsOptions());
+    }
+
+    /**
+     * List the consumer groups available in the cluster.
+     *
+     * @param options           The options to use when listing the consumer 
groups.
+     * @return The ListGroupsResult.
+     */
+    public abstract ListConsumerGroupsResult 
listConsumerGroups(ListConsumerGroupsOptions options);
+
+    /**
+     * List the consumer groups available in the cluster with the default 
options.
+     *
+     * This is a convenience method for #{@link 
AdminClient#listConsumerGroups(ListConsumerGroupsOptions)} with default options.
+     * See the overload for more details.
+     *
+     * @return The ListGroupsResult.
+     */
+    public ListConsumerGroupsResult listConsumerGroups() {
+        return listConsumerGroups(new ListConsumerGroupsOptions());
+    }
+
+    /**
+     * List the consumer group offsets available in the cluster.
+     *
+     * @param options           The options to use when listing the consumer 
group offsets.
+     * @return The ListGroupOffsetsResult
+     */
+    public abstract ListConsumerGroupOffsetsResult 
listConsumerGroupOffsets(String groupId, ListConsumerGroupOffsetsOptions 
options);
+
+    /**
+     * List the consumer group offsets available in the cluster with the 
default options.
+     *
+     * This is a convenience method for #{@link 
AdminClient#listConsumerGroupOffsets(String, ListConsumerGroupOffsetsOptions)} 
with default options.
+     *
+     * @return The ListGroupOffsetsResult.
+     */
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId) {
+        return listConsumerGroupOffsets(groupId, new 
ListConsumerGroupOffsetsOptions());
+    }
+
+    /**
+     * Delete consumer groups from the cluster.
+     *
+     * @param options           The options to use when deleting a consumer 
group.
+     * @return The DeletConsumerGroupResult.
+     */
+    public abstract DeleteConsumerGroupsResult 
deleteConsumerGroups(Collection<String> groupIds, DeleteConsumerGroupsOptions 
options);
+
+    /**
+     * Delete consumer groups from the cluster with the default options.
+     *
+     * @return The DeleteConsumerGroupResult.
+     */
+    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> 
groupIds) {
+        return deleteConsumerGroups(groupIds, new 
DeleteConsumerGroupsOptions());
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
new file mode 100644
index 0000000..0bfa8a7
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupDescription.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+/**
+ * A detailed description of a single consumer group in the cluster.
+ */
+public class ConsumerGroupDescription {
+
+    private final String groupId;
+    private final boolean isSimpleConsumerGroup;
+    private final List<MemberDescription> members;
+    private final String partitionAssignor;
+
+    /**
+     * Creates an instance with the specified parameters.
+     *
+     * @param groupId               The consumer group id
+     * @param isSimpleConsumerGroup If Consumer Group is simple
+     * @param members               The consumer group members
+     * @param partitionAssignor     The consumer group partition assignor
+     */
+    public ConsumerGroupDescription(String groupId, boolean 
isSimpleConsumerGroup, List<MemberDescription> members, String 
partitionAssignor) {
+        this.groupId = groupId;
+        this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+        this.members = members;
+        this.partitionAssignor = partitionAssignor;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        ConsumerGroupDescription that = (ConsumerGroupDescription) o;
+
+        if (isSimpleConsumerGroup != that.isSimpleConsumerGroup) return false;
+        if (groupId != null ? !groupId.equals(that.groupId) : that.groupId != 
null) return false;
+        if (members != null ? !members.equals(that.members) : that.members != 
null) return false;
+        return partitionAssignor != null ? 
partitionAssignor.equals(that.partitionAssignor) : that.partitionAssignor == 
null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = groupId != null ? groupId.hashCode() : 0;
+        result = 31 * result + (isSimpleConsumerGroup ? 1 : 0);
+        result = 31 * result + (members != null ? members.hashCode() : 0);
+        result = 31 * result + (partitionAssignor != null ? 
partitionAssignor.hashCode() : 0);
+        return result;
+    }
+
+    /**
+     * The id of the consumer group.
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * If consumer group is simple or not.
+     */
+    public boolean isSimpleConsumerGroup() {
+        return isSimpleConsumerGroup;
+    }
+
+    /**
+     * A list of the members of the consumer group.
+     */
+    public List<MemberDescription> members() {
+        return members;
+    }
+
+    /**
+     * The consumer group partition assignor.
+     */
+    public String partitionAssignor() {
+        return partitionAssignor;
+    }
+
+    @Override
+    public String toString() {
+        return "(groupId=" + groupId + ", isSimpleConsumerGroup=" + 
isSimpleConsumerGroup + ", members=" +
+            Utils.join(members, ",") + ", partitionAssignor=" + 
partitionAssignor + ")";
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
new file mode 100644
index 0000000..46da962
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A listing of a consumer group in the cluster.
+ */
+public class ConsumerGroupListing {
+    private final String groupId;
+    private final boolean isSimpleConsumerGroup;
+
+    /**
+     * Create an instance with the specified parameters.
+     *
+     * @param groupId Group Id
+     * @param isSimpleConsumerGroup If consumer group is simple or not.
+     */
+    public ConsumerGroupListing(String groupId, boolean isSimpleConsumerGroup) 
{
+        this.groupId = groupId;
+        this.isSimpleConsumerGroup = isSimpleConsumerGroup;
+    }
+
+    /**
+     * Consumer Group Id
+     */
+    public String groupId() {
+        return groupId;
+    }
+
+    /**
+     * If Consumer Group is simple or not.
+     */
+    public boolean isSimpleConsumerGroup() {
+        return isSimpleConsumerGroup;
+    }
+
+    @Override
+    public String toString() {
+        return "(" +
+            "groupId='" + groupId + '\'' +
+            ", isSimpleConsumerGroup=" + isSimpleConsumerGroup +
+            ')';
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java
new file mode 100644
index 0000000..cd505f4
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for the {@link AdminClient#deleteConsumerGroups(Collection)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DeleteConsumerGroupsOptions extends 
AbstractOptions<DeleteConsumerGroupsOptions> {
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
new file mode 100644
index 0000000..b4bce26
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteConsumerGroupsResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * The result of the {@link AdminClient#deleteConsumerGroups(Collection)} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DeleteConsumerGroupsResult {
+    final KafkaFuture<Map<String, KafkaFuture<Void>>> futures;
+
+    DeleteConsumerGroupsResult(KafkaFuture<Map<String, KafkaFuture<Void>>> 
futures) {
+        this.futures = futures;
+    }
+
+    public KafkaFuture<Map<String, KafkaFuture<Void>>> deletedGroups() {
+        return futures;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
new file mode 100644
index 0000000..7daff1a
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsOptions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * Options for {@link AdminClient#describeConsumerGroups(Collection, 
DescribeConsumerGroupsOptions)}.
+ * <p>
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeConsumerGroupsOptions extends 
AbstractOptions<DescribeConsumerGroupsOptions> {
+}
\ No newline at end of file
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
new file mode 100644
index 0000000..adde031
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConsumerGroupsResult.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+import java.util.Map;
+
+
+/**
+ * The result of the {@link 
KafkaAdminClient#describeConsumerGroups(Collection, 
DescribeConsumerGroupsOptions)}} call.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class DescribeConsumerGroupsResult {
+
+    private final KafkaFuture<Map<String, 
KafkaFuture<ConsumerGroupDescription>>> futures;
+
+    public DescribeConsumerGroupsResult(KafkaFuture<Map<String, 
KafkaFuture<ConsumerGroupDescription>>> futures) {
+        this.futures = futures;
+    }
+
+    /**
+     * Return a map from group name to futures which can be used to check the 
description of a consumer group.
+     */
+    public KafkaFuture<Map<String, KafkaFuture<ConsumerGroupDescription>>> 
describedGroups() {
+        return futures;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 3ac0e28..50bcfd3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -27,6 +27,9 @@ import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
 import 
org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult.ReplicaLogDirInfo;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.KafkaFuture;
@@ -79,6 +82,8 @@ import org.apache.kafka.common.requests.DeleteAclsRequest;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.DeleteGroupsRequest;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsRequest;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -97,6 +102,14 @@ import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
 import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ListGroupsRequest;
+import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.requests.Resource;
 import org.apache.kafka.common.requests.ResourceType;
 import org.apache.kafka.common.security.token.delegation.DelegationToken;
@@ -105,9 +118,11 @@ import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -2209,4 +2224,325 @@ public class KafkaAdminClient extends AdminClient {
 
         return new DescribeDelegationTokenResult(tokensFuture);
     }
+
+    @Override
+    public DescribeConsumerGroupsResult describeConsumerGroups(final 
Collection<String> groupIds,
+                                                               final 
DescribeConsumerGroupsOptions options) {
+        final KafkaFutureImpl<Map<String, 
KafkaFuture<ConsumerGroupDescription>>> resultFutures = new KafkaFutureImpl<>();
+        final Map<String, KafkaFutureImpl<ConsumerGroupDescription>> 
consumerGroupFutures = new HashMap<>(groupIds.size());
+        final ArrayList<String> groupIdList = new ArrayList<>();
+        for (String groupId : groupIds) {
+            if (!consumerGroupFutures.containsKey(groupId)) {
+                consumerGroupFutures.put(groupId, new 
KafkaFutureImpl<ConsumerGroupDescription>());
+                groupIdList.add(groupId);
+            }
+        }
+
+        for (final String groupId : groupIdList) {
+
+            final long nowFindCoordinator = time.milliseconds();
+            final long deadline = calcDeadlineMs(nowFindCoordinator, 
options.timeoutMs());
+
+            runnable.call(new Call("findCoordinator", deadline, new 
LeastLoadedNodeProvider()) {
+                @Override
+                AbstractRequest.Builder createRequest(int timeoutMs) {
+                    return new 
FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, 
groupId);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    final FindCoordinatorResponse response = 
(FindCoordinatorResponse) abstractResponse;
+
+                    final long nowDescribeConsumerGroups = time.milliseconds();
+
+                    final int nodeId = response.node().id();
+
+                    runnable.call(new Call("describeConsumerGroups", deadline, 
new ConstantNodeIdProvider(nodeId)) {
+
+                        @Override
+                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                            return new 
DescribeGroupsRequest.Builder(groupIdList);
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) 
{
+                            final DescribeGroupsResponse response = 
(DescribeGroupsResponse) abstractResponse;
+                            // Handle server responses for particular groupId.
+                            for (Map.Entry<String, 
KafkaFutureImpl<ConsumerGroupDescription>> entry : 
consumerGroupFutures.entrySet()) {
+                                final String groupId = entry.getKey();
+                                final 
KafkaFutureImpl<ConsumerGroupDescription> future = entry.getValue();
+                                final DescribeGroupsResponse.GroupMetadata 
groupMetadata = response.groups().get(groupId);
+                                final Errors groupError = 
groupMetadata.error();
+                                if (groupError != Errors.NONE) {
+                                    
future.completeExceptionally(groupError.exception());
+                                    continue;
+                                }
+
+                                final String protocolType = 
groupMetadata.protocolType();
+                                if 
(protocolType.equals(ConsumerProtocol.PROTOCOL_TYPE) || protocolType.isEmpty()) 
{
+                                    final 
List<DescribeGroupsResponse.GroupMember> members = groupMetadata.members();
+                                    final List<MemberDescription> consumers = 
new ArrayList<>(members.size());
+
+                                    for (DescribeGroupsResponse.GroupMember 
groupMember : members) {
+                                        final PartitionAssignor.Assignment 
assignment =
+                                                
ConsumerProtocol.deserializeAssignment(
+                                                        
ByteBuffer.wrap(Utils.readBytes(groupMember.memberAssignment())));
+
+                                        final MemberDescription 
memberDescription =
+                                                new MemberDescription(
+                                                        groupMember.memberId(),
+                                                        groupMember.clientId(),
+                                                        
groupMember.clientHost(),
+                                                        new 
MemberAssignment(assignment.partitions()));
+                                        consumers.add(memberDescription);
+                                    }
+                                    final String protocol = 
groupMetadata.protocol();
+                                    final ConsumerGroupDescription 
consumerGroupDescription =
+                                            new 
ConsumerGroupDescription(groupId, protocolType.isEmpty(), consumers, protocol);
+                                    future.complete(consumerGroupDescription);
+                                }
+                            }
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            
completeAllExceptionally(consumerGroupFutures.values(), throwable);
+                        }
+                    }, nowDescribeConsumerGroups);
+
+                    resultFutures.complete(new HashMap<String, 
KafkaFuture<ConsumerGroupDescription>>(consumerGroupFutures));
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    resultFutures.completeExceptionally(throwable);
+                }
+            }, nowFindCoordinator);
+        }
+
+        return new DescribeConsumerGroupsResult(resultFutures);
+    }
+
+    @Override
+    public ListConsumerGroupsResult 
listConsumerGroups(ListConsumerGroupsOptions options) {
+        //final KafkaFutureImpl<Map<Node, 
KafkaFuture<Collection<ConsumerGroupListing>>>> nodeAndConsumerGroupListing = 
new KafkaFutureImpl<>();
+        final KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new 
KafkaFutureImpl<Collection<ConsumerGroupListing>>();
+
+        final long nowMetadata = time.milliseconds();
+        final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
+
+        runnable.call(new Call("listNodes", deadline, new 
LeastLoadedNodeProvider()) {
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new 
MetadataRequest.Builder(Collections.<String>emptyList(), true);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                MetadataResponse metadataResponse = (MetadataResponse) 
abstractResponse;
+
+                final Map<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> futures = new HashMap<>();
+
+                for (final Node node : metadataResponse.brokers()) {
+                    futures.put(node, new 
KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+                }
+
+                future.combine(futures.values().toArray(new 
KafkaFuture[0])).thenApply(
+                        new 
KafkaFuture.BaseFunction<Collection<ConsumerGroupListing>, 
Collection<ConsumerGroupListing>>() {
+                            @Override
+                            public Collection<ConsumerGroupListing> 
apply(Collection<ConsumerGroupListing> v) {
+                                List<ConsumerGroupListing> listings = new 
ArrayList<>();
+                                for (Map.Entry<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
+                                    Collection<ConsumerGroupListing> results;
+                                    try {
+                                        results = entry.getValue().get();
+                                    } catch (Throwable e) {
+                                        // This should be unreachable, since 
the future returned by KafkaFuture#allOf should
+                                        // have failed if any Future failed.
+                                        throw new 
KafkaException("ListConsumerGroupsResult#listings(): internal error", e);
+                                    }
+                                    listings.addAll(results);
+                                }
+                                return listings;
+                            }
+                        });
+
+
+                for (final Map.Entry<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : futures.entrySet()) {
+                    final long nowList = time.milliseconds();
+
+                    final int brokerId = entry.getKey().id();
+
+                    runnable.call(new Call("listConsumerGroups", deadline, new 
ConstantNodeIdProvider(brokerId)) {
+
+                        private final 
KafkaFutureImpl<Collection<ConsumerGroupListing>> future = entry.getValue();
+
+                        @Override
+                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                            return new ListGroupsRequest.Builder();
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) 
{
+                            final ListGroupsResponse response = 
(ListGroupsResponse) abstractResponse;
+                            final List<ConsumerGroupListing> groupsListing = 
new ArrayList<>();
+                            for (ListGroupsResponse.Group group : 
response.groups()) {
+                                if 
(group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || 
group.protocolType().isEmpty()) {
+                                    final String groupId = group.groupId();
+                                    final String protocolType = 
group.protocolType();
+                                    final ConsumerGroupListing groupListing = 
new ConsumerGroupListing(groupId, protocolType.isEmpty());
+                                    groupsListing.add(groupListing);
+                                }
+                            }
+                            future.complete(groupsListing);
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            completeAllExceptionally(futures.values(), 
throwable);
+                        }
+                    }, nowList);
+
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                future.completeExceptionally(throwable);
+            }
+        }, nowMetadata);
+
+        return new ListConsumerGroupsResult(future);
+    }
+
+    @Override
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(final 
String groupId, final ListConsumerGroupOffsetsOptions options) {
+        final KafkaFutureImpl<Map<TopicPartition, OffsetAndMetadata>> 
groupOffsetListingFuture = new KafkaFutureImpl<>();
+
+        final long nowFindCoordinator = time.milliseconds();
+        final long deadline = calcDeadlineMs(nowFindCoordinator, 
options.timeoutMs());
+
+        runnable.call(new Call("findCoordinator", deadline, new 
LeastLoadedNodeProvider()) {
+            @Override
+            AbstractRequest.Builder createRequest(int timeoutMs) {
+                return new 
FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, 
groupId);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final FindCoordinatorResponse response = 
(FindCoordinatorResponse) abstractResponse;
+
+                final long nowListConsumerGroupOffsets = time.milliseconds();
+
+                final int nodeId = response.node().id();
+
+                runnable.call(new Call("listConsumerGroupOffsets", deadline, 
new ConstantNodeIdProvider(nodeId)) {
+                    @Override
+                    AbstractRequest.Builder createRequest(int timeoutMs) {
+                        return new OffsetFetchRequest.Builder(groupId, 
options.topicPartitions());
+                    }
+
+                    @Override
+                    void handleResponse(AbstractResponse abstractResponse) {
+                        final OffsetFetchResponse response = 
(OffsetFetchResponse) abstractResponse;
+                        final Map<TopicPartition, OffsetAndMetadata> 
groupOffsetsListing = new HashMap<>();
+                        for (Map.Entry<TopicPartition, 
OffsetFetchResponse.PartitionData> entry :
+                                response.responseData().entrySet()) {
+                            final TopicPartition topicPartition = 
entry.getKey();
+                            final Long offset = entry.getValue().offset;
+                            final String metadata = entry.getValue().metadata;
+                            groupOffsetsListing.put(topicPartition, new 
OffsetAndMetadata(offset, metadata));
+                        }
+                        groupOffsetListingFuture.complete(groupOffsetsListing);
+                    }
+
+                    @Override
+                    void handleFailure(Throwable throwable) {
+                        
groupOffsetListingFuture.completeExceptionally(throwable);
+                    }
+                }, nowListConsumerGroupOffsets);
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                groupOffsetListingFuture.completeExceptionally(throwable);
+            }
+        }, nowFindCoordinator);
+
+        return new ListConsumerGroupOffsetsResult(groupOffsetListingFuture);
+    }
+
+    @Override
+    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> 
groupIds, DeleteConsumerGroupsOptions options) {
+        final KafkaFutureImpl<Map<String, KafkaFuture<Void>>> 
deleteConsumerGroupsFuture = new KafkaFutureImpl<>();
+        final Map<String, KafkaFutureImpl<Void>> deleteConsumerGroupFutures = 
new HashMap<>(groupIds.size());
+        final Set<String> groupIdList = new HashSet<>();
+        for (String groupId : groupIds) {
+            if (!deleteConsumerGroupFutures.containsKey(groupId)) {
+                deleteConsumerGroupFutures.put(groupId, new 
KafkaFutureImpl<Void>());
+                groupIdList.add(groupId);
+            }
+        }
+
+        for (final String groupId : groupIdList) {
+
+            final long nowFindCoordinator = time.milliseconds();
+            final long deadline = calcDeadlineMs(nowFindCoordinator, 
options.timeoutMs());
+
+            runnable.call(new Call("findCoordinator", deadline, new 
LeastLoadedNodeProvider()) {
+                @Override
+                AbstractRequest.Builder createRequest(int timeoutMs) {
+                    return new 
FindCoordinatorRequest.Builder(FindCoordinatorRequest.CoordinatorType.GROUP, 
groupId);
+                }
+
+                @Override
+                void handleResponse(AbstractResponse abstractResponse) {
+                    final FindCoordinatorResponse response = 
(FindCoordinatorResponse) abstractResponse;
+
+                    final long nowDeleteConsumerGroups = time.milliseconds();
+
+                    final int nodeId = response.node().id();
+
+                    runnable.call(new Call("deleteConsumerGroups", deadline, 
new ConstantNodeIdProvider(nodeId)) {
+
+                        @Override
+                        AbstractRequest.Builder createRequest(int timeoutMs) {
+                            return new 
DeleteGroupsRequest.Builder(Collections.singleton(groupId));
+                        }
+
+                        @Override
+                        void handleResponse(AbstractResponse abstractResponse) 
{
+                            final DeleteGroupsResponse response = 
(DeleteGroupsResponse) abstractResponse;
+                            // Handle server responses for particular groupId.
+                            for (Map.Entry<String, KafkaFutureImpl<Void>> 
entry : deleteConsumerGroupFutures.entrySet()) {
+                                final String groupId = entry.getKey();
+                                final KafkaFutureImpl<Void> future = 
entry.getValue();
+                                final Errors groupError = 
response.get(groupId);
+                                if (groupError != Errors.NONE) {
+                                    
future.completeExceptionally(groupError.exception());
+                                    continue;
+                                }
+
+                                future.complete(null);
+                            }
+                        }
+
+                        @Override
+                        void handleFailure(Throwable throwable) {
+                            
completeAllExceptionally(deleteConsumerGroupFutures.values(), throwable);
+                        }
+                    }, nowDeleteConsumerGroups);
+
+                    deleteConsumerGroupsFuture.complete(new HashMap<String, 
KafkaFuture<Void>>(deleteConsumerGroupFutures));
+                }
+
+                @Override
+                void handleFailure(Throwable throwable) {
+                    
deleteConsumerGroupsFuture.completeExceptionally(throwable);
+                }
+            }, nowFindCoordinator);
+        }
+
+        return new DeleteConsumerGroupsResult(deleteConsumerGroupsFuture);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
new file mode 100644
index 0000000..c6434eb
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsOptions.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.List;
+
+/**
+ * Options for {@link AdminClient#listConsumerGroupOffsets(String)}.
+ * <p>
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupOffsetsOptions extends 
AbstractOptions<ListConsumerGroupOffsetsOptions> {
+
+    private List<TopicPartition> topicPartitions = null;
+
+    /**
+     * Set the topic partitions to list as part of the result.
+     * {@code null} includes all topic partitions.
+     *
+     * @param topicPartitions List of topic partitions to include
+     * @return This ListGroupOffsetsOptions
+     */
+    public ListConsumerGroupOffsetsOptions 
topicPartitions(List<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions;
+        return this;
+    }
+
+    /**
+     * Returns a list of topic partitions to add as part of the result.
+     */
+    public List<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
new file mode 100644
index 0000000..23657b5
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupOffsetsResult.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * The result of the {@link AdminClient#listConsumerGroupOffsets(String)} call.
+ * <p>
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupOffsetsResult {
+
+    final KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> future;
+
+    ListConsumerGroupOffsetsResult(KafkaFuture<Map<TopicPartition, 
OffsetAndMetadata>> future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which yields a map of topic partitions to 
OffsetAndMetadata objects.
+     */
+    public KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> 
partitionsToOffsetAndMetadata() {
+        return future;
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
new file mode 100644
index 0000000..86ca171
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * Options for {@link AdminClient#listConsumerGroups()}.
+ *
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupsOptions extends 
AbstractOptions<ListConsumerGroupsOptions> {
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
new file mode 100644
index 0000000..c725371
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collection;
+
+/**
+ * The result of the {@link AdminClient#listConsumerGroups()} call.
+ * <p>
+ * The API of this class is evolving, see {@link AdminClient} for details.
+ */
+@InterfaceStability.Evolving
+public class ListConsumerGroupsResult {
+    private final KafkaFuture<Collection<ConsumerGroupListing>> future;
+
+    ListConsumerGroupsResult(KafkaFuture<Collection<ConsumerGroupListing>> 
future) {
+        this.future = future;
+    }
+
+    /**
+     * Return a future which yields a collection of ConsumerGroupListing 
objects.
+     */
+    public KafkaFuture<Collection<ConsumerGroupListing>> listings() {
+        return future;
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
new file mode 100644
index 0000000..bd95813
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/MemberAssignment.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+
+import java.util.List;
+
+/**
+ * A description of the assignments of a specific group member.
+ */
+public class MemberAssignment {
+    private final List<TopicPartition> topicPartitions;
+
+    /**
+     * Creates an instance with the specified parameters.
+     *
+     * @param topicPartitions List of topic partitions
+     */
+    public MemberAssignment(List<TopicPartition> topicPartitions) {
+        this.topicPartitions = topicPartitions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        MemberAssignment that = (MemberAssignment) o;
+
+        return topicPartitions != null ? 
topicPartitions.equals(that.topicPartitions) : that.topicPartitions == null;
+    }
+
+    @Override
+    public int hashCode() {
+        return topicPartitions != null ? topicPartitions.hashCode() : 0;
+    }
+
+    /**
+     * The topic partitions assigned to a group member.
+     */
+    public List<TopicPartition> topicPartitions() {
+        return topicPartitions;
+    }
+
+    @Override
+    public String toString() {
+        return "(topicPartitions=" + Utils.join(topicPartitions, ",") + ")";
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
new file mode 100644
index 0000000..2ba1963
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/MemberDescription.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+/**
+ * A detailed description of a single group instance in the cluster.
+ */
+public class MemberDescription {
+
+    private final String memberId;
+    private final String clientId;
+    private final String host;
+    private final MemberAssignment assignment;
+
+    /**
+     * Creates an instance with the specified parameters.
+     *
+     * @param memberId The consumer id
+     * @param clientId   The client id
+     * @param host       The host
+     * @param assignment The assignment
+     */
+    public MemberDescription(String memberId, String clientId, String host, 
MemberAssignment assignment) {
+        this.memberId = memberId;
+        this.clientId = clientId;
+        this.host = host;
+        this.assignment = assignment;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        MemberDescription that = (MemberDescription) o;
+
+        if (memberId != null ? !memberId.equals(that.memberId) : that.memberId 
!= null) return false;
+        if (clientId != null ? !clientId.equals(that.clientId) : that.clientId 
!= null) return false;
+        return assignment != null ? assignment.equals(that.assignment) : 
that.assignment == null;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = memberId != null ? memberId.hashCode() : 0;
+        result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
+        result = 31 * result + (assignment != null ? assignment.hashCode() : 
0);
+        return result;
+    }
+
+    /**
+     * The consumer id of the group member.
+     */
+    public String consumerId() {
+        return memberId;
+    }
+
+    /**
+     * The client id of the group member.
+     */
+    public String clientId() {
+        return clientId;
+    }
+
+    /**
+     * The host where the group member is running.
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * The assignment of the group member.
+     */
+    public MemberAssignment assignment() {
+        return assignment;
+    }
+
+    @Override
+    public String toString() {
+        return "(memberId=" + memberId + ", clientId=" + clientId + ", host=" 
+ host + ", assignment=" +
+            assignment + ")";
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java 
b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
index 9cd2e01..4af996c 100644
--- a/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
+++ b/clients/src/main/java/org/apache/kafka/common/KafkaFuture.java
@@ -106,6 +106,15 @@ public abstract class KafkaFuture<T> implements Future<T> {
         return allOfFuture;
     }
 
+    public KafkaFuture<T> combine(KafkaFuture<?>... futures) {
+        AllOfAdapter<Object> allOfWaiter = new AllOfAdapter<>(futures.length, 
this);
+        for (KafkaFuture<?> future : futures) {
+            future.addWaiter(allOfWaiter);
+        }
+
+        return this;
+    }
+
     /**
      * Returns a new KafkaFuture that, when this future completes normally, is 
executed with this
      * futures's result as the argument to the supplied function.
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 0d4dee6..a242ed6 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -18,6 +18,9 @@ package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.Node;
@@ -47,10 +50,15 @@ import 
org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.requests.DeleteGroupsResponse;
 import org.apache.kafka.common.requests.DeleteRecordsResponse;
 import org.apache.kafka.common.requests.DescribeAclsResponse;
 import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.ListGroupsResponse;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
@@ -64,6 +72,7 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -81,6 +90,7 @@ import static 
org.apache.kafka.common.requests.ResourceType.BROKER;
 import static org.apache.kafka.common.requests.ResourceType.TOPIC;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -636,6 +646,188 @@ public class KafkaAdminClientTest {
         }
     }
 
+    //Ignoring test to be fixed on follow-up PR
+    @Ignore
+    @Test
+    public void testListConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            env.kafkaClient().prepareResponse(
+                new MetadataResponse(
+                    env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(),
+                    env.cluster().controller().id(),
+                    new ArrayList<MetadataResponse.TopicMetadata>()));
+
+            env.kafkaClient().prepareResponse(
+                new ListGroupsResponse(
+                    Errors.NONE,
+                    Arrays.asList(
+                        new ListGroupsResponse.Group("group-1", 
ConsumerProtocol.PROTOCOL_TYPE),
+                        new ListGroupsResponse.Group("group-connect-1", 
"connector")
+                    )));
+
+            final ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups();
+            final List<ConsumerGroupListing> consumerGroups = new 
ArrayList<>();
+
+            final KafkaFuture<Collection<ConsumerGroupListing>> listings = 
result.listings();
+            consumerGroups.addAll(listings.get());
+            assertEquals(1, consumerGroups.size());
+        }
+    }
+
+    @Test
+    public void testDescribeConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            env.kafkaClient().prepareResponse(new 
FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            final Map<String, DescribeGroupsResponse.GroupMetadata> 
groupMetadataMap = new HashMap<>();
+            TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 
0);
+            TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 
1);
+            TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 
2);
+
+            final List<TopicPartition> topicPartitions = new ArrayList<>();
+            topicPartitions.add(0, myTopicPartition0);
+            topicPartitions.add(1, myTopicPartition1);
+            topicPartitions.add(2, myTopicPartition2);
+
+            final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
PartitionAssignor.Assignment(topicPartitions));
+
+            groupMetadataMap.put(
+                "group-0",
+                new DescribeGroupsResponse.GroupMetadata(
+                    Errors.NONE,
+                    "",
+                    ConsumerProtocol.PROTOCOL_TYPE,
+                    "",
+                    Arrays.asList(
+                        new DescribeGroupsResponse.GroupMember("0", 
"clientId0", "clientHost", null, memberAssignment),
+                        new DescribeGroupsResponse.GroupMember("1", 
"clientId1", "clientHost", null, memberAssignment))));
+            groupMetadataMap.put(
+                "group-connect-0",
+                new DescribeGroupsResponse.GroupMetadata(
+                    Errors.NONE,
+                    "",
+                    "connect",
+                    "",
+                    Arrays.asList(
+                        new DescribeGroupsResponse.GroupMember("0", 
"clientId0", "clientHost", null, memberAssignment),
+                        new DescribeGroupsResponse.GroupMember("1", 
"clientId1", "clientHost", null, memberAssignment))));
+
+            env.kafkaClient().prepareResponse(new 
DescribeGroupsResponse(groupMetadataMap));
+
+            final DescribeConsumerGroupsResult result = 
env.adminClient().describeConsumerGroups(Collections.singletonList("group-0"));
+            final KafkaFuture<ConsumerGroupDescription> groupDescriptionFuture 
= result.describedGroups().get().get("group-0");
+            final ConsumerGroupDescription groupDescription = 
groupDescriptionFuture.get();
+
+            assertEquals(1, result.describedGroups().get().size());
+            assertEquals("group-0", groupDescription.groupId());
+            assertEquals(2, groupDescription.members().size());
+        }
+    }
+
+    @Test
+    public void testDescribeConsumerGroupOffsets() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            env.kafkaClient().prepareResponse(new 
FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            TopicPartition myTopicPartition0 = new TopicPartition("my_topic", 
0);
+            TopicPartition myTopicPartition1 = new TopicPartition("my_topic", 
1);
+            TopicPartition myTopicPartition2 = new TopicPartition("my_topic", 
2);
+
+            final Map<TopicPartition, OffsetFetchResponse.PartitionData> 
responseData = new HashMap<>();
+            responseData.put(myTopicPartition0, new 
OffsetFetchResponse.PartitionData(10, "", Errors.NONE));
+            responseData.put(myTopicPartition1, new 
OffsetFetchResponse.PartitionData(0, "", Errors.NONE));
+            responseData.put(myTopicPartition2, new 
OffsetFetchResponse.PartitionData(20, "", Errors.NONE));
+            env.kafkaClient().prepareResponse(new 
OffsetFetchResponse(Errors.NONE, responseData));
+
+            final ListConsumerGroupOffsetsResult result = 
env.adminClient().listConsumerGroupOffsets("group-0");
+
+            assertEquals(3, 
result.partitionsToOffsetAndMetadata().get().size());
+            final TopicPartition topicPartition = 
result.partitionsToOffsetAndMetadata().get().keySet().iterator().next();
+            assertEquals("my_topic", topicPartition.topic());
+            final OffsetAndMetadata offsetAndMetadata = 
result.partitionsToOffsetAndMetadata().get().values().iterator().next();
+            assertEquals(10, offsetAndMetadata.offset());
+        }
+    }
+
+    @Test
+    public void testDeleteConsumerGroups() throws Exception {
+        final HashMap<Integer, Node> nodes = new HashMap<>();
+        nodes.put(0, new Node(0, "localhost", 8121));
+
+        final Cluster cluster =
+            new Cluster(
+                "mockClusterId",
+                nodes.values(),
+                Collections.<PartitionInfo>emptyList(),
+                Collections.<String>emptySet(),
+                Collections.<String>emptySet(), nodes.get(0));
+
+        final List<String> groupIds = Collections.singletonList("group-0");
+
+        try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(cluster)) 
{
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareMetadataUpdate(env.cluster(), 
Collections.<String>emptySet());
+            env.kafkaClient().setNode(env.cluster().controller());
+
+            env.kafkaClient().prepareResponse(new 
FindCoordinatorResponse(Errors.NONE, env.cluster().controller()));
+
+            final Map<String, Errors> response = new HashMap<>();
+            response.put("group-0", Errors.NONE);
+            env.kafkaClient().prepareResponse(new 
DeleteGroupsResponse(response));
+
+            final DeleteConsumerGroupsResult result = 
env.adminClient().deleteConsumerGroups(groupIds);
+
+            final Map<String, KafkaFuture<Void>> results = 
result.deletedGroups().get();
+            assertNull(results.get("group-0").get());
+        }
+    }
+
     private static <T> void assertCollectionIs(Collection<T> collection, T... 
elements) {
         for (T element : elements) {
             assertTrue("Did not find " + element, 
collection.contains(element));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index 0f5df38..2fc7048 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -297,6 +297,26 @@ public class MockAdminClient extends AdminClient {
     }
 
     @Override
+    public DescribeConsumerGroupsResult 
describeConsumerGroups(Collection<String> groupIds, 
DescribeConsumerGroupsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public ListConsumerGroupsResult 
listConsumerGroups(ListConsumerGroupsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(String 
groupId, ListConsumerGroupOffsetsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
+    public DeleteConsumerGroupsResult deleteConsumerGroups(Collection<String> 
groupIds, DeleteConsumerGroupsOptions options) {
+        throw new UnsupportedOperationException("Not implemented yet");
+    }
+
+    @Override
     public CreateAclsResult createAcls(Collection<AclBinding> acls, 
CreateAclsOptions options) {
         throw new UnsupportedOperationException("Not implemented yet");
     }

-- 
To stop receiving notification emails like this one, please contact
guozh...@apache.org.

Reply via email to