This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6be908a  MINOR: Refactor AdminClient ListConsumerGroups API (#4884)
6be908a is described below

commit 6be908a8296456adee254b405605acff55fd47a5
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Wed Apr 25 17:49:02 2018 -0700

    MINOR: Refactor AdminClient ListConsumerGroups API (#4884)
    
    The current Iterator-based ListConsumerGroups API is synchronous.  The API 
should be asynchronous to fit in with the other AdminClient APIs.  Also fix 
some error handling corner cases.
    
    Reviewers: Guozhang Wang <[email protected]>, Jason Gustafson 
<[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      | 160 +++++++++++----------
 .../clients/admin/ListConsumerGroupsResult.java    | 109 +++++++-------
 .../kafka/clients/admin/KafkaAdminClientTest.java  |  52 ++++---
 3 files changed, 166 insertions(+), 155 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index fa3f943..d8c0bad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -49,6 +49,7 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InvalidGroupIdException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownServerException;
@@ -2342,16 +2343,56 @@ public class KafkaAdminClient extends AdminClient {
         return new DescribeConsumerGroupsResult(new HashMap<String, 
KafkaFuture<ConsumerGroupDescription>>(futures));
     }
 
+    private final static class ListConsumerGroupsResults {
+        private final List<Throwable> errors;
+        private final HashMap<String, ConsumerGroupListing> listings;
+        private final HashSet<Node> remaining;
+        private final KafkaFutureImpl<Collection<Object>> future;
+
+        ListConsumerGroupsResults(Collection<Throwable> errors, 
Collection<Node> leaders,
+                                  KafkaFutureImpl<Collection<Object>> future) {
+            this.errors = new ArrayList<>(errors);
+            this.listings = new HashMap<>();
+            this.remaining = new HashSet<>(leaders);
+            this.future = future;
+            tryComplete();
+        }
+
+        synchronized void addError(Throwable throwable, Node node) {
+            ApiError error = ApiError.fromThrowable(throwable);
+            if (error.message() == null || error.message().isEmpty()) {
+                errors.add(error.error().exception(
+                    "Error listing groups on " + node));
+            } else {
+                errors.add(error.error().exception(
+                    "Error listing groups on " + node + ": " + 
error.message()));
+            }
+        }
+
+        synchronized void addListing(ConsumerGroupListing listing) {
+            listings.put(listing.groupId(), listing);
+        }
+
+        synchronized void tryComplete(Node leader) {
+            remaining.remove(leader);
+            tryComplete();
+        }
+
+        private synchronized void tryComplete() {
+            if (remaining.isEmpty()) {
+                ArrayList<Object> results = new 
ArrayList<Object>(listings.values());
+                results.addAll(errors);
+                future.complete(results);
+            }
+        }
+    };
+
     @Override
     public ListConsumerGroupsResult 
listConsumerGroups(ListConsumerGroupsOptions options) {
-        final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> 
futuresMap = new HashMap<>();
-        final KafkaFutureImpl<Collection<ConsumerGroupListing>> flattenFuture 
= new KafkaFutureImpl<>();
-        final KafkaFutureImpl<Void> listFuture = new KafkaFutureImpl<>();
-
+        final KafkaFutureImpl<Collection<Object>> all = new 
KafkaFutureImpl<>();
         final long nowMetadata = time.milliseconds();
         final long deadline = calcDeadlineMs(nowMetadata, options.timeoutMs());
-
-        runnable.call(new Call("listNodes", deadline, new 
LeastLoadedNodeProvider()) {
+        runnable.call(new Call("findGroupsMetadata", deadline, new 
LeastLoadedNodeProvider()) {
             @Override
             AbstractRequest.Builder createRequest(int timeoutMs) {
                 return new 
MetadataRequest.Builder(Collections.singletonList(Topic.GROUP_METADATA_TOPIC_NAME),
 true);
@@ -2360,68 +2401,38 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 MetadataResponse metadataResponse = (MetadataResponse) 
abstractResponse;
-
+                final List<Throwable> metadataExceptions = new ArrayList<>();
+                final HashSet<Node> leaders = new HashSet<>();
                 for (final MetadataResponse.TopicMetadata metadata : 
metadataResponse.topicMetadata()) {
-                    if 
(metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+                    if (metadata.error() != Errors.NONE) {
+                        
metadataExceptions.add(metadata.error().exception("Unable to locate " +
+                            Topic.GROUP_METADATA_TOPIC_NAME));
+                    } else if 
(!metadata.topic().equals(Topic.GROUP_METADATA_TOPIC_NAME)) {
+                        metadataExceptions.add(new 
UnknownServerException("Server returned unrequested " +
+                            "information about unexpected topic " + 
metadata.topic()));
+                    } else {
                         for (final MetadataResponse.PartitionMetadata 
partitionMetadata : metadata.partitionMetadata()) {
                             final Node leader = partitionMetadata.leader();
                             if (partitionMetadata.error() != Errors.NONE) {
                                 // TODO: KAFKA-6789, retry based on the error 
code
-                                
KafkaFutureImpl<Collection<ConsumerGroupListing>> future = new 
KafkaFutureImpl<>();
-                                
future.completeExceptionally(partitionMetadata.error().exception());
-                                // if it is the leader not found error, then 
the leader might be NoNode; if there are more than
-                                // one such error, we will only have one entry 
in the map. For now it is okay since we are not
-                                // guaranteeing to return the full list of 
consumers still.
-                                futuresMap.put(leader, future);
+                                
metadataExceptions.add(partitionMetadata.error().exception("Unable to find " +
+                                    "leader for partition " + 
partitionMetadata.partition() + " of " +
+                                    Topic.GROUP_METADATA_TOPIC_NAME));
+                            } else if (leader == null || 
leader.equals(Node.noNode())) {
+                                metadataExceptions.add(new 
LeaderNotAvailableException("Unable to find leader " +
+                                    "for partition " + 
partitionMetadata.partition() + " of " +
+                                    Topic.GROUP_METADATA_TOPIC_NAME));
                             } else {
-                                futuresMap.put(leader, new 
KafkaFutureImpl<Collection<ConsumerGroupListing>>());
+                                leaders.add(leader);
                             }
                         }
-                        listFuture.complete(null);
-                    } else {
-                        if (metadata.error() != Errors.NONE)
-                            
listFuture.completeExceptionally(metadata.error().exception());
-                        else
-                            listFuture.completeExceptionally(new 
IllegalStateException("Unexpected topic metadata for "
-                                    + metadata.topic() + " is returned; cannot 
find the brokers to query consumer listings."));
                     }
                 }
-
-                // we have to flatten the future here instead in the result, 
because we need to wait until the map of nodes
-                // are known from the listNode request.
-                flattenFuture.copyWith(
-                        KafkaFuture.allOf(futuresMap.values().toArray(new 
KafkaFuture[0])),
-                        new KafkaFuture.BaseFunction<Void, 
Collection<ConsumerGroupListing>>() {
-                            @Override
-                            public Collection<ConsumerGroupListing> apply(Void 
v) {
-                                List<ConsumerGroupListing> listings = new 
ArrayList<>();
-                                for (Map.Entry<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : 
futuresMap.entrySet()) {
-                                    Collection<ConsumerGroupListing> results;
-                                    try {
-                                        results = entry.getValue().get();
-                                        listings.addAll(results);
-                                    } catch (Throwable e) {
-                                        // This should be unreachable, because 
allOf ensured that all the futures
-                                        // completed successfully.
-                                        throw new RuntimeException(e);
-                                    }
-                                }
-                                return listings;
-                            }
-                        });
-
-                for (final Map.Entry<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> entry : 
futuresMap.entrySet()) {
-                    // skip sending the request for those futures who have 
already failed
-                    if (entry.getValue().isCompletedExceptionally())
-                        continue;
-
+                final ListConsumerGroupsResults results =
+                    new ListConsumerGroupsResults(metadataExceptions, leaders, 
all);
+                for (final Node node : leaders) {
                     final long nowList = time.milliseconds();
-
-                    final int brokerId = entry.getKey().id();
-                    final KafkaFutureImpl<Collection<ConsumerGroupListing>> 
future = entry.getValue();
-
-                    runnable.call(new Call("listConsumerGroups", deadline, new 
ConstantNodeIdProvider(brokerId)) {
-
+                    runnable.call(new Call("listConsumerGroups", deadline, new 
ConstantNodeIdProvider(node.id())) {
                         @Override
                         AbstractRequest.Builder createRequest(int timeoutMs) {
                             return new ListGroupsRequest.Builder();
@@ -2430,39 +2441,42 @@ public class KafkaAdminClient extends AdminClient {
                         @Override
                         void handleResponse(AbstractResponse abstractResponse) 
{
                             final ListGroupsResponse response = 
(ListGroupsResponse) abstractResponse;
-
-                            if (response.error() != Errors.NONE) {
-                                
future.completeExceptionally(response.error().exception());
-                            } else {
-                                final List<ConsumerGroupListing> groupsListing 
= new ArrayList<>();
-                                for (ListGroupsResponse.Group group : 
response.groups()) {
-                                    if 
(group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) || 
group.protocolType().isEmpty()) {
-                                        final String groupId = group.groupId();
-                                        final String protocolType = 
group.protocolType();
-                                        final ConsumerGroupListing 
groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
-                                        groupsListing.add(groupListing);
+                            synchronized (results) {
+                                if (response.error() != Errors.NONE) {
+                                    
results.addError(response.error().exception(), node);
+                                } else {
+                                    for (ListGroupsResponse.Group group : 
response.groups()) {
+                                        if 
(group.protocolType().equals(ConsumerProtocol.PROTOCOL_TYPE) ||
+                                            group.protocolType().isEmpty()) {
+                                            final String groupId = 
group.groupId();
+                                            final String protocolType = 
group.protocolType();
+                                            final ConsumerGroupListing 
groupListing = new ConsumerGroupListing(groupId, protocolType.isEmpty());
+                                            results.addListing(groupListing);
+                                        }
                                     }
                                 }
-                                future.complete(groupsListing);
+                                results.tryComplete(node);
                             }
                         }
 
                         @Override
                         void handleFailure(Throwable throwable) {
-                            future.completeExceptionally(throwable);
+                            synchronized (results) {
+                                results.addError(throwable, node);
+                                results.tryComplete(node);
+                            }
                         }
                     }, nowList);
-
                 }
             }
 
             @Override
             void handleFailure(Throwable throwable) {
-                listFuture.completeExceptionally(throwable);
+                all.complete(Collections.<Object>singletonList(throwable));
             }
         }, nowMetadata);
 
-        return new ListConsumerGroupsResult(listFuture, flattenFuture, 
futuresMap);
+        return new ListConsumerGroupsResult(all);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
index c3f1236..0ac8529 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsResult.java
@@ -18,14 +18,11 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.KafkaFuture;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
-import org.apache.kafka.common.utils.AbstractIterator;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
 
 /**
  * The result of the {@link AdminClient#listConsumerGroups()} call.
@@ -34,70 +31,72 @@ import java.util.Map;
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsResult {
-    private final Map<Node, KafkaFutureImpl<Collection<ConsumerGroupListing>>> 
futuresMap;
-    private final KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture;
-    private final KafkaFuture<Void> listFuture;
+    private final KafkaFutureImpl<Collection<ConsumerGroupListing>> all;
+    private final KafkaFutureImpl<Collection<ConsumerGroupListing>> valid;
+    private final KafkaFutureImpl<Collection<Throwable>> errors;
 
-    ListConsumerGroupsResult(final KafkaFuture<Void> listFuture,
-                             final 
KafkaFuture<Collection<ConsumerGroupListing>> flattenFuture,
-                             final Map<Node, 
KafkaFutureImpl<Collection<ConsumerGroupListing>>> futuresMap) {
-        this.flattenFuture = flattenFuture;
-        this.listFuture = listFuture;
-        this.futuresMap = futuresMap;
-    }
-
-    private class FutureConsumerGroupListingIterator extends 
AbstractIterator<KafkaFuture<ConsumerGroupListing>> {
-        private Iterator<KafkaFutureImpl<Collection<ConsumerGroupListing>>> 
futuresIter;
-        private Iterator<ConsumerGroupListing> innerIter;
-
-        @Override
-        protected KafkaFuture<ConsumerGroupListing> makeNext() {
-            if (futuresIter == null) {
-                try {
-                    listFuture.get();
-                } catch (Exception e) {
-                    // the list future has failed, there will be no listings 
to show at all
-                    return allDone();
-                }
-
-                futuresIter = futuresMap.values().iterator();
-            }
-
-            while (innerIter == null || !innerIter.hasNext()) {
-                if (futuresIter.hasNext()) {
-                    KafkaFuture<Collection<ConsumerGroupListing>> 
collectionFuture = futuresIter.next();
-                    try {
-                        Collection<ConsumerGroupListing> collection = 
collectionFuture.get();
-                        innerIter = collection.iterator();
-                    } catch (Exception e) {
-                        KafkaFutureImpl<ConsumerGroupListing> future = new 
KafkaFutureImpl<>();
-                        future.completeExceptionally(e);
-                        return future;
+    ListConsumerGroupsResult(KafkaFutureImpl<Collection<Object>> future) {
+        this.all = new KafkaFutureImpl<>();
+        this.valid = new KafkaFutureImpl<>();
+        this.errors = new KafkaFutureImpl<>();
+        future.thenApply(new KafkaFuture.BaseFunction<Collection<Object>, 
Void>() {
+            @Override
+            public Void apply(Collection<Object> results) {
+                ArrayList<Throwable> curErrors = new ArrayList<>();
+                ArrayList<ConsumerGroupListing> curValid = new ArrayList<>();
+                for (Object resultObject : results) {
+                    if (resultObject instanceof Throwable) {
+                        curErrors.add((Throwable) resultObject);
+                    } else {
+                        curValid.add((ConsumerGroupListing) resultObject);
                     }
+                }
+                if (!curErrors.isEmpty()) {
+                    all.completeExceptionally(curErrors.get(0));
                 } else {
-                    return allDone();
+                    all.complete(curValid);
                 }
+                valid.complete(curValid);
+                errors.complete(curErrors);
+                return null;
             }
+        });
+    }
 
-            KafkaFutureImpl<ConsumerGroupListing> future = new 
KafkaFutureImpl<>();
-            future.complete(innerIter.next());
-            return future;
-        }
+    /**
+     * Returns a future that yields either an exception, or the full set of 
consumer group
+     * listings.
+     *
+     * In the event of a failure, the future yields nothing but the first 
exception which
+     * occurred.
+     */
+    public KafkaFutureImpl<Collection<ConsumerGroupListing>> all() {
+        return all;
     }
 
     /**
-     * Return an iterator of futures for ConsumerGroupListing objects; the 
returned future will throw exception
-     * if we cannot get a complete collection of consumer listings.
+     * Returns a future which yields just the valid listings.
+     *
+     * This future never fails with an error, no matter what happens.  Errors 
are completely
+     * ignored.  If nothing can be fetched, an empty collection is yielded.
+     * If there is an error, but some results can be returned, this future 
will yield
+     * those partial results.  When using this future, it is a good idea to 
also check
+     * the errors future so that errors can be displayed and handled.
      */
-    public Iterator<KafkaFuture<ConsumerGroupListing>> iterator() {
-        return new FutureConsumerGroupListingIterator();
+    public KafkaFutureImpl<Collection<ConsumerGroupListing>> valid() {
+        return valid;
     }
 
     /**
-     * Return a future which yields a full collection of ConsumerGroupListing 
objects; will throw exception
-     * if we cannot get a complete collection of consumer listings.
+     * Returns a future which yields just the errors which occurred.
+     *
+     * If this future yields a non-empty collection, it is very likely that 
elements are
+     * missing from the valid() set.
+     *
+     * This future itself never fails with an error.  In the event of an 
error, this future
+     * will successfully yield a collection containing at least one exception.
      */
-    public KafkaFuture<Collection<ConsumerGroupListing>> all() {
-        return flattenFuture;
+    public KafkaFutureImpl<Collection<Throwable>> errors() {
+        return errors;
     }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index d2789b6..0debed3 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.NotLeaderForPartitionException;
@@ -80,7 +81,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -648,9 +648,8 @@ public class KafkaAdminClientTest {
         }
     }
 
-    //Ignoring test to be fixed on follow-up PR
     @Test
-    public void testListConsumerGroups() {
+    public void testListConsumerGroups() throws Exception {
         final HashMap<Integer, Node> nodes = new HashMap<>();
         Node node0 = new Node(0, "localhost", 8121);
         Node node1 = new Node(1, "localhost", 8122);
@@ -685,7 +684,8 @@ public class KafkaAdminClientTest {
                             env.cluster().nodes(),
                             env.cluster().clusterResource().clusterId(),
                             env.cluster().controller().id(),
-                            Collections.singletonList(new 
MetadataResponse.TopicMetadata(Errors.NONE, Topic.GROUP_METADATA_TOPIC_NAME, 
true, partitionMetadata))));
+                            Collections.singletonList(new 
MetadataResponse.TopicMetadata(Errors.NONE,
+                                Topic.GROUP_METADATA_TOPIC_NAME, true, 
partitionMetadata))));
 
             env.kafkaClient().prepareResponseFrom(
                     new ListGroupsResponse(
@@ -713,31 +713,29 @@ public class KafkaAdminClientTest {
                     node2);
 
             final ListConsumerGroupsResult result = 
env.adminClient().listConsumerGroups();
-
-            try {
-                Collection<ConsumerGroupListing> listing = result.all().get();
-                fail("Expected to throw exception");
-            } catch (Exception e) {
-                // this is good
-            }
-
-            Iterator<KafkaFuture<ConsumerGroupListing>> iterator = 
result.iterator();
-            int numListing = 0;
-            int numFailure = 0;
-
-            while (iterator.hasNext()) {
-                KafkaFuture<ConsumerGroupListing> future = iterator.next();
-                try {
-                    ConsumerGroupListing listing = future.get();
-                    numListing++;
-                    assertTrue(listing.groupId().equals("group-1") || 
listing.groupId().equals("group-2"));
-                } catch (Exception e) {
-                    numFailure++;
-                }
+            assertFutureError(result.all(), 
CoordinatorNotAvailableException.class);
+            Collection<ConsumerGroupListing> listings = result.valid().get();
+            assertEquals(2, listings.size());
+            for (ConsumerGroupListing listing : listings) {
+                assertTrue(listing.groupId().equals("group-1") || 
listing.groupId().equals("group-2"));
             }
+            assertEquals(1, result.errors().get().size());
 
-            assertEquals(2, numListing);
-            assertEquals(1, numFailure);
+            // Test handling the error where we are unable to get metadata for 
the __consumer_offsets topic.
+            env.kafkaClient().prepareResponse(
+                new MetadataResponse(
+                    env.cluster().nodes(),
+                    env.cluster().clusterResource().clusterId(),
+                    env.cluster().controller().id(),
+                    Collections.singletonList(new 
MetadataResponse.TopicMetadata(
+                        Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Topic.GROUP_METADATA_TOPIC_NAME,
+                        true, 
Collections.<MetadataResponse.PartitionMetadata>emptyList()))));
+            final ListConsumerGroupsResult result2 = 
env.adminClient().listConsumerGroups();
+            Collection<Throwable> errors = result2.errors().get();
+            assertEquals(1, errors.size());
+            assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
Errors.forException(errors.iterator().next()));
+            assertTrue(result2.valid().get().isEmpty());
+            assertFutureError(result2.all(), 
UnknownTopicOrPartitionException.class);
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to