Repository: kafka Updated Branches: refs/heads/trunk bcf5da0ba -> 0f60617fa
KAFKA-5275; AdminClient API consistency Author: Ismael Juma <[email protected]> Reviewers: Colin P. Mccabe <[email protected]>, Jason Gustafson <[email protected]> Closes #3339 from ijuma/kafka-5275-admin-client-api-consistency Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0f60617f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0f60617f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0f60617f Branch: refs/heads/trunk Commit: 0f60617fab5fc6805f522d0b9a213a7f600fab12 Parents: bcf5da0 Author: Ismael Juma <[email protected]> Authored: Thu Jun 15 02:05:41 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu Jun 15 02:05:41 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/admin/AdminClientConfig.java | 9 +++ .../clients/admin/AlterConfigsOptions.java | 2 +- .../kafka/clients/admin/AlterConfigsResult.java | 2 +- .../kafka/clients/admin/CreateAclsResult.java | 2 +- .../clients/admin/CreateTopicsOptions.java | 2 +- .../kafka/clients/admin/CreateTopicsResult.java | 2 +- .../kafka/clients/admin/DeleteAclsResult.java | 32 +++++------ .../kafka/clients/admin/DeleteTopicsResult.java | 2 +- .../kafka/clients/admin/DescribeAclsResult.java | 2 +- .../clients/admin/DescribeConfigsResult.java | 2 +- .../clients/admin/DescribeTopicsResult.java | 2 +- .../kafka/clients/admin/KafkaAdminClient.java | 60 ++++++++++---------- .../kafka/clients/admin/ListTopicsOptions.java | 2 +- .../kafka/clients/admin/ListTopicsResult.java | 13 +++-- .../apache/kafka/clients/admin/NewTopic.java | 2 +- .../kafka/clients/admin/TopicDescription.java | 19 ++++--- .../kafka/clients/admin/TopicListing.java | 2 +- .../apache/kafka/common/TopicPartitionInfo.java | 7 ++- .../kafka/common/acl/AccessControlEntry.java | 10 ++-- .../common/acl/AccessControlEntryData.java | 4 +- .../common/acl/AccessControlEntryFilter.java | 4 +- .../org/apache/kafka/common/acl/AclBinding.java | 2 +- .../kafka/common/acl/AclBindingFilter.java | 2 +- .../apache/kafka/common/acl/AclOperation.java | 2 +- .../kafka/common/acl/AclPermissionType.java | 2 +- .../apache/kafka/common/resource/Resource.java | 4 +- .../kafka/common/resource/ResourceFilter.java | 2 +- .../kafka/common/resource/ResourceType.java | 2 +- .../clients/admin/KafkaAdminClientTest.java | 28 ++++----- .../kafka/common/acl/AclOperationTest.java | 2 +- .../kafka/common/acl/AclPermissionTypeTest.java | 2 +- .../kafka/common/resource/ResourceTypeTest.java | 2 +- .../apache/kafka/connect/util/TopicAdmin.java | 2 +- .../storage/KafkaConfigBackingStoreTest.java | 2 +- .../storage/KafkaOffsetBackingStoreTest.java | 2 +- .../kafka/api/AdminClientIntegrationTest.scala | 30 +++++----- ...AdminClientWithPoliciesIntegrationTest.scala | 20 +++---- .../api/SaslSslAdminClientIntegrationTest.scala | 38 ++++++------- .../kafka/tools/ClientCompatibilityTest.java | 6 +- 39 files changed, 173 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java index c106823..ed51e67 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java @@ -98,6 +98,9 @@ public class AdminClientConfig extends AbstractConfig { private static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC; private static final String METRICS_RECORDING_LEVEL_DOC = CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC; + public static final String RETRIES_CONFIG = "retries"; + private static final String RETRIES_DOC = "The maximum number of times to retry a call before failing it."; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -136,6 +139,12 @@ public class AdminClientConfig extends AbstractConfig { 5 * 60 * 1000, Importance.MEDIUM, CONNECTIONS_MAX_IDLE_MS_DOC) + .define(RETRIES_CONFIG, + Type.INT, + 5, + atLeast(0), + Importance.LOW, + RETRIES_DOC) .define(METRICS_SAMPLE_WINDOW_MS_CONFIG, Type.LONG, 30000, http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java index 31f130f..c5665c0 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsOptions.java @@ -52,7 +52,7 @@ public class AlterConfigsOptions { /** * Return true if the request should be validated without altering the configs. */ - public boolean isValidateOnly() { + public boolean shouldValidateOnly() { return validateOnly; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java index 5baacf7..df6c1c2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AlterConfigsResult.java @@ -40,7 +40,7 @@ public class AlterConfigsResult { /** * Return a map from resources to futures which can be used to check the status of the operation on each resource. */ - public Map<ConfigResource, KafkaFuture<Void>> results() { + public Map<ConfigResource, KafkaFuture<Void>> values() { return futures; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java index e575184..2917f17 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateAclsResult.java @@ -41,7 +41,7 @@ public class CreateAclsResult { * Return a map from ACL bindings to futures which can be used to check the status of the creation of each ACL * binding. */ - public Map<AclBinding, KafkaFuture<Void>> results() { + public Map<AclBinding, KafkaFuture<Void>> values() { return futures; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java index bc24014..cb23a8d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsOptions.java @@ -59,7 +59,7 @@ public class CreateTopicsOptions { /** * Return true if the request should be validated without creating the topic. */ - public boolean validateOnly() { + public boolean shouldValidateOnly() { return validateOnly; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java index 3731fad..404cb918 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java @@ -39,7 +39,7 @@ public class CreateTopicsResult { * Return a map from topic names to futures, which can be used to check the status of individual * topic creations. */ - public Map<String, KafkaFuture<Void>> results() { + public Map<String, KafkaFuture<Void>> values() { return futures; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java index 16a505d..90bc297 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteAclsResult.java @@ -38,22 +38,22 @@ import java.util.Map; public class DeleteAclsResult { /** - * A class containing either the deleted ACL or an exception if the delete failed. + * A class containing either the deleted ACL binding or an exception if the delete failed. */ public static class FilterResult { - private final AclBinding acl; + private final AclBinding binding; private final ApiException exception; - FilterResult(AclBinding acl, ApiException exception) { - this.acl = acl; + FilterResult(AclBinding binding, ApiException exception) { + this.binding = binding; this.exception = exception; } /** - * Return the deleted ACL or null if there was an error. + * Return the deleted ACL binding or null if there was an error. */ - public AclBinding acl() { - return acl; + public AclBinding binding() { + return binding; } /** @@ -68,17 +68,17 @@ public class DeleteAclsResult { * A class containing the results of the delete ACLs operation. */ public static class FilterResults { - private final List<FilterResult> acls; + private final List<FilterResult> values; - FilterResults(List<FilterResult> acls) { - this.acls = acls; + FilterResults(List<FilterResult> values) { + this.values = values; } /** - * Return a list of delete ACLs results. + * Return a list of delete ACLs results for a given filter. */ - public List<FilterResult> acls() { - return acls; + public List<FilterResult> values() { + return values; } } @@ -92,7 +92,7 @@ public class DeleteAclsResult { * Return a map from acl filters to futures which can be used to check the status of the deletions by each * filter. */ - public Map<AclBindingFilter, KafkaFuture<FilterResults>> results() { + public Map<AclBindingFilter, KafkaFuture<FilterResults>> values() { return futures; } @@ -115,11 +115,11 @@ public class DeleteAclsResult { // have failed if any Future failed. throw new KafkaException("DeleteAclsResult#all: internal error", e); } - for (FilterResult result : results.acls()) { + for (FilterResult result : results.values()) { if (result.exception() != null) { throw result.exception(); } - acls.add(result.acl()); + acls.add(result.binding()); } } return acls; http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java index 2fd6648..9148a76 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DeleteTopicsResult.java @@ -40,7 +40,7 @@ public class DeleteTopicsResult { * Return a map from topic names to futures which can be used to check the status of * individual deletions. */ - public Map<String, KafkaFuture<Void>> results() { + public Map<String, KafkaFuture<Void>> values() { return futures; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java index 7c49b28..e09bf43 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeAclsResult.java @@ -40,7 +40,7 @@ public class DescribeAclsResult { /** * Return a future containing the ACLs requested. */ - public KafkaFuture<Collection<AclBinding>> all() { + public KafkaFuture<Collection<AclBinding>> values() { return future; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java index c5d4c26..478bf05 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java @@ -44,7 +44,7 @@ public class DescribeConfigsResult { * Return a map from resources to futures which can be used to check the status of the configuration for each * resource. */ - public Map<ConfigResource, KafkaFuture<Config>> results() { + public Map<ConfigResource, KafkaFuture<Config>> values() { return futures; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java index 4e8d433..18f5f9d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java @@ -42,7 +42,7 @@ public class DescribeTopicsResult { * Return a map from topic names to futures which can be used to check the status of * individual topics. */ - public Map<String, KafkaFuture<TopicDescription>> results() { + public Map<String, KafkaFuture<TopicDescription>> values() { return futures; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index d2927ea..e92b1d3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -89,13 +89,13 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -113,11 +113,6 @@ public class KafkaAdminClient extends AdminClient { private static final Logger log = LoggerFactory.getLogger(KafkaAdminClient.class); /** - * The maximum number of times to retry a call before failing it. - */ - private static final int MAX_CALL_RETRIES = 5; - - /** * The next integer to use to name a KafkaAdminClient which the user hasn't specified an explicit name for. */ private static final AtomicInteger ADMIN_CLIENT_ID_SEQUENCE = new AtomicInteger(1); @@ -183,6 +178,8 @@ public class KafkaAdminClient extends AdminClient { */ private final TimeoutProcessorFactory timeoutProcessorFactory; + private final int maxRetries; + /** * Get or create a list value from a map. * @@ -357,6 +354,7 @@ public class KafkaAdminClient extends AdminClient { this.thread = new KafkaThread(threadName, runnable, false); this.timeoutProcessorFactory = (timeoutProcessorFactory == null) ? new TimeoutProcessorFactory() : timeoutProcessorFactory; + this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG); config.logUnused(); log.debug("Created Kafka admin client {}", this.clientId); thread.start(); @@ -421,22 +419,6 @@ public class KafkaAdminClient extends AdminClient { } /** - * Provides a constant node which is known at construction time. - */ - private static class ConstantNodeProvider implements NodeProvider { - private final Node node; - - ConstantNodeProvider(Node node) { - this.node = node; - } - - @Override - public Node provide() { - return node; - } - } - - /** * Provides the controller node. */ private class ControllerNodeProvider implements NodeProvider { @@ -506,7 +488,7 @@ public class KafkaAdminClient extends AdminClient { return; } // If we are out of retries, fail. - if (tries > MAX_CALL_RETRIES) { + if (tries > maxRetries) { if (log.isDebugEnabled()) { log.debug("{} failed after {} attempt(s)", this, tries, new Exception(prettyPrintException(throwable))); @@ -1031,7 +1013,7 @@ public class KafkaAdminClient extends AdminClient { @Override public AbstractRequest.Builder createRequest(int timeoutMs) { - return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.validateOnly()); + return new CreateTopicsRequest.Builder(topicsMap, timeoutMs, options.shouldValidateOnly()); } @Override @@ -1141,7 +1123,7 @@ public class KafkaAdminClient extends AdminClient { Map<String, TopicListing> topicListing = new HashMap<>(); for (String topicName : cluster.topics()) { boolean internal = cluster.internalTopics().contains(topicName); - if (!internal || options.listInternal()) + if (!internal || options.shouldListInternal()) topicListing.put(topicName, new TopicListing(topicName, internal)); } topicListingFuture.complete(topicListing); @@ -1197,19 +1179,31 @@ public class KafkaAdminClient extends AdminClient { continue; } boolean isInternal = cluster.internalTopics().contains(topicName); - TreeMap<Integer, TopicPartitionInfo> partitions = new TreeMap<>(); List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topicName); + List<TopicPartitionInfo> partitions = new ArrayList<>(partitionInfos.size()); for (PartitionInfo partitionInfo : partitionInfos) { TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo( - partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()), + partitionInfo.partition(), leader(partitionInfo), Arrays.asList(partitionInfo.replicas()), Arrays.asList(partitionInfo.inSyncReplicas())); - partitions.put(partitionInfo.partition(), topicPartitionInfo); + partitions.add(topicPartitionInfo); } + Collections.sort(partitions, new Comparator<TopicPartitionInfo>() { + @Override + public int compare(TopicPartitionInfo tp1, TopicPartitionInfo tp2) { + return Integer.compare(tp1.partition(), tp2.partition()); + } + }); TopicDescription topicDescription = new TopicDescription(topicName, isInternal, partitions); future.complete(topicDescription); } } + private Node leader(PartitionInfo partitionInfo) { + if (partitionInfo.leader() == null || partitionInfo.leader().id() == Node.noNode().id()) + return null; + return partitionInfo.leader(); + } + @Override boolean handleUnsupportedVersionException(UnsupportedVersionException exception) { if (supportsDisablingTopicCreation) { @@ -1247,10 +1241,16 @@ public class KafkaAdminClient extends AdminClient { void handleResponse(AbstractResponse abstractResponse) { MetadataResponse response = (MetadataResponse) abstractResponse; describeClusterFuture.complete(response.brokers()); - controllerFuture.complete(response.controller()); + controllerFuture.complete(controller(response)); clusterIdFuture.complete(response.clusterId()); } + private Node controller(MetadataResponse response) { + if (response.controller() == null || response.controller().id() == MetadataResponse.NO_CONTROLLER_ID) + return null; + return response.controller(); + } + @Override void handleFailure(Throwable throwable) { describeClusterFuture.completeExceptionally(throwable); @@ -1530,7 +1530,7 @@ public class KafkaAdminClient extends AdminClient { @Override public AbstractRequest.Builder createRequest(int timeoutMs) { - return new AlterConfigsRequest.Builder(requestMap, options.isValidateOnly()); + return new AlterConfigsRequest.Builder(requestMap, options.shouldValidateOnly()); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java index 4068e88..81d834f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsOptions.java @@ -61,7 +61,7 @@ public class ListTopicsOptions { /** * Return true if we should list internal topics. */ - public boolean listInternal() { + public boolean shouldListInternal() { return listInternal; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java index 97987ae..e54b3de 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.annotation.InterfaceStability; import java.util.Collection; import java.util.Map; +import java.util.Set; /** * The result of the {@link AdminClient#listTopics()} call. @@ -39,14 +40,14 @@ public class ListTopicsResult { /** * Return a future which yields a map of topic names to TopicListing objects. */ - public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() { + public KafkaFuture<Map<String, TopicListing>> namesToListings() { return future; } /** * Return a future which yields a collection of TopicListing objects. */ - public KafkaFuture<Collection<TopicListing>> descriptions() { + public KafkaFuture<Collection<TopicListing>> listings() { return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() { @Override public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) { @@ -58,11 +59,11 @@ public class ListTopicsResult { /** * Return a future which yields a collection of topic names. */ - public KafkaFuture<Collection<String>> names() { - return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() { + public KafkaFuture<Set<String>> names() { + return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Set<String>>() { @Override - public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) { - return namesToDescriptions.keySet(); + public Set<String> apply(Map<String, TopicListing> namesToListings) { + return namesToListings.keySet(); } }); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java index 314337c..ff09579 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/NewTopic.java @@ -68,7 +68,7 @@ public class NewTopic { /** * The number of partitions for the new topic or -1 if a replica assignment has been specified. */ - public int partitions() { + public int numPartitions() { return numPartitions; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index 88ec1a0..c220892 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -20,7 +20,7 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.utils.Utils; -import java.util.NavigableMap; +import java.util.List; /** * A detailed description of a single topic in the cluster. @@ -28,17 +28,17 @@ import java.util.NavigableMap; public class TopicDescription { private final String name; private final boolean internal; - private final NavigableMap<Integer, TopicPartitionInfo> partitions; + private final List<TopicPartitionInfo> partitions; /** * Create an instance with the specified parameters. * * @param name The topic name * @param internal Whether the topic is internal to Kafka - * @param partitions A map from partition id to its leadership and replica information + * @param partitions A list of partitions where the index represents the partition id and the element contains + * leadership and replica information for that partition. */ - public TopicDescription(String name, boolean internal, - NavigableMap<Integer, TopicPartitionInfo> partitions) { + public TopicDescription(String name, boolean internal, List<TopicPartitionInfo> partitions) { this.name = name; this.internal = internal; this.partitions = partitions; @@ -55,20 +55,21 @@ public class TopicDescription { * Whether the topic is internal to Kafka. An example of an internal topic is the offsets and group management topic: * __consumer_offsets. */ - public boolean internal() { + public boolean isInternal() { return internal; } /** - * A map from partition id to the leadership and replica information for that partition. + * A list of partitions where the index represents the partition id and the element contains leadership and replica + * information for that partition. */ - public NavigableMap<Integer, TopicPartitionInfo> partitions() { + public List<TopicPartitionInfo> partitions() { return partitions; } @Override public String toString() { return "(name=" + name + ", internal=" + internal + ", partitions=" + - Utils.mkString(partitions, "[", "]", "=", ",") + ")"; + Utils.join(partitions, ",") + ")"; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java index 738c2ef..e5124be 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicListing.java @@ -46,7 +46,7 @@ public class TopicListing { * Whether the topic is internal to Kafka. An example of an internal topic is the offsets and group management topic: * __consumer_offsets. */ - public boolean internal() { + public boolean isInternal() { return internal; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java index 7656cd2..be69318 100644 --- a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java @@ -19,6 +19,7 @@ package org.apache.kafka.common; import org.apache.kafka.common.utils.Utils; +import java.util.Collections; import java.util.List; /** @@ -42,8 +43,8 @@ public class TopicPartitionInfo { public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) { this.partition = partition; this.leader = leader; - this.replicas = replicas; - this.isr = isr; + this.replicas = Collections.unmodifiableList(replicas); + this.isr = Collections.unmodifiableList(isr); } /** @@ -54,7 +55,7 @@ public class TopicPartitionInfo { } /** - * Return the leader of the partition or {@link Node#noNode()} if there is none. + * Return the leader of the partition or null if there is none. */ public Node leader() { return leader; http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java index 1796762..d5e05df 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java @@ -42,9 +42,11 @@ public class AccessControlEntry { Objects.requireNonNull(principal); Objects.requireNonNull(host); Objects.requireNonNull(operation); - assert operation != AclOperation.ANY; + if (operation == AclOperation.ANY) + throw new IllegalArgumentException("operation must not be ANY"); Objects.requireNonNull(permissionType); - assert permissionType != AclPermissionType.ANY; + if (permissionType == AclPermissionType.ANY) + throw new IllegalArgumentException("permissionType must not be ANY"); this.data = new AccessControlEntryData(principal, host, operation, permissionType); } @@ -91,8 +93,8 @@ public class AccessControlEntry { /** * Return true if this AclResource has any UNKNOWN components. */ - public boolean unknown() { - return data.unknown(); + public boolean isUnknown() { + return data.isUnknown(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java index cf69263..ad7660d 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java @@ -83,8 +83,8 @@ class AccessControlEntryData { /** * Return true if there are any UNKNOWN components. */ - boolean unknown() { - return operation.unknown() || permissionType.unknown(); + boolean isUnknown() { + return operation.isUnknown() || permissionType.isUnknown(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java index 2d8e9fb..a95303e 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java @@ -95,8 +95,8 @@ public class AccessControlEntryFilter { /** * Return true if there are any UNKNOWN components. */ - public boolean unknown() { - return data.unknown(); + public boolean isUnknown() { + return data.isUnknown(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java index fd2a756..ea58434 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java @@ -49,7 +49,7 @@ public class AclBinding { * Return true if this binding has any UNKNOWN components. */ public boolean unknown() { - return resource.unknown() || entry.unknown(); + return resource.isUnknown() || entry.isUnknown(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java index ad4a811..807b730 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java @@ -57,7 +57,7 @@ public class AclBindingFilter { * Return true if this filter has any UNKNOWN components. */ public boolean unknown() { - return resourceFilter.unknown() || entryFilter.unknown(); + return resourceFilter.unknown() || entryFilter.isUnknown(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java index c5d5b1a..3da18cf 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java @@ -158,7 +158,7 @@ public enum AclOperation { /** * Return true if this operation is UNKNOWN. */ - public boolean unknown() { + public boolean isUnknown() { return this == UNKNOWN; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java index d963135..c5b077c 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java @@ -100,7 +100,7 @@ public enum AclPermissionType { /** * Return true if this permission type is UNKNOWN. */ - public boolean unknown() { + public boolean isUnknown() { return this == UNKNOWN; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/Resource.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java index 484c207..f41f41a 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java @@ -83,8 +83,8 @@ public class Resource { /** * Return true if this Resource has any UNKNOWN components. */ - public boolean unknown() { - return resourceType.unknown(); + public boolean isUnknown() { + return resourceType.isUnknown(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java index 18042fa..5032660 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java @@ -71,7 +71,7 @@ public class ResourceFilter { * Return true if this ResourceFilter has any UNKNOWN components. */ public boolean unknown() { - return resourceType.unknown(); + return resourceType.isUnknown(); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java index 0874f91..d83382d 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java @@ -110,7 +110,7 @@ public enum ResourceType { /** * Return whether this resource type is UNKNOWN. */ - public boolean unknown() { + public boolean isUnknown() { return this == UNKNOWN; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index dfab018..cd6ed6b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -220,17 +220,17 @@ public class KafkaAdminClientTest { add(ACL1); add(ACL2); }})); - assertCollectionIs(env.adminClient().describeAcls(FILTER1).all().get(), ACL1, ACL2); + assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(), ACL1, ACL2); // Test a call where we get back no results. env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null, Collections.<AclBinding>emptySet())); - assertTrue(env.adminClient().describeAcls(FILTER2).all().get().isEmpty()); + assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty()); // Test a call where we get back an error. env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, new SecurityDisabledException("Security is disabled"), Collections.<AclBinding>emptySet())); - assertFutureError(env.adminClient().describeAcls(FILTER2).all(), SecurityDisabledException.class); + assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class); } } @@ -251,8 +251,8 @@ public class KafkaAdminClientTest { add(ACL1); add(ACL2); }}); - assertCollectionIs(results.results().keySet(), ACL1, ACL2); - for (KafkaFuture<Void> future : results.results().values()) { + assertCollectionIs(results.values().keySet(), ACL1, ACL2); + for (KafkaFuture<Void> future : results.values().values()) { future.get(); } results.all().get(); @@ -267,9 +267,9 @@ public class KafkaAdminClientTest { add(ACL1); add(ACL2); }}); - assertCollectionIs(results.results().keySet(), ACL1, ACL2); - assertFutureError(results.results().get(ACL1), SecurityDisabledException.class); - results.results().get(ACL2).get(); + assertCollectionIs(results.values().keySet(), ACL1, ACL2); + assertFutureError(results.values().get(ACL1), SecurityDisabledException.class); + results.values().get(ACL2).get(); assertFutureError(results.all(), SecurityDisabledException.class); } } @@ -295,12 +295,12 @@ public class KafkaAdminClientTest { add(FILTER1); add(FILTER2); }}); - Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.results(); + Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.values(); FilterResults filter1Results = filterResults.get(FILTER1).get(); - assertEquals(null, filter1Results.acls().get(0).exception()); - assertEquals(ACL1, filter1Results.acls().get(0).acl()); - assertEquals(null, filter1Results.acls().get(1).exception()); - assertEquals(ACL2, filter1Results.acls().get(1).acl()); + assertEquals(null, filter1Results.values().get(0).exception()); + assertEquals(ACL1, filter1Results.values().get(0).binding()); + assertEquals(null, filter1Results.values().get(1).exception()); + assertEquals(ACL2, filter1Results.values().get(1).binding()); assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class); assertFutureError(results.all(), SecurityDisabledException.class); @@ -318,7 +318,7 @@ public class KafkaAdminClientTest { add(FILTER1); add(FILTER2); }}); - assertTrue(results.results().get(FILTER2).get().acls().isEmpty()); + assertTrue(results.values().get(FILTER2).get().values().isEmpty()); assertFutureError(results.all(), SecurityDisabledException.class); // Test a call where there are no errors. http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java index ba09499..a9b27d8 100644 --- a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java @@ -55,7 +55,7 @@ public class AclOperationTest { public void testIsUnknown() throws Exception { for (AclOperationTestInfo info : INFOS) { assertEquals(info.operation + " was supposed to have unknown == " + info.unknown, - info.unknown, info.operation.unknown()); + info.unknown, info.operation.isUnknown()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java index 15b9068..3f018d7 100644 --- a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java @@ -47,7 +47,7 @@ public class AclPermissionTypeTest { public void testIsUnknown() throws Exception { for (AclPermissionTypeTestInfo info : INFOS) { assertEquals(info.ty + " was supposed to have unknown == " + info.unknown, - info.unknown, info.ty.unknown()); + info.unknown, info.ty.isUnknown()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java index 9adade1..d5f13bc 100644 --- a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java @@ -48,7 +48,7 @@ public class ResourceTypeTest { public void testIsUnknown() throws Exception { for (AclResourceTypeTestInfo info : INFOS) { assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown, - info.unknown, info.resourceType.unknown()); + info.unknown, info.resourceType.isUnknown()); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java index a3b4218..adc3378 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java @@ -216,7 +216,7 @@ public class TopicAdmin implements AutoCloseable { // Attempt to create any missing topics CreateTopicsOptions args = new CreateTopicsOptions().validateOnly(false); - Map<String, KafkaFuture<Void>> newResults = admin.createTopics(topicsByName.values(), args).results(); + Map<String, KafkaFuture<Void>> newResults = admin.createTopics(topicsByName.values(), args).values(); // Iterate over each future so that we can handle individual failures like when some topics already exist Set<String> newlyCreatedTopicNames = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java index 9da574d..07d192b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -169,7 +169,7 @@ public class KafkaConfigBackingStoreTest { assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); assertEquals(TOPIC, capturedNewTopic.getValue().name()); - assertEquals(1, capturedNewTopic.getValue().partitions()); + assertEquals(1, capturedNewTopic.getValue().numPartitions()); assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor()); configStorage.start(); configStorage.stop(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java index 70d7f40..8cd2f0b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java @@ -130,7 +130,7 @@ public class KafkaOffsetBackingStoreTest { assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); assertEquals(TOPIC, capturedNewTopic.getValue().name()); - assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().partitions()); + assertEquals(TOPIC_PARTITIONS, capturedNewTopic.getValue().numPartitions()); assertEquals(TOPIC_REPLICATION_FACTOR, capturedNewTopic.getValue().replicationFactor()); store.start(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 5e94ffe..4c74bca 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -133,7 +133,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { client.createTopics(newTopics.asJava).all.get() waitForTopics(client, topics, List()) - val results = client.createTopics(newTopics.asJava).results() + val results = client.createTopics(newTopics.asJava).values() assertTrue(results.containsKey("mytopic")) assertFutureExceptionTypeEquals(results.get("mytopic"), classOf[TopicExistsException]) assertTrue(results.containsKey("mytopic2")) @@ -143,7 +143,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { assertEquals(topics.toSet, topicToDescription.keySet.asScala) val topic0 = topicToDescription.get("mytopic") - assertEquals(false, topic0.internal) + assertEquals(false, topic0.isInternal) assertEquals("mytopic", topic0.name) assertEquals(2, topic0.partitions.size) val topic0Partition0 = topic0.partitions.get(0) @@ -158,7 +158,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { assertEquals(Seq(2, 0), topic0Partition1.replicas.asScala.map(_.id)) val topic1 = topicToDescription.get("mytopic2") - assertEquals(false, topic1.internal) + assertEquals(false, topic1.isInternal) assertEquals("mytopic2", topic1.name) assertEquals(3, topic1.partitions.size) for (partitionId <- 0 until 3) { @@ -192,7 +192,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { waitForTopics(client, Seq(existingTopic), List()) val nonExistingTopic = "non-existing" - val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).results + val results = client.describeTopics(Seq(nonExistingTopic, existingTopic).asJava).values assertEquals(existingTopic, results.get(existingTopic).get.name) intercept[ExecutionException](results.get(nonExistingTopic).get).getCause.isInstanceOf[UnknownTopicOrPartitionException] assertEquals(None, zkUtils.getTopicPartitionCount(nonExistingTopic)) @@ -303,7 +303,7 @@ class AdminClientIntegrationTest extends KafkaServerTestHarness with Logging { @Test def testAclOperations(): Unit = { client = AdminClient.create(createConfig()) - assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).all(), classOf[SecurityDisabledException]) + assertFutureExceptionTypeEquals(client.describeAcls(AclBindingFilter.ANY).values(), classOf[SecurityDisabledException]) assertFutureExceptionTypeEquals(client.createAcls(Collections.singleton(ACL1)).all(), classOf[SecurityDisabledException]) assertFutureExceptionTypeEquals(client.deleteAcls(Collections.singleton(ACL1.toFilter())).all(), @@ -422,7 +422,7 @@ object AdminClientIntegrationTest { topicResource2 -> new Config(topicConfigEntries2) ).asJava) - assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet) + assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.values.keySet) alterResult.all.get // Verify that topics were updated correctly @@ -454,7 +454,7 @@ object AdminClientIntegrationTest { topicResource2 -> new Config(topicConfigEntries2) ).asJava, new AlterConfigsOptions().validateOnly(true)) - assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.results.keySet) + assertEquals(Set(topicResource1, topicResource2).asJava, alterResult.values.keySet) alterResult.all.get // Verify that topics were not updated due to validateOnly = true @@ -495,10 +495,10 @@ object AdminClientIntegrationTest { brokerResource -> new Config(brokerConfigEntries) ).asJava) - assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet) - assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) - alterResult.results.get(topicResource2).get - assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) + assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) + alterResult.values.get(topicResource2).get + assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) // Verify that first and third resources were not updated and second was updated var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) @@ -523,10 +523,10 @@ object AdminClientIntegrationTest { brokerResource -> new Config(brokerConfigEntries) ).asJava, new AlterConfigsOptions().validateOnly(true)) - assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.results.keySet) - assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) - alterResult.results.get(topicResource2).get - assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) + assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[InvalidRequestException]) + alterResult.values.get(topicResource2).get + assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) // Verify that no resources are updated since validate_only = true describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, brokerResource).asJava) http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala index 7d3c54c..8e8e825 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientWithPoliciesIntegrationTest.scala @@ -128,11 +128,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with brokerResource -> new Config(brokerConfigEntries) ).asJava) - assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet) - assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) - alterResult.results.get(topicResource2).get - assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException]) - assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet) + assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) + alterResult.values.get(topicResource2).get + assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException]) + assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) // Verify that the second resource was updated and the others were not var describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava) @@ -158,11 +158,11 @@ class AdminClientWithPoliciesIntegrationTest extends KafkaServerTestHarness with topicResource3 -> new Config(topicConfigEntries3) ).asJava, new AlterConfigsOptions().validateOnly(true)) - assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.results.keySet) - assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) - alterResult.results.get(topicResource2).get - assertTrue(intercept[ExecutionException](alterResult.results.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException]) - assertTrue(intercept[ExecutionException](alterResult.results.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) + assertEquals(Set(topicResource1, topicResource2, topicResource3, brokerResource).asJava, alterResult.values.keySet) + assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource1).get).getCause.isInstanceOf[PolicyViolationException]) + alterResult.values.get(topicResource2).get + assertTrue(intercept[ExecutionException](alterResult.values.get(topicResource3).get).getCause.isInstanceOf[InvalidRequestException]) + assertTrue(intercept[ExecutionException](alterResult.values.get(brokerResource).get).getCause.isInstanceOf[InvalidRequestException]) // Verify that no resources are updated since validate_only = true describeResult = client.describeConfigs(Seq(topicResource1, topicResource2, topicResource3, brokerResource).asJava) http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index d0e0806..b4e09b3 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -96,25 +96,25 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with @Test override def testAclOperations(): Unit = { client = AdminClient.create(createConfig()) - assertEquals(6, client.describeAcls(AclBindingFilter.ANY).all.get().size) + assertEquals(6, client.describeAcls(AclBindingFilter.ANY).values.get().size) val results = client.createAcls(List(acl2, acl3).asJava) - assertEquals(Set(acl2, acl3), results.results.keySet().asScala) - results.results.values().asScala.foreach(value => value.get) + assertEquals(Set(acl2, acl3), results.values.keySet().asScala) + results.values.values().asScala.foreach(value => value.get) val aclUnknown = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.UNKNOWN, AclPermissionType.ALLOW)) val results2 = client.createAcls(List(aclUnknown).asJava) - assertEquals(Set(aclUnknown), results2.results.keySet().asScala) + assertEquals(Set(aclUnknown), results2.values.keySet().asScala) assertFutureExceptionTypeEquals(results2.all, classOf[InvalidRequestException]) - val results3 = client.deleteAcls(List(ACL1.toFilter, acl2.toFilter, acl3.toFilter).asJava).results + val results3 = client.deleteAcls(List(ACL1.toFilter, acl2.toFilter, acl3.toFilter).asJava).values assertEquals(Set(ACL1.toFilter, acl2.toFilter, acl3.toFilter), results3.keySet.asScala) - assertEquals(0, results3.get(ACL1.toFilter).get.acls.size()) - assertEquals(Set(acl2), results3.get(acl2.toFilter).get.acls.asScala.map(_.acl).toSet) - assertEquals(Set(acl3), results3.get(acl3.toFilter).get.acls.asScala.map(_.acl).toSet) + assertEquals(0, results3.get(ACL1.toFilter).get.values.size()) + assertEquals(Set(acl2), results3.get(acl2.toFilter).get.values.asScala.map(_.binding).toSet) + assertEquals(Set(acl3), results3.get(acl3.toFilter).get.values.asScala.map(_.binding).toSet) } def waitForDescribeAcls(client: AdminClient, filter: AclBindingFilter, acls: Set[AclBinding]): Unit = { TestUtils.waitUntilTrue(() => { - val results = client.describeAcls(filter).all.get() + val results = client.describeAcls(filter).values.get() acls == results.asScala.toSet }, "timed out waiting for ACLs") } @@ -123,7 +123,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with def testAclOperations2(): Unit = { client = AdminClient.create(createConfig()) val results = client.createAcls(List(acl2, acl2).asJava) - assertEquals(Set(acl2, acl2), results.results.keySet().asScala) + assertEquals(Set(acl2, acl2), results.values.keySet().asScala) results.all.get() waitForDescribeAcls(client, acl2.toFilter, Set(acl2)) @@ -133,9 +133,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with waitForDescribeAcls(client, filterA, Set()) val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions()) - assertEquals(Set(filterA, filterB), results2.results.keySet().asScala) - assertEquals(Set(), results2.results.get(filterA).get.acls.asScala.map(_.acl).toSet) - assertEquals(Set(acl2), results2.results.get(filterB).get.acls.asScala.map(_.acl).toSet) + assertEquals(Set(filterA, filterB), results2.values.keySet().asScala) + assertEquals(Set(), results2.values.get(filterA).get.values.asScala.map(_.binding).toSet) + assertEquals(Set(acl2), results2.values.get(filterB).get.values.asScala.map(_.binding).toSet) waitForDescribeAcls(client, filterB, Set()) } @@ -148,9 +148,9 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with val emptyResourceNameAcl = new AclBinding(new Resource(ResourceType.TOPIC, ""), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) val results = client.createAcls(List(clusterAcl, emptyResourceNameAcl).asJava, new CreateAclsOptions()) - assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.results.keySet().asScala) - assertFutureExceptionTypeEquals(results.results.get(clusterAcl), classOf[InvalidRequestException]) - assertFutureExceptionTypeEquals(results.results.get(emptyResourceNameAcl), classOf[InvalidRequestException]) + assertEquals(Set(clusterAcl, emptyResourceNameAcl), results.values.keySet().asScala) + assertFutureExceptionTypeEquals(results.values.get(clusterAcl), classOf[InvalidRequestException]) + assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException]) } private def verifyCauseIsClusterAuth(e: Throwable): Unit = { @@ -196,7 +196,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with verifyCauseIsClusterAuth(e) true case Success(_) => - assertEquals(Set(fooAcl), result.results.get(fooAcl.toFilter).get.acls.asScala.map(_.acl).toSet) + assertEquals(Set(fooAcl), result.values.get(fooAcl.toFilter).get.values.asScala.map(_.binding).toSet) true } } @@ -212,14 +212,14 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with new AccessControlEntry("User:*", "*", AclOperation.ALL, AclPermissionType.ALLOW)) val results = client.describeAcls(userAcl.toFilter) if (expectAuth) { - Try(results.all.get) match { + Try(results.values.get) match { case Failure(e) => verifyCauseIsClusterAuth(e) false case Success(acls) => Set(userAcl).equals(acls.asScala.toSet) } } else { - Try(results.all.get) match { + Try(results.values.get) match { case Failure(e) => verifyCauseIsClusterAuth(e) true http://git-wip-us.apache.org/repos/asf/kafka/blob/0f60617f/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java ---------------------------------------------------------------------- diff --git a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java index 431b53b..b9288d7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java +++ b/tools/src/main/java/org/apache/kafka/tools/ClientCompatibilityTest.java @@ -287,13 +287,13 @@ public class ClientCompatibilityTest { } }); while (true) { - Collection<TopicListing> listings = client.listTopics().descriptions().get(); + Collection<TopicListing> listings = client.listTopics().listings().get(); if (!testConfig.createTopicsSupported) break; boolean foundNewTopic = false; for (TopicListing listing : listings) { if (listing.name().equals("newtopic")) { - if (listing.internal()) + if (listing.isInternal()) throw new KafkaException("Did not expect newtopic to be an internal topic."); foundNewTopic = true; } @@ -308,7 +308,7 @@ public class ClientCompatibilityTest { @Override public void invoke() throws Throwable { try { - client.describeAcls(AclBindingFilter.ANY).all().get(); + client.describeAcls(AclBindingFilter.ANY).values().get(); } catch (ExecutionException e) { if (e.getCause() instanceof SecurityDisabledException) return;
