Repository: kafka
Updated Branches:
  refs/heads/trunk 69ebf6f7b -> 6cf2cb6f2


KAFKA-4139; Reset findCoordinatorFuture when brokers are unavailable

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Ismael Juma, Jason Gustafson

Closes #1831 from rajinisivaram/KAFKA-4139


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6cf2cb6f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6cf2cb6f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6cf2cb6f

Branch: refs/heads/trunk
Commit: 6cf2cb6f294d0d4766f57e9da660fb4efac7d258
Parents: 69ebf6f
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Thu Sep 8 09:42:22 2016 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Sep 8 09:42:22 2016 -0700

----------------------------------------------------------------------
 .../consumer/internals/AbstractCoordinator.java | 31 ++++++++++----------
 .../internals/AbstractCoordinatorTest.java      | 17 +++++++++++
 2 files changed, 32 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6cf2cb6f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index f2e15ca..0766f3d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -207,8 +207,16 @@ public abstract class AbstractCoordinator implements 
Closeable {
     }
 
     protected synchronized RequestFuture<Void> lookupCoordinator() {
-        if (findCoordinatorFuture == null)
-            findCoordinatorFuture = sendGroupCoordinatorRequest();
+        if (findCoordinatorFuture == null) {
+            // find a node to ask about the coordinator
+            Node node = this.client.leastLoadedNode();
+            if (node == null) {
+                // TODO: If there are no brokers left, perhaps we should use 
the bootstrap set
+                // from configuration?
+                return RequestFuture.noBrokersAvailable();
+            } else
+                findCoordinatorFuture = sendGroupCoordinatorRequest(node);
+        }
         return findCoordinatorFuture;
     }
 
@@ -496,21 +504,12 @@ public abstract class AbstractCoordinator implements 
Closeable {
      * one of the brokers. The returned future should be polled to get the 
result of the request.
      * @return A request future which indicates the completion of the metadata 
request
      */
-    private RequestFuture<Void> sendGroupCoordinatorRequest() {
+    private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
         // initiate the group metadata request
-        // find a node to ask about the coordinator
-        Node node = this.client.leastLoadedNode();
-        if (node == null) {
-            // TODO: If there are no brokers left, perhaps we should use the 
bootstrap set
-            // from configuration?
-            return RequestFuture.noBrokersAvailable();
-        } else {
-            // create a group  metadata request
-            log.debug("Sending coordinator request for group {} to broker {}", 
groupId, node);
-            GroupCoordinatorRequest metadataRequest = new 
GroupCoordinatorRequest(this.groupId);
-            return client.send(node, ApiKeys.GROUP_COORDINATOR, 
metadataRequest)
-                    .compose(new GroupCoordinatorResponseHandler());
-        }
+        log.debug("Sending coordinator request for group {} to broker {}", 
groupId, node);
+        GroupCoordinatorRequest metadataRequest = new 
GroupCoordinatorRequest(this.groupId);
+        return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
+                     .compose(new GroupCoordinatorResponseHandler());
     }
 
     private class GroupCoordinatorResponseHandler extends 
RequestFutureAdapter<ClientResponse, Void> {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6cf2cb6f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index 77f9df5..777b67f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -133,6 +134,22 @@ public class AbstractCoordinatorTest {
         }
     }
 
+    @Test
+    public void testLookupCoordinator() throws Exception {
+        mockClient.setNode(null);
+        RequestFuture<Void> noBrokersAvailableFuture = 
coordinator.lookupCoordinator();
+        assertTrue("Failed future expected", 
noBrokersAvailableFuture.failed());
+
+        mockClient.setNode(node);
+        RequestFuture<Void> future = coordinator.lookupCoordinator();
+        assertFalse("Request not sent", future.isDone());
+        assertTrue("New request sent while one is in progress", future == 
coordinator.lookupCoordinator());
+
+        mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+        coordinator.ensureCoordinatorReady();
+        assertTrue("New request not sent after previous completed", future != 
coordinator.lookupCoordinator());
+    }
+
     private Struct groupCoordinatorResponse(Node node, Errors error) {
         GroupCoordinatorResponse response = new 
GroupCoordinatorResponse(error.code(), node);
         return response.toStruct();

Reply via email to