http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java new file mode 100644 index 0000000..f792bbd --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.clients.admin.AccessControlEntry; +import org.apache.kafka.clients.admin.AclBinding; +import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class CreateAclsRequest extends AbstractRequest { + private final static String CREATIONS = "creations"; + + public static class AclCreation { + private final AclBinding acl; + + public AclCreation(AclBinding acl) { + this.acl = acl; + } + + static AclCreation fromStruct(Struct struct) { + Resource resource = RequestUtils.resourceFromStructFields(struct); + AccessControlEntry entry = RequestUtils.aceFromStructFields(struct); + return new AclCreation(new AclBinding(resource, entry)); + } + + public AclBinding acl() { + return acl; + } + + void setStructFields(Struct struct) { + RequestUtils.resourceSetStructFields(acl.resource(), struct); + RequestUtils.aceSetStructFields(acl.entry(), struct); + } + + @Override + public String toString() { + return "(acl=" + acl + ")"; + } + } + + public static class Builder extends AbstractRequest.Builder<CreateAclsRequest> { + private final List<AclCreation> creations; + + public Builder(List<AclCreation> creations) { + super(ApiKeys.CREATE_ACLS); + this.creations = creations; + } + + @Override + public CreateAclsRequest build(short version) { + return new CreateAclsRequest(version, creations); + } + + @Override + public String toString() { + return "(type=CreateAclsRequest, creations=" + Utils.join(creations, ", ") + ")"; + } + } + + private final List<AclCreation> aclCreations; + + CreateAclsRequest(short version, List<AclCreation> aclCreations) { + super(version); + this.aclCreations = aclCreations; + } + + public CreateAclsRequest(Struct struct, short version) { + super(version); + this.aclCreations = new ArrayList<>(); + for (Object creationStructObj : struct.getArray(CREATIONS)) { + Struct creationStruct = (Struct) creationStructObj; + aclCreations.add(AclCreation.fromStruct(creationStruct)); + } + } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(ApiKeys.CREATE_ACLS.requestSchema(version())); + List<Struct> requests = new ArrayList<>(); + for (AclCreation creation : aclCreations) { + Struct creationStruct = struct.instance(CREATIONS); + creation.setStructFields(creationStruct); + requests.add(creationStruct); + } + struct.set(CREATIONS, requests.toArray()); + return struct; + } + + public List<AclCreation> aclCreations() { + return aclCreations; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) { + short versionId = version(); + switch (versionId) { + case 0: + List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>(); + for (int i = 0; i < aclCreations.size(); i++) { + responses.add(new CreateAclsResponse.AclCreationResponse(throwable)); + } + return new CreateAclsResponse(throttleTimeMs, responses); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_ACLS.latestVersion())); + } + } + + public static CreateAclsRequest parse(ByteBuffer buffer, short version) { + return new CreateAclsRequest(ApiKeys.CREATE_ACLS.parseRequest(version, buffer), version); + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java new file mode 100644 index 0000000..885981a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +public class CreateAclsResponse extends AbstractResponse { + private final static String THROTTLE_TIME_MS = "throttle_time_ms"; + private final static String CREATION_RESPONSES = "creation_responses"; + private final static String ERROR_CODE = "error_code"; + private final static String ERROR_MESSAGE = "error_message"; + + public static class AclCreationResponse { + private final Throwable throwable; + + public AclCreationResponse(Throwable throwable) { + this.throwable = throwable; + } + + public Throwable throwable() { + return throwable; + } + + @Override + public String toString() { + return "(" + throwable + ")"; + } + } + + private final int throttleTimeMs; + + private final List<AclCreationResponse> aclCreationResponses; + + public CreateAclsResponse(int throttleTimeMs, List<AclCreationResponse> aclCreationResponses) { + this.throttleTimeMs = throttleTimeMs; + this.aclCreationResponses = aclCreationResponses; + } + + public CreateAclsResponse(Struct struct) { + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS); + this.aclCreationResponses = new ArrayList<>(); + for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) { + Struct responseStruct = (Struct) responseStructObj; + short errorCode = responseStruct.getShort(ERROR_CODE); + String errorMessage = responseStruct.getString(ERROR_MESSAGE); + if (errorCode != 0) { + this.aclCreationResponses.add(new AclCreationResponse( + Errors.forCode(errorCode).exception(errorMessage))); + } else { + this.aclCreationResponses.add(new AclCreationResponse(null)); + } + } + } + + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ApiKeys.CREATE_ACLS.responseSchema(version)); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); + List<Struct> responseStructs = new ArrayList<>(); + for (AclCreationResponse response : aclCreationResponses) { + Struct responseStruct = struct.instance(CREATION_RESPONSES); + if (response.throwable() == null) { + responseStruct.set(ERROR_CODE, (short) 0); + } else { + Errors errors = Errors.forException(response.throwable()); + responseStruct.set(ERROR_CODE, errors.code()); + responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage()); + } + responseStructs.add(responseStruct); + } + struct.set(CREATION_RESPONSES, responseStructs.toArray()); + return struct; + } + + public int throttleTimeMs() { + return throttleTimeMs; + } + + public List<AclCreationResponse> aclCreationResponses() { + return aclCreationResponses; + } + + public static CreateAclsResponse parse(ByteBuffer buffer, short version) { + return new CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java new file mode 100644 index 0000000..8a9ee19 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.clients.admin.AccessControlEntryFilter; +import org.apache.kafka.clients.admin.AclBindingFilter; +import org.apache.kafka.clients.admin.ResourceFilter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.Utils; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.kafka.common.protocol.ApiKeys.DELETE_ACLS; + +public class DeleteAclsRequest extends AbstractRequest { + private final static String FILTERS = "filters"; + + public static class Builder extends AbstractRequest.Builder<DeleteAclsRequest> { + private final List<AclBindingFilter> filters; + + public Builder(List<AclBindingFilter> filters) { + super(DELETE_ACLS); + this.filters = filters; + } + + @Override + public DeleteAclsRequest build(short version) { + return new DeleteAclsRequest(version, filters); + } + + @Override + public String toString() { + return "(type=DeleteAclsRequest, filters=" + Utils.join(filters, ", ") + ")"; + } + } + + private final List<AclBindingFilter> filters; + + DeleteAclsRequest(short version, List<AclBindingFilter> filters) { + super(version); + this.filters = filters; + } + + public DeleteAclsRequest(Struct struct, short version) { + super(version); + this.filters = new ArrayList<>(); + for (Object filterStructObj : struct.getArray(FILTERS)) { + Struct filterStruct = (Struct) filterStructObj; + ResourceFilter resourceFilter = RequestUtils.resourceFilterFromStructFields(filterStruct); + AccessControlEntryFilter aceFilter = RequestUtils.aceFilterFromStructFields(filterStruct); + this.filters.add(new AclBindingFilter(resourceFilter, aceFilter)); + } + } + + public List<AclBindingFilter> filters() { + return filters; + } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(DELETE_ACLS.requestSchema(version())); + List<Struct> filterStructs = new ArrayList<>(); + for (AclBindingFilter filter : filters) { + Struct filterStruct = struct.instance(FILTERS); + RequestUtils.resourceFilterSetStructFields(filter.resourceFilter(), filterStruct); + RequestUtils.aceFilterSetStructFields(filter.entryFilter(), filterStruct); + filterStructs.add(filterStruct); + } + struct.set(FILTERS, filterStructs.toArray()); + return struct; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) { + short versionId = version(); + switch (versionId) { + case 0: + List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>(); + for (int i = 0; i < filters.size(); i++) { + responses.add(new DeleteAclsResponse.AclFilterResponse( + throwable, Collections.<DeleteAclsResponse.AclDeletionResult>emptySet())); + } + return new DeleteAclsResponse(throttleTimeMs, responses); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_ACLS.latestVersion())); + } + } + + public static DeleteAclsRequest parse(ByteBuffer buffer, short version) { + return new DeleteAclsRequest(DELETE_ACLS.parseRequest(version, buffer), version); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java new file mode 100644 index 0000000..6fffc0f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.clients.admin.AccessControlEntry; +import org.apache.kafka.clients.admin.AclBinding; +import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +public class DeleteAclsResponse extends AbstractResponse { + public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class); + private final static String THROTTLE_TIME_MS = "throttle_time_ms"; + private final static String FILTER_RESPONSES = "filter_responses"; + private final static String ERROR_CODE = "error_code"; + private final static String ERROR_MESSAGE = "error_message"; + private final static String MATCHING_ACLS = "matching_acls"; + + public static class AclDeletionResult { + private final ApiException exception; + private final AclBinding acl; + + public AclDeletionResult(ApiException exception, AclBinding acl) { + this.exception = exception; + this.acl = acl; + } + + public ApiException exception() { + return exception; + } + + public AclBinding acl() { + return acl; + } + + @Override + public String toString() { + return "(apiException=" + exception + ", acl=" + acl + ")"; + } + } + + public static class AclFilterResponse { + private final Throwable throwable; + private final Collection<AclDeletionResult> deletions; + + public AclFilterResponse(Throwable throwable, Collection<AclDeletionResult> deletions) { + this.throwable = throwable; + this.deletions = deletions; + } + + public Throwable throwable() { + return throwable; + } + + public Collection<AclDeletionResult> deletions() { + return deletions; + } + + @Override + public String toString() { + return "(throwable=" + throwable + ", deletions=" + Utils.join(deletions, ",") + ")"; + } + } + + private final int throttleTimeMs; + + private final List<AclFilterResponse> responses; + + public DeleteAclsResponse(int throttleTimeMs, List<AclFilterResponse> responses) { + this.throttleTimeMs = throttleTimeMs; + this.responses = responses; + } + + public DeleteAclsResponse(Struct struct) { + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS); + this.responses = new ArrayList<>(); + for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) { + Struct responseStruct = (Struct) responseStructObj; + short responseErrorCode = responseStruct.getShort(ERROR_CODE); + String responseErrorMessage = responseStruct.getString(ERROR_MESSAGE); + if (responseErrorCode != 0) { + this.responses.add(new AclFilterResponse( + Errors.forCode(responseErrorCode).exception(responseErrorMessage), + Collections.<AclDeletionResult>emptySet())); + } else { + List<AclDeletionResult> deletions = new ArrayList<>(); + for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) { + Struct matchingAclStruct = (Struct) matchingAclStructObj; + short matchErrorCode = matchingAclStruct.getShort(ERROR_CODE); + ApiException exception = null; + if (matchErrorCode != 0) { + Errors errors = Errors.forCode(matchErrorCode); + String matchErrorMessage = matchingAclStruct.getString(ERROR_MESSAGE); + exception = errors.exception(matchErrorMessage); + } + AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct); + Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct); + deletions.add(new AclDeletionResult(exception, new AclBinding(resource, entry))); + } + this.responses.add(new AclFilterResponse(null, deletions)); + } + } + } + + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ApiKeys.DELETE_ACLS.responseSchema(version)); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); + List<Struct> responseStructs = new ArrayList<>(); + for (AclFilterResponse response : responses) { + Struct responseStruct = struct.instance(FILTER_RESPONSES); + if (response.throwable() != null) { + Errors error = Errors.forException(response.throwable()); + responseStruct.set(ERROR_CODE, error.code()); + responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage()); + responseStruct.set(MATCHING_ACLS, new Struct[0]); + } else { + responseStruct.set(ERROR_CODE, (short) 0); + List<Struct> deletionStructs = new ArrayList<>(); + for (AclDeletionResult deletion : response.deletions()) { + Struct deletionStruct = responseStruct.instance(MATCHING_ACLS); + if (deletion.exception() != null) { + Errors error = Errors.forException(deletion.exception); + deletionStruct.set(ERROR_CODE, error.code()); + deletionStruct.set(ERROR_MESSAGE, deletion.exception.getMessage()); + } else { + deletionStruct.set(ERROR_CODE, (short) 0); + } + RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct); + RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct); + deletionStructs.add(deletionStruct); + } + responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0])); + } + responseStructs.add(responseStruct); + } + struct.set(FILTER_RESPONSES, responseStructs.toArray()); + return struct; + } + + public int throttleTimeMs() { + return throttleTimeMs; + } + + public List<AclFilterResponse> responses() { + return responses; + } + + public static DeleteAclsResponse parse(ByteBuffer buffer, short version) { + return new DeleteAclsResponse(ApiKeys.DELETE_ACLS.responseSchema(version).read(buffer)); + } + + public String toString() { + return "(responses=" + Utils.join(responses, ",") + ")"; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java new file mode 100644 index 0000000..8d4eba6 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.clients.admin.AccessControlEntryFilter; +import org.apache.kafka.clients.admin.AclBinding; +import org.apache.kafka.clients.admin.AclBindingFilter; +import org.apache.kafka.clients.admin.ResourceFilter; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.Collections; + +public class DescribeAclsRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder<DescribeAclsRequest> { + private final AclBindingFilter filter; + + public Builder(AclBindingFilter filter) { + super(ApiKeys.DESCRIBE_ACLS); + this.filter = filter; + } + + @Override + public DescribeAclsRequest build(short version) { + return new DescribeAclsRequest(filter, version); + } + + @Override + public String toString() { + return "(type=DescribeAclsRequest, filter=" + filter + ")"; + } + } + + private final AclBindingFilter filter; + + DescribeAclsRequest(AclBindingFilter filter, short version) { + super(version); + this.filter = filter; + } + + public DescribeAclsRequest(Struct struct, short version) { + super(version); + ResourceFilter resourceFilter = RequestUtils.resourceFilterFromStructFields(struct); + AccessControlEntryFilter entryFilter = RequestUtils.aceFilterFromStructFields(struct); + this.filter = new AclBindingFilter(resourceFilter, entryFilter); + } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.requestSchema(version())); + RequestUtils.resourceFilterSetStructFields(filter.resourceFilter(), struct); + RequestUtils.aceFilterSetStructFields(filter.entryFilter(), struct); + return struct; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable throwable) { + short versionId = version(); + switch (versionId) { + case 0: + return new DescribeAclsResponse(throttleTimeMs, throwable, Collections.<AclBinding>emptySet()); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_ACLS.latestVersion())); + } + } + + public static DescribeAclsRequest parse(ByteBuffer buffer, short version) { + return new DescribeAclsRequest(ApiKeys.DESCRIBE_ACLS.parseRequest(version, buffer), version); + } + + public AclBindingFilter filter() { + return filter; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java new file mode 100644 index 0000000..0de4865 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.clients.admin.AccessControlEntry; +import org.apache.kafka.clients.admin.AclBinding; +import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DescribeAclsResponse extends AbstractResponse { + private final static String THROTTLE_TIME_MS = "throttle_time_ms"; + private final static String ERROR_CODE = "error_code"; + private final static String ERROR_MESSAGE = "error_message"; + private final static String RESOURCES = "resources"; + private final static String ACLS = "acls"; + + private final int throttleTimeMs; + private final Throwable throwable; + private final Collection<AclBinding> acls; + + public DescribeAclsResponse(int throttleTimeMs, Throwable throwable, Collection<AclBinding> acls) { + this.throttleTimeMs = throttleTimeMs; + this.throwable = throwable; + this.acls = acls; + } + + public DescribeAclsResponse(Struct struct) { + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS); + Errors error = Errors.forCode(struct.getShort(ERROR_CODE)); + if (error != Errors.NONE) { + this.throwable = error.exception(struct.getString(ERROR_MESSAGE)); + this.acls = Collections.emptySet(); + } else { + this.throwable = null; + this.acls = new ArrayList<>(); + for (Object resourceStructObj : struct.getArray(RESOURCES)) { + Struct resourceStruct = (Struct) resourceStructObj; + Resource resource = RequestUtils.resourceFromStructFields(resourceStruct); + for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) { + Struct aclDataStruct = (Struct) aclDataStructObj; + AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct); + this.acls.add(new AclBinding(resource, entry)); + } + } + } + } + + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version)); + struct.set(THROTTLE_TIME_MS, throttleTimeMs); + if (throwable != null) { + Errors errors = Errors.forException(throwable); + struct.set(ERROR_CODE, errors.code()); + struct.set(ERROR_MESSAGE, throwable.getMessage()); + struct.set(RESOURCES, new Struct[0]); + return struct; + } + struct.set(ERROR_CODE, (short) 0); + struct.set(ERROR_MESSAGE, null); + Map<Resource, List<AccessControlEntry>> resourceToData = new HashMap<>(); + for (AclBinding acl : acls) { + List<AccessControlEntry> entry = resourceToData.get(acl.resource()); + if (entry == null) { + entry = new ArrayList<>(); + resourceToData.put(acl.resource(), entry); + } + entry.add(acl.entry()); + } + List<Struct> resourceStructs = new ArrayList<>(); + for (Map.Entry<Resource, List<AccessControlEntry>> tuple : resourceToData.entrySet()) { + Resource resource = tuple.getKey(); + Struct resourceStruct = struct.instance(RESOURCES); + RequestUtils.resourceSetStructFields(resource, resourceStruct); + List<Struct> dataStructs = new ArrayList<>(); + for (AccessControlEntry entry : tuple.getValue()) { + Struct dataStruct = resourceStruct.instance(ACLS); + RequestUtils.aceSetStructFields(entry, dataStruct); + dataStructs.add(dataStruct); + } + resourceStruct.set(ACLS, dataStructs.toArray()); + resourceStructs.add(resourceStruct); + } + struct.set(RESOURCES, resourceStructs.toArray()); + return struct; + } + + public int throttleTimeMs() { + return throttleTimeMs; + } + + public Throwable throwable() { + return throwable; + } + + public Collection<AclBinding> acls() { + return acls; + } + + public static DescribeAclsResponse parse(ByteBuffer buffer, short version) { + return new DescribeAclsResponse(ApiKeys.DESCRIBE_ACLS.responseSchema(version).read(buffer)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java new file mode 100644 index 0000000..f2ce55f --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.clients.admin.AccessControlEntry; +import org.apache.kafka.clients.admin.AccessControlEntryFilter; +import org.apache.kafka.clients.admin.AclOperation; +import org.apache.kafka.clients.admin.AclPermissionType; +import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.clients.admin.ResourceFilter; +import org.apache.kafka.clients.admin.ResourceType; +import org.apache.kafka.common.protocol.types.Struct; + +class RequestUtils { + static Resource resourceFromStructFields(Struct struct) { + byte resourceType = struct.getByte("resource_type"); + String name = struct.getString("resource_name"); + return new Resource(ResourceType.fromCode(resourceType), name); + } + + static void resourceSetStructFields(Resource resource, Struct struct) { + struct.set("resource_type", resource.resourceType().code()); + struct.set("resource_name", resource.name()); + } + + static ResourceFilter resourceFilterFromStructFields(Struct struct) { + byte resourceType = struct.getByte("resource_type"); + String name = struct.getString("resource_name"); + return new ResourceFilter(ResourceType.fromCode(resourceType), name); + } + + static void resourceFilterSetStructFields(ResourceFilter resourceFilter, Struct struct) { + struct.set("resource_type", resourceFilter.resourceType().code()); + struct.set("resource_name", resourceFilter.name()); + } + + static AccessControlEntry aceFromStructFields(Struct struct) { + String principal = struct.getString("principal"); + String host = struct.getString("host"); + byte operation = struct.getByte("operation"); + byte permissionType = struct.getByte("permission_type"); + return new AccessControlEntry(principal, host, AclOperation.fromCode(operation), + AclPermissionType.fromCode(permissionType)); + } + + static void aceSetStructFields(AccessControlEntry data, Struct struct) { + struct.set("principal", data.principal()); + struct.set("host", data.host()); + struct.set("operation", data.operation().code()); + struct.set("permission_type", data.permissionType().code()); + } + + static AccessControlEntryFilter aceFilterFromStructFields(Struct struct) { + String principal = struct.getString("principal"); + String host = struct.getString("host"); + byte operation = struct.getByte("operation"); + byte permissionType = struct.getByte("permission_type"); + return new AccessControlEntryFilter(principal, host, AclOperation.fromCode(operation), + AclPermissionType.fromCode(permissionType)); + } + + static void aceFilterSetStructFields(AccessControlEntryFilter filter, Struct struct) { + struct.set("principal", filter.principal()); + struct.set("host", filter.host()); + struct.set("operation", filter.operation().code()); + struct.set("permission_type", filter.permissionType().code()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java new file mode 100644 index 0000000..34cedb6 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclBindingTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class AclBindingTest { + private static final AclBinding ACL1 = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW)); + + private static final AclBinding ACL2 = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:*", "", AclOperation.READ, AclPermissionType.ALLOW)); + + private static final AclBinding ACL3 = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic2"), + new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.DENY)); + + private static final AclBinding UNKNOWN_ACL = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic2"), + new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.UNKNOWN, AclPermissionType.DENY)); + + private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter( + new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)); + + private static final AclBindingFilter ANY_DENY = new AclBindingFilter( + new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.DENY)); + + private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter( + new ResourceFilter(ResourceType.TOPIC, "mytopic"), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY)); + + @Test + public void testMatching() throws Exception { + assertTrue(ACL1.equals(ACL1)); + final AclBinding acl1Copy = new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:ANONYMOUS", "", AclOperation.ALL, AclPermissionType.ALLOW)); + assertTrue(ACL1.equals(acl1Copy)); + assertTrue(acl1Copy.equals(ACL1)); + assertTrue(ACL2.equals(ACL2)); + assertFalse(ACL1.equals(ACL2)); + assertFalse(ACL2.equals(ACL1)); + assertTrue(AclBindingFilter.ANY.matches(ACL1)); + assertFalse(AclBindingFilter.ANY.equals(ACL1)); + assertTrue(AclBindingFilter.ANY.matches(ACL2)); + assertFalse(AclBindingFilter.ANY.equals(ACL2)); + assertTrue(AclBindingFilter.ANY.matches(ACL3)); + assertFalse(AclBindingFilter.ANY.equals(ACL3)); + assertTrue(AclBindingFilter.ANY.equals(AclBindingFilter.ANY)); + assertTrue(ANY_ANONYMOUS.matches(ACL1)); + assertFalse(ANY_ANONYMOUS.equals(ACL1)); + assertFalse(ANY_ANONYMOUS.matches(ACL2)); + assertFalse(ANY_ANONYMOUS.equals(ACL2)); + assertTrue(ANY_ANONYMOUS.matches(ACL3)); + assertFalse(ANY_ANONYMOUS.equals(ACL3)); + assertFalse(ANY_DENY.matches(ACL1)); + assertFalse(ANY_DENY.matches(ACL2)); + assertTrue(ANY_DENY.matches(ACL3)); + assertTrue(ANY_MYTOPIC.matches(ACL1)); + assertTrue(ANY_MYTOPIC.matches(ACL2)); + assertFalse(ANY_MYTOPIC.matches(ACL3)); + assertTrue(ANY_ANONYMOUS.matches(UNKNOWN_ACL)); + assertTrue(ANY_DENY.matches(UNKNOWN_ACL)); + assertTrue(UNKNOWN_ACL.equals(UNKNOWN_ACL)); + assertFalse(ANY_MYTOPIC.matches(UNKNOWN_ACL)); + } + + @Test + public void testUnknowns() throws Exception { + assertFalse(ACL1.unknown()); + assertFalse(ACL2.unknown()); + assertFalse(ACL3.unknown()); + assertFalse(ANY_ANONYMOUS.unknown()); + assertFalse(ANY_DENY.unknown()); + assertFalse(ANY_MYTOPIC.unknown()); + assertTrue(UNKNOWN_ACL.unknown()); + } + + @Test + public void testMatchesAtMostOne() throws Exception { + assertEquals(null, ACL1.toFilter().findIndefiniteField()); + assertEquals(null, ACL2.toFilter().findIndefiniteField()); + assertEquals(null, ACL3.toFilter().findIndefiniteField()); + assertFalse(ANY_ANONYMOUS.matchesAtMostOne()); + assertFalse(ANY_DENY.matchesAtMostOne()); + assertFalse(ANY_MYTOPIC.matchesAtMostOne()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java new file mode 100644 index 0000000..2d7c546 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class AclOperationTest { + private static class AclOperationTestInfo { + private final AclOperation operation; + private final int code; + private final String name; + private final boolean unknown; + + AclOperationTestInfo(AclOperation operation, int code, String name, boolean unknown) { + this.operation = operation; + this.code = code; + this.name = name; + this.unknown = unknown; + } + } + + private static final AclOperationTestInfo[] INFOS = { + new AclOperationTestInfo(AclOperation.UNKNOWN, 0, "unknown", true), + new AclOperationTestInfo(AclOperation.ANY, 1, "any", false), + new AclOperationTestInfo(AclOperation.ALL, 2, "all", false), + new AclOperationTestInfo(AclOperation.READ, 3, "read", false), + new AclOperationTestInfo(AclOperation.WRITE, 4, "write", false), + new AclOperationTestInfo(AclOperation.CREATE, 5, "create", false), + new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false), + new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false), + new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false), + new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false) + }; + + @Test + public void testIsUnknown() throws Exception { + for (AclOperationTestInfo info : INFOS) { + assertEquals(info.operation + " was supposed to have unknown == " + info.unknown, + info.unknown, info.operation.unknown()); + } + } + + @Test + public void testCode() throws Exception { + for (AclOperationTestInfo info : INFOS) { + assertEquals(info.operation + " was supposed to have code == " + info.code, + info.code, info.operation.code()); + assertEquals("AclOperation.fromCode(" + info.code + ") was supposed to be " + info.operation, + info.operation, AclOperation.fromCode((byte) info.code)); + } + assertEquals(AclOperation.UNKNOWN, AclOperation.fromCode((byte) 120)); + } + + @Test + public void testName() throws Exception { + for (AclOperationTestInfo info : INFOS) { + assertEquals("AclOperation.fromString(" + info.name + ") was supposed to be " + info.operation, + info.operation, AclOperation.fromString(info.name)); + } + assertEquals(AclOperation.UNKNOWN, AclOperation.fromString("something")); + } + + @Test + public void testExhaustive() throws Exception { + assertEquals(INFOS.length, AclOperation.values().length); + for (int i = 0; i < INFOS.length; i++) { + assertEquals(INFOS[i].operation, AclOperation.values()[i]); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java new file mode 100644 index 0000000..aa6deca --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclPermissionTypeTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class AclPermissionTypeTest { + private static class AclPermissionTypeTestInfo { + private final AclPermissionType ty; + private final int code; + private final String name; + private final boolean unknown; + + AclPermissionTypeTestInfo(AclPermissionType ty, int code, String name, boolean unknown) { + this.ty = ty; + this.code = code; + this.name = name; + this.unknown = unknown; + } + } + + private static final AclPermissionTypeTestInfo[] INFOS = { + new AclPermissionTypeTestInfo(AclPermissionType.UNKNOWN, 0, "unknown", true), + new AclPermissionTypeTestInfo(AclPermissionType.ANY, 1, "any", false), + new AclPermissionTypeTestInfo(AclPermissionType.DENY, 2, "deny", false), + new AclPermissionTypeTestInfo(AclPermissionType.ALLOW, 3, "allow", false) + }; + + @Test + public void testIsUnknown() throws Exception { + for (AclPermissionTypeTestInfo info : INFOS) { + assertEquals(info.ty + " was supposed to have unknown == " + info.unknown, + info.unknown, info.ty.unknown()); + } + } + + @Test + public void testCode() throws Exception { + for (AclPermissionTypeTestInfo info : INFOS) { + assertEquals(info.ty + " was supposed to have code == " + info.code, + info.code, info.ty.code()); + assertEquals("AclPermissionType.fromCode(" + info.code + ") was supposed to be " + info.ty, + info.ty, AclPermissionType.fromCode((byte) info.code)); + } + assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromCode((byte) 120)); + } + + @Test + public void testName() throws Exception { + for (AclPermissionTypeTestInfo info : INFOS) { + assertEquals("AclPermissionType.fromString(" + info.name + ") was supposed to be " + info.ty, + info.ty, AclPermissionType.fromString(info.name)); + } + assertEquals(AclPermissionType.UNKNOWN, AclPermissionType.fromString("something")); + } + + @Test + public void testExhaustive() throws Exception { + assertEquals(INFOS.length, AclPermissionType.values().length); + for (int i = 0; i < INFOS.length; i++) { + assertEquals(INFOS[i].ty, AclPermissionType.values()[i]); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 36cb8e8..6d01b0a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -19,17 +19,27 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; +import org.apache.kafka.clients.admin.DeleteAclsResults.FilterResults; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.CreateAclsResponse; +import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; import org.apache.kafka.common.requests.CreateTopicsResponse.Error; import org.apache.kafka.common.requests.CreateTopicsResponse; +import org.apache.kafka.common.requests.DeleteAclsResponse; +import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; +import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; +import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.utils.Time; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -203,4 +213,157 @@ public class KafkaAdminClientTest { future.get(); } } + + private static final AclBinding ACL1 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic3"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)); + private static final AclBinding ACL2 = new AclBinding(new Resource(ResourceType.TOPIC, "mytopic4"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY)); + private static final AclBindingFilter FILTER1 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY)); + private static final AclBindingFilter FILTER2 = new AclBindingFilter(new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY)); + + @Test + public void testDescribeAcls() throws Exception { + try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) { + ctx.mockClient.setNodeApiVersions(NodeApiVersions.create()); + ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet()); + ctx.mockClient.setNode(ctx.nodes.get(0)); + + // Test a call where we get back ACL1 and ACL2. + ctx.mockClient.prepareResponse(new DescribeAclsResponse(0, null, + new ArrayList<AclBinding>() {{ + add(ACL1); + add(ACL2); + }})); + assertCollectionIs(ctx.client.describeAcls(FILTER1).all().get(), ACL1, ACL2); + + // Test a call where we get back no results. + ctx.mockClient.prepareResponse(new DescribeAclsResponse(0, null, + Collections.<AclBinding>emptySet())); + assertTrue(ctx.client.describeAcls(FILTER2).all().get().isEmpty()); + + // Test a call where we get back an error. + ctx.mockClient.prepareResponse(new DescribeAclsResponse(0, + new SecurityDisabledException("Security is disabled"), Collections.<AclBinding>emptySet())); + assertFutureError(ctx.client.describeAcls(FILTER2).all(), SecurityDisabledException.class); + } + } + + @Test + public void testCreateAcls() throws Exception { + try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) { + ctx.mockClient.setNodeApiVersions(NodeApiVersions.create()); + ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet()); + ctx.mockClient.setNode(ctx.nodes.get(0)); + + // Test a call where we successfully create two ACLs. + ctx.mockClient.prepareResponse(new CreateAclsResponse(0, + new ArrayList<AclCreationResponse>() {{ + add(new AclCreationResponse(null)); + add(new AclCreationResponse(null)); + }})); + CreateAclsResults results = ctx.client.createAcls(new ArrayList<AclBinding>() {{ + add(ACL1); + add(ACL2); + }}); + assertCollectionIs(results.results().keySet(), ACL1, ACL2); + for (KafkaFuture<Void> future : results.results().values()) { + future.get(); + } + results.all().get(); + + // Test a call where we fail to create one ACL. + ctx.mockClient.prepareResponse(new CreateAclsResponse(0, + new ArrayList<AclCreationResponse>() {{ + add(new AclCreationResponse(new SecurityDisabledException("Security is disabled"))); + add(new AclCreationResponse(null)); + }})); + results = ctx.client.createAcls(new ArrayList<AclBinding>() {{ + add(ACL1); + add(ACL2); + }}); + assertCollectionIs(results.results().keySet(), ACL1, ACL2); + assertFutureError(results.results().get(ACL1), SecurityDisabledException.class); + results.results().get(ACL2).get(); + assertFutureError(results.all(), SecurityDisabledException.class); + } + } + + @Test + public void testDeleteAcls() throws Exception { + try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) { + ctx.mockClient.setNodeApiVersions(NodeApiVersions.create()); + ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet()); + ctx.mockClient.setNode(ctx.nodes.get(0)); + + // Test a call where one filter has an error. + ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{ + add(new AclFilterResponse(null, + new ArrayList<AclDeletionResult>() {{ + add(new AclDeletionResult(null, ACL1)); + add(new AclDeletionResult(null, ACL2)); + }})); + add(new AclFilterResponse(new SecurityDisabledException("No security"), + Collections.<AclDeletionResult>emptySet())); + }})); + DeleteAclsResults results = ctx.client.deleteAcls(new ArrayList<AclBindingFilter>() {{ + add(FILTER1); + add(FILTER2); + }}); + Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.results(); + FilterResults filter1Results = filterResults.get(FILTER1).get(); + assertEquals(null, filter1Results.acls().get(0).exception()); + assertEquals(ACL1, filter1Results.acls().get(0).acl()); + assertEquals(null, filter1Results.acls().get(1).exception()); + assertEquals(ACL2, filter1Results.acls().get(1).acl()); + assertTrue(filterResults.get(FILTER2).isCompletedExceptionally()); + assertFutureError(filterResults.get(FILTER2), SecurityDisabledException.class); + assertFutureError(results.all(), SecurityDisabledException.class); + + // Test a call where one deletion result has an error. + ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{ + add(new AclFilterResponse(null, + new ArrayList<AclDeletionResult>() {{ + add(new AclDeletionResult(null, ACL1)); + add(new AclDeletionResult(new SecurityDisabledException("No security"), ACL2)); + }})); + add(new AclFilterResponse(null, Collections.<AclDeletionResult>emptySet())); + }})); + results = ctx.client.deleteAcls( + new ArrayList<AclBindingFilter>() {{ + add(FILTER1); + add(FILTER2); + }}); + assertTrue(results.results().get(FILTER2).get().acls().isEmpty()); + assertFutureError(results.all(), SecurityDisabledException.class); + + // Test a call where there are no errors. + ctx.mockClient.prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{ + add(new AclFilterResponse(null, + new ArrayList<AclDeletionResult>() {{ + add(new AclDeletionResult(null, ACL1)); + }})); + add(new AclFilterResponse(null, + new ArrayList<AclDeletionResult>() {{ + add(new AclDeletionResult(null, ACL2)); + }})); + }})); + results = ctx.client.deleteAcls( + new ArrayList<AclBindingFilter>() {{ + add(FILTER1); + add(FILTER2); + }}); + Collection<AclBinding> deleted = results.all().get(); + assertCollectionIs(deleted, ACL1, ACL2); + } + } + + private static <T> void assertCollectionIs(Collection<T> collection, T... elements) { + for (T element : elements) { + assertTrue("Did not find " + element, collection.contains(element)); + } + assertEquals("There are unexpected extra elements in the collection.", + elements.length, collection.size()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java new file mode 100644 index 0000000..8f6f670 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ResourceTypeTest { + private static class AclResourceTypeTestInfo { + private final ResourceType resourceType; + private final int code; + private final String name; + private final boolean unknown; + + AclResourceTypeTestInfo(ResourceType resourceType, int code, String name, boolean unknown) { + this.resourceType = resourceType; + this.code = code; + this.name = name; + this.unknown = unknown; + } + } + + private static final AclResourceTypeTestInfo[] INFOS = { + new AclResourceTypeTestInfo(ResourceType.UNKNOWN, 0, "unknown", true), + new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false), + new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false), + new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false), + new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false) + }; + + @Test + public void testIsUnknown() throws Exception { + for (AclResourceTypeTestInfo info : INFOS) { + assertEquals(info.resourceType + " was supposed to have unknown == " + info.unknown, + info.unknown, info.resourceType.unknown()); + } + } + + @Test + public void testCode() throws Exception { + for (AclResourceTypeTestInfo info : INFOS) { + assertEquals(info.resourceType + " was supposed to have code == " + info.code, + info.code, info.resourceType.code()); + assertEquals("AclResourceType.fromCode(" + info.code + ") was supposed to be " + + info.resourceType, info.resourceType, ResourceType.fromCode((byte) info.code)); + } + assertEquals(ResourceType.UNKNOWN, ResourceType.fromCode((byte) 120)); + } + + @Test + public void testName() throws Exception { + for (AclResourceTypeTestInfo info : INFOS) { + assertEquals("ResourceType.fromString(" + info.name + ") was supposed to be " + + info.resourceType, info.resourceType, ResourceType.fromString(info.name)); + } + assertEquals(ResourceType.UNKNOWN, ResourceType.fromString("something")); + } + + @Test + public void testExhaustive() throws Exception { + assertEquals(INFOS.length, ResourceType.values().length); + for (int i = 0; i < INFOS.length; i++) { + assertEquals(INFOS[i].resourceType, ResourceType.values()[i]); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 4946246..ede55a5 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -16,10 +16,21 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.clients.admin.AccessControlEntry; +import org.apache.kafka.clients.admin.AccessControlEntryFilter; +import org.apache.kafka.clients.admin.AclBinding; +import org.apache.kafka.clients.admin.AclBindingFilter; +import org.apache.kafka.clients.admin.AclOperation; +import org.apache.kafka.clients.admin.AclPermissionType; +import org.apache.kafka.clients.admin.Resource; +import org.apache.kafka.clients.admin.ResourceFilter; +import org.apache.kafka.clients.admin.ResourceType; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.network.ListenerName; @@ -35,6 +46,10 @@ import org.apache.kafka.common.record.MemoryRecordsBuilder; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.record.SimpleRecord; import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation; +import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; +import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; +import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse; import org.apache.kafka.common.utils.Utils; import org.junit.Test; @@ -44,6 +59,7 @@ import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -205,6 +221,15 @@ public class RequestResponseTest { checkRequest(createTxnOffsetCommitRequest()); checkErrorResponse(createTxnOffsetCommitRequest(), new UnknownServerException()); checkResponse(createTxnOffsetCommitResponse(), 0); + checkRequest(createListAclsRequest()); + checkErrorResponse(createListAclsRequest(), new SecurityDisabledException("Security is not enabled.")); + checkResponse(createListAclsResponse(), ApiKeys.DESCRIBE_ACLS.latestVersion()); + checkRequest(createCreateAclsRequest()); + checkErrorResponse(createCreateAclsRequest(), new SecurityDisabledException("Security is not enabled.")); + checkResponse(createCreateAclsResponse(), ApiKeys.CREATE_ACLS.latestVersion()); + checkRequest(createDeleteAclsRequest()); + checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled.")); + checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion()); } @Test @@ -960,6 +985,61 @@ public class RequestResponseTest { return new TxnOffsetCommitResponse(0, errorPerPartitions); } + private DescribeAclsRequest createListAclsRequest() { + return new DescribeAclsRequest.Builder(new AclBindingFilter( + new ResourceFilter(ResourceType.TOPIC, "mytopic"), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).build(); + } + + private DescribeAclsResponse createListAclsResponse() { + return new DescribeAclsResponse(0, null, Collections.singleton(new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW)))); + } + + private CreateAclsRequest createCreateAclsRequest() { + List<AclCreation> creations = new ArrayList<>(); + creations.add(new AclCreation(new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", AclOperation.READ, AclPermissionType.ALLOW)))); + creations.add(new AclCreation(new AclBinding( + new Resource(ResourceType.GROUP, "mygroup"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY)))); + return new CreateAclsRequest.Builder(creations).build(); + } + + private CreateAclsResponse createCreateAclsResponse() { + return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(null), + new AclCreationResponse(new InvalidRequestException("Foo bar")))); + } + + private DeleteAclsRequest createDeleteAclsRequest() { + List<AclBindingFilter> filters = new ArrayList<>(); + filters.add(new AclBindingFilter( + new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, AclPermissionType.ANY))); + filters.add(new AclBindingFilter( + new ResourceFilter(ResourceType.ANY, null), + new AccessControlEntryFilter("User:bob", null, AclOperation.ANY, AclPermissionType.ANY))); + return new DeleteAclsRequest.Builder(filters).build(); + } + + private DeleteAclsResponse createDeleteAclsResponse() { + List<AclFilterResponse> responses = new ArrayList<>(); + responses.add(new AclFilterResponse(null, + new HashSet<AclDeletionResult>() {{ + add(new AclDeletionResult(null, new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic3"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)))); + add(new AclDeletionResult(null, new AclBinding( + new Resource(ResourceType.TOPIC, "mytopic4"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY)))); + }})); + responses.add(new AclFilterResponse(new SecurityDisabledException("No security"), + Collections.<AclDeletionResult>emptySet())); + return new DeleteAclsResponse(0, responses); + } + private static class ByteBufferChannel implements GatheringByteChannel { private final ByteBuffer buf; private boolean closed = false; http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/main/scala/kafka/security/auth/Operation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala index 5d31c62..7d292d2 100644 --- a/core/src/main/scala/kafka/security/auth/Operation.scala +++ b/core/src/main/scala/kafka/security/auth/Operation.scala @@ -17,20 +17,50 @@ package kafka.security.auth import kafka.common.{BaseEnum, KafkaException} +import org.apache.kafka.clients.admin.AclOperation + +import scala.util.{Failure, Success, Try} /** * Different operations a client may perform on kafka resources. */ -sealed trait Operation extends BaseEnum -case object Read extends Operation { val name = "Read" } -case object Write extends Operation { val name = "Write" } -case object Create extends Operation { val name = "Create" } -case object Delete extends Operation { val name = "Delete" } -case object Alter extends Operation { val name = "Alter" } -case object Describe extends Operation { val name = "Describe" } -case object ClusterAction extends Operation { val name = "ClusterAction" } -case object All extends Operation { val name = "All" } +sealed trait Operation extends BaseEnum { + def toJava : AclOperation +} + +case object Read extends Operation { + val name = "Read" + val toJava = AclOperation.READ +} +case object Write extends Operation { + val name = "Write" + val toJava = AclOperation.WRITE +} +case object Create extends Operation { + val name = "Create" + val toJava = AclOperation.CREATE +} +case object Delete extends Operation { + val name = "Delete" + val toJava = AclOperation.DELETE +} +case object Alter extends Operation { + val name = "Alter" + val toJava = AclOperation.ALTER +} +case object Describe extends Operation { + val name = "Describe" + val toJava = AclOperation.DESCRIBE +} +case object ClusterAction extends Operation { + val name = "ClusterAction" + val toJava = AclOperation.CLUSTER_ACTION +} +case object All extends Operation { + val name = "All" + val toJava = AclOperation.ALL +} object Operation { def fromString(operation: String): Operation = { @@ -38,5 +68,13 @@ object Operation { op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(","))) } + def fromJava(operation: AclOperation): Try[Operation] = { + try { + Success(fromString(operation.toString)) + } catch { + case throwable: Throwable => Failure(throwable) + } + } + def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, All) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9815e18f/core/src/main/scala/kafka/security/auth/PermissionType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/PermissionType.scala b/core/src/main/scala/kafka/security/auth/PermissionType.scala index fd2a0fe..c4209e5 100644 --- a/core/src/main/scala/kafka/security/auth/PermissionType.scala +++ b/core/src/main/scala/kafka/security/auth/PermissionType.scala @@ -17,20 +17,27 @@ package kafka.security.auth import kafka.common.{BaseEnum, KafkaException} +import org.apache.kafka.clients.admin.AclPermissionType + +import scala.util.{Failure, Success, Try} /** * PermissionType. */ -sealed trait PermissionType extends BaseEnum +sealed trait PermissionType extends BaseEnum { + val toJava: AclPermissionType +} case object Allow extends PermissionType { val name = "Allow" + val toJava = AclPermissionType.ALLOW } case object Deny extends PermissionType { val name = "Deny" + val toJava = AclPermissionType.DENY } object PermissionType { @@ -39,6 +46,14 @@ object PermissionType { pType.getOrElse(throw new KafkaException(permissionType + " not a valid permissionType name. The valid names are " + values.mkString(","))) } + def fromJava(permissionType: AclPermissionType): Try[PermissionType] = { + try { + Success(fromString(permissionType.toString)) + } catch { + case throwable: Throwable => Failure(throwable) + } + } + def values: Seq[PermissionType] = List(Allow, Deny) }
