Repository: kafka Updated Branches: refs/heads/0.11.0 f925627e2 -> 1b64a4e63
http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java index 341021b..796e200 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java @@ -16,13 +16,13 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.AccessControlEntry; -import org.apache.kafka.clients.admin.AclBinding; -import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java index 8d4eba6..6573b6e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java @@ -16,12 +16,12 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.AccessControlEntryFilter; -import org.apache.kafka.clients.admin.AclBinding; -import org.apache.kafka.clients.admin.AclBindingFilter; -import org.apache.kafka.clients.admin.ResourceFilter; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.resource.ResourceFilter; import java.nio.ByteBuffer; import java.util.Collections; http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java index 127493b..cf21aa6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java @@ -17,12 +17,12 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.AccessControlEntry; -import org.apache.kafka.clients.admin.AclBinding; -import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.resource.Resource; import java.nio.ByteBuffer; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java index f2ce55f..fa23559 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java @@ -16,14 +16,14 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.AccessControlEntry; -import org.apache.kafka.clients.admin.AccessControlEntryFilter; -import org.apache.kafka.clients.admin.AclOperation; -import org.apache.kafka.clients.admin.AclPermissionType; -import org.apache.kafka.clients.admin.Resource; -import org.apache.kafka.clients.admin.ResourceFilter; -import org.apache.kafka.clients.admin.ResourceType; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.ResourceFilter; +import org.apache.kafka.common.resource.ResourceType; class RequestUtils { static Resource resourceFromStructFields(Struct struct) { http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/resource/Resource.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java new file mode 100644 index 0000000..2883a03 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.resource; + +import java.util.Objects; + +/** + * Represents a cluster resource with a tuple of (type, name). + */ +public class Resource { + private final ResourceType resourceType; + private final String name; + + public Resource(ResourceType resourceType, String name) { + Objects.requireNonNull(resourceType); + this.resourceType = resourceType; + Objects.requireNonNull(name); + this.name = name; + } + + public ResourceType resourceType() { + return resourceType; + } + + public String name() { + return name; + } + + /** + * Create a filter which matches only this Resource. + */ + public ResourceFilter toFilter() { + return new ResourceFilter(resourceType, name); + } + + @Override + public String toString() { + return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")"; + } + + /** + * Return true if this Resource has any UNKNOWN components. + */ + public boolean unknown() { + return resourceType.unknown(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Resource)) + return false; + Resource other = (Resource) o; + return resourceType.equals(other.resourceType) && Objects.equals(name, other.name); + } + + @Override + public int hashCode() { + return Objects.hash(resourceType, name); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java new file mode 100644 index 0000000..572b7dc --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.resource; + +import java.util.Objects; + +/** + * A filter which matches Resource objects. + */ +public class ResourceFilter { + private final ResourceType resourceType; + private final String name; + + public static final ResourceFilter ANY = new ResourceFilter(ResourceType.ANY, null); + + public ResourceFilter(ResourceType resourceType, String name) { + Objects.requireNonNull(resourceType); + this.resourceType = resourceType; + this.name = name; + } + + public ResourceType resourceType() { + return resourceType; + } + + public String name() { + return name; + } + + @Override + public String toString() { + return "(resourceType=" + resourceType + ", name=" + ((name == null) ? "<any>" : name) + ")"; + } + + /** + * Return true if this ResourceFilter has any UNKNOWN components. + */ + public boolean unknown() { + return resourceType.unknown(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ResourceFilter)) + return false; + ResourceFilter other = (ResourceFilter) o; + return resourceType.equals(other.resourceType) && Objects.equals(name, other.name); + } + + @Override + public int hashCode() { + return Objects.hash(resourceType, name); + } + + public boolean matches(Resource other) { + if ((name != null) && (!name.equals(other.name()))) + return false; + if ((resourceType != ResourceType.ANY) && (!resourceType.equals(other.resourceType()))) + return false; + return true; + } + + public boolean matchesAtMostOne() { + return findIndefiniteField() == null; + } + + public String findIndefiniteField() { + if (resourceType == ResourceType.ANY) + return "Resource type is ANY."; + if (resourceType == ResourceType.UNKNOWN) + return "Resource type is UNKNOWN."; + if (name == null) + return "Resource name is NULL."; + return null; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java new file mode 100644 index 0000000..a1b7b2b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.resource; + +import java.util.HashMap; +import java.util.Locale; + +/** + * Represents a type of resource which an ACL can be applied to. + */ +public enum ResourceType { + /** + * Represents any ResourceType which this client cannot understand, + * perhaps because this client is too old. + */ + UNKNOWN((byte) 0), + + /** + * In a filter, matches any ResourceType. + */ + ANY((byte) 1), + + /** + * A Kafka topic. + */ + TOPIC((byte) 2), + + /** + * A consumer group. + */ + GROUP((byte) 3), + + /** + * The cluster as a whole. + */ + CLUSTER((byte) 4), + + /** + * A broker. + */ + BROKER((byte) 5); + + private final static HashMap<Byte, ResourceType> CODE_TO_VALUE = new HashMap<>(); + + static { + for (ResourceType resourceType : ResourceType.values()) { + CODE_TO_VALUE.put(resourceType.code, resourceType); + } + } + + /** + * Parse the given string as an ACL resource type. + * + * @param str The string to parse. + * + * @return The ResourceType, or UNKNOWN if the string could not be matched. + */ + public static ResourceType fromString(String str) throws IllegalArgumentException { + try { + return ResourceType.valueOf(str.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + return UNKNOWN; + } + } + + public static ResourceType fromCode(byte code) { + ResourceType resourceType = CODE_TO_VALUE.get(code); + if (resourceType == null) { + return UNKNOWN; + } + return resourceType; + } + + private final byte code; + + ResourceType(byte code) { + this.code = code; + } + + public byte code() { + return code; + } + + public boolean unknown() { + return this == UNKNOWN; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java deleted file mode 100644 index 34cedb6..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.admin; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -public class AclBindingTest { - private static final AclBinding ACL1 = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic"), - new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW)); - - private static final AclBinding ACL2 = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic"), - new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW)); - - private static final AclBinding ACL3 = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic2"), - new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); - - private static final AclBinding UNKNOWN_ACL = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic2"), - new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY)); - - private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter( - new ResourceFilter(ResourceType.ANY, null), - new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)); - - private static final AclBindingFilter ANY_DENY = new AclBindingFilter( - new ResourceFilter(ResourceType.ANY, null), - new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY)); - - private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter( - new ResourceFilter(ResourceType.TOPIC, "mytopic"), - new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY)); - - @Test - public void testMatching() throws Exception { - assertTrue(ACL1.equals(ACL1)); - final AclBinding acl1Copy = new AclBinding( - new Resource(ResourceType.TOPIC, "mytopic"), - new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW)); - assertTrue(ACL1.equals(acl1Copy)); - assertTrue(acl1Copy.equals(ACL1)); - assertTrue(ACL2.equals(ACL2)); - assertFalse(ACL1.equals(ACL2)); - assertFalse(ACL2.equals(ACL1)); - assertTrue(AclBindingFilter.ANY.matches(ACL1)); - assertFalse(AclBindingFilter.ANY.equals(ACL1)); - assertTrue(AclBindingFilter.ANY.matches(ACL2)); - assertFalse(AclBindingFilter.ANY.equals(ACL2)); - assertTrue(AclBindingFilter.ANY.matches(ACL3)); - assertFalse(AclBindingFilter.ANY.equals(ACL3)); - assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY)); - assertTrue(ANY_ANONYMOUS.matches(ACL1)); - assertFalse(ANY_ANONYMOUS.equals(ACL1)); - assertFalse(ANY_ANONYMOUS.matches(ACL2)); - assertFalse(ANY_ANONYMOUS.equals(ACL2)); - assertTrue(ANY_ANONYMOUS.matches(ACL3)); - assertFalse(ANY_ANONYMOUS.equals(ACL3)); - assertFalse(ANY_DENY.matches(ACL1)); - assertFalse(ANY_DENY.matches(ACL2)); - assertTrue(ANY_DENY.matches(ACL3)); - assertTrue(ANY_MYTOPIC.matches(ACL1)); - assertTrue(ANY_MYTOPIC.matches(ACL2)); - assertFalse(ANY_MYTOPIC.matches(ACL3)); - assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL)); - assertTrue(ANY_DENY.matches(UNKNOWN_ACL)); - assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL)); - assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL)); - } - - @Test - public void testUnknowns() throws Exception { - assertFalse(ACL1.unknown()); - assertFalse(ACL2.unknown()); - assertFalse(ACL3.unknown()); - assertFalse(ANY_ANONYMOUS.unknown()); - assertFalse(ANY_DENY.unknown()); - assertFalse(ANY_MYTOPIC.unknown()); - assertTrue(UNKNOWN_ACL.unknown()); - } - - @Test - public void testMatchesAtMostOne() throws Exception { - assertEquals(null, ACL1.toFilter().findIndefiniteField()); - assertEquals(null, ACL2.toFilter().findIndefiniteField()); - assertEquals(null, ACL3.toFilter().findIndefiniteField()); - assertFalse(ANY_ANONYMOUS.matchesAtMostOne()); - assertFalse(ANY_DENY.matchesAtMostOne()); - assertFalse(ANY_MYTOPIC.matchesAtMostOne()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java deleted file mode 100644 index 0e3441f..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.admin; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class AclOperationTest { - private static class AclOperationTestInfo { - private final AclOperation operation; - private final int code; - private final String name; - private final boolean unknown; - - AclOperationTestInfo(AclOperation operation, int code, String name, boolean unknown) { - this.operation = operation; - this.code = code; - this.name = name; - this.unknown = unknown; - } - } - - private static final AclOperationTestInfo[] INFOS = { - new AclOperationTestInfo(AclOperation.UNKNOWN, 0, "unknown", true), - new AclOperationTestInfo(AclOperation.ANY, 1, "any", false), - new AclOperationTestInfo(AclOperation.ALL, 2, "all", false), - new AclOperationTestInfo(AclOperation.READ, 3, "read", false), - new AclOperationTestInfo(AclOperation.WRITE, 4, "write", false), - new AclOperationTestInfo(AclOperation.CREATE, 5, "create", false), - new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false), - new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false), - new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false), - new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false), - new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false), - new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false), - new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false) - }; - - @Test - public void testIsUnknown() throws Exception { - for (AclOperationTestInfo info : INFOS) { - assertEquals(info.operation + " was supposed to have unknown == " + info.unknown, - info.unknown, info.operation.unknown()); - } - } - - @Test - public void testCode() throws Exception { - for (AclOperationTestInfo info : INFOS) { - assertEquals(info.operation + " was supposed to have code == " + info.code, - info.code, info.operation.code()); - assertEquals("AclOperation.fromCode(" + info.code + ") was supposed to be " + info.operation, - info.operation, AclOperation.fromCode((byte) info.code)); - } - assertEquals(AclOperation.UNKNOWN, AclOperation.fromCode((byte) 120)); - } - - @Test - public void testName() throws Exception { - for (AclOperationTestInfo info : INFOS) { - assertEquals("AclOperation.fromString(" + info.name + ") was supposed to be " + info.operation, - info.operation, AclOperation.fromString(info.name)); - } - assertEquals(AclOperation.UNKNOWN, AclOperation.fromString("something")); - } - - @Test - public void testExhaustive() throws Exception { - assertEquals(INFOS.length, AclOperation.values().length); - for (int i = 0; i < INFOS.length; i++) { - assertEquals(INFOS[i].operation, AclOperation.values()[i]); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java deleted file mode 100644 index aa6deca..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.admin; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class AclPermissionTypeTest { - private static class AclPermissionTypeTestInfo { - private final AclPermissionType ty; - private final int code; - private final String name; - private final boolean unknown; - - AclPermissionTypeTestInfo(AclPermissionType ty, int code, String name, boolean unknown) { - this.ty = ty; - this.code = code; - this.name = name; - this.unknown = unknown; - } - } - - private static final AclPermissionTypeTestInfo[] INFOS = { - new AclPermissionTypeTestInfo(AclPermissionType.UNKNOWN, 0, "unknown", true), - new AclPermissionTypeTestInfo(AclPermissionType.ANY, 1, "any", false), - new AclPermissionTypeTestInfo(AclPermissionType.DENY, 2, "deny", false), - new AclPermissionTypeTestInfo(AclPermissionType.ALLOW, 3, "allow", false) - }; - - @Test - public void testIsUnknown() throws Exception { - for (AclPermissionTypeTestInfo info : INFOS) { - assertEquals(info.ty + " was supposed to have unknown == " + info.unknown, - info.unknown, info.ty.unknown()); - } - } - - @Test - public void testCode() throws Exception { - for (AclPermissionTypeTestInfo info : INFOS) { - assertEquals(info.ty + " was supposed to have code == " + info.code, - info.code, info.ty.code()); - assertEquals("AclPermissionType.fromCode(" + info.code + ") was supposed to be " + info.ty, - info.ty, AclPermissionType.fromCode((byte) info.code)); - } - assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromCode((byte) 120)); - } - - @Test - public void testName() throws Exception { - for (AclPermissionTypeTestInfo info : INFOS) { - assertEquals("AclPermissionType.fromString(" + info.name + ") was supposed to be " + info.ty, - info.ty, AclPermissionType.fromString(info.name)); - } - assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromString("something")); - } - - @Test - public void testExhaustive() throws Exception { - assertEquals(INFOS.length, AclPermissionType.values().length); - for (int i = 0; i < INFOS.length; i++) { - assertEquals(INFOS[i].ty, AclPermissionType.values()[i]); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 2ef654d..6f9e6af 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -17,11 +17,17 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.NodeApiVersions; -import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults; +import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; @@ -33,6 +39,9 @@ import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; import org.apache.kafka.common.requests.DescribeAclsResponse; +import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.ResourceFilter; +import org.apache.kafka.common.resource.ResourceType; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -234,7 +243,7 @@ public class KafkaAdminClientTest { add(new AclCreationResponse(null)); add(new AclCreationResponse(null)); }})); - CreateAclsResults results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{ + CreateAclsResult results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{ add(ACL1); add(ACL2); }}); @@ -278,7 +287,7 @@ public class KafkaAdminClientTest { add(new AclFilterResponse(new SecurityDisabledException("No security"), Collections.<AclDeletionResult>emptySet())); }})); - DeleteAclsResults results = env.adminClient().deleteAcls(new ArrayList<AclBindingFilter>() {{ + DeleteAclsResult results = env.adminClient().deleteAcls(new ArrayList<AclBindingFilter>() {{ add(FILTER1); add(FILTER2); }}); http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java deleted file mode 100644 index af72de2..0000000 --- a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.admin; - -import org.junit.Test; - -import static org.junit.Assert.assertEquals; - -public class ResourceTypeTest { - private static class AclResourceTypeTestInfo { - private final ResourceType resourceType; - private final int code; - private final String name; - private final boolean unknown; - - AclResourceTypeTestInfo(ResourceType resourceType, int code, String name, boolean unknown) { - this.resourceType = resourceType; - this.code = code; - this.name = name; - this.unknown = unknown; - } - } - - private static final AclResourceTypeTestInfo[] INFOS = { - new AclResourceTypeTestInfo(ResourceType.UNKNOWN, 0, "unknown", true), - new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false), - new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false), - new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false), - new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false), - new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false) - }; - - @Test - public void testIsUnknown() throws Exception { - for (AclResourceTypeTestInfo info : INFOS) { - assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown, - info.unknown, info.resourceType.unknown()); - } - } - - @Test - public void testCode() throws Exception { - for (AclResourceTypeTestInfo info : INFOS) { - assertEquals(info.resourceType + " was supposed to have code == " + info.code, - info.code, info.resourceType.code()); - assertEquals("AclResourceType.fromCode(" + info.code + ") was supposed to be " + - info.resourceType, info.resourceType, ResourceType.fromCode((byte) info.code)); - } - assertEquals(ResourceType.UNKNOWN, ResourceType.fromCode((byte) 120)); - } - - @Test - public void testName() throws Exception { - for (AclResourceTypeTestInfo info : INFOS) { - assertEquals("ResourceType.fromString(" + info.name + ") was supposed to be " + - info.resourceType, info.resourceType, ResourceType.fromString(info.name)); - } - assertEquals(ResourceType.UNKNOWN, ResourceType.fromString("something")); - } - - @Test - public void testExhaustive() throws Exception { - assertEquals(INFOS.length, ResourceType.values().length); - for (int i = 0; i < INFOS.length; i++) { - assertEquals(INFOS[i].resourceType, ResourceType.values()[i]); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java new file mode 100644 index 0000000..e0a0598 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.acl; + +import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.ResourceFilter; +import org.apache.kafka.common.resource.ResourceType; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AclBindingTest { + private static final AclBinding ACL1 = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW)); + + private static final AclBinding ACL2 = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW)); + + private static final AclBinding ACL3 = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic2"), + new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); + + private static final AclBinding UNKNOWN_ACL = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic2"), + new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY)); + + private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter( + new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)); + + private static final AclBindingFilter ANY_DENY = new AclBindingFilter( + new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY)); + + private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter( + new ResourceFilter(ResourceType.TOPIC, "mytopic"), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY)); + + @Test + public void testMatching() throws Exception { + assertTrue(ACL1.equals(ACL1)); + final AclBinding acl1Copy = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW)); + assertTrue(ACL1.equals(acl1Copy)); + assertTrue(acl1Copy.equals(ACL1)); + assertTrue(ACL2.equals(ACL2)); + assertFalse(ACL1.equals(ACL2)); + assertFalse(ACL2.equals(ACL1)); + assertTrue(AclBindingFilter.ANY.matches(ACL1)); + assertFalse(AclBindingFilter.ANY.equals(ACL1)); + assertTrue(AclBindingFilter.ANY.matches(ACL2)); + assertFalse(AclBindingFilter.ANY.equals(ACL2)); + assertTrue(AclBindingFilter.ANY.matches(ACL3)); + assertFalse(AclBindingFilter.ANY.equals(ACL3)); + assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY)); + assertTrue(ANY_ANONYMOUS.matches(ACL1)); + assertFalse(ANY_ANONYMOUS.equals(ACL1)); + assertFalse(ANY_ANONYMOUS.matches(ACL2)); + assertFalse(ANY_ANONYMOUS.equals(ACL2)); + assertTrue(ANY_ANONYMOUS.matches(ACL3)); + assertFalse(ANY_ANONYMOUS.equals(ACL3)); + assertFalse(ANY_DENY.matches(ACL1)); + assertFalse(ANY_DENY.matches(ACL2)); + assertTrue(ANY_DENY.matches(ACL3)); + assertTrue(ANY_MYTOPIC.matches(ACL1)); + assertTrue(ANY_MYTOPIC.matches(ACL2)); + assertFalse(ANY_MYTOPIC.matches(ACL3)); + assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL)); + assertTrue(ANY_DENY.matches(UNKNOWN_ACL)); + assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL)); + assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL)); + } + + @Test + public void testUnknowns() throws Exception { + assertFalse(ACL1.unknown()); + assertFalse(ACL2.unknown()); + assertFalse(ACL3.unknown()); + assertFalse(ANY_ANONYMOUS.unknown()); + assertFalse(ANY_DENY.unknown()); + assertFalse(ANY_MYTOPIC.unknown()); + assertTrue(UNKNOWN_ACL.unknown()); + } + + @Test + public void testMatchesAtMostOne() throws Exception { + assertEquals(null, ACL1.toFilter().findIndefiniteField()); + assertEquals(null, ACL2.toFilter().findIndefiniteField()); + assertEquals(null, ACL3.toFilter().findIndefiniteField()); + assertFalse(ANY_ANONYMOUS.matchesAtMostOne()); + assertFalse(ANY_DENY.matchesAtMostOne()); + assertFalse(ANY_MYTOPIC.matchesAtMostOne()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java new file mode 100644 index 0000000..5f5a87c --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclOperationTest.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.acl; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class AclOperationTest { + private static class AclOperationTestInfo { + private final AclOperation operation; + private final int code; + private final String name; + private final boolean unknown; + + AclOperationTestInfo(AclOperation operation, int code, String name, boolean unknown) { + this.operation = operation; + this.code = code; + this.name = name; + this.unknown = unknown; + } + } + + private static final AclOperationTestInfo[] INFOS = { + new AclOperationTestInfo(AclOperation.UNKNOWN, 0, "unknown", true), + new AclOperationTestInfo(AclOperation.ANY, 1, "any", false), + new AclOperationTestInfo(AclOperation.ALL, 2, "all", false), + new AclOperationTestInfo(AclOperation.READ, 3, "read", false), + new AclOperationTestInfo(AclOperation.WRITE, 4, "write", false), + new AclOperationTestInfo(AclOperation.CREATE, 5, "create", false), + new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false), + new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false), + new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false), + new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false), + new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false), + new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false), + new AclOperationTestInfo(AclOperation.IDEMPOTENT_WRITE, 12, "idempotent_write", false) + }; + + @Test + public void testIsUnknown() throws Exception { + for (AclOperationTestInfo info : INFOS) { + assertEquals(info.operation + " was supposed to have unknown == " + info.unknown, + info.unknown, info.operation.unknown()); + } + } + + @Test + public void testCode() throws Exception { + for (AclOperationTestInfo info : INFOS) { + assertEquals(info.operation + " was supposed to have code == " + info.code, + info.code, info.operation.code()); + assertEquals("AclOperation.fromCode(" + info.code + ") was supposed to be " + info.operation, + info.operation, AclOperation.fromCode((byte) info.code)); + } + assertEquals(AclOperation.UNKNOWN, AclOperation.fromCode((byte) 120)); + } + + @Test + public void testName() throws Exception { + for (AclOperationTestInfo info : INFOS) { + assertEquals("AclOperation.fromString(" + info.name + ") was supposed to be " + info.operation, + info.operation, AclOperation.fromString(info.name)); + } + assertEquals(AclOperation.UNKNOWN, AclOperation.fromString("something")); + } + + @Test + public void testExhaustive() throws Exception { + assertEquals(INFOS.length, AclOperation.values().length); + for (int i = 0; i < INFOS.length; i++) { + assertEquals(INFOS[i].operation, AclOperation.values()[i]); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java new file mode 100644 index 0000000..8e7fdc7 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclPermissionTypeTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.acl; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class AclPermissionTypeTest { + private static class AclPermissionTypeTestInfo { + private final AclPermissionType ty; + private final int code; + private final String name; + private final boolean unknown; + + AclPermissionTypeTestInfo(AclPermissionType ty, int code, String name, boolean unknown) { + this.ty = ty; + this.code = code; + this.name = name; + this.unknown = unknown; + } + } + + private static final AclPermissionTypeTestInfo[] INFOS = { + new AclPermissionTypeTestInfo(AclPermissionType.UNKNOWN, 0, "unknown", true), + new AclPermissionTypeTestInfo(AclPermissionType.ANY, 1, "any", false), + new AclPermissionTypeTestInfo(AclPermissionType.DENY, 2, "deny", false), + new AclPermissionTypeTestInfo(AclPermissionType.ALLOW, 3, "allow", false) + }; + + @Test + public void testIsUnknown() throws Exception { + for (AclPermissionTypeTestInfo info : INFOS) { + assertEquals(info.ty + " was supposed to have unknown == " + info.unknown, + info.unknown, info.ty.unknown()); + } + } + + @Test + public void testCode() throws Exception { + for (AclPermissionTypeTestInfo info : INFOS) { + assertEquals(info.ty + " was supposed to have code == " + info.code, + info.code, info.ty.code()); + assertEquals("AclPermissionType.fromCode(" + info.code + ") was supposed to be " + info.ty, + info.ty, AclPermissionType.fromCode((byte) info.code)); + } + assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromCode((byte) 120)); + } + + @Test + public void testName() throws Exception { + for (AclPermissionTypeTestInfo info : INFOS) { + assertEquals("AclPermissionType.fromString(" + info.name + ") was supposed to be " + info.ty, + info.ty, AclPermissionType.fromString(info.name)); + } + assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromString("something")); + } + + @Test + public void testExhaustive() throws Exception { + assertEquals(INFOS.length, AclPermissionType.values().length); + for (int i = 0; i < INFOS.length; i++) { + assertEquals(INFOS[i].ty, AclPermissionType.values()[i]); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index e0f48bf..56f0215 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -16,15 +16,12 @@ */ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.AccessControlEntry; -import org.apache.kafka.clients.admin.AccessControlEntryFilter; -import org.apache.kafka.clients.admin.AclBinding; -import org.apache.kafka.clients.admin.AclBindingFilter; -import org.apache.kafka.clients.admin.AclOperation; -import org.apache.kafka.clients.admin.AclPermissionType; -import org.apache.kafka.clients.admin.Resource; -import org.apache.kafka.clients.admin.ResourceFilter; -import org.apache.kafka.clients.admin.ResourceType; +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AccessControlEntryFilter; +import org.apache.kafka.common.acl.AclBinding; +import org.apache.kafka.common.acl.AclBindingFilter; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.InvalidRequestException; @@ -50,6 +47,9 @@ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; +import org.apache.kafka.common.resource.Resource; +import org.apache.kafka.common.resource.ResourceFilter; +import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.utils.Utils; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java new file mode 100644 index 0000000..4dc4cac --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/resource/ResourceTypeTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.resource; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ResourceTypeTest { + private static class AclResourceTypeTestInfo { + private final ResourceType resourceType; + private final int code; + private final String name; + private final boolean unknown; + + AclResourceTypeTestInfo(ResourceType resourceType, int code, String name, boolean unknown) { + this.resourceType = resourceType; + this.code = code; + this.name = name; + this.unknown = unknown; + } + } + + private static final AclResourceTypeTestInfo[] INFOS = { + new AclResourceTypeTestInfo(ResourceType.UNKNOWN, 0, "unknown", true), + new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false), + new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false), + new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false), + new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false), + new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false) + }; + + @Test + public void testIsUnknown() throws Exception { + for (AclResourceTypeTestInfo info : INFOS) { + assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown, + info.unknown, info.resourceType.unknown()); + } + } + + @Test + public void testCode() throws Exception { + for (AclResourceTypeTestInfo info : INFOS) { + assertEquals(info.resourceType + " was supposed to have code == " + info.code, + info.code, info.resourceType.code()); + assertEquals("AclResourceType.fromCode(" + info.code + ") was supposed to be " + + info.resourceType, info.resourceType, ResourceType.fromCode((byte) info.code)); + } + assertEquals(ResourceType.UNKNOWN, ResourceType.fromCode((byte) 120)); + } + + @Test + public void testName() throws Exception { + for (AclResourceTypeTestInfo info : INFOS) { + assertEquals("ResourceType.fromString(" + info.name + ") was supposed to be " + + info.resourceType, info.resourceType, ResourceType.fromString(info.name)); + } + assertEquals(ResourceType.UNKNOWN, ResourceType.fromString("something")); + } + + @Test + public void testExhaustive() throws Exception { + assertEquals(INFOS.length, ResourceType.values().length); + for (int i = 0; i < INFOS.length; i++) { + assertEquals(INFOS[i].resourceType, ResourceType.values()[i]); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 6a329d8..ad50aab 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -24,7 +24,7 @@ import kafka.api.ApiVersion import kafka.message.{BrokerCompressionCodec, Message} import kafka.server.{KafkaConfig, ThrottledReplicaListValidator} import org.apache.kafka.common.errors.InvalidConfigurationException -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig} import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.utils.Utils @@ -100,107 +100,66 @@ object LogConfig { println(configDef.toHtmlTable) } - val Delete = "delete" - val Compact = "compact" + val SegmentBytesProp = TopicConfig.SEGMENT_BYTES_CONFIG + val SegmentMsProp = TopicConfig.SEGMENT_MS_CONFIG + val SegmentJitterMsProp = TopicConfig.SEGMENT_JITTER_MS_CONFIG + val SegmentIndexBytesProp = TopicConfig.SEGMENT_INDEX_BYTES_CONFIG + val FlushMessagesProp = TopicConfig.FLUSH_MESSAGES_INTERVAL_CONFIG + val FlushMsProp = TopicConfig.FLUSH_MS_CONFIG + val RetentionBytesProp = TopicConfig.RETENTION_BYTES_CONFIG + val RetentionMsProp = TopicConfig.RETENTION_MS_CONFIG + val MaxMessageBytesProp = TopicConfig.MAX_MESSAGE_BYTES_CONFIG + val IndexIntervalBytesProp = TopicConfig.INDEX_INTERVAL_BYTES_CONFIG + val DeleteRetentionMsProp = TopicConfig.DELETE_RETENTION_MS_CONFIG + val MinCompactionLagMsProp = TopicConfig.MIN_COMPACTION_LAG_MS_CONFIG + val FileDeleteDelayMsProp = TopicConfig.FILE_DELETE_DELAY_MS_CONFIG + val MinCleanableDirtyRatioProp = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG + val CleanupPolicyProp = TopicConfig.CLEANUP_POLICY_CONFIG + val Delete = TopicConfig.CLEANUP_POLICY_DELETE + val Compact = TopicConfig.CLEANUP_POLICY_COMPACT + val UncleanLeaderElectionEnableProp = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG + val MinInSyncReplicasProp = TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG + val CompressionTypeProp = TopicConfig.COMPRESSION_TYPE_CONFIG + val PreAllocateEnableProp = TopicConfig.PREALLOCATE_CONFIG + val MessageFormatVersionProp = TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG + val MessageTimestampTypeProp = TopicConfig.MESSAGE_TIMESTAMP_TYPE_CONFIG + val MessageTimestampDifferenceMaxMsProp = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_CONFIG - val SegmentBytesProp = "segment.bytes" - val SegmentMsProp = "segment.ms" - val SegmentJitterMsProp = "segment.jitter.ms" - val SegmentIndexBytesProp = "segment.index.bytes" - val FlushMessagesProp = "flush.messages" - val FlushMsProp = "flush.ms" - val RetentionBytesProp = "retention.bytes" - val RetentionMsProp = "retention.ms" - val MaxMessageBytesProp = "max.message.bytes" - val IndexIntervalBytesProp = "index.interval.bytes" - val DeleteRetentionMsProp = "delete.retention.ms" - val MinCompactionLagMsProp = "min.compaction.lag.ms" - val FileDeleteDelayMsProp = "file.delete.delay.ms" - val MinCleanableDirtyRatioProp = "min.cleanable.dirty.ratio" - val CleanupPolicyProp = "cleanup.policy" - val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" - val MinInSyncReplicasProp = "min.insync.replicas" - val CompressionTypeProp = "compression.type" - val PreAllocateEnableProp = "preallocate" - val MessageFormatVersionProp = "message.format.version" - val MessageTimestampTypeProp = "message.timestamp.type" - val MessageTimestampDifferenceMaxMsProp = "message.timestamp.difference.max.ms" + // Leave these out of TopicConfig for now as they are replication quota configs val LeaderReplicationThrottledReplicasProp = "leader.replication.throttled.replicas" val FollowerReplicationThrottledReplicasProp = "follower.replication.throttled.replicas" - val SegmentSizeDoc = "This configuration controls the segment file size for " + - "the log. Retention and cleaning is always done a file at a time so a larger " + - "segment size means fewer files but less granular control over retention." - val SegmentMsDoc = "This configuration controls the period of time after " + - "which Kafka will force the log to roll even if the segment file isn't full " + - "to ensure that retention can delete or compact old data." - val SegmentJitterMsDoc = "The maximum random jitter subtracted from the scheduled segment roll time to avoid" + - " thundering herds of segment rolling" - val FlushIntervalDoc = "This setting allows specifying an interval at which we " + - "will force an fsync of data written to the log. For example if this was set to 1 " + - "we would fsync after every message; if it were 5 we would fsync after every five " + - "messages. In general we recommend you not set this and use replication for " + - "durability and allow the operating system's background flush capabilities as it " + - "is more efficient. This setting can be overridden on a per-topic basis (see <a " + - "href=\"#topic-config\">the per-topic configuration section</a>)." - val FlushMsDoc = "This setting allows specifying a time interval at which we will " + - "force an fsync of data written to the log. For example if this was set to 1000 " + - "we would fsync after 1000 ms had passed. In general we recommend you not set " + - "this and use replication for durability and allow the operating system's background " + - "flush capabilities as it is more efficient." - val RetentionSizeDoc = "This configuration controls the maximum size a log can grow " + - "to before we will discard old log segments to free up space if we are using the " + - "\"delete\" retention policy. By default there is no size limit only a time limit." - val RetentionMsDoc = "This configuration controls the maximum time we will retain a " + - "log before we will discard old log segments to free up space if we are using the " + - "\"delete\" retention policy. This represents an SLA on how soon consumers must read " + - "their data." - val MaxIndexSizeDoc = "This configuration controls the size of the index that maps " + - "offsets to file positions. We preallocate this index file and shrink it only after log " + - "rolls. You generally should not need to change this setting." - val MaxMessageSizeDoc = "This is largest message size Kafka will allow to be appended. Note that if you increase" + - " this size you must also increase your consumer's fetch size so they can fetch messages this large." - val IndexIntervalDoc = "This setting controls how frequently Kafka adds an index " + - "entry to it's offset index. The default setting ensures that we index a message " + - "roughly every 4096 bytes. More indexing allows reads to jump closer to the exact " + - "position in the log but makes the index larger. You probably don't need to change " + - "this." - val FileDeleteDelayMsDoc = "The time to wait before deleting a file from the filesystem" - val DeleteRetentionMsDoc = "The amount of time to retain delete tombstone markers " + - "for <a href=\"#compaction\">log compacted</a> topics. This setting also gives a bound " + - "on the time in which a consumer must complete a read if they begin from offset 0 " + - "to ensure that they get a valid snapshot of the final stage (otherwise delete " + - "tombstones may be collected before they complete their scan)." - val MinCompactionLagMsDoc = "The minimum time a message will remain uncompacted in the log. " + - "Only applicable for logs that are being compacted." - val MinCleanableRatioDoc = "This configuration controls how frequently the log " + - "compactor will attempt to clean the log (assuming <a href=\"#compaction\">log " + - "compaction</a> is enabled). By default we will avoid cleaning a log where more than " + - "50% of the log has been compacted. This ratio bounds the maximum space wasted in " + - "the log by duplicates (at 50% at most 50% of the log could be duplicates). A " + - "higher ratio will mean fewer, more efficient cleanings but will mean more wasted " + - "space in the log." - val CompactDoc = "A string that is either \"delete\" or \"compact\". This string " + - "designates the retention policy to use on old log segments. The default policy " + - "(\"delete\") will discard old segments when their retention time or size limit has " + - "been reached. The \"compact\" setting will enable <a href=\"#compaction\">log " + - "compaction</a> on the topic." - val UncleanLeaderElectionEnableDoc = "Indicates whether to enable replicas not in the ISR set to be elected as" + - " leader as a last resort, even though doing so may result in data loss" - val MinInSyncReplicasDoc = KafkaConfig.MinInSyncReplicasDoc - val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the " + - "standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to " + - "no compression; and 'producer' which means retain the original compression codec set by the producer." - val PreAllocateEnableDoc ="Should pre allocate file when create new segment?" - val MessageFormatVersionDoc = KafkaConfig.LogMessageFormatVersionDoc - val MessageTimestampTypeDoc = KafkaConfig.LogMessageTimestampTypeDoc - val MessageTimestampDifferenceMaxMsDoc = "The maximum difference allowed between the timestamp when a broker receives " + - "a message and the timestamp specified in the message. If message.timestamp.type=CreateTime, a message will be rejected " + - "if the difference in timestamp exceeds this threshold. This configuration is ignored if message.timestamp.type=LogAppendTime." - val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on the leader side. The list should describe a set of " + - "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle all replicas for this topic." - val FollowerReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on the follower side. The list should describe a set of " + - "replicas in the form [PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle all replicas for this topic." + val SegmentSizeDoc = TopicConfig.SEGMENT_BYTES_DOC + val SegmentMsDoc = TopicConfig.SEGMENT_MS_DOC + val SegmentJitterMsDoc = TopicConfig.SEGMENT_JITTER_MS_DOC + val MaxIndexSizeDoc = TopicConfig.SEGMENT_INDEX_BYTES_DOC + val FlushIntervalDoc = TopicConfig.FLUSH_MESSAGES_INTERVAL_DOC + val FlushMsDoc = TopicConfig.FLUSH_MS_DOC + val RetentionSizeDoc = TopicConfig.RETENTION_BYTES_DOC + val RetentionMsDoc = TopicConfig.RETENTION_MS_DOC + val MaxMessageSizeDoc = TopicConfig.MAX_MESSAGE_BYTES_DOC + val IndexIntervalDoc = TopicConfig.INDEX_INTERVAL_BYTES_DOCS + val FileDeleteDelayMsDoc = TopicConfig.FILE_DELETE_DELAY_MS_DOC + val DeleteRetentionMsDoc = TopicConfig.DELETE_RETENTION_MS_DOC + val MinCompactionLagMsDoc = TopicConfig.MIN_COMPACTION_LAG_MS_DOC + val MinCleanableRatioDoc = TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_DOC + val CompactDoc = TopicConfig.CLEANUP_POLICY_DOC + val UncleanLeaderElectionEnableDoc = TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_DOC + val MinInSyncReplicasDoc = TopicConfig.MIN_IN_SYNC_REPLICAS_DOC + val CompressionTypeDoc = TopicConfig.COMPRESSION_TYPE_DOC + val PreAllocateEnableDoc = TopicConfig.PREALLOCATE_DOC + val MessageFormatVersionDoc = TopicConfig.MESSAGE_FORMAT_VERSION_DOC + val MessageTimestampTypeDoc = TopicConfig.MESSAGE_TIMESTAMP_TYPE_DOC + val MessageTimestampDifferenceMaxMsDoc = TopicConfig.MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC + + val LeaderReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on " + + "the leader side. The list should describe a set of replicas in the form " + + "[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle " + + "all replicas for this topic." + val FollowerReplicationThrottledReplicasDoc = "A list of replicas for which log replication should be throttled on " + + "the follower side. The list should describe a set of " + "replicas in the form " + + "[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:... or alternatively the wildcard '*' can be used to throttle " + + "all replicas for this topic." private class LogConfigDef extends ConfigDef { http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/main/scala/kafka/security/auth/Operation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala index 420c3eb..d3a25b5 100644 --- a/core/src/main/scala/kafka/security/auth/Operation.scala +++ b/core/src/main/scala/kafka/security/auth/Operation.scala @@ -17,7 +17,7 @@ package kafka.security.auth import kafka.common.{BaseEnum, KafkaException} -import org.apache.kafka.clients.admin.AclOperation +import org.apache.kafka.common.acl.AclOperation import scala.util.{Failure, Success, Try} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/main/scala/kafka/security/auth/PermissionType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala index c4209e5..ec99ae4 100644 --- a/core/src/main/scala/kafka/security/auth/PermissionType.scala +++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala @@ -17,7 +17,7 @@ package kafka.security.auth import kafka.common.{BaseEnum, KafkaException} -import org.apache.kafka.clients.admin.AclPermissionType +import org.apache.kafka.common.acl.AclPermissionType import scala.util.{Failure, Success, Try} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d7f6773..b780823 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -51,7 +51,8 @@ import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.requests.SaslHandshakeResponse import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.clients.admin.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, Resource => AdminResource, ResourceType => AdminResourceType} +import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType} +import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import scala.collection._ import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala index c52594b..065759f 100644 --- a/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/KafkaAdminClientIntegrationTest.scala @@ -28,10 +28,14 @@ import org.apache.kafka.clients.admin._ import kafka.utils.{Logging, TestUtils} import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.common.KafkaFuture +import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} +import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TopicExistsException} import org.apache.kafka.common.errors.{InvalidRequestException, SecurityDisabledException, TimeoutException, TopicExistsException} import org.apache.kafka.common.protocol.ApiKeys import org.junit.{After, Before, Rule, Test} import org.apache.kafka.common.requests.MetadataResponse +import org.apache.kafka.common.resource.{Resource, ResourceType} import org.junit.rules.Timeout import org.junit.Assert._ http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index cb43b09..d27b0bf 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -18,8 +18,10 @@ import kafka.security.auth.SimpleAclAuthorizer import org.apache.kafka.common.protocol.SecurityProtocol import kafka.server.KafkaConfig import kafka.utils.{JaasTestUtils, TestUtils} -import org.apache.kafka.clients.admin.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, AdminClient, CreateAclsOptions, DeleteAclsOptions, Resource, ResourceFilter, ResourceType} +import org.apache.kafka.clients.admin.{AdminClient, CreateAclsOptions, DeleteAclsOptions} +import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.errors.InvalidRequestException +import org.apache.kafka.common.resource.{Resource, ResourceFilter, ResourceType} import org.junit.Assert.assertEquals import org.junit.{After, Assert, Before, Test} http://git-wip-us.apache.org/repos/asf/kafka/blob/1b64a4e6/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index fa2e55b..b261cb2 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -24,7 +24,8 @@ import kafka.log.LogConfig import kafka.network.RequestChannel.Session import kafka.security.auth._ import kafka.utils.TestUtils -import org.apache.kafka.clients.admin.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType, ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType} +import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} +import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} import org.apache.kafka.common.network.{Authenticator, ListenerName, TransportLayer}
