http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java deleted file mode 100644 index a51c1c8..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeClusterResults.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin; - -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.Collection; - -/** - * The results of the describeCluster call. - */ [email protected] -public class DescribeClusterResults { - private final KafkaFuture<Collection<Node>> nodes; - private final KafkaFuture<Node> controller; - private final KafkaFuture<String> clusterId; - - DescribeClusterResults(KafkaFuture<Collection<Node>> nodes, - KafkaFuture<Node> controller, - KafkaFuture<String> clusterId) { - this.nodes = nodes; - this.controller = controller; - this.clusterId = clusterId; - } - - /** - * Returns a future which yields a collection of nodes. - */ - public KafkaFuture<Collection<Node>> nodes() { - return nodes; - } - - /** - * Returns a future which yields the current controller id. - * Note that this may yield null, if the controller ID is not yet known. - */ - public KafkaFuture<Node> controller() { - return controller; - } - - /** - * Returns a future which yields the current cluster Id. - * Note that this may yield null, if the cluster version is too old. - */ - public KafkaFuture<String> clusterId() { - return clusterId; - } -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java new file mode 100644 index 0000000..2379724 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResult.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.config.ConfigResource; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + [email protected] +public class DescribeConfigsResult { + + private final Map<ConfigResource, KafkaFuture<Config>> futures; + + DescribeConfigsResult(Map<ConfigResource, KafkaFuture<Config>> futures) { + this.futures = futures; + } + + public Map<ConfigResource, KafkaFuture<Config>> results() { + return futures; + } + + public KafkaFuture<Map<ConfigResource, Config>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). + thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() { + @Override + public Map<ConfigResource, Config> apply(Void v) { + Map<ConfigResource, Config> configs = new HashMap<>(futures.size()); + for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) { + try { + configs.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, because allOf ensured that all the futures + // completed successfully. + throw new RuntimeException(e); + } + } + return configs; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java deleted file mode 100644 index c29872a..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsResults.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin; - -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; - [email protected] -public class DescribeConfigsResults { - - private final Map<ConfigResource, KafkaFuture<Config>> futures; - - DescribeConfigsResults(Map<ConfigResource, KafkaFuture<Config>> futures) { - this.futures = futures; - } - - public Map<ConfigResource, KafkaFuture<Config>> results() { - return futures; - } - - public KafkaFuture<Map<ConfigResource, Config>> all() { - return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.Function<Void, Map<ConfigResource, Config>>() { - @Override - public Map<ConfigResource, Config> apply(Void v) { - Map<ConfigResource, Config> configs = new HashMap<>(futures.size()); - for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : futures.entrySet()) { - try { - configs.put(entry.getKey(), entry.getValue().get()); - } catch (InterruptedException | ExecutionException e) { - // This should be unreachable, because allOf ensured that all the futures - // completed successfully. - throw new RuntimeException(e); - } - } - return configs; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java new file mode 100644 index 0000000..e7cd6b3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResult.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +/** + * The results of the describeTopic call. + */ [email protected] +public class DescribeTopicsResult { + private final Map<String, KafkaFuture<TopicDescription>> futures; + + DescribeTopicsResult(Map<String, KafkaFuture<TopicDescription>> futures) { + this.futures = futures; + } + + /** + * Return a map from topic names to futures which can be used to check the status of + * individual topics. + */ + public Map<String, KafkaFuture<TopicDescription>> results() { + return futures; + } + + /** + * Return a future which succeeds only if all the topic descriptions succeed. + */ + public KafkaFuture<Map<String, TopicDescription>> all() { + return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). + thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() { + @Override + public Map<String, TopicDescription> apply(Void v) { + Map<String, TopicDescription> descriptions = new HashMap<>(futures.size()); + for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) { + try { + descriptions.put(entry.getKey(), entry.getValue().get()); + } catch (InterruptedException | ExecutionException e) { + // This should be unreachable, because allOf ensured that all the futures + // completed successfully. + throw new RuntimeException(e); + } + } + return descriptions; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java deleted file mode 100644 index 5c309bb..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeTopicsResults.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin; - -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ExecutionException; - -/** - * The results of the describeTopic call. - */ [email protected] -public class DescribeTopicsResults { - private final Map<String, KafkaFuture<TopicDescription>> futures; - - DescribeTopicsResults(Map<String, KafkaFuture<TopicDescription>> futures) { - this.futures = futures; - } - - /** - * Return a map from topic names to futures which can be used to check the status of - * individual topics. - */ - public Map<String, KafkaFuture<TopicDescription>> results() { - return futures; - } - - /** - * Return a future which succeeds only if all the topic descriptions succeed. - */ - public KafkaFuture<Map<String, TopicDescription>> all() { - return KafkaFuture.allOf(futures.values().toArray(new KafkaFuture[0])). - thenApply(new KafkaFuture.Function<Void, Map<String, TopicDescription>>() { - @Override - public Map<String, TopicDescription> apply(Void v) { - Map<String, TopicDescription> descriptions = new HashMap<>(futures.size()); - for (Map.Entry<String, KafkaFuture<TopicDescription>> entry : futures.entrySet()) { - try { - descriptions.put(entry.getKey(), entry.getValue().get()); - } catch (InterruptedException | ExecutionException e) { - // This should be unreachable, because allOf ensured that all the futures - // completed successfully. - throw new RuntimeException(e); - } - } - return descriptions; - } - }); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 98fc3f3..9fa0cad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -25,14 +25,18 @@ import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResult; -import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults; +import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResult; +import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.BrokerNotAvailableException; import org.apache.kafka.common.errors.DisconnectException; @@ -995,7 +999,7 @@ public class KafkaAdminClient extends AdminClient { } @Override - public CreateTopicResults createTopics(final Collection<NewTopic> newTopics, + public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics, final CreateTopicsOptions options) { final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(newTopics.size()); final Map<String, CreateTopicsRequest.TopicDetails> topicsMap = new HashMap<>(newTopics.size()); @@ -1046,11 +1050,11 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(topicFutures.values(), throwable); } }, now); - return new CreateTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures)); + return new CreateTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures)); } @Override - public DeleteTopicResults deleteTopics(final Collection<String> topicNames, + public DeleteTopicsResult deleteTopics(final Collection<String> topicNames, DeleteTopicsOptions options) { final Map<String, KafkaFutureImpl<Void>> topicFutures = new HashMap<>(topicNames.size()); for (String topicName : topicNames) { @@ -1099,11 +1103,11 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(topicFutures.values(), throwable); } }, now); - return new DeleteTopicResults(new HashMap<String, KafkaFuture<Void>>(topicFutures)); + return new DeleteTopicsResult(new HashMap<String, KafkaFuture<Void>>(topicFutures)); } @Override - public ListTopicsResults listTopics(final ListTopicsOptions options) { + public ListTopicsResult listTopics(final ListTopicsOptions options) { final KafkaFutureImpl<Map<String, TopicListing>> topicListingFuture = new KafkaFutureImpl<>(); final long now = time.milliseconds(); runnable.call(new Call("listTopics", calcDeadlineMs(now, options.timeoutMs()), @@ -1132,11 +1136,11 @@ public class KafkaAdminClient extends AdminClient { topicListingFuture.completeExceptionally(throwable); } }, now); - return new ListTopicsResults(topicListingFuture); + return new ListTopicsResult(topicListingFuture); } @Override - public DescribeTopicsResults describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) { + public DescribeTopicsResult describeTopics(final Collection<String> topicNames, DescribeTopicsOptions options) { final Map<String, KafkaFutureImpl<TopicDescription>> topicFutures = new HashMap<>(topicNames.size()); final ArrayList<String> topicNamesList = new ArrayList<>(); for (String topicName : topicNames) { @@ -1190,11 +1194,11 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(topicFutures.values(), throwable); } }, now); - return new DescribeTopicsResults(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures)); + return new DescribeTopicsResult(new HashMap<String, KafkaFuture<TopicDescription>>(topicFutures)); } @Override - public DescribeClusterResults describeCluster(DescribeClusterOptions options) { + public DescribeClusterResult describeCluster(DescribeClusterOptions options) { final KafkaFutureImpl<Collection<Node>> describeClusterFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<Node> controllerFuture = new KafkaFutureImpl<>(); final KafkaFutureImpl<String> clusterIdFuture = new KafkaFutureImpl<>(); @@ -1223,11 +1227,11 @@ public class KafkaAdminClient extends AdminClient { } }, now); - return new DescribeClusterResults(describeClusterFuture, controllerFuture, clusterIdFuture); + return new DescribeClusterResult(describeClusterFuture, controllerFuture, clusterIdFuture); } @Override - public ApiVersionsResults apiVersions(Collection<Node> nodes, ApiVersionsOptions options) { + public ApiVersionsResult apiVersions(Collection<Node> nodes, ApiVersionsOptions options) { final long now = time.milliseconds(); final long deadlineMs = calcDeadlineMs(now, options.timeoutMs()); Map<Node, KafkaFuture<NodeApiVersions>> nodeFutures = new HashMap<>(); @@ -1254,12 +1258,12 @@ public class KafkaAdminClient extends AdminClient { } }, now); } - return new ApiVersionsResults(nodeFutures); + return new ApiVersionsResult(nodeFutures); } @Override - public DescribeAclsResults describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) { + public DescribeAclsResult describeAcls(final AclBindingFilter filter, DescribeAclsOptions options) { final long now = time.milliseconds(); final KafkaFutureImpl<Collection<AclBinding>> future = new KafkaFutureImpl<>(); runnable.call(new Call("describeAcls", calcDeadlineMs(now, options.timeoutMs()), @@ -1285,11 +1289,11 @@ public class KafkaAdminClient extends AdminClient { future.completeExceptionally(throwable); } }, now); - return new DescribeAclsResults(future); + return new DescribeAclsResult(future); } @Override - public CreateAclsResults createAcls(Collection<AclBinding> acls, CreateAclsOptions options) { + public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) { final long now = time.milliseconds(); final Map<AclBinding, KafkaFutureImpl<Void>> futures = new HashMap<>(); final List<AclCreation> aclCreations = new ArrayList<>(); @@ -1340,11 +1344,11 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(futures.values(), throwable); } }, now); - return new CreateAclsResults(new HashMap<AclBinding, KafkaFuture<Void>>(futures)); + return new CreateAclsResult(new HashMap<AclBinding, KafkaFuture<Void>>(futures)); } @Override - public DeleteAclsResults deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) { + public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters, DeleteAclsOptions options) { final long now = time.milliseconds(); final Map<AclBindingFilter, KafkaFutureImpl<FilterResults>> futures = new HashMap<>(); final List<AclBindingFilter> filterList = new ArrayList<>(); @@ -1392,11 +1396,11 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(futures.values(), throwable); } }, now); - return new DeleteAclsResults(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures)); + return new DeleteAclsResult(new HashMap<AclBindingFilter, KafkaFuture<FilterResults>>(futures)); } @Override - public DescribeConfigsResults describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) { + public DescribeConfigsResult describeConfigs(Collection<ConfigResource> configResources, final DescribeConfigsOptions options) { final Map<ConfigResource, KafkaFutureImpl<Config>> singleRequestFutures = new HashMap<>(); final Collection<Resource> singleRequestResources = new ArrayList<>(configResources.size()); @@ -1487,7 +1491,7 @@ public class KafkaAdminClient extends AdminClient { Map<ConfigResource, KafkaFutureImpl<Config>> allFutures = new HashMap<>(configResources.size()); allFutures.putAll(singleRequestFutures); allFutures.putAll(brokerFutures); - return new DescribeConfigsResults(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures)); + return new DescribeConfigsResult(new HashMap<ConfigResource, KafkaFuture<Config>>(allFutures)); } private Resource configResourceToResource(ConfigResource configResource) { @@ -1506,7 +1510,7 @@ public class KafkaAdminClient extends AdminClient { } @Override - public AlterConfigsResults alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) { + public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) { final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(configs.size()); for (ConfigResource configResource : configs.keySet()) { futures.put(configResource, new KafkaFutureImpl<Void>()); @@ -1548,6 +1552,6 @@ public class KafkaAdminClient extends AdminClient { completeAllExceptionally(futures.values(), throwable); } }, now); - return new AlterConfigsResults(new HashMap<ConfigResource, KafkaFuture<Void>>(futures)); + return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(futures)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java new file mode 100644 index 0000000..7b2fae8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResult.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; +import java.util.Map; + +/** + * The result of the listTopics call. + */ [email protected] +public class ListTopicsResult { + final KafkaFuture<Map<String, TopicListing>> future; + + ListTopicsResult(KafkaFuture<Map<String, TopicListing>> future) { + this.future = future; + } + + /** + * Return a future which yields a map of topic names to TopicListing objects. + */ + public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() { + return future; + } + + /** + * Return a future which yields a collection of TopicListing objects. + */ + public KafkaFuture<Collection<TopicListing>> descriptions() { + return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() { + @Override + public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) { + return namesToDescriptions.values(); + } + }); + } + + /** + * Return a future which yields a collection of topic names. + */ + public KafkaFuture<Collection<String>> names() { + return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() { + @Override + public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) { + return namesToDescriptions.keySet(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java b/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java deleted file mode 100644 index 7e9448d..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ListTopicsResults.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin; - -import org.apache.kafka.common.KafkaFuture; -import org.apache.kafka.common.annotation.InterfaceStability; - -import java.util.Collection; -import java.util.Map; - -/** - * The result of the listTopics call. - */ [email protected] -public class ListTopicsResults { - final KafkaFuture<Map<String, TopicListing>> future; - - ListTopicsResults(KafkaFuture<Map<String, TopicListing>> future) { - this.future = future; - } - - /** - * Return a future which yields a map of topic names to TopicListing objects. - */ - public KafkaFuture<Map<String, TopicListing>> namesToDescriptions() { - return future; - } - - /** - * Return a future which yields a collection of TopicListing objects. - */ - public KafkaFuture<Collection<TopicListing>> descriptions() { - return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<TopicListing>>() { - @Override - public Collection<TopicListing> apply(Map<String, TopicListing> namesToDescriptions) { - return namesToDescriptions.values(); - } - }); - } - - /** - * Return a future which yields a collection of topic names. - */ - public KafkaFuture<Collection<String>> names() { - return future.thenApply(new KafkaFuture.Function<Map<String, TopicListing>, Collection<String>>() { - @Override - public Collection<String> apply(Map<String, TopicListing> namesToDescriptions) { - return namesToDescriptions.keySet(); - } - }); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java b/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java deleted file mode 100644 index 9148aac..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/Resource.java +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.admin; - -import java.util.Objects; - -/** - * Represents a cluster resource with a tuple of (type, name). - */ -public class Resource { - private final ResourceType resourceType; - private final String name; - - public Resource(ResourceType resourceType, String name) { - Objects.requireNonNull(resourceType); - this.resourceType = resourceType; - Objects.requireNonNull(name); - this.name = name; - } - - public ResourceType resourceType() { - return resourceType; - } - - public String name() { - return name; - } - - /** - * Create a filter which matches only this Resource. - */ - public ResourceFilter toFilter() { - return new ResourceFilter(resourceType, name); - } - - @Override - public String toString() { - return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")"; - } - - /** - * Return true if this Resource has any UNKNOWN components. - */ - public boolean unknown() { - return resourceType.unknown(); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Resource)) - return false; - Resource other = (Resource) o; - return resourceType.equals(other.resourceType) && Objects.equals(name, other.name); - } - - @Override - public int hashCode() { - return Objects.hash(resourceType, name); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java deleted file mode 100644 index 6f453b6..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceFilter.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.admin; - -import java.util.Objects; - -/** - * A filter which matches Resource objects. - */ -public class ResourceFilter { - private final ResourceType resourceType; - private final String name; - - public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null); - - public ResourceFilter(ResourceType resourceType, String name) { - Objects.requireNonNull(resourceType); - this.resourceType = resourceType; - this.name = name; - } - - public ResourceType resourceType() { - return resourceType; - } - - public String name() { - return name; - } - - @Override - public String toString() { - return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")"; - } - - /** - * Return true if this ResourceFilter has any UNKNOWN components. - */ - public boolean unknown() { - return resourceType.unknown(); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof ResourceFilter)) - return false; - ResourceFilter other = (ResourceFilter) o; - return resourceType.equals(other.resourceType) && Objects.equals(name, other.name); - } - - @Override - public int hashCode() { - return Objects.hash(resourceType, name); - } - - public boolean matches(Resource other) { - if ((name != null) && (!name.equals(other.name()))) - return false; - if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType()))) - return false; - return true; - } - - public boolean matchesAtMostOne() { - return findIndefiniteField() == null; - } - - public String findIndefiniteField() { - if (resourceType == ResourceType.ANY) - return "Resource type is ANY."; - if (resourceType == ResourceType.UNKNOWN) - return "Resource type is UNKNOWN."; - if (name == null) - return "Resource name is NULL."; - return null; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java b/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java deleted file mode 100644 index ca4fa0a..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/ResourceType.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin; - -import java.util.HashMap; -import java.util.Locale; - -/** - * Represents a type of resource which an ACL can be applied to. - */ -public enum ResourceType { - /** - * Represents any ResourceType which this client cannot understand, - * perhaps because this client is too old. - */ - UNKNOWN((byte) 0), - - /** - * In a filter, matches any ResourceType. - */ - ANY((byte) 1), - - /** - * A Kafka topic. - */ - TOPIC((byte) 2), - - /** - * A consumer group. - */ - GROUP((byte) 3), - - /** - * The cluster as a whole. - */ - CLUSTER((byte) 4), - - /** - * A broker. - */ - BROKER((byte) 5); - - private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>(); - - static { - for (ResourceType resourceType : ResourceType.values()) { - CODE_TO_VALUE.put(resourceType.code, resourceType); - } - } - - /** - * Parse the given string as an ACL resource type. - * - * @param str The string to parse. - * - * @return The ResourceType, or UNKNOWN if the string could not be matched. - */ - public static ResourceType fromString(String str) throws IllegalArgumentException { - try { - return ResourceType.valueOf(str.toUpperCase(Locale.ROOT)); - } catch (IllegalArgumentException e) { - return UNKNOWN; - } - } - - public static ResourceType fromCode(byte code) { - ResourceType resourceType = CODE_TO_VALUE.get(code); - if (resourceType == null) { - return UNKNOWN; - } - return resourceType; - } - - private final byte code; - - ResourceType(byte code) { - this.code = code; - } - - public byte code() { - return code; - } - - public boolean unknown() { - return this == UNKNOWN; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java index f13dfff..bf1431e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/TopicDescription.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.common.TopicPartitionInfo; import org.apache.kafka.common.utils.Utils; import java.util.NavigableMap; http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java deleted file mode 100644 index 5241602..0000000 --- a/clients/src/main/java/org/apache/kafka/clients/admin/TopicPartitionInfo.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.clients.admin; - -import org.apache.kafka.common.Node; -import org.apache.kafka.common.utils.Utils; - -import java.util.List; - -public class TopicPartitionInfo { - private final int partition; - private final Node leader; - private final List<Node> replicas; - private final List<Node> isr; - - public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) { - this.partition = partition; - this.leader = leader; - this.replicas = replicas; - this.isr = isr; - } - - public int partition() { - return partition; - } - - public Node leader() { - return leader; - } - - public List<Node> replicas() { - return replicas; - } - - public List<Node> isr() { - return isr; - } - - public String toString() { - return "(partition=" + partition + ", leader=" + leader + ", replicas=" + - Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")"; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java new file mode 100644 index 0000000..70352bd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/TopicPartitionInfo.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common; + +import org.apache.kafka.common.utils.Utils; + +import java.util.List; + +public class TopicPartitionInfo { + private final int partition; + private final Node leader; + private final List<Node> replicas; + private final List<Node> isr; + + public TopicPartitionInfo(int partition, Node leader, List<Node> replicas, List<Node> isr) { + this.partition = partition; + this.leader = leader; + this.replicas = replicas; + this.isr = isr; + } + + public int partition() { + return partition; + } + + public Node leader() { + return leader; + } + + public List<Node> replicas() { + return replicas; + } + + public List<Node> isr() { + return isr; + } + + public String toString() { + return "(partition=" + partition + ", leader=" + leader + ", replicas=" + + Utils.join(replicas, ", ") + ", isr=" + Utils.join(isr, ", ") + ")"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java new file mode 100644 index 0000000..68464b3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntry.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import java.util.Objects; + +/** + * Represents an access control entry. ACEs are a tuple of principal, host, + * operation, and permissionType. + */ +public class AccessControlEntry { + final AccessControlEntryData data; + + public AccessControlEntry(String principal, String host, AclOperation operation, AclPermissionType permissionType) { + Objects.requireNonNull(principal); + Objects.requireNonNull(host); + Objects.requireNonNull(operation); + assert operation != AclOperation.ANY; + Objects.requireNonNull(permissionType); + assert permissionType != AclPermissionType.ANY; + this.data = new AccessControlEntryData(principal, host, operation, permissionType); + } + + public String principal() { + return data.principal(); + } + + public String host() { + return data.host(); + } + + public AclOperation operation() { + return data.operation(); + } + + public AclPermissionType permissionType() { + return data.permissionType(); + } + + /** + * Create a filter which matches only this AccessControlEntry. + */ + public AccessControlEntryFilter toFilter() { + return new AccessControlEntryFilter(data); + } + + @Override + public String toString() { + return data.toString(); + } + + /** + * Return true if this AclResource has any UNKNOWN components. + */ + public boolean unknown() { + return data.unknown(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AccessControlEntry)) + return false; + AccessControlEntry other = (AccessControlEntry) o; + return data.equals(other.data); + } + + @Override + public int hashCode() { + return data.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java new file mode 100644 index 0000000..cf69263 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryData.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import java.util.Objects; + +/** + * An internal, private class which contains the data stored in AccessControlEntry and + * AccessControlEntryFilter objects. + */ +class AccessControlEntryData { + private final String principal; + private final String host; + private final AclOperation operation; + private final AclPermissionType permissionType; + + AccessControlEntryData(String principal, String host, AclOperation operation, AclPermissionType permissionType) { + this.principal = principal; + this.host = host; + this.operation = operation; + this.permissionType = permissionType; + } + + String principal() { + return principal; + } + + String host() { + return host; + } + + AclOperation operation() { + return operation; + } + + AclPermissionType permissionType() { + return permissionType; + } + + /** + * Returns a string describing an ANY or UNKNOWN field, or null if there is + * no such field. + */ + public String findIndefiniteField() { + if (principal() == null) + return "Principal is NULL"; + if (host() == null) + return "Host is NULL"; + if (operation() == AclOperation.ANY) + return "Operation is ANY"; + if (operation() == AclOperation.UNKNOWN) + return "Operation is UNKNOWN"; + if (permissionType() == AclPermissionType.ANY) + return "Permission type is ANY"; + if (permissionType() == AclPermissionType.UNKNOWN) + return "Permission type is UNKNOWN"; + return null; + } + + @Override + public String toString() { + return "(principal=" + (principal == null ? "<any>" : principal) + + ", host=" + (host == null ? "<any>" : host) + + ", operation=" + operation + + ", permissionType=" + permissionType + ")"; + } + + /** + * Return true if there are any UNKNOWN components. + */ + boolean unknown() { + return operation.unknown() || permissionType.unknown(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AccessControlEntryData)) + return false; + AccessControlEntryData other = (AccessControlEntryData) o; + return Objects.equals(principal, other.principal) && + Objects.equals(host, other.host) && + Objects.equals(operation, other.operation) && + Objects.equals(permissionType, other.permissionType); + } + + @Override + public int hashCode() { + return Objects.hash(principal, host, operation, permissionType); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java new file mode 100644 index 0000000..7817865 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/acl/AccessControlEntryFilter.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import java.util.Objects; + +/** + * Represents a filter which matches access control entries. + */ +public class AccessControlEntryFilter { + private final AccessControlEntryData data; + + public static final AccessControlEntryFilter ANY = + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY); + + public AccessControlEntryFilter(String principal, String host, AclOperation operation, AclPermissionType permissionType) { + Objects.requireNonNull(operation); + Objects.requireNonNull(permissionType); + this.data = new AccessControlEntryData(principal, host, operation, permissionType); + } + + /** + * This is a non-public constructor used in AccessControlEntry#toFilter + * + * @param data The access control data. + */ + AccessControlEntryFilter(AccessControlEntryData data) { + this.data = data; + } + + public String principal() { + return data.principal(); + } + + public String host() { + return data.host(); + } + + public AclOperation operation() { + return data.operation(); + } + + public AclPermissionType permissionType() { + return data.permissionType(); + } + + @Override + public String toString() { + return data.toString(); + } + + /** + * Return true if there are any UNKNOWN components. + */ + public boolean unknown() { + return data.unknown(); + } + + /** + * Returns true if this filter matches the given AccessControlEntry. + */ + public boolean matches(AccessControlEntry other) { + if ((principal() != null) && (!data.principal().equals(other.principal()))) + return false; + if ((host() != null) && (!host().equals(other.host()))) + return false; + if ((operation() != AclOperation.ANY) && (!operation().equals(other.operation()))) + return false; + if ((permissionType() != AclPermissionType.ANY) && (!permissionType().equals(other.permissionType()))) + return false; + return true; + } + + /** + * Returns true if this filter could only match one ACE-- in other words, if + * there are no ANY or UNKNOWN fields. + */ + public boolean matchesAtMostOne() { + return findIndefiniteField() == null; + } + + /** + * Returns a string describing an ANY or UNKNOWN field, or null if there is + * no such field. + */ + public String findIndefiniteField() { + return data.findIndefiniteField(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AccessControlEntryFilter)) + return false; + AccessControlEntryFilter other = (AccessControlEntryFilter) o; + return data.equals(other.data); + } + + @Override + public int hashCode() { + return data.hashCode(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java new file mode 100644 index 0000000..91c1c79 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import org.apache.kafka.common.resource.Resource; + +import java.util.Objects; + +/** + * Represents a binding between a resource and an access control entry. + */ +public class AclBinding { + private final Resource resource; + private final AccessControlEntry entry; + + public AclBinding(Resource resource, AccessControlEntry entry) { + Objects.requireNonNull(resource); + this.resource = resource; + Objects.requireNonNull(entry); + this.entry = entry; + } + + /** + * Return true if this binding has any UNKNOWN components. + */ + public boolean unknown() { + return resource.unknown() || entry.unknown(); + } + + public Resource resource() { + return resource; + } + + public final AccessControlEntry entry() { + return entry; + } + + /** + * Create a filter which matches only this AclBinding. + */ + public AclBindingFilter toFilter() { + return new AclBindingFilter(resource.toFilter(), entry.toFilter()); + } + + @Override + public String toString() { + return "(resource=" + resource + ", entry=" + entry + ")"; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AclBinding)) + return false; + AclBinding other = (AclBinding) o; + return resource.equals(other.resource) && entry.equals(other.entry); + } + + @Override + public int hashCode() { + return Objects.hash(resource, entry); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java new file mode 100644 index 0000000..765fac2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import org.apache.kafka.common.resource.ResourceFilter; +import org.apache.kafka.common.resource.ResourceType; + +import java.util.Objects; + +/** + * A filter which can match AclBinding objects. + */ +public class AclBindingFilter { + private final ResourceFilter resourceFilter; + private final AccessControlEntryFilter entryFilter; + + /** + * A filter which matches any ACL binding. + */ + public static final AclBindingFilter ANY = new AclBindingFilter( + new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY)); + + public AclBindingFilter(ResourceFilter resourceFilter, AccessControlEntryFilter entryFilter) { + Objects.requireNonNull(resourceFilter); + this.resourceFilter = resourceFilter; + Objects.requireNonNull(entryFilter); + this.entryFilter = entryFilter; + } + + /** + * Return true if this filter has any UNKNOWN components. + */ + public boolean unknown() { + return resourceFilter.unknown() || entryFilter.unknown(); + } + + public ResourceFilter resourceFilter() { + return resourceFilter; + } + + public final AccessControlEntryFilter entryFilter() { + return entryFilter; + } + + @Override + public String toString() { + return "(resourceFilter=" + resourceFilter + ", entryFilter=" + entryFilter + ")"; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof AclBindingFilter)) + return false; + AclBindingFilter other = (AclBindingFilter) o; + return resourceFilter.equals(other.resourceFilter) && entryFilter.equals(other.entryFilter); + } + + public boolean matchesAtMostOne() { + return resourceFilter.matchesAtMostOne() && entryFilter.matchesAtMostOne(); + } + + public String findIndefiniteField() { + String indefinite = resourceFilter.findIndefiniteField(); + if (indefinite != null) + return indefinite; + return entryFilter.findIndefiniteField(); + } + + public boolean matches(AclBinding binding) { + return resourceFilter.matches(binding.resource()) && entryFilter.matches(binding.entry()); + } + + @Override + public int hashCode() { + return Objects.hash(resourceFilter, entryFilter); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java new file mode 100644 index 0000000..c63320d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclOperation.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Represents an operation which an ACL grants or denies permission to perform. + */ +public enum AclOperation { + /** + * Represents any AclOperation which this client cannot understand, perhaps because this + * client is too old. + */ + UNKNOWN((byte) 0), + + /** + * In a filter, matches any AclOperation. + */ + ANY((byte) 1), + + /** + * ALL operation. + */ + ALL((byte) 2), + + /** + * READ operation. + */ + READ((byte) 3), + + /** + * WRITE operation. + */ + WRITE((byte) 4), + + /** + * CREATE operation. + */ + CREATE((byte) 5), + + /** + * DELETE operation. + */ + DELETE((byte) 6), + + /** + * ALTER operation. + */ + ALTER((byte) 7), + + /** + * DESCRIBE operation. + */ + DESCRIBE((byte) 8), + + /** + * CLUSTER_ACTION operation. + */ + CLUSTER_ACTION((byte) 9), + + /** + * DESCRIBE_CONFIGS operation. + */ + DESCRIBE_CONFIGS((byte) 10), + + /** + * ALTER_CONFIGS operation. + */ + ALTER_CONFIGS((byte) 11), + + /** + * IDEMPOTENT_WRITE operation. + */ + IDEMPOTENT_WRITE((byte) 12); + + private final static HashMap<Byte, AclOperation> CODE_TO_VALUE = new HashMap<>(); + + static { + for (AclOperation operation : AclOperation.values()) { + CODE_TO_VALUE.put(operation.code, operation); + } + } + + /** + * Parse the given string as an ACL operation. + * + * @param str The string to parse. + * + * @return The AclOperation, or UNKNOWN if the string could not be matched. + */ + public static AclOperation fromString(String str) throws IllegalArgumentException { + try { + return AclOperation.valueOf(str.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + return UNKNOWN; + } + } + + public static AclOperation fromCode(byte code) { + AclOperation operation = CODE_TO_VALUE.get(code); + if (operation == null) { + return UNKNOWN; + } + return operation; + } + + private final byte code; + + AclOperation(byte code) { + this.code = code; + } + + public byte code() { + return code; + } + + public boolean unknown() { + return this == UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java new file mode 100644 index 0000000..8c77938 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclPermissionType.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Represents whether an ACL grants or denies permissions. + */ +public enum AclPermissionType { + /** + * Represents any AclPermissionType which this client cannot understand, + * perhaps because this client is too old. + */ + UNKNOWN((byte) 0), + + /** + * In a filter, matches any AclPermissionType. + */ + ANY((byte) 1), + + /** + * Disallows access. + */ + DENY((byte) 2), + + /** + * Grants access. + */ + ALLOW((byte) 3); + + private final static HashMap<Byte, AclPermissionType> CODE_TO_VALUE = new HashMap<>(); + + static { + for (AclPermissionType permissionType : AclPermissionType.values()) { + CODE_TO_VALUE.put(permissionType.code, permissionType); + } + } + + /** + * Parse the given string as an ACL permission. + * + * @param str The string to parse. + * + * @return The AclPermissionType, or UNKNOWN if the string could not be matched. + */ + public static AclPermissionType fromString(String str) { + try { + return AclPermissionType.valueOf(str.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + return UNKNOWN; + } + } + + public static AclPermissionType fromCode(byte code) { + AclPermissionType permissionType = CODE_TO_VALUE.get(code); + if (permissionType == null) { + return UNKNOWN; + } + return permissionType; + } + + private final byte code; + + AclPermissionType(byte code) { + this.code = code; + } + + public byte code() { + return code; + } + + public boolean unknown() { + return this == UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java new file mode 100644 index 0000000..5395671 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.config; + +public final class ConfigResource { + + public enum Type { + BROKER, TOPIC, UNKNOWN; + } + + private final Type type; + private final String name; + + public ConfigResource(Type type, String name) { + this.type = type; + this.name = name; + } + + public Type type() { + return type; + } + + public String name() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ConfigResource that = (ConfigResource) o; + + return type == that.type && name.equals(that.name); + } + + @Override + public int hashCode() { + int result = type.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "ConfigResource{type=" + type + ", name='" + name + "'}"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java new file mode 100755 index 0000000..554c97b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.config; + +/** + * Keys that can be used to configure a topic. These keys are useful when creating or reconfiguring a + * topic using the AdminClient. + * + * The intended pattern is for broker configs to include a `log.` prefix. For example, to set the default broker + * cleanup policy, one would set log.cleanup.policy instead of cleanup.policy. Unfortunately, there are many cases + * where this pattern is not followed. + */ +// This is a public API, so we should not remove or alter keys without a discussion and a deprecation period. +// Eventually this should replace LogConfig.scala. +public class TopicConfig { + public static final String SEGMENT_BYTES_CONFIG = "segment.bytes"; + public static final String SEGMENT_BYTES_DOC = "This configuration controls the segment file size for " + + "the log. Retention and cleaning is always done a file at a time so a larger segment size means " + + "fewer files but less granular control over retention."; + + public static final String SEGMENT_MS_CONFIG = "segment.ms"; + public static final String SEGMENT_MS_DOC = "This configuration controls the period of time after " + + "which Kafka will force the log to roll even if the segment file isn't full to ensure that retention " + + "can delete or compact old data."; + + public static final String SEGMENT_JITTER_MS_CONFIG = "segment.jitter.ms"; + public static final String SEGMENT_JITTER_MS_DOC = "The maximum random jitter subtracted from the scheduled " + + "segment roll time to avoid thundering herds of segment rolling"; + + public static final String SEGMENT_INDEX_BYTES_CONFIG = "segment.index.bytes"; + public static final String SEGMENT_INDEX_BYTES_DOC = "This configuration controls the size of the index that " + + "maps offsets to file positions. We preallocate this index file and shrink it only after log " + + "rolls. You generally should not need to change this setting."; + + public static final String FLUSH_MESSAGES_INTERVAL_CONFIG = "flush.messages"; + public static final String FLUSH_MESSAGES_INTERVAL_DOC = "This setting allows specifying an interval at " + + "which we will force an fsync of data written to the log. For example if this was set to 1 " + + "we would fsync after every message; if it were 5 we would fsync after every five messages. " + + "In general we recommend you not set this and use replication for durability and allow the " + + "operating system's background flush capabilities as it is more efficient. This setting can " + + "be overridden on a per-topic basis (see <a href=\"#topic-config\">the per-topic configuration section</a>)."; + + public static final String FLUSH_MS_CONFIG = "flush.ms"; + public static final String FLUSH_MS_DOC = "This setting allows specifying a time interval at which we will " + + "force an fsync of data written to the log. For example if this was set to 1000 " + + "we would fsync after 1000 ms had passed. In general we recommend you not set " + + "this and use replication for durability and allow the operating system's background " + + "flush capabilities as it is more efficient."; + + public static final String RETENTION_BYTES_CONFIG = "retention.bytes"; + public static final String RETENTION_BYTES_DOC = "This configuration controls the maximum size a log can grow " + + "to before we will discard old log segments to free up space if we are using the " + + "\"delete\" retention policy. By default there is no size limit only a time limit."; + + public static final String RETENTION_MS_CONFIG = "retention.ms"; + public static final String RETENTION_MS_DOC = "This configuration controls the maximum time we will retain a " + + "log before we will discard old log segments to free up space if we are using the " + + "\"delete\" retention policy. This represents an SLA on how soon consumers must read " + + "their data."; + + public static final String MAX_MESSAGE_BYTES_CONFIG = "max.message.bytes"; + public static final String MAX_MESSAGE_BYTES_DOC = "This is largest message size Kafka will allow to be " + + "appended. Note that if you increase this size you must also increase your consumer's fetch size so " + + "they can fetch messages this large."; + + public static final String INDEX_INTERVAL_BYTES_CONFIG = "index.interval.bytes"; + public static final String INDEX_INTERVAL_BYTES_DOCS = "This setting controls how frequently " + + "Kafka adds an index entry to it's offset index. The default setting ensures that we index a " + + "message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " + + "position in the log but makes the index larger. You probably don't need to change this."; + + public static final String FILE_DELETE_DELAY_MS_CONFIG = "file.delete.delay.ms"; + public static final String FILE_DELETE_DELAY_MS_DOC = "The time to wait before deleting a file from the " + + "filesystem"; + + public static final String DELETE_RETENTION_MS_CONFIG = "delete.retention.ms"; + public static final String DELETE_RETENTION_MS_DOC = "The amount of time to retain delete tombstone markers " + + "for <a href=\"#compaction\">log compacted</a> topics. This setting also gives a bound " + + "on the time in which a consumer must complete a read if they begin from offset 0 " + + "to ensure that they get a valid snapshot of the final stage (otherwise delete " + + "tombstones may be collected before they complete their scan)."; + + public static final String MIN_COMPACTION_LAG_MS_CONFIG = "min.compaction.lag.ms"; + public static final String MIN_COMPACTION_LAG_MS_DOC = "The minimum time a message will remain " + + "uncompacted in the log. Only applicable for logs that are being compacted."; + + public static final String MIN_CLEANABLE_DIRTY_RATIO_CONFIG = "min.cleanable.dirty.ratio"; + public static final String MIN_CLEANABLE_DIRTY_RATIO_DOC = "This configuration controls how frequently " + + "the log compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " + + "compaction</a> is enabled). By default we will avoid cleaning a log where more than " + + "50% of the log has been compacted. This ratio bounds the maximum space wasted in " + + "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " + + "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " + + "space in the log."; + + public static final String CLEANUP_POLICY_CONFIG = "cleanup.policy"; + public static final String CLEANUP_POLICY_COMPACT = "compact"; + public static final String CLEANUP_POLICY_DELETE = "delete"; + public static final String CLEANUP_POLICY_DOC = "A string that is either \"" + CLEANUP_POLICY_DELETE + + "\" or \"" + CLEANUP_POLICY_COMPACT + "\". This string designates the retention policy to use on " + + "old log segments. The default policy (\"delete\") will discard old segments when their retention " + + "time or size limit has been reached. The \"compact\" setting will enable <a href=\"#compaction\">log " + + "compaction</a> on the topic."; + + public static final String UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG = "unclean.leader.election.enable"; + public static final String UNCLEAN_LEADER_ELECTION_ENABLE_DOC = "Indicates whether to enable replicas " + + "not in the ISR set to be elected as leader as a last resort, even though doing so may result in data " + + "loss."; + + public static final String MIN_IN_SYNC_REPLICAS_CONFIG = "min.insync.replicas"; + public static final String MIN_IN_SYNC_REPLICAS_DOC = "When a producer sets acks to \"all\" (or \"-1\"), " + + "this configuration specifies the minimum number of replicas that must acknowledge " + + "a write for the write to be considered successful. If this minimum cannot be met, " + + "then the producer will raise an exception (either NotEnoughReplicas or " + + "NotEnoughReplicasAfterAppend).<br>When used together, min.insync.replicas and acks " + + "allow you to enforce greater durability guarantees. A typical scenario would be to " + + "create a topic with a replication factor of 3, set min.insync.replicas to 2, and " + + "produce with acks of \"all\". This will ensure that the producer raises an exception " + + "if a majority of replicas do not receive a write."; + + public static final String COMPRESSION_TYPE_CONFIG = "compression.type"; + public static final String COMPRESSION_TYPE_DOC = "Specify the final compression type for a given topic. " + + "This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally " + + "accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the " + + "original compression codec set by the producer."; + + public static final String PREALLOCATE_CONFIG = "preallocate"; + public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " + + "creating a new log segment."; + + public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version"; + public static final String MESSAGE_FORMAT_VERSION_DOC = "Specify the message format version the broker " + + "will use to append messages to the logs. The value should be a valid ApiVersion. Some examples are: " + + "0.8.2, 0.9.0.0, 0.10.0, check ApiVersion for more details. By setting a particular message format " + + "version, the user is certifying that all the existing messages on disk are smaller or equal than the " + + "specified version. Setting this value incorrectly will cause consumers with older versions to break as " + + "they will receive messages with a format that they don't understand."; + + public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type"; + public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " + + "message create time or log append time. The value should be either `CreateTime` or `LogAppendTime`"; + + public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG = "message.timestamp.difference.max.ms"; + public static final String MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC = "The maximum difference allowed between " + + "the timestamp when a broker receives a message and the timestamp specified in the message. If " + + "message.timestamp.type=CreateTime, a message will be rejected if the difference in timestamp " + + "exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime."; +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java index f792bbd..757b5af 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java @@ -17,11 +17,11 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.AccessControlEntry; -import org.apache.kafka.clients.admin.AclBinding; -import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java index 8a9ee19..246b5e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java @@ -16,11 +16,11 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.AccessControlEntryFilter; -import org.apache.kafka.clients.admin.AclBindingFilter; -import org.apache.kafka.clients.admin.ResourceFilter; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.resource.ResourceFilter; import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer;
