Repository: kafka Updated Branches: refs/heads/trunk 69ebf6f7b -> 6cf2cb6f2
KAFKA-4139; Reset findCoordinatorFuture when brokers are unavailable Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma, Jason Gustafson Closes #1831 from rajinisivaram/KAFKA-4139 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6cf2cb6f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6cf2cb6f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6cf2cb6f Branch: refs/heads/trunk Commit: 6cf2cb6f294d0d4766f57e9da660fb4efac7d258 Parents: 69ebf6f Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Thu Sep 8 09:42:22 2016 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Sep 8 09:42:22 2016 -0700 ---------------------------------------------------------------------- .../consumer/internals/AbstractCoordinator.java | 31 ++++++++++---------- .../internals/AbstractCoordinatorTest.java | 17 +++++++++++ 2 files changed, 32 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/6cf2cb6f/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index f2e15ca..0766f3d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -207,8 +207,16 @@ public abstract class AbstractCoordinator implements Closeable { } protected synchronized RequestFuture<Void> lookupCoordinator() { - if (findCoordinatorFuture == null) - findCoordinatorFuture = sendGroupCoordinatorRequest(); + if (findCoordinatorFuture == null) { + // find a node to ask about the coordinator + Node node = this.client.leastLoadedNode(); + if (node == null) { + // TODO: If there are no brokers left, perhaps we should use the bootstrap set + // from configuration? + return RequestFuture.noBrokersAvailable(); + } else + findCoordinatorFuture = sendGroupCoordinatorRequest(node); + } return findCoordinatorFuture; } @@ -496,21 +504,12 @@ public abstract class AbstractCoordinator implements Closeable { * one of the brokers. The returned future should be polled to get the result of the request. * @return A request future which indicates the completion of the metadata request */ - private RequestFuture<Void> sendGroupCoordinatorRequest() { + private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) { // initiate the group metadata request - // find a node to ask about the coordinator - Node node = this.client.leastLoadedNode(); - if (node == null) { - // TODO: If there are no brokers left, perhaps we should use the bootstrap set - // from configuration? - return RequestFuture.noBrokersAvailable(); - } else { - // create a group metadata request - log.debug("Sending coordinator request for group {} to broker {}", groupId, node); - GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); - return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) - .compose(new GroupCoordinatorResponseHandler()); - } + log.debug("Sending coordinator request for group {} to broker {}", groupId, node); + GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId); + return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest) + .compose(new GroupCoordinatorResponseHandler()); } private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> { http://git-wip-us.apache.org/repos/asf/kafka/blob/6cf2cb6f/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 77f9df5..777b67f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -133,6 +134,22 @@ public class AbstractCoordinatorTest { } } + @Test + public void testLookupCoordinator() throws Exception { + mockClient.setNode(null); + RequestFuture<Void> noBrokersAvailableFuture = coordinator.lookupCoordinator(); + assertTrue("Failed future expected", noBrokersAvailableFuture.failed()); + + mockClient.setNode(node); + RequestFuture<Void> future = coordinator.lookupCoordinator(); + assertFalse("Request not sent", future.isDone()); + assertTrue("New request sent while one is in progress", future == coordinator.lookupCoordinator()); + + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(); + assertTrue("New request not sent after previous completed", future != coordinator.lookupCoordinator()); + } + private Struct groupCoordinatorResponse(Node node, Errors error) { GroupCoordinatorResponse response = new GroupCoordinatorResponse(error.code(), node); return response.toStruct();