Repository: kafka Updated Branches: refs/heads/trunk 59b918ec2 -> 78ace3725
KAFKA-5176; AdminClient: add controller and clusterId methods to DescribeClusterResults Author: Colin P. Mccabe <[email protected]> Reviewers: dan norwood <[email protected]>, Ismael Juma <[email protected]> Closes #2977 from cmccabe/KAFKA-5176 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/78ace372 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/78ace372 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/78ace372 Branch: refs/heads/trunk Commit: 78ace3725151dc056e83a8ec389141fc8809d4c0 Parents: 59b918e Author: Colin P. Mccabe <[email protected]> Authored: Tue May 9 02:10:24 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Tue May 9 02:10:31 2017 +0100 ---------------------------------------------------------------------- .../clients/admin/DescribeClusterResults.java | 30 +++++++++++++++++--- .../kafka/clients/admin/KafkaAdminClient.java | 9 +++++- .../api/KafkaAdminClientIntegrationTest.scala | 8 +++++- 3 files changed, 41 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java index 5ee834b..a51c1c8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java @@ -28,16 +28,38 @@ import java.util.Collection; */ @InterfaceStability.Unstable public class DescribeClusterResults { - private final KafkaFuture<Collection<Node>> future; + private final KafkaFuture<Collection<Node>> nodes; + private final KafkaFuture<Node> controller; + private final KafkaFuture<String> clusterId; - DescribeClusterResults(KafkaFuture<Collection<Node>> future) { - this.future = future; + DescribeClusterResults(KafkaFuture<Collection<Node>> nodes, + KafkaFuture<Node> controller, + KafkaFuture<String> clusterId) { + this.nodes = nodes; + this.controller = controller; + this.clusterId = clusterId; } /** * Returns a future which yields a collection of nodes. */ public KafkaFuture<Collection<Node>> nodes() { - return future; + return nodes; + } + + /** + * Returns a future which yields the current controller id. + * Note that this may yield null, if the controller ID is not yet known. + */ + public KafkaFuture<Node> controller() { + return controller; + } + + /** + * Returns a future which yields the current cluster Id. + * Note that this may yield null, if the cluster version is too old. + */ + public KafkaFuture<String> clusterId() { + return clusterId; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index ad921f8..ec10232 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1011,6 +1011,8 @@ public class KafkaAdminClient extends AdminClient { @Override public DescribeClusterResults describeCluster(DescribeClusterOptions options) { final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>(); + final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("listNodes", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @@ -1024,14 +1026,19 @@ public class KafkaAdminClient extends AdminClient { void handleResponse(AbstractResponse abstractResponse) { MetadataResponse response = (MetadataResponse) abstractResponse; describeClusterFuture.complete(response.brokers()); + controllerFuture.complete(response.controller()); + clusterIdFuture.complete(response.clusterId()); } @Override void handleFailure(Throwable throwable) { describeClusterFuture.completeExceptionally(throwable); + controllerFuture.completeExceptionally(throwable); + clusterIdFuture.completeExceptionally(throwable); } }, now); - return new DescribeClusterResults(describeClusterFuture); + + return new DescribeClusterResults(describeClusterFuture, controllerFuture, clusterIdFuture); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/78ace372/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala index 455ab61..07eb673 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala @@ -30,6 +30,7 @@ import org.apache.kafka.common.KafkaFuture import org.apache.kafka.common.errors.TopicExistsException import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Rule, Test} +import org.apache.kafka.common.requests.MetadataResponse import org.junit.rules.Timeout import org.junit.Assert._ @@ -135,9 +136,14 @@ class KafkaAdminClientIntegrationTest extends KafkaServerTestHarness with Loggin } @Test - def testGetAllBrokerVersions(): Unit = { + def testGetAllBrokerVersionsAndDescribeCluster(): Unit = { client = AdminClient.create(createConfig()) val nodes = client.describeCluster().nodes().get() + val clusterId = client.describeCluster().clusterId().get() + assertEquals(servers.head.apis.clusterId, clusterId) + val controller = client.describeCluster().controller().get() + assertEquals(servers.head.apis.metadataCache.getControllerId. + getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id()) val nodesToVersions = client.apiVersions(nodes).all().get() val brokers = brokerList.split(",") assert(brokers.size == nodesToVersions.size())
