This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new b1fc625 KAFKA-9027, KAFKA-9028: Convert create/delete acls
requests/response to use generated protocol (#7725)
b1fc625 is described below
commit b1fc625182586785b82ea511e41e1e9f91769b0b
Author: Ismael Juma <[email protected]>
AuthorDate: Mon Feb 3 07:12:00 2020 -0800
KAFKA-9027, KAFKA-9028: Convert create/delete acls requests/response to use
generated protocol (#7725)
Also add support for flexible versions to both protocol types.
Reviewers: Rajini Sivaram <[email protected]>, Colin Patrick
McCabe <[email protected]>
Co-authored-by: Rajini Sivaram <[email protected]>
Co-authored-by: Jason Gustafson <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 71 +++---
.../org/apache/kafka/common/protocol/ApiKeys.java | 12 +-
.../apache/kafka/common/protocol/CommonFields.java | 27 ---
.../kafka/common/requests/AbstractResponse.java | 4 +-
.../kafka/common/requests/CreateAclsRequest.java | 188 ++++++---------
.../kafka/common/requests/CreateAclsResponse.java | 88 +------
.../kafka/common/requests/DeleteAclsRequest.java | 169 ++++++-------
.../kafka/common/requests/DeleteAclsResponse.java | 266 ++++++---------------
.../kafka/common/requests/DescribeAclsRequest.java | 43 ++--
.../common/requests/DescribeAclsResponse.java | 43 ++--
.../apache/kafka/common/requests/RequestUtils.java | 80 -------
.../common/message/CreateAclsRequest.json | 7 +-
.../common/message/CreateAclsResponse.json | 7 +-
.../common/message/DeleteAclsRequest.json | 5 +-
.../common/message/DeleteAclsResponse.json | 5 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 72 +++---
.../kafka/common/protocol/MessageTestUtil.java | 15 --
.../common/requests/CreateAclsRequestTest.java | 23 +-
.../common/requests/DeleteAclsRequestTest.java | 35 +--
.../common/requests/DeleteAclsResponseTest.java | 133 ++++++-----
.../common/requests/DescribeAclsRequestTest.java | 11 +-
.../common/requests/DescribeAclsResponseTest.java | 8 +-
.../kafka/common/requests/RequestResponseTest.java | 90 ++++---
.../main/scala/kafka/server/DelayedFuture.scala | 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 93 +++----
.../kafka/api/AuthorizerIntegrationTest.scala | 43 ++--
.../scala/unit/kafka/server/RequestQuotaTest.scala | 29 ++-
27 files changed, 652 insertions(+), 919 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 800fab3..6b8e962 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -69,6 +69,9 @@ import
org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData;
import
org.apache.kafka.common.message.AlterPartitionReassignmentsRequestData.ReassignableTopic;
+import org.apache.kafka.common.message.CreateAclsRequestData;
+import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation;
+import
org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
@@ -80,6 +83,11 @@ import
org.apache.kafka.common.message.CreateTopicsRequestData;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
+import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
+import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
+import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
import
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
@@ -126,9 +134,7 @@ import
org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateAclsRequest;
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.CreateDelegationTokenRequest;
import org.apache.kafka.common.requests.CreateDelegationTokenResponse;
import org.apache.kafka.common.requests.CreatePartitionsRequest;
@@ -137,8 +143,6 @@ import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
@@ -1765,44 +1769,48 @@ public class KafkaAdminClient extends AdminClient {
final long now = time.milliseconds();
final Map<AclBinding, KafkaFutureImpl<Void>> futures = new HashMap<>();
final List<AclCreation> aclCreations = new ArrayList<>();
+ final List<AclBinding> aclBindingsSent = new ArrayList<>();
for (AclBinding acl : acls) {
if (futures.get(acl) == null) {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
futures.put(acl, future);
String indefinite = acl.toFilter().findIndefiniteField();
if (indefinite == null) {
- aclCreations.add(new AclCreation(acl));
+ aclCreations.add(CreateAclsRequest.aclCreation(acl));
+ aclBindingsSent.add(acl);
} else {
future.completeExceptionally(new
InvalidRequestException("Invalid ACL creation: " +
indefinite));
}
}
}
+ final CreateAclsRequestData data = new
CreateAclsRequestData().setCreations(aclCreations);
runnable.call(new Call("createAcls", calcDeadlineMs(now,
options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
CreateAclsRequest.Builder createRequest(int timeoutMs) {
- return new CreateAclsRequest.Builder(aclCreations);
+ return new CreateAclsRequest.Builder(data);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
CreateAclsResponse response = (CreateAclsResponse)
abstractResponse;
- List<AclCreationResponse> responses =
response.aclCreationResponses();
- Iterator<AclCreationResponse> iter = responses.iterator();
- for (AclCreation aclCreation : aclCreations) {
- KafkaFutureImpl<Void> future =
futures.get(aclCreation.acl());
+ List<AclCreationResult> responses = response.results();
+ Iterator<AclCreationResult> iter = responses.iterator();
+ for (AclBinding aclBinding : aclBindingsSent) {
+ KafkaFutureImpl<Void> future = futures.get(aclBinding);
if (!iter.hasNext()) {
future.completeExceptionally(new
UnknownServerException(
- "The broker reported no creation result for the
given ACL."));
+ "The broker reported no creation result for the
given ACL: " + aclBinding));
} else {
- AclCreationResponse creation = iter.next();
- if (creation.error().isFailure()) {
-
future.completeExceptionally(creation.error().exception());
- } else {
+ AclCreationResult creation = iter.next();
+ Errors error = Errors.forCode(creation.errorCode());
+ ApiError apiError = new ApiError(error,
creation.errorMessage());
+ if (apiError.isFailure())
+ future.completeExceptionally(apiError.exception());
+ else
future.complete(null);
- }
}
}
}
@@ -1819,39 +1827,46 @@ public class KafkaAdminClient extends AdminClient {
public DeleteAclsResult deleteAcls(Collection<AclBindingFilter> filters,
DeleteAclsOptions options) {
final long now = time.milliseconds();
final Map<AclBindingFilter, KafkaFutureImpl<FilterResults>> futures =
new HashMap<>();
- final List<AclBindingFilter> filterList = new ArrayList<>();
+ final List<AclBindingFilter> aclBindingFiltersSent = new ArrayList<>();
+ final List<DeleteAclsFilter> deleteAclsFilters = new ArrayList<>();
for (AclBindingFilter filter : filters) {
if (futures.get(filter) == null) {
- filterList.add(filter);
+ aclBindingFiltersSent.add(filter);
+
deleteAclsFilters.add(DeleteAclsRequest.deleteAclsFilter(filter));
futures.put(filter, new KafkaFutureImpl<>());
}
}
+ final DeleteAclsRequestData data = new
DeleteAclsRequestData().setFilters(deleteAclsFilters);
runnable.call(new Call("deleteAcls", calcDeadlineMs(now,
options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
DeleteAclsRequest.Builder createRequest(int timeoutMs) {
- return new DeleteAclsRequest.Builder(filterList);
+ return new DeleteAclsRequest.Builder(data);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
DeleteAclsResponse response = (DeleteAclsResponse)
abstractResponse;
- List<AclFilterResponse> responses = response.responses();
- Iterator<AclFilterResponse> iter = responses.iterator();
- for (AclBindingFilter filter : filterList) {
- KafkaFutureImpl<FilterResults> future =
futures.get(filter);
+ List<DeleteAclsResponseData.DeleteAclsFilterResult> results =
response.filterResults();
+ Iterator<DeleteAclsResponseData.DeleteAclsFilterResult> iter =
results.iterator();
+ for (AclBindingFilter bindingFilter : aclBindingFiltersSent) {
+ KafkaFutureImpl<FilterResults> future =
futures.get(bindingFilter);
if (!iter.hasNext()) {
future.completeExceptionally(new
UnknownServerException(
"The broker reported no deletion result for the
given filter."));
} else {
- AclFilterResponse deletion = iter.next();
- if (deletion.error().isFailure()) {
-
future.completeExceptionally(deletion.error().exception());
+ DeleteAclsFilterResult filterResult = iter.next();
+ ApiError error = new
ApiError(Errors.forCode(filterResult.errorCode()), filterResult.errorMessage());
+ if (error.isFailure()) {
+ future.completeExceptionally(error.exception());
} else {
List<FilterResult> filterResults = new
ArrayList<>();
- for (AclDeletionResult deletionResult :
deletion.deletions()) {
- filterResults.add(new
FilterResult(deletionResult.acl(), deletionResult.error().exception()));
+ for (DeleteAclsMatchingAcl matchingAcl :
filterResult.matchingAcls()) {
+ ApiError aclError = new
ApiError(Errors.forCode(matchingAcl.errorCode()),
+ matchingAcl.errorMessage());
+ AclBinding aclBinding =
DeleteAclsResponse.aclBinding(matchingAcl);
+ filterResults.add(new FilterResult(aclBinding,
aclError.exception()));
}
future.complete(new FilterResults(filterResults));
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 000c654..f48f112 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -21,12 +21,16 @@ import
org.apache.kafka.common.message.ApiVersionsRequestData;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.ControlledShutdownResponseData;
+import org.apache.kafka.common.message.CreateAclsRequestData;
+import org.apache.kafka.common.message.CreateAclsResponseData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteTopicsRequestData;
@@ -98,10 +102,6 @@ import org.apache.kafka.common.requests.AlterConfigsRequest;
import org.apache.kafka.common.requests.AlterConfigsResponse;
import org.apache.kafka.common.requests.AlterReplicaLogDirsRequest;
import org.apache.kafka.common.requests.AlterReplicaLogDirsResponse;
-import org.apache.kafka.common.requests.CreateAclsRequest;
-import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.DeleteAclsRequest;
-import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteRecordsRequest;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DescribeConfigsRequest;
@@ -178,8 +178,8 @@ public enum ApiKeys {
TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false,
RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequestData.SCHEMAS,
TxnOffsetCommitResponseData.SCHEMAS),
DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequestData.SCHEMAS,
DescribeAclsResponseData.SCHEMAS),
- CREATE_ACLS(30, "CreateAcls", CreateAclsRequest.schemaVersions(),
CreateAclsResponse.schemaVersions()),
- DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequest.schemaVersions(),
DeleteAclsResponse.schemaVersions()),
+ CREATE_ACLS(30, "CreateAcls", CreateAclsRequestData.SCHEMAS,
CreateAclsResponseData.SCHEMAS),
+ DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequestData.SCHEMAS,
DeleteAclsResponseData.SCHEMAS),
DESCRIBE_CONFIGS(32, "DescribeConfigs",
DescribeConfigsRequest.schemaVersions(),
DescribeConfigsResponse.schemaVersions()),
ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(),
diff --git
a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index 5da38aa..fe756a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -17,7 +17,6 @@
package org.apache.kafka.common.protocol;
import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.resource.PatternType;
public class CommonFields {
public static final Field.Int32 THROTTLE_TIME_MS = new
Field.Int32("throttle_time_ms",
@@ -43,30 +42,4 @@ public class CommonFields {
"The transactional id or null if the producer is not
transactional");
public static final Field.Int64 PRODUCER_ID = new
Field.Int64("producer_id", "Current producer id in use by the transactional
id.");
public static final Field.Int16 PRODUCER_EPOCH = new
Field.Int16("producer_epoch", "Current epoch associated with the producer id.");
-
- // ACL APIs
- public static final Field.Int8 RESOURCE_TYPE = new
Field.Int8("resource_type", "The resource type");
- public static final Field.Str RESOURCE_NAME = new
Field.Str("resource_name", "The resource name");
- public static final Field.NullableStr RESOURCE_NAME_FILTER = new
Field.NullableStr("resource_name", "The resource name filter");
- public static final Field.Int8 RESOURCE_PATTERN_TYPE = new
Field.Int8("resource_pattern_type", "The resource pattern type",
PatternType.LITERAL.code());
- public static final Field.Int8 RESOURCE_PATTERN_TYPE_FILTER = new
Field.Int8("resource_pattern_type_filter", "The resource pattern type filter",
PatternType.LITERAL.code());
- public static final Field.Str PRINCIPAL = new Field.Str("principal", "The
ACL principal");
- public static final Field.NullableStr PRINCIPAL_FILTER = new
Field.NullableStr("principal", "The ACL principal filter");
- public static final Field.Str HOST = new Field.Str("host", "The ACL host");
- public static final Field.NullableStr HOST_FILTER = new
Field.NullableStr("host", "The ACL host filter");
- public static final Field.Int8 OPERATION = new Field.Int8("operation",
"The ACL operation");
- public static final Field.Int8 PERMISSION_TYPE = new
Field.Int8("permission_type", "The ACL permission type");
-
- public static final Field.Str PRINCIPAL_TYPE = new
Field.Str("principal_type", "principalType of the Kafka principal");
- public static final Field.Str PRINCIPAL_NAME = new Field.Str("name", "name
of the Kafka principal");
-
- public static final Field.Int64 COMMITTED_OFFSET = new
Field.Int64("offset",
- "Message offset to be committed");
- public static final Field.NullableStr COMMITTED_METADATA = new
Field.NullableStr("metadata",
- "Any associated metadata the client wants to keep.");
- public static final Field.Int32 COMMITTED_LEADER_EPOCH = new
Field.Int32("leader_epoch",
- "The leader epoch, if provided is derived from the last consumed
record. " +
- "This is used by the consumer to check for log truncation
and to ensure partition " +
- "metadata is up to date following a group rebalance.");
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 8187bf8..621b6a1 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -141,9 +141,9 @@ public abstract class AbstractResponse implements
AbstractRequestResponse {
case DESCRIBE_ACLS:
return new DescribeAclsResponse(struct, version);
case CREATE_ACLS:
- return new CreateAclsResponse(struct);
+ return new CreateAclsResponse(struct, version);
case DELETE_ACLS:
- return new DeleteAclsResponse(struct);
+ return new DeleteAclsResponse(struct, version);
case DESCRIBE_CONFIGS:
return new DescribeConfigsResponse(struct);
case ALTER_CONFIGS:
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index 992e3d4..3eb88a9 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -19,171 +19,123 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.message.CreateAclsRequestData;
+import org.apache.kafka.common.message.CreateAclsRequestData.AclCreation;
+import org.apache.kafka.common.message.CreateAclsResponseData;
+import
org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.resource.ResourceType;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
-import static org.apache.kafka.common.protocol.CommonFields.HOST;
-import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
-import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static
org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
-
public class CreateAclsRequest extends AbstractRequest {
- private final static String CREATIONS_KEY_NAME = "creations";
-
- private static final Schema CREATE_ACLS_REQUEST_V0 = new Schema(
- new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
- RESOURCE_TYPE,
- RESOURCE_NAME,
- PRINCIPAL,
- HOST,
- OPERATION,
- PERMISSION_TYPE))));
-
- /**
- * Version 1 adds RESOURCE_PATTERN_TYPE, to support more than just literal
resource patterns.
- * For more info, see {@link PatternType}.
- *
- * Also, when the quota is violated, brokers will respond to a version 1
or later request before throttling.
- */
- private static final Schema CREATE_ACLS_REQUEST_V1 = new Schema(
- new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
- RESOURCE_TYPE,
- RESOURCE_NAME,
- RESOURCE_PATTERN_TYPE,
- PRINCIPAL,
- HOST,
- OPERATION,
- PERMISSION_TYPE))));
-
- public static Schema[] schemaVersions() {
- return new Schema[]{CREATE_ACLS_REQUEST_V0, CREATE_ACLS_REQUEST_V1};
- }
-
- public static class AclCreation {
- private final AclBinding acl;
-
- public AclCreation(AclBinding acl) {
- this.acl = acl;
- }
-
- static AclCreation fromStruct(Struct struct) {
- ResourcePattern pattern =
RequestUtils.resourcePatternromStructFields(struct);
- AccessControlEntry entry =
RequestUtils.aceFromStructFields(struct);
- return new AclCreation(new AclBinding(pattern, entry));
- }
-
- public AclBinding acl() {
- return acl;
- }
-
- void setStructFields(Struct struct) {
- RequestUtils.resourcePatternSetStructFields(acl.pattern(), struct);
- RequestUtils.aceSetStructFields(acl.entry(), struct);
- }
-
- @Override
- public String toString() {
- return "(acl=" + acl + ")";
- }
- }
public static class Builder extends
AbstractRequest.Builder<CreateAclsRequest> {
- private final List<AclCreation> creations;
+ private final CreateAclsRequestData data;
- public Builder(List<AclCreation> creations) {
+ public Builder(CreateAclsRequestData data) {
super(ApiKeys.CREATE_ACLS);
- this.creations = creations;
+ this.data = data;
}
@Override
public CreateAclsRequest build(short version) {
- return new CreateAclsRequest(version, creations);
+ return new CreateAclsRequest(version, data);
}
@Override
public String toString() {
- return "(type=CreateAclsRequest, creations=" +
Utils.join(creations, ", ") + ")";
+ return data.toString();
}
}
- private final List<AclCreation> aclCreations;
+ private final CreateAclsRequestData data;
- CreateAclsRequest(short version, List<AclCreation> aclCreations) {
+ CreateAclsRequest(short version, CreateAclsRequestData data) {
super(ApiKeys.CREATE_ACLS, version);
- this.aclCreations = aclCreations;
-
- validate(aclCreations);
+ validate(data);
+ this.data = data;
}
public CreateAclsRequest(Struct struct, short version) {
- super(ApiKeys.CREATE_ACLS, version);
- this.aclCreations = new ArrayList<>();
- for (Object creationStructObj : struct.getArray(CREATIONS_KEY_NAME)) {
- Struct creationStruct = (Struct) creationStructObj;
- aclCreations.add(AclCreation.fromStruct(creationStruct));
- }
+ this(version, new CreateAclsRequestData(struct, version));
}
- @Override
- protected Struct toStruct() {
- Struct struct = new
Struct(ApiKeys.CREATE_ACLS.requestSchema(version()));
- List<Struct> requests = new ArrayList<>();
- for (AclCreation creation : aclCreations) {
- Struct creationStruct = struct.instance(CREATIONS_KEY_NAME);
- creation.setStructFields(creationStruct);
- requests.add(creationStruct);
- }
- struct.set(CREATIONS_KEY_NAME, requests.toArray());
- return struct;
+ public List<AclCreation> aclCreations() {
+ return data.creations();
}
- public List<AclCreation> aclCreations() {
- return aclCreations;
+ @Override
+ protected Struct toStruct() {
+ return data.toStruct(version());
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable
throwable) {
- List<CreateAclsResponse.AclCreationResponse> responses = new
ArrayList<>();
- for (int i = 0; i < aclCreations.size(); i++)
- responses.add(new
CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
- return new CreateAclsResponse(throttleTimeMs, responses);
+ CreateAclsResponseData.AclCreationResult result =
CreateAclsRequest.aclResult(throwable);
+ List<CreateAclsResponseData.AclCreationResult> results =
Collections.nCopies(data.creations().size(), result);
+ return new CreateAclsResponse(new CreateAclsResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setResults(results));
}
public static CreateAclsRequest parse(ByteBuffer buffer, short version) {
return new CreateAclsRequest(ApiKeys.CREATE_ACLS.parseRequest(version,
buffer), version);
}
- private void validate(List<AclCreation> aclCreations) {
+ private void validate(CreateAclsRequestData data) {
if (version() == 0) {
- final boolean unsupported = aclCreations.stream()
- .map(AclCreation::acl)
- .map(AclBinding::pattern)
- .map(ResourcePattern::patternType)
- .anyMatch(patternType -> patternType != PatternType.LITERAL);
- if (unsupported) {
+ final boolean unsupported =
data.creations().stream().anyMatch(creation ->
+ creation.resourcePatternType() != PatternType.LITERAL.code());
+ if (unsupported)
throw new UnsupportedVersionException("Version 0 only supports
literal resource pattern types");
- }
}
- final boolean unknown = aclCreations.stream()
- .map(AclCreation::acl)
- .anyMatch(AclBinding::isUnknown);
- if (unknown) {
- throw new IllegalArgumentException("You can not create ACL
bindings with unknown elements");
- }
+ final boolean unknown = data.creations().stream().anyMatch(creation ->
+ creation.resourcePatternType() == PatternType.UNKNOWN.code()
+ || creation.resourceType() == ResourceType.UNKNOWN.code()
+ || creation.permissionType() ==
AclPermissionType.UNKNOWN.code()
+ || creation.operation() == AclOperation.UNKNOWN.code());
+ if (unknown)
+ throw new IllegalArgumentException("CreatableAcls contain unknown
elements: " + data.creations());
+ }
+
+ public static AclBinding aclBinding(AclCreation acl) {
+ ResourcePattern pattern = new ResourcePattern(
+ ResourceType.fromCode(acl.resourceType()),
+ acl.resourceName(),
+ PatternType.fromCode(acl.resourcePatternType()));
+ AccessControlEntry entry = new AccessControlEntry(
+ acl.principal(),
+ acl.host(),
+ AclOperation.fromCode(acl.operation()),
+ AclPermissionType.fromCode(acl.permissionType()));
+ return new AclBinding(pattern, entry);
+ }
+
+ public static AclCreation aclCreation(AclBinding binding) {
+ return new AclCreation()
+ .setHost(binding.entry().host())
+ .setOperation(binding.entry().operation().code())
+ .setPermissionType(binding.entry().permissionType().code())
+ .setPrincipal(binding.entry().principal())
+ .setResourceName(binding.pattern().name())
+ .setResourceType(binding.pattern().resourceType().code())
+ .setResourcePatternType(binding.pattern().patternType().code());
+ }
+
+ private static AclCreationResult aclResult(Throwable throwable) {
+ ApiError apiError = ApiError.fromThrowable(throwable);
+ return new AclCreationResult()
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message());
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index d5f52dd..f9ef457 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -16,110 +16,48 @@
*/
package org.apache.kafka.common.requests;
+import org.apache.kafka.common.message.CreateAclsResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import java.util.stream.Collectors;
public class CreateAclsResponse extends AbstractResponse {
- private final static String CREATION_RESPONSES_KEY_NAME =
"creation_responses";
-
- private static final Schema CREATE_ACLS_RESPONSE_V0 = new Schema(
- THROTTLE_TIME_MS,
- new Field(CREATION_RESPONSES_KEY_NAME, new ArrayOf(new Schema(
- ERROR_CODE,
- ERROR_MESSAGE))));
-
- /**
- * The version number is bumped to indicate that, on quota violation,
brokers send out responses before throttling.
- */
- private static final Schema CREATE_ACLS_RESPONSE_V1 =
CREATE_ACLS_RESPONSE_V0;
-
- public static Schema[] schemaVersions() {
- return new Schema[]{CREATE_ACLS_RESPONSE_V0, CREATE_ACLS_RESPONSE_V1};
- }
-
- public static class AclCreationResponse {
- private final ApiError error;
-
- public AclCreationResponse(ApiError error) {
- this.error = error;
- }
-
- public ApiError error() {
- return error;
- }
-
- @Override
- public String toString() {
- return "(" + error + ")";
- }
- }
-
- private final int throttleTimeMs;
-
- private final List<AclCreationResponse> aclCreationResponses;
+ private final CreateAclsResponseData data;
- public CreateAclsResponse(int throttleTimeMs, List<AclCreationResponse>
aclCreationResponses) {
- this.throttleTimeMs = throttleTimeMs;
- this.aclCreationResponses = aclCreationResponses;
+ public CreateAclsResponse(CreateAclsResponseData data) {
+ this.data = data;
}
- public CreateAclsResponse(Struct struct) {
- this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
- this.aclCreationResponses = new ArrayList<>();
- for (Object responseStructObj :
struct.getArray(CREATION_RESPONSES_KEY_NAME)) {
- Struct responseStruct = (Struct) responseStructObj;
- ApiError error = new ApiError(responseStruct);
- this.aclCreationResponses.add(new AclCreationResponse(error));
- }
+ public CreateAclsResponse(Struct struct, short version) {
+ this.data = new CreateAclsResponseData(struct, version);
}
@Override
protected Struct toStruct(short version) {
- Struct struct = new
Struct(ApiKeys.CREATE_ACLS.responseSchema(version));
- struct.set(THROTTLE_TIME_MS, throttleTimeMs);
- List<Struct> responseStructs = new ArrayList<>();
- for (AclCreationResponse response : aclCreationResponses) {
- Struct responseStruct =
struct.instance(CREATION_RESPONSES_KEY_NAME);
- response.error.write(responseStruct);
- responseStructs.add(responseStruct);
- }
- struct.set(CREATION_RESPONSES_KEY_NAME, responseStructs.toArray());
- return struct;
+ return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
}
- public List<AclCreationResponse> aclCreationResponses() {
- return aclCreationResponses;
+ public List<CreateAclsResponseData.AclCreationResult> results() {
+ return data.results();
}
@Override
public Map<Errors, Integer> errorCounts() {
- Map<Errors, Integer> errorCounts = new HashMap<>();
- for (AclCreationResponse response : aclCreationResponses)
- updateErrorCounts(errorCounts, response.error.error());
- return errorCounts;
+ return errorCounts(results().stream().map(r ->
Errors.forCode(r.errorCode())).collect(Collectors.toList()));
}
public static CreateAclsResponse parse(ByteBuffer buffer, short version) {
- return new
CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer));
+ return new
CreateAclsResponse(ApiKeys.CREATE_ACLS.responseSchema(version).read(buffer),
version);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 5a20ceb..b020b2b 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -16,149 +16,136 @@
*/
package org.apache.kafka.common.requests;
+import java.util.Collections;
+import java.util.stream.Collectors;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBindingFilter;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
+import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
+import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.resource.ResourceType;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import static org.apache.kafka.common.protocol.ApiKeys.DELETE_ACLS;
-import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
-import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
-import static
org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
-import static
org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
public class DeleteAclsRequest extends AbstractRequest {
- private final static String FILTERS = "filters";
-
- private static final Schema DELETE_ACLS_REQUEST_V0 = new Schema(
- new Field(FILTERS, new ArrayOf(new Schema(
- RESOURCE_TYPE,
- RESOURCE_NAME_FILTER,
- PRINCIPAL_FILTER,
- HOST_FILTER,
- OPERATION,
- PERMISSION_TYPE))));
-
- /**
- * V1 sees a new `RESOURCE_PATTERN_TYPE_FILTER` that controls how the
filter handles different resource pattern types.
- * For more info, see {@link PatternType}.
- *
- * Also, when the quota is violated, brokers will respond to a version 1
or later request before throttling.
- */
- private static final Schema DELETE_ACLS_REQUEST_V1 = new Schema(
- new Field(FILTERS, new ArrayOf(new Schema(
- RESOURCE_TYPE,
- RESOURCE_NAME_FILTER,
- RESOURCE_PATTERN_TYPE_FILTER,
- PRINCIPAL_FILTER,
- HOST_FILTER,
- OPERATION,
- PERMISSION_TYPE))));
-
- public static Schema[] schemaVersions() {
- return new Schema[]{DELETE_ACLS_REQUEST_V0, DELETE_ACLS_REQUEST_V1};
- }
-
public static class Builder extends
AbstractRequest.Builder<DeleteAclsRequest> {
- private final List<AclBindingFilter> filters;
+ private final DeleteAclsRequestData data;
- public Builder(List<AclBindingFilter> filters) {
+ public Builder(DeleteAclsRequestData data) {
super(DELETE_ACLS);
- this.filters = filters;
+ this.data = data;
}
@Override
public DeleteAclsRequest build(short version) {
- return new DeleteAclsRequest(version, filters);
+ return new DeleteAclsRequest(version, data);
}
@Override
public String toString() {
- return "(type=DeleteAclsRequest, filters=" + Utils.join(filters,
", ") + ")";
+ return data.toString();
}
+
}
- private final List<AclBindingFilter> filters;
+ private final DeleteAclsRequestData data;
- DeleteAclsRequest(short version, List<AclBindingFilter> filters) {
+ private DeleteAclsRequest(short version, DeleteAclsRequestData data) {
super(ApiKeys.DELETE_ACLS, version);
- this.filters = filters;
+ this.data = data;
+ normalizeAndValidate();
+ }
+
+ private void normalizeAndValidate() {
+ if (version() == 0) {
+ for (DeleteAclsRequestData.DeleteAclsFilter filter :
data.filters()) {
+ PatternType patternType =
PatternType.fromCode(filter.patternTypeFilter());
+
+ // On older brokers, no pattern types existed except LITERAL
(effectively). So even though ANY is not
+ // directly supported on those brokers, we can get the same
effect as ANY by setting the pattern type
+ // to LITERAL. Note that the wildcard `*` is considered
`LITERAL` for compatibility reasons.
+ if (patternType == PatternType.ANY)
+ filter.setPatternTypeFilter(PatternType.LITERAL.code());
+ else if (patternType != PatternType.LITERAL)
+ throw new UnsupportedVersionException("Version 0 does not
support pattern type " +
+ patternType + " (only LITERAL and ANY are
supported)");
+ }
+ }
+
+ final boolean unknown = data.filters().stream().anyMatch(filter ->
+ filter.patternTypeFilter() == PatternType.UNKNOWN.code()
+ || filter.resourceTypeFilter() ==
ResourceType.UNKNOWN.code()
+ || filter.operation() == AclOperation.UNKNOWN.code()
+ || filter.permissionType() ==
AclPermissionType.UNKNOWN.code()
+ );
- validate(version, filters);
+ if (unknown) {
+ throw new IllegalArgumentException("Filters contain UNKNOWN
elements, filters: " + data.filters());
+ }
}
public DeleteAclsRequest(Struct struct, short version) {
super(ApiKeys.DELETE_ACLS, version);
- this.filters = new ArrayList<>();
- for (Object filterStructObj : struct.getArray(FILTERS)) {
- Struct filterStruct = (Struct) filterStructObj;
- ResourcePatternFilter resourceFilter =
RequestUtils.resourcePatternFilterFromStructFields(filterStruct);
- AccessControlEntryFilter aceFilter =
RequestUtils.aceFilterFromStructFields(filterStruct);
- this.filters.add(new AclBindingFilter(resourceFilter, aceFilter));
- }
+ this.data = new DeleteAclsRequestData(struct, version);
}
public List<AclBindingFilter> filters() {
- return filters;
+ return
data.filters().stream().map(DeleteAclsRequest::aclBindingFilter).collect(Collectors.toList());
}
@Override
protected Struct toStruct() {
- Struct struct = new Struct(DELETE_ACLS.requestSchema(version()));
- List<Struct> filterStructs = new ArrayList<>();
- for (AclBindingFilter filter : filters) {
- Struct filterStruct = struct.instance(FILTERS);
-
RequestUtils.resourcePatternFilterSetStructFields(filter.patternFilter(),
filterStruct);
- RequestUtils.aceFilterSetStructFields(filter.entryFilter(),
filterStruct);
- filterStructs.add(filterStruct);
- }
- struct.set(FILTERS, filterStructs.toArray());
- return struct;
+ return data.toStruct(version());
}
@Override
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable
throwable) {
- List<DeleteAclsResponse.AclFilterResponse> responses = new
ArrayList<>();
- for (int i = 0; i < filters.size(); i++) {
- responses.add(new DeleteAclsResponse.AclFilterResponse(
- ApiError.fromThrowable(throwable), Collections.emptySet()));
- }
- return new DeleteAclsResponse(throttleTimeMs, responses);
+ ApiError apiError = ApiError.fromThrowable(throwable);
+ List<DeleteAclsFilterResult> filterResults =
Collections.nCopies(data.filters().size(),
+ new DeleteAclsResponseData.DeleteAclsFilterResult()
+ .setErrorCode(apiError.error().code())
+ .setErrorMessage(apiError.message()));
+ return new DeleteAclsResponse(new DeleteAclsResponseData()
+ .setThrottleTimeMs(throttleTimeMs)
+ .setFilterResults(filterResults));
}
public static DeleteAclsRequest parse(ByteBuffer buffer, short version) {
return new DeleteAclsRequest(DELETE_ACLS.parseRequest(version,
buffer), version);
}
- private void validate(short version, List<AclBindingFilter> filters) {
- if (version == 0) {
- final boolean unsupported = filters.stream()
- .map(AclBindingFilter::patternFilter)
- .map(ResourcePatternFilter::patternType)
- .anyMatch(patternType -> patternType != PatternType.LITERAL &&
patternType != PatternType.ANY);
- if (unsupported) {
- throw new UnsupportedVersionException("Version 0 only supports
literal resource pattern types");
- }
- }
+ public static DeleteAclsFilter deleteAclsFilter(AclBindingFilter filter) {
+ return new DeleteAclsFilter()
+ .setResourceNameFilter(filter.patternFilter().name())
+
.setResourceTypeFilter(filter.patternFilter().resourceType().code())
+ .setPatternTypeFilter(filter.patternFilter().patternType().code())
+ .setHostFilter(filter.entryFilter().host())
+ .setOperation(filter.entryFilter().operation().code())
+ .setPermissionType(filter.entryFilter().permissionType().code())
+ .setPrincipalFilter(filter.entryFilter().principal());
+ }
- final boolean unknown =
filters.stream().anyMatch(AclBindingFilter::isUnknown);
- if (unknown) {
- throw new IllegalArgumentException("Filters contain UNKNOWN
elements");
- }
+ private static AclBindingFilter aclBindingFilter(DeleteAclsFilter filter) {
+ ResourcePatternFilter patternFilter = new ResourcePatternFilter(
+ ResourceType.fromCode(filter.resourceTypeFilter()),
+ filter.resourceNameFilter(),
+ PatternType.fromCode(filter.patternTypeFilter()));
+ AccessControlEntryFilter entryFilter = new AccessControlEntryFilter(
+ filter.principalFilter(),
+ filter.hostFilter(),
+ AclOperation.fromCode(filter.operation()),
+ AclPermissionType.fromCode(filter.permissionType()));
+ return new AclBindingFilter(patternFilter, entryFilter);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index a3b81cc..cb0ca07 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -18,223 +18,66 @@ package org.apache.kafka.common.requests;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
+import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
+import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_MESSAGE;
-import static org.apache.kafka.common.protocol.CommonFields.HOST;
-import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
-import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static
org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
+import java.util.stream.Collectors;
public class DeleteAclsResponse extends AbstractResponse {
public static final Logger log =
LoggerFactory.getLogger(DeleteAclsResponse.class);
- private final static String FILTER_RESPONSES_KEY_NAME = "filter_responses";
- private final static String MATCHING_ACLS_KEY_NAME = "matching_acls";
-
- private static final Schema MATCHING_ACL_V0 = new Schema(
- ERROR_CODE,
- ERROR_MESSAGE,
- RESOURCE_TYPE,
- RESOURCE_NAME,
- PRINCIPAL,
- HOST,
- OPERATION,
- PERMISSION_TYPE);
-
- /**
- * V1 sees a new `RESOURCE_PATTERN_TYPE` that defines the type of the
resource pattern.
- *
- * For more info, see {@link PatternType}.
- */
- private static final Schema MATCHING_ACL_V1 = new Schema(
- ERROR_CODE,
- ERROR_MESSAGE,
- RESOURCE_TYPE,
- RESOURCE_NAME,
- RESOURCE_PATTERN_TYPE,
- PRINCIPAL,
- HOST,
- OPERATION,
- PERMISSION_TYPE);
-
- private static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema(
- THROTTLE_TIME_MS,
- new Field(FILTER_RESPONSES_KEY_NAME,
- new ArrayOf(new Schema(
- ERROR_CODE,
- ERROR_MESSAGE,
- new Field(MATCHING_ACLS_KEY_NAME, new
ArrayOf(MATCHING_ACL_V0), "The matching ACLs")))));
- /**
- * V1 sees a new `RESOURCE_PATTERN_TYPE` field added to MATCHING_ACL_V1,
that describes how the resource pattern is interpreted
- * and version was bumped to indicate that, on quota violation, brokers
send out responses before throttling.
- *
- * For more info, see {@link PatternType}.
- */
- private static final Schema DELETE_ACLS_RESPONSE_V1 = new Schema(
- THROTTLE_TIME_MS,
- new Field(FILTER_RESPONSES_KEY_NAME,
- new ArrayOf(new Schema(
- ERROR_CODE,
- ERROR_MESSAGE,
- new Field(MATCHING_ACLS_KEY_NAME, new
ArrayOf(MATCHING_ACL_V1), "The matching ACLs")))));
+ private final DeleteAclsResponseData data;
- public static Schema[] schemaVersions() {
- return new Schema[]{DELETE_ACLS_RESPONSE_V0, DELETE_ACLS_RESPONSE_V1};
+ public DeleteAclsResponse(DeleteAclsResponseData data) {
+ this.data = data;
}
- public static class AclDeletionResult {
- private final ApiError error;
- private final AclBinding acl;
-
- public AclDeletionResult(ApiError error, AclBinding acl) {
- this.error = error;
- this.acl = acl;
- }
-
- public AclDeletionResult(AclBinding acl) {
- this(ApiError.NONE, acl);
- }
-
- public ApiError error() {
- return error;
- }
-
- public AclBinding acl() {
- return acl;
- }
-
- @Override
- public String toString() {
- return "(error=" + error + ", acl=" + acl + ")";
- }
- }
-
- public static class AclFilterResponse {
- private final ApiError error;
- private final Collection<AclDeletionResult> deletions;
-
- public AclFilterResponse(ApiError error, Collection<AclDeletionResult>
deletions) {
- this.error = error;
- this.deletions = deletions;
- }
-
- public AclFilterResponse(Collection<AclDeletionResult> deletions) {
- this(ApiError.NONE, deletions);
- }
-
- public ApiError error() {
- return error;
- }
-
- public Collection<AclDeletionResult> deletions() {
- return deletions;
- }
-
- @Override
- public String toString() {
- return "(error=" + error + ", deletions=" + Utils.join(deletions,
",") + ")";
- }
- }
-
- private final int throttleTimeMs;
-
- private final List<AclFilterResponse> responses;
-
- public DeleteAclsResponse(int throttleTimeMs, List<AclFilterResponse>
responses) {
- this.throttleTimeMs = throttleTimeMs;
- this.responses = responses;
- }
-
- public DeleteAclsResponse(Struct struct) {
- this.throttleTimeMs = struct.get(THROTTLE_TIME_MS);
- this.responses = new ArrayList<>();
- for (Object responseStructObj :
struct.getArray(FILTER_RESPONSES_KEY_NAME)) {
- Struct responseStruct = (Struct) responseStructObj;
- ApiError error = new ApiError(responseStruct);
- List<AclDeletionResult> deletions = new ArrayList<>();
- for (Object matchingAclStructObj :
responseStruct.getArray(MATCHING_ACLS_KEY_NAME)) {
- Struct matchingAclStruct = (Struct) matchingAclStructObj;
- ApiError matchError = new ApiError(matchingAclStruct);
- AccessControlEntry entry =
RequestUtils.aceFromStructFields(matchingAclStruct);
- ResourcePattern resource =
RequestUtils.resourcePatternromStructFields(matchingAclStruct);
- deletions.add(new AclDeletionResult(matchError, new
AclBinding(resource, entry)));
- }
- this.responses.add(new AclFilterResponse(error, deletions));
- }
+ public DeleteAclsResponse(Struct struct, short version) {
+ data = new DeleteAclsResponseData(struct, version);
}
@Override
protected Struct toStruct(short version) {
validate(version);
-
- Struct struct = new
Struct(ApiKeys.DELETE_ACLS.responseSchema(version));
- struct.set(THROTTLE_TIME_MS, throttleTimeMs);
- List<Struct> responseStructs = new ArrayList<>();
- for (AclFilterResponse response : responses) {
- Struct responseStruct = struct.instance(FILTER_RESPONSES_KEY_NAME);
- response.error.write(responseStruct);
- List<Struct> deletionStructs = new ArrayList<>();
- for (AclDeletionResult deletion : response.deletions()) {
- Struct deletionStruct =
responseStruct.instance(MATCHING_ACLS_KEY_NAME);
- deletion.error.write(deletionStruct);
-
RequestUtils.resourcePatternSetStructFields(deletion.acl().pattern(),
deletionStruct);
- RequestUtils.aceSetStructFields(deletion.acl().entry(),
deletionStruct);
- deletionStructs.add(deletionStruct);
- }
- responseStruct.set(MATCHING_ACLS_KEY_NAME,
deletionStructs.toArray(new Struct[0]));
- responseStructs.add(responseStruct);
- }
- struct.set(FILTER_RESPONSES_KEY_NAME, responseStructs.toArray());
- return struct;
+ return data.toStruct(version);
}
@Override
public int throttleTimeMs() {
- return throttleTimeMs;
+ return data.throttleTimeMs();
}
- public List<AclFilterResponse> responses() {
- return responses;
+ public List<DeleteAclsResponseData.DeleteAclsFilterResult> filterResults()
{
+ return data.filterResults();
}
@Override
public Map<Errors, Integer> errorCounts() {
- Map<Errors, Integer> errorCounts = new HashMap<>();
- for (AclFilterResponse response : responses)
- updateErrorCounts(errorCounts, response.error.error());
- return errorCounts;
+ return errorCounts(filterResults().stream().map(r ->
Errors.forCode(r.errorCode())).collect(Collectors.toList()));
}
public static DeleteAclsResponse parse(ByteBuffer buffer, short version) {
- return new
DeleteAclsResponse(ApiKeys.DELETE_ACLS.responseSchema(version).read(buffer));
+ return new
DeleteAclsResponse(ApiKeys.DELETE_ACLS.parseResponse(version, buffer), version);
}
public String toString() {
- return "(responses=" + Utils.join(responses, ",") + ")";
+ return data.toString();
}
@Override
@@ -244,23 +87,60 @@ public class DeleteAclsResponse extends AbstractResponse {
private void validate(short version) {
if (version == 0) {
- final boolean unsupported = responses.stream()
- .flatMap(r -> r.deletions.stream())
- .map(AclDeletionResult::acl)
- .map(AclBinding::pattern)
- .map(ResourcePattern::patternType)
- .anyMatch(patternType -> patternType != PatternType.LITERAL);
- if (unsupported) {
+ final boolean unsupported = filterResults().stream()
+ .flatMap(r -> r.matchingAcls().stream())
+ .anyMatch(matchingAcl -> matchingAcl.patternType() !=
PatternType.LITERAL.code());
+ if (unsupported)
throw new UnsupportedVersionException("Version 0 only supports
literal resource pattern types");
- }
}
- final boolean unknown = responses.stream()
- .flatMap(r -> r.deletions.stream())
- .map(AclDeletionResult::acl)
- .anyMatch(AclBinding::isUnknown);
- if (unknown) {
- throw new IllegalArgumentException("Response contains UNKNOWN
elements");
- }
+ final boolean unknown = filterResults().stream()
+ .flatMap(r -> r.matchingAcls().stream())
+ .anyMatch(matchingAcl -> matchingAcl.patternType() ==
PatternType.UNKNOWN.code()
+ || matchingAcl.resourceType() ==
ResourceType.UNKNOWN.code()
+ || matchingAcl.permissionType() ==
AclPermissionType.UNKNOWN.code()
+ || matchingAcl.operation() == AclOperation.UNKNOWN.code());
+ if (unknown)
+ throw new IllegalArgumentException("DeleteAclsMatchingAcls contain
UNKNOWN elements");
+ }
+
+ public static DeleteAclsFilterResult filterResult(AclDeleteResult result) {
+ ApiError error = result.exception().map(e ->
ApiError.fromThrowable(e)).orElse(ApiError.NONE);
+ List<DeleteAclsMatchingAcl> matchingAcls =
result.aclBindingDeleteResults().stream()
+ .map(DeleteAclsResponse::matchingAcl)
+ .collect(Collectors.toList());
+ return new DeleteAclsFilterResult()
+ .setErrorCode(error.error().code())
+ .setErrorMessage(error.message())
+ .setMatchingAcls(matchingAcls);
+ }
+
+ private static DeleteAclsMatchingAcl
matchingAcl(AclDeleteResult.AclBindingDeleteResult result) {
+ ApiError error = result.exception().map(e ->
ApiError.fromThrowable(e)).orElse(ApiError.NONE);
+ AclBinding acl = result.aclBinding();
+ return matchingAcl(acl, error);
}
+
+ // Visible for testing
+ public static DeleteAclsMatchingAcl matchingAcl(AclBinding acl, ApiError
error) {
+ return new DeleteAclsMatchingAcl()
+ .setErrorCode(error.error().code())
+ .setErrorMessage(error.message())
+ .setResourceName(acl.pattern().name())
+ .setResourceType(acl.pattern().resourceType().code())
+ .setPatternType(acl.pattern().patternType().code())
+ .setHost(acl.entry().host())
+ .setOperation(acl.entry().operation().code())
+ .setPermissionType(acl.entry().permissionType().code())
+ .setPrincipal(acl.entry().principal());
+ }
+
+ public static AclBinding aclBinding(DeleteAclsMatchingAcl matchingAcl) {
+ ResourcePattern resourcePattern = new
ResourcePattern(ResourceType.fromCode(matchingAcl.resourceType()),
+ matchingAcl.resourceName(),
PatternType.fromCode(matchingAcl.patternType()));
+ AccessControlEntry accessControlEntry = new
AccessControlEntry(matchingAcl.principal(), matchingAcl.host(),
+ AclOperation.fromCode(matchingAcl.operation()),
AclPermissionType.fromCode(matchingAcl.permissionType()));
+ return new AclBinding(resourcePattern, accessControlEntry);
+ }
+
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index f5c58f4..153001d 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.resource.ResourceType;
import java.nio.ByteBuffer;
-
public class DescribeAclsRequest extends AbstractRequest {
public static class Builder extends
AbstractRequest.Builder<DescribeAclsRequest> {
@@ -64,10 +63,30 @@ public class DescribeAclsRequest extends AbstractRequest {
private final DescribeAclsRequestData data;
- public DescribeAclsRequest(DescribeAclsRequestData data, short version) {
+ private DescribeAclsRequest(DescribeAclsRequestData data, short version) {
super(ApiKeys.DESCRIBE_ACLS, version);
this.data = data;
- validate(version);
+ normalizeAndValidate(version);
+ }
+
+ private void normalizeAndValidate(short version) {
+ if (version == 0) {
+ PatternType patternType =
PatternType.fromCode(data.resourcePatternType());
+ // On older brokers, no pattern types existed except LITERAL
(effectively). So even though ANY is not
+ // directly supported on those brokers, we can get the same effect
as ANY by setting the pattern type
+ // to LITERAL. Note that the wildcard `*` is considered `LITERAL`
for compatibility reasons.
+ if (patternType == PatternType.ANY)
+ data.setResourcePatternType(PatternType.LITERAL.code());
+ else if (patternType != PatternType.LITERAL)
+ throw new UnsupportedVersionException("Version 0 only supports
literal resource pattern types");
+ }
+
+ if (data.resourcePatternType() == PatternType.UNKNOWN.code()
+ || data.resourceType() == ResourceType.UNKNOWN.code()
+ || data.permissionType() == AclPermissionType.UNKNOWN.code()
+ || data.operation() == AclOperation.UNKNOWN.code()) {
+ throw new IllegalArgumentException("DescribeAclsRequest contains
UNKNOWN elements: " + data);
+ }
}
public DescribeAclsRequest(Struct struct, short version) {
@@ -111,22 +130,4 @@ public class DescribeAclsRequest extends AbstractRequest {
return new AclBindingFilter(rpf, acef);
}
- private void validate(short version) {
- if (version == 0) {
- if (data.resourcePatternType() == PatternType.ANY.code()) {
- data.setResourcePatternType(PatternType.LITERAL.code());
- }
- if (data.resourcePatternType() != PatternType.LITERAL.code()) {
- throw new UnsupportedVersionException("Version 0 only supports
literal resource pattern types");
- }
- }
-
- if (data.resourcePatternType() == PatternType.UNKNOWN.code()
- || data.resourceType() == ResourceType.UNKNOWN.code()
- || data.permissionType() == AclPermissionType.UNKNOWN.code()
- || data.operation() == AclOperation.UNKNOWN.code()) {
- throw new IllegalArgumentException("Filter contain UNKNOWN
elements");
- }
- }
-
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index bbf90b2..cb1bc43 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
@@ -105,9 +107,8 @@ public class DescribeAclsResponse extends AbstractResponse {
}
}
- private static List<AclBinding> aclBindings(DescribeAclsResource resource)
{
- List<AclBinding> acls = new ArrayList<>();
- for (AclDescription acl : resource.acls()) {
+ private static Stream<AclBinding> aclBindings(DescribeAclsResource
resource) {
+ return resource.acls().stream().map(acl -> {
ResourcePattern pattern = new ResourcePattern(
ResourceType.fromCode(resource.type()),
resource.name(),
@@ -117,26 +118,21 @@ public class DescribeAclsResponse extends
AbstractResponse {
acl.host(),
AclOperation.fromCode(acl.operation()),
AclPermissionType.fromCode(acl.permissionType()));
- acls.add(new AclBinding(pattern, entry));
- }
- return acls;
+ return new AclBinding(pattern, entry);
+ });
}
public static List<AclBinding> aclBindings(List<DescribeAclsResource>
resources) {
- List<AclBinding> acls = new ArrayList<>();
- for (DescribeAclsResource resource : resources) {
- acls.addAll(aclBindings(resource));
- }
- return acls;
+ return
resources.stream().flatMap(DescribeAclsResponse::aclBindings).collect(Collectors.toList());
}
- public static DescribeAclsResponse prepareResponse(int throttleTimeMs,
ApiError error, Collection<AclBinding> acls) {
- Map<ResourcePattern, List<AccessControlEntry>> map = new HashMap<>();
+ public static List<DescribeAclsResource>
aclsResources(Collection<AclBinding> acls) {
+ Map<ResourcePattern, List<AccessControlEntry>> patternToEntries = new
HashMap<>();
for (AclBinding acl : acls) {
- map.computeIfAbsent(acl.pattern(), v -> new
ArrayList<>()).add(acl.entry());
+ patternToEntries.computeIfAbsent(acl.pattern(), v -> new
ArrayList<>()).add(acl.entry());
}
- List<DescribeAclsResource> resources = new ArrayList<>();
- for (Entry<ResourcePattern, List<AccessControlEntry>> entry :
map.entrySet()) {
+ List<DescribeAclsResource> resources = new
ArrayList<>(patternToEntries.size());
+ for (Entry<ResourcePattern, List<AccessControlEntry>> entry :
patternToEntries.entrySet()) {
ResourcePattern key = entry.getKey();
List<AclDescription> aclDescriptions = new ArrayList<>();
for (AccessControlEntry ace : entry.getValue()) {
@@ -148,17 +144,12 @@ public class DescribeAclsResponse extends
AbstractResponse {
aclDescriptions.add(ad);
}
DescribeAclsResource dar = new DescribeAclsResource()
- .setName(key.name())
- .setPatternType(key.patternType().code())
- .setType(key.resourceType().code())
- .setAcls(aclDescriptions);
+ .setName(key.name())
+ .setPatternType(key.patternType().code())
+ .setType(key.resourceType().code())
+ .setAcls(aclDescriptions);
resources.add(dar);
}
- DescribeAclsResponseData data = new DescribeAclsResponseData()
- .setThrottleTimeMs(throttleTimeMs)
- .setErrorCode(error.error().code())
- .setErrorMessage(error.message())
- .setResources(resources);
- return new DescribeAclsResponse(data);
+ return resources;
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index c3dfaa1..2b1f04c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -16,97 +16,17 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.acl.AccessControlEntry;
-import org.apache.kafka.common.acl.AccessControlEntryFilter;
-import org.apache.kafka.common.acl.AclOperation;
-import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.record.RecordBatch;
-import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.resource.ResourcePattern;
-import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.resource.ResourceType;
import java.nio.ByteBuffer;
import java.util.Optional;
-import static org.apache.kafka.common.protocol.CommonFields.HOST;
-import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.OPERATION;
-import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
-import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
-import static
org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
-import static
org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE;
-import static
org.apache.kafka.common.protocol.CommonFields.RESOURCE_PATTERN_TYPE_FILTER;
-import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
-
public final class RequestUtils {
private RequestUtils() {}
- static ResourcePattern resourcePatternromStructFields(Struct struct) {
- byte resourceType = struct.get(RESOURCE_TYPE);
- String name = struct.get(RESOURCE_NAME);
- PatternType patternType = PatternType.fromCode(
- struct.getOrElse(RESOURCE_PATTERN_TYPE,
PatternType.LITERAL.code()));
- return new ResourcePattern(ResourceType.fromCode(resourceType), name,
patternType);
- }
-
- static void resourcePatternSetStructFields(ResourcePattern pattern, Struct
struct) {
- struct.set(RESOURCE_TYPE, pattern.resourceType().code());
- struct.set(RESOURCE_NAME, pattern.name());
- struct.setIfExists(RESOURCE_PATTERN_TYPE,
pattern.patternType().code());
- }
-
- static ResourcePatternFilter resourcePatternFilterFromStructFields(Struct
struct) {
- byte resourceType = struct.get(RESOURCE_TYPE);
- String name = struct.get(RESOURCE_NAME_FILTER);
- PatternType patternType = PatternType.fromCode(
- struct.getOrElse(RESOURCE_PATTERN_TYPE_FILTER,
PatternType.LITERAL.code()));
- return new ResourcePatternFilter(ResourceType.fromCode(resourceType),
name, patternType);
- }
-
- static void resourcePatternFilterSetStructFields(ResourcePatternFilter
patternFilter, Struct struct) {
- struct.set(RESOURCE_TYPE, patternFilter.resourceType().code());
- struct.set(RESOURCE_NAME_FILTER, patternFilter.name());
- struct.setIfExists(RESOURCE_PATTERN_TYPE_FILTER,
patternFilter.patternType().code());
- }
-
- static AccessControlEntry aceFromStructFields(Struct struct) {
- String principal = struct.get(PRINCIPAL);
- String host = struct.get(HOST);
- byte operation = struct.get(OPERATION);
- byte permissionType = struct.get(PERMISSION_TYPE);
- return new AccessControlEntry(principal, host,
AclOperation.fromCode(operation),
- AclPermissionType.fromCode(permissionType));
- }
-
- static void aceSetStructFields(AccessControlEntry data, Struct struct) {
- struct.set(PRINCIPAL, data.principal());
- struct.set(HOST, data.host());
- struct.set(OPERATION, data.operation().code());
- struct.set(PERMISSION_TYPE, data.permissionType().code());
- }
-
- static AccessControlEntryFilter aceFilterFromStructFields(Struct struct) {
- String principal = struct.get(PRINCIPAL_FILTER);
- String host = struct.get(HOST_FILTER);
- byte operation = struct.get(OPERATION);
- byte permissionType = struct.get(PERMISSION_TYPE);
- return new AccessControlEntryFilter(principal, host,
AclOperation.fromCode(operation),
- AclPermissionType.fromCode(permissionType));
- }
-
- static void aceFilterSetStructFields(AccessControlEntryFilter filter,
Struct struct) {
- struct.set(PRINCIPAL_FILTER, filter.principal());
- struct.set(HOST_FILTER, filter.host());
- struct.set(OPERATION, filter.operation().code());
- struct.set(PERMISSION_TYPE, filter.permissionType().code());
- }
-
static void setLeaderEpochIfExists(Struct struct, Field.Int32
leaderEpochField, Optional<Integer> leaderEpoch) {
struct.setIfExists(leaderEpochField,
leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH));
}
diff --git a/clients/src/main/resources/common/message/CreateAclsRequest.json
b/clients/src/main/resources/common/message/CreateAclsRequest.json
index 72d4d11..a9bd9c5 100644
--- a/clients/src/main/resources/common/message/CreateAclsRequest.json
+++ b/clients/src/main/resources/common/message/CreateAclsRequest.json
@@ -18,10 +18,11 @@
"type": "request",
"name": "CreateAclsRequest",
// Version 1 adds resource pattern type.
- "validVersions": "0-1",
- "flexibleVersions": "none",
+ // Version 2 enables flexible versions.
+ "validVersions": "0-2",
+ "flexibleVersions": "2+",
"fields": [
- { "name": "Creations", "type": "[]CreatableAcl", "versions": "0+",
+ { "name": "Creations", "type": "[]AclCreation", "versions": "0+",
"about": "The ACLs that we want to create.", "fields": [
{ "name": "ResourceType", "type": "int8", "versions": "0+",
"about": "The type of the resource." },
diff --git a/clients/src/main/resources/common/message/CreateAclsResponse.json
b/clients/src/main/resources/common/message/CreateAclsResponse.json
index d84b723..7b0de7e 100644
--- a/clients/src/main/resources/common/message/CreateAclsResponse.json
+++ b/clients/src/main/resources/common/message/CreateAclsResponse.json
@@ -18,12 +18,13 @@
"type": "response",
"name": "CreateAclsResponse",
// Starting in version 1, on quota violation, brokers send out responses
before throttling.
- "validVersions": "0-1",
- "flexibleVersions": "none",
+ // Version 2 enables flexible versions.
+ "validVersions": "0-2",
+ "flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
- { "name": "Results", "type": "[]CreatableAclResult", "versions": "0+",
+ { "name": "Results", "type": "[]AclCreationResult", "versions": "0+",
"about": "The results for each ACL creation.", "fields": [
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The result error, or zero if there was no error." },
diff --git a/clients/src/main/resources/common/message/DeleteAclsRequest.json
b/clients/src/main/resources/common/message/DeleteAclsRequest.json
index c9d6d4a..664737e 100644
--- a/clients/src/main/resources/common/message/DeleteAclsRequest.json
+++ b/clients/src/main/resources/common/message/DeleteAclsRequest.json
@@ -18,8 +18,9 @@
"type": "request",
"name": "DeleteAclsRequest",
// Version 1 adds the pattern type.
- "validVersions": "0-1",
- "flexibleVersions": "none",
+ // Version 2 enables flexible versions.
+ "validVersions": "0-2",
+ "flexibleVersions": "2+",
"fields": [
{ "name": "Filters", "type": "[]DeleteAclsFilter", "versions": "0+",
"about": "The filters to use when deleting ACLs.", "fields": [
diff --git a/clients/src/main/resources/common/message/DeleteAclsResponse.json
b/clients/src/main/resources/common/message/DeleteAclsResponse.json
index 303fa2b..08f5702 100644
--- a/clients/src/main/resources/common/message/DeleteAclsResponse.json
+++ b/clients/src/main/resources/common/message/DeleteAclsResponse.json
@@ -19,8 +19,9 @@
"name": "DeleteAclsResponse",
// Version 1 adds the resource pattern type.
// Starting in version 1, on quota violation, brokers send out responses
before throttling.
- "validVersions": "0-1",
- "flexibleVersions": "none",
+ // Version 2 enables flexible versions.
+ "validVersions": "0-2",
+ "flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
"about": "The duration in milliseconds for which the request was
throttled due to a quota violation, or zero if the request did not violate any
quota." },
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index f82fddd..c869aaa 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -61,13 +61,16 @@ import
org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
-import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.CreateAclsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData;
+import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResultCollection;
import org.apache.kafka.common.message.DeleteTopicsResponseData;
import
org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult;
+import org.apache.kafka.common.message.DescribeAclsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import
org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember;
import
org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
@@ -88,13 +91,10 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AlterPartitionReassignmentsResponse;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.CreateAclsResponse;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
import org.apache.kafka.common.requests.DeleteTopicsRequest;
@@ -713,18 +713,18 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Test a call where we get back ACL1 and ACL2.
-
env.kafkaClient().prepareResponse(DescribeAclsResponse.prepareResponse(0,
ApiError.NONE,
- asList(ACL1, ACL2)));
+ env.kafkaClient().prepareResponse(new DescribeAclsResponse(new
DescribeAclsResponseData()
+ .setResources(DescribeAclsResponse.aclsResources(asList(ACL1,
ACL2)))));
assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(),
ACL1, ACL2);
// Test a call where we get back no results.
-
env.kafkaClient().prepareResponse(DescribeAclsResponse.prepareResponse(0,
ApiError.NONE,
- Collections.<AclBinding>emptySet()));
+ env.kafkaClient().prepareResponse(new DescribeAclsResponse(new
DescribeAclsResponseData()));
assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty());
// Test a call where we get back an error.
-
env.kafkaClient().prepareResponse(DescribeAclsResponse.prepareResponse(0,
- new ApiError(Errors.SECURITY_DISABLED, "Security is
disabled"), Collections.<AclBinding>emptySet()));
+ env.kafkaClient().prepareResponse(new DescribeAclsResponse(new
DescribeAclsResponseData()
+ .setErrorCode(Errors.SECURITY_DISABLED.code())
+ .setErrorMessage("Security is disabled")));
TestUtils.assertFutureError(env.adminClient().describeAcls(FILTER2).values(),
SecurityDisabledException.class);
// Test a call where we supply an invalid filter.
@@ -739,8 +739,9 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Test a call where we successfully create two ACLs.
- env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
- asList(new AclCreationResponse(ApiError.NONE), new
AclCreationResponse(ApiError.NONE))));
+ env.kafkaClient().prepareResponse(new CreateAclsResponse(new
CreateAclsResponseData().setResults(asList(
+ new CreateAclsResponseData.AclCreationResult(),
+ new CreateAclsResponseData.AclCreationResult()))));
CreateAclsResult results =
env.adminClient().createAcls(asList(ACL1, ACL2));
assertCollectionIs(results.values().keySet(), ACL1, ACL2);
for (KafkaFuture<Void> future : results.values().values())
@@ -748,10 +749,11 @@ public class KafkaAdminClientTest {
results.all().get();
// Test a call where we fail to create one ACL.
- env.kafkaClient().prepareResponse(new CreateAclsResponse(0, asList(
- new AclCreationResponse(new ApiError(Errors.SECURITY_DISABLED,
"Security is disabled")),
- new AclCreationResponse(ApiError.NONE))
- ));
+ env.kafkaClient().prepareResponse(new CreateAclsResponse(new
CreateAclsResponseData().setResults(asList(
+ new CreateAclsResponseData.AclCreationResult()
+ .setErrorCode(Errors.SECURITY_DISABLED.code())
+ .setErrorMessage("Security is disabled"),
+ new CreateAclsResponseData.AclCreationResult()))));
results = env.adminClient().createAcls(asList(ACL1, ACL2));
assertCollectionIs(results.values().keySet(), ACL1, ACL2);
TestUtils.assertFutureError(results.values().get(ACL1),
SecurityDisabledException.class);
@@ -766,10 +768,16 @@ public class KafkaAdminClientTest {
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
// Test a call where one filter has an error.
- env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
- new AclFilterResponse(asList(new AclDeletionResult(ACL1),
new AclDeletionResult(ACL2))),
- new AclFilterResponse(new
ApiError(Errors.SECURITY_DISABLED, "No security"),
- Collections.<AclDeletionResult>emptySet()))));
+ env.kafkaClient().prepareResponse(new DeleteAclsResponse(new
DeleteAclsResponseData()
+ .setThrottleTimeMs(0)
+ .setFilterResults(asList(
+ new DeleteAclsResponseData.DeleteAclsFilterResult()
+ .setMatchingAcls(asList(
+ DeleteAclsResponse.matchingAcl(ACL1,
ApiError.NONE),
+ DeleteAclsResponse.matchingAcl(ACL2,
ApiError.NONE))),
+ new DeleteAclsResponseData.DeleteAclsFilterResult()
+ .setErrorCode(Errors.SECURITY_DISABLED.code())
+ .setErrorMessage("No security")))));
DeleteAclsResult results =
env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults =
results.values();
FilterResults filter1Results = filterResults.get(FILTER1).get();
@@ -781,18 +789,28 @@ public class KafkaAdminClientTest {
TestUtils.assertFutureError(results.all(),
SecurityDisabledException.class);
// Test a call where one deletion result has an error.
- env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
- new AclFilterResponse(asList(new AclDeletionResult(ACL1),
- new AclDeletionResult(new
ApiError(Errors.SECURITY_DISABLED, "No security"), ACL2))),
- new
AclFilterResponse(Collections.<AclDeletionResult>emptySet()))));
+ env.kafkaClient().prepareResponse(new DeleteAclsResponse(new
DeleteAclsResponseData()
+ .setThrottleTimeMs(0)
+ .setFilterResults(asList(
+ new DeleteAclsResponseData.DeleteAclsFilterResult()
+ .setMatchingAcls(asList(
+ DeleteAclsResponse.matchingAcl(ACL1,
ApiError.NONE),
+ new DeleteAclsResponseData.DeleteAclsMatchingAcl()
+ .setErrorCode(Errors.SECURITY_DISABLED.code())
+ .setErrorMessage("No security"))),
+ new DeleteAclsResponseData.DeleteAclsFilterResult()))));
results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
assertTrue(results.values().get(FILTER2).get().values().isEmpty());
TestUtils.assertFutureError(results.all(),
SecurityDisabledException.class);
// Test a call where there are no errors.
- env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
- new AclFilterResponse(asList(new AclDeletionResult(ACL1))),
- new AclFilterResponse(asList(new
AclDeletionResult(ACL2))))));
+ env.kafkaClient().prepareResponse(new DeleteAclsResponse(new
DeleteAclsResponseData()
+ .setThrottleTimeMs(0)
+ .setFilterResults(asList(
+ new DeleteAclsResponseData.DeleteAclsFilterResult()
+
.setMatchingAcls(asList(DeleteAclsResponse.matchingAcl(ACL1, ApiError.NONE))),
+ new DeleteAclsResponseData.DeleteAclsFilterResult()
+
.setMatchingAcls(asList(DeleteAclsResponse.matchingAcl(ACL2,
ApiError.NONE)))))));
results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
Collection<AclBinding> deleted = results.all().get();
assertCollectionIs(deleted, ACL1, ACL2);
diff --git
a/clients/src/test/java/org/apache/kafka/common/protocol/MessageTestUtil.java
b/clients/src/test/java/org/apache/kafka/common/protocol/MessageTestUtil.java
index 942ff7f..945246d 100644
---
a/clients/src/test/java/org/apache/kafka/common/protocol/MessageTestUtil.java
+++
b/clients/src/test/java/org/apache/kafka/common/protocol/MessageTestUtil.java
@@ -32,19 +32,4 @@ public final class MessageTestUtil {
bytes.flip();
return bytes;
}
-
- public static void messageFromByteBuffer(ByteBuffer bytes, Message
message, short version) {
- message.read(new ByteBufferAccessor(bytes.duplicate()), version);
- }
-
- public static String byteBufferToString(ByteBuffer buf) {
- ByteBuffer buf2 = buf.duplicate();
- StringBuilder bld = new StringBuilder();
- String prefix = "";
- while (buf2.hasRemaining()) {
- bld.append(String.format("%s%02x", prefix, (int) buf2.get()));
- prefix = " ";
- }
- return bld.toString();
- }
}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
index 5642677..06b4b9f 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/CreateAclsRequestTest.java
@@ -22,8 +22,8 @@ import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.CreateAclsRequestData;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
@@ -53,17 +53,17 @@ public class CreateAclsRequestTest {
@Test(expected = UnsupportedVersionException.class)
public void shouldThrowOnV0IfNotLiteral() {
- new CreateAclsRequest(V0, aclCreations(PREFIXED_ACL1));
+ new CreateAclsRequest(V0, data(PREFIXED_ACL1));
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnIfUnknown() {
- new CreateAclsRequest(V0, aclCreations(UNKNOWN_ACL1));
+ new CreateAclsRequest(V0, data(UNKNOWN_ACL1));
}
@Test
public void shouldRoundTripV0() {
- final CreateAclsRequest original = new CreateAclsRequest(V0,
aclCreations(LITERAL_ACL1, LITERAL_ACL2));
+ final CreateAclsRequest original = new CreateAclsRequest(V0,
data(LITERAL_ACL1, LITERAL_ACL2));
final Struct struct = original.toStruct();
final CreateAclsRequest result = new CreateAclsRequest(struct, V0);
@@ -73,7 +73,7 @@ public class CreateAclsRequestTest {
@Test
public void shouldRoundTripV1() {
- final CreateAclsRequest original = new CreateAclsRequest(V1,
aclCreations(LITERAL_ACL1, PREFIXED_ACL1));
+ final CreateAclsRequest original = new CreateAclsRequest(V1,
data(LITERAL_ACL1, PREFIXED_ACL1));
final Struct struct = original.toStruct();
final CreateAclsRequest result = new CreateAclsRequest(struct, V1);
@@ -85,15 +85,16 @@ public class CreateAclsRequestTest {
assertEquals("Number of Acls wrong", original.aclCreations().size(),
actual.aclCreations().size());
for (int idx = 0; idx != original.aclCreations().size(); ++idx) {
- final AclBinding originalBinding =
original.aclCreations().get(idx).acl();
- final AclBinding actualBinding =
actual.aclCreations().get(idx).acl();
+ final AclBinding originalBinding =
CreateAclsRequest.aclBinding(original.aclCreations().get(idx));
+ final AclBinding actualBinding =
CreateAclsRequest.aclBinding(actual.aclCreations().get(idx));
assertEquals(originalBinding, actualBinding);
}
}
- private static List<AclCreation> aclCreations(final AclBinding... acls) {
- return Arrays.stream(acls)
- .map(AclCreation::new)
+ private static CreateAclsRequestData data(final AclBinding... acls) {
+ List<CreateAclsRequestData.AclCreation> aclCreations =
Arrays.stream(acls)
+ .map(CreateAclsRequest::aclCreation)
.collect(Collectors.toList());
+ return new CreateAclsRequestData().setCreations(aclCreations);
}
-}
\ No newline at end of file
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
index 9be8d59..19b65ed 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsRequestTest.java
@@ -22,16 +22,18 @@ import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePatternFilter;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.List;
+import java.util.stream.Collectors;
+import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
public class DeleteAclsRequestTest {
private static final short V0 = 0;
@@ -49,19 +51,19 @@ public class DeleteAclsRequestTest {
private static final AclBindingFilter UNKNOWN_FILTER = new
AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "prefix",
PatternType.PREFIXED),
new AccessControlEntryFilter("User:*", "127.0.0.1",
AclOperation.CREATE, AclPermissionType.ALLOW));
- @Test(expected = UnsupportedVersionException.class)
+ @Test
public void shouldThrowOnV0IfPrefixed() {
- new DeleteAclsRequest(V0, aclFilters(PREFIXED_FILTER));
+ assertThrows(UnsupportedVersionException.class, () -> new
DeleteAclsRequest.Builder(requestData(PREFIXED_FILTER)).build(V0));
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void shouldThrowOnUnknownElements() {
- new DeleteAclsRequest(V1, aclFilters(UNKNOWN_FILTER));
+ assertThrows(IllegalArgumentException.class, () -> new
DeleteAclsRequest.Builder(requestData(UNKNOWN_FILTER)).build(V1));
}
@Test
public void shouldRoundTripLiteralV0() {
- final DeleteAclsRequest original = new DeleteAclsRequest(V0,
aclFilters(LITERAL_FILTER));
+ final DeleteAclsRequest original = new
DeleteAclsRequest.Builder(requestData(LITERAL_FILTER)).build(V0);
final Struct struct = original.toStruct();
final DeleteAclsRequest result = new DeleteAclsRequest(struct, V0);
@@ -71,13 +73,14 @@ public class DeleteAclsRequestTest {
@Test
public void shouldRoundTripAnyV0AsLiteral() {
- final DeleteAclsRequest original = new DeleteAclsRequest(V0,
aclFilters(ANY_FILTER));
- final DeleteAclsRequest expected = new DeleteAclsRequest(V0,
aclFilters(
+ final DeleteAclsRequest original = new
DeleteAclsRequest.Builder(requestData(ANY_FILTER)).build(V0);
+ final DeleteAclsRequest expected = new
DeleteAclsRequest.Builder(requestData(
new AclBindingFilter(new ResourcePatternFilter(
ANY_FILTER.patternFilter().resourceType(),
ANY_FILTER.patternFilter().name(),
PatternType.LITERAL),
- ANY_FILTER.entryFilter())));
+ ANY_FILTER.entryFilter()))
+ ).build(V0);
final DeleteAclsRequest result = new
DeleteAclsRequest(original.toStruct(), V0);
@@ -86,7 +89,9 @@ public class DeleteAclsRequestTest {
@Test
public void shouldRoundTripV1() {
- final DeleteAclsRequest original = new DeleteAclsRequest(V1,
aclFilters(LITERAL_FILTER, PREFIXED_FILTER, ANY_FILTER));
+ final DeleteAclsRequest original = new DeleteAclsRequest.Builder(
+ requestData(LITERAL_FILTER, PREFIXED_FILTER, ANY_FILTER)
+ ).build(V1);
final Struct struct = original.toStruct();
final DeleteAclsRequest result = new DeleteAclsRequest(struct, V1);
@@ -104,7 +109,9 @@ public class DeleteAclsRequestTest {
}
}
- private static List<AclBindingFilter> aclFilters(final AclBindingFilter...
acls) {
- return Arrays.asList(acls);
+ private static DeleteAclsRequestData requestData(AclBindingFilter... acls)
{
+ return new DeleteAclsRequestData().setFilters(asList(acls).stream()
+ .map(DeleteAclsRequest::deleteAclsFilter)
+ .collect(Collectors.toList()));
}
-}
\ No newline at end of file
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
index f8bec15..aaebc0c 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/DeleteAclsResponseTest.java
@@ -17,101 +17,106 @@
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.acl.AccessControlEntry;
-import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
-import org.apache.kafka.common.resource.PatternType;
-import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
+import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsFilterResult;
+import
org.apache.kafka.common.message.DeleteAclsResponseData.DeleteAclsMatchingAcl;
import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
+import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourceType;
import org.junit.Test;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-import java.util.stream.Collectors;
-
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
import static org.junit.Assert.assertEquals;
public class DeleteAclsResponseTest {
private static final short V0 = 0;
private static final short V1 = 1;
- private static final AclBinding LITERAL_ACL1 = new AclBinding(new
ResourcePattern(ResourceType.TOPIC, "foo", PatternType.LITERAL),
- new AccessControlEntry("User:ANONYMOUS", "127.0.0.1",
AclOperation.READ, AclPermissionType.DENY));
-
- private static final AclBinding LITERAL_ACL2 = new AclBinding(new
ResourcePattern(ResourceType.GROUP, "group", PatternType.LITERAL),
- new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE,
AclPermissionType.ALLOW));
-
- private static final AclBinding PREFIXED_ACL1 = new AclBinding(new
ResourcePattern(ResourceType.GROUP, "prefix", PatternType.PREFIXED),
- new AccessControlEntry("User:*", "127.0.0.1", AclOperation.CREATE,
AclPermissionType.ALLOW));
-
- private static final AclBinding UNKNOWN_ACL = new AclBinding(new
ResourcePattern(ResourceType.UNKNOWN, "group", PatternType.LITERAL),
- new AccessControlEntry("User:*", "127.0.0.1", AclOperation.WRITE,
AclPermissionType.ALLOW));
-
- private static final AclFilterResponse LITERAL_RESPONSE = new
AclFilterResponse(aclDeletions(LITERAL_ACL1, LITERAL_ACL2));
-
- private static final AclFilterResponse PREFIXED_RESPONSE = new
AclFilterResponse(aclDeletions(LITERAL_ACL1, PREFIXED_ACL1));
-
- private static final AclFilterResponse UNKNOWN_RESPONSE = new
AclFilterResponse(aclDeletions(UNKNOWN_ACL));
+ private static final DeleteAclsMatchingAcl LITERAL_ACL1 = new
DeleteAclsMatchingAcl()
+ .setResourceType(ResourceType.TOPIC.code())
+ .setResourceName("foo")
+ .setPatternType(PatternType.LITERAL.code())
+ .setPrincipal("User:ANONYMOUS")
+ .setHost("127.0.0.1")
+ .setOperation(AclOperation.READ.code())
+ .setPermissionType(AclPermissionType.DENY.code());
+
+ private static final DeleteAclsMatchingAcl LITERAL_ACL2 = new
DeleteAclsMatchingAcl()
+ .setResourceType(ResourceType.GROUP.code())
+ .setResourceName("group")
+ .setPatternType(PatternType.LITERAL.code())
+ .setPrincipal("User:*")
+ .setHost("127.0.0.1")
+ .setOperation(AclOperation.WRITE.code())
+ .setPermissionType(AclPermissionType.ALLOW.code());
+
+ private static final DeleteAclsMatchingAcl PREFIXED_ACL1 = new
DeleteAclsMatchingAcl()
+ .setResourceType(ResourceType.GROUP.code())
+ .setResourceName("prefix")
+ .setPatternType(PatternType.PREFIXED.code())
+ .setPrincipal("User:*")
+ .setHost("127.0.0.1")
+ .setOperation(AclOperation.CREATE.code())
+ .setPermissionType(AclPermissionType.ALLOW.code());
+
+ private static final DeleteAclsMatchingAcl UNKNOWN_ACL = new
DeleteAclsMatchingAcl()
+ .setResourceType(ResourceType.UNKNOWN.code())
+ .setResourceName("group")
+ .setPatternType(PatternType.LITERAL.code())
+ .setPrincipal("User:*")
+ .setHost("127.0.0.1")
+ .setOperation(AclOperation.WRITE.code())
+ .setPermissionType(AclPermissionType.ALLOW.code());
+
+ private static final DeleteAclsFilterResult LITERAL_RESPONSE = new
DeleteAclsFilterResult().setMatchingAcls(asList(
+ LITERAL_ACL1, LITERAL_ACL2));
+
+ private static final DeleteAclsFilterResult PREFIXED_RESPONSE = new
DeleteAclsFilterResult().setMatchingAcls(asList(
+ LITERAL_ACL1, PREFIXED_ACL1));
+
+ private static final DeleteAclsFilterResult UNKNOWN_RESPONSE = new
DeleteAclsFilterResult().setMatchingAcls(asList(
+ UNKNOWN_ACL));
@Test(expected = UnsupportedVersionException.class)
public void shouldThrowOnV0IfNotLiteral() {
- new DeleteAclsResponse(10,
aclResponses(PREFIXED_RESPONSE)).toStruct(V0);
+ new DeleteAclsResponse(new DeleteAclsResponseData()
+ .setThrottleTimeMs(10)
+ .setFilterResults(singletonList(PREFIXED_RESPONSE))
+ ).toStruct(V0);
}
@Test(expected = IllegalArgumentException.class)
public void shouldThrowOnIfUnknown() {
- new DeleteAclsResponse(10,
aclResponses(UNKNOWN_RESPONSE)).toStruct(V1);
+ new DeleteAclsResponse(new DeleteAclsResponseData()
+ .setThrottleTimeMs(10)
+ .setFilterResults(singletonList(UNKNOWN_RESPONSE))
+ ).toStruct(V1);
}
@Test
public void shouldRoundTripV0() {
- final DeleteAclsResponse original = new DeleteAclsResponse(10,
aclResponses(LITERAL_RESPONSE));
+ final DeleteAclsResponse original = new DeleteAclsResponse(new
DeleteAclsResponseData()
+ .setThrottleTimeMs(10)
+ .setFilterResults(singletonList(LITERAL_RESPONSE)));
final Struct struct = original.toStruct(V0);
- final DeleteAclsResponse result = new DeleteAclsResponse(struct);
-
- assertResponseEquals(original, result);
+ final DeleteAclsResponse result = new DeleteAclsResponse(struct, V0);
+ assertEquals(original.filterResults(), result.filterResults());
}
@Test
public void shouldRoundTripV1() {
- final DeleteAclsResponse original = new DeleteAclsResponse(100,
aclResponses(LITERAL_RESPONSE, PREFIXED_RESPONSE));
+ final DeleteAclsResponse original = new DeleteAclsResponse(new
DeleteAclsResponseData()
+ .setThrottleTimeMs(10)
+ .setFilterResults(asList(LITERAL_RESPONSE, PREFIXED_RESPONSE)));
final Struct struct = original.toStruct(V1);
- final DeleteAclsResponse result = new DeleteAclsResponse(struct);
-
- assertResponseEquals(original, result);
+ final DeleteAclsResponse result = new DeleteAclsResponse(struct, V1);
+ assertEquals(original.filterResults(), result.filterResults());
}
- private static void assertResponseEquals(final DeleteAclsResponse
original, final DeleteAclsResponse actual) {
- assertEquals("Number of responses wrong", original.responses().size(),
actual.responses().size());
-
- for (int idx = 0; idx != original.responses().size(); ++idx) {
- final List<AclBinding> originalBindings =
original.responses().get(idx).deletions().stream()
- .map(AclDeletionResult::acl)
- .collect(Collectors.toList());
-
- final List<AclBinding> actualBindings =
actual.responses().get(idx).deletions().stream()
- .map(AclDeletionResult::acl)
- .collect(Collectors.toList());
-
- assertEquals(originalBindings, actualBindings);
- }
- }
-
- private static List<AclFilterResponse> aclResponses(final
AclFilterResponse... responses) {
- return Arrays.asList(responses);
- }
-
- private static Collection<AclDeletionResult> aclDeletions(final
AclBinding... acls) {
- return Arrays.stream(acls)
- .map(AclDeletionResult::new)
- .collect(Collectors.toList());
- }
-}
\ No newline at end of file
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
index 8be1658..e9203ae 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsRequestTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.resource.ResourceType;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
public class DescribeAclsRequestTest {
private static final short V0 = 0;
@@ -46,14 +47,14 @@ public class DescribeAclsRequestTest {
private static final AclBindingFilter UNKNOWN_FILTER = new
AclBindingFilter(new ResourcePatternFilter(ResourceType.UNKNOWN, "foo",
PatternType.LITERAL),
new AccessControlEntryFilter("User:ANONYMOUS", "127.0.0.1",
AclOperation.READ, AclPermissionType.DENY));
- @Test(expected = UnsupportedVersionException.class)
+ @Test
public void shouldThrowOnV0IfPrefixed() {
- new DescribeAclsRequest.Builder(PREFIXED_FILTER).build(V0);
+ assertThrows(UnsupportedVersionException.class, () -> new
DescribeAclsRequest.Builder(PREFIXED_FILTER).build(V0));
}
- @Test(expected = IllegalArgumentException.class)
+ @Test
public void shouldThrowIfUnknown() {
- new DescribeAclsRequest.Builder(UNKNOWN_FILTER).build(V0);
+ assertThrows(IllegalArgumentException.class, () -> new
DescribeAclsRequest.Builder(UNKNOWN_FILTER).build(V0));
}
@Test
@@ -117,4 +118,4 @@ public class DescribeAclsRequestTest {
final AclBindingFilter acttualFilter = actual.filter();
assertEquals(originalFilter, acttualFilter);
}
-}
\ No newline at end of file
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
index ec07bc3..77ecd18 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/DescribeAclsResponseTest.java
@@ -99,7 +99,8 @@ public class DescribeAclsResponseTest {
final DescribeAclsResponse result = new DescribeAclsResponse(struct,
V0);
assertResponseEquals(original, result);
- final DescribeAclsResponse result2 =
DescribeAclsResponse.prepareResponse(10, ApiError.NONE,
DescribeAclsResponse.aclBindings(resources));
+ final DescribeAclsResponse result2 = buildResponse(10, Errors.NONE,
DescribeAclsResponse.aclsResources(
+ DescribeAclsResponse.aclBindings(resources)));
assertResponseEquals(original, result2);
}
@@ -112,7 +113,8 @@ public class DescribeAclsResponseTest {
final DescribeAclsResponse result = new DescribeAclsResponse(struct,
V1);
assertResponseEquals(original, result);
- final DescribeAclsResponse result2 =
DescribeAclsResponse.prepareResponse(100, ApiError.NONE,
DescribeAclsResponse.aclBindings(resources));
+ final DescribeAclsResponse result2 = buildResponse(100, Errors.NONE,
DescribeAclsResponse.aclsResources(
+ DescribeAclsResponse.aclBindings(resources)));
assertResponseEquals(original, result2);
}
@@ -156,4 +158,4 @@ public class DescribeAclsResponseTest {
.setOperation(operation.code())
.setPermissionType(permission.code());
}
-}
\ No newline at end of file
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 7943101..2304fcf 100644
---
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -43,21 +43,25 @@ import
org.apache.kafka.common.message.ControlledShutdownRequestData;
import org.apache.kafka.common.message.ControlledShutdownResponseData;
import
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartition;
import
org.apache.kafka.common.message.ControlledShutdownResponseData.RemainingPartitionCollection;
+import org.apache.kafka.common.message.CreateAclsRequestData;
+import org.apache.kafka.common.message.CreateAclsResponseData;
import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
import
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData;
-import
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
+import
org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData;
import
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import
org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig;
import org.apache.kafka.common.message.CreateTopicsResponseData;
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
import
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.DeleteAclsRequestData;
+import org.apache.kafka.common.message.DeleteAclsResponseData;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import
org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
@@ -133,11 +137,7 @@ import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation;
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest.Builder;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
-import org.apache.kafka.common.requests.DeleteAclsResponse.AclFilterResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
@@ -1780,44 +1780,72 @@ public class RequestResponseTest {
}
private CreateAclsRequest createCreateAclsRequest() {
- List<AclCreation> creations = new ArrayList<>();
- creations.add(new AclCreation(new AclBinding(
+ List<CreateAclsRequestData.AclCreation> creations = new ArrayList<>();
+ creations.add(CreateAclsRequest.aclCreation(new AclBinding(
new ResourcePattern(ResourceType.TOPIC, "mytopic",
PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "127.0.0.1",
AclOperation.READ, AclPermissionType.ALLOW))));
- creations.add(new AclCreation(new AclBinding(
+ creations.add(CreateAclsRequest.aclCreation(new AclBinding(
new ResourcePattern(ResourceType.GROUP, "mygroup",
PatternType.LITERAL),
new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE,
AclPermissionType.DENY))));
- return new CreateAclsRequest.Builder(creations).build();
+ CreateAclsRequestData data = new
CreateAclsRequestData().setCreations(creations);
+ return new CreateAclsRequest.Builder(data).build();
}
private CreateAclsResponse createCreateAclsResponse() {
- return new CreateAclsResponse(0, Arrays.asList(new
AclCreationResponse(ApiError.NONE),
- new AclCreationResponse(new ApiError(Errors.INVALID_REQUEST, "Foo
bar"))));
+ return new CreateAclsResponse(new
CreateAclsResponseData().setResults(asList(
+ new CreateAclsResponseData.AclCreationResult(),
+ new CreateAclsResponseData.AclCreationResult()
+ .setErrorCode(Errors.INVALID_REQUEST.code())
+ .setErrorMessage("Foo bar"))));
}
private DeleteAclsRequest createDeleteAclsRequest() {
- List<AclBindingFilter> filters = new ArrayList<>();
- filters.add(new AclBindingFilter(
- new ResourcePatternFilter(ResourceType.ANY, null,
PatternType.LITERAL),
- new AccessControlEntryFilter("User:ANONYMOUS", null,
AclOperation.ANY, AclPermissionType.ANY)));
- filters.add(new AclBindingFilter(
- new ResourcePatternFilter(ResourceType.ANY, null,
PatternType.LITERAL),
- new AccessControlEntryFilter("User:bob", null, AclOperation.ANY,
AclPermissionType.ANY)));
- return new DeleteAclsRequest.Builder(filters).build();
+ DeleteAclsRequestData data = new
DeleteAclsRequestData().setFilters(asList(
+ new DeleteAclsRequestData.DeleteAclsFilter()
+ .setResourceTypeFilter(ResourceType.ANY.code())
+ .setResourceNameFilter(null)
+ .setPatternTypeFilter(PatternType.LITERAL.code())
+ .setPrincipalFilter("User:ANONYMOUS")
+ .setHostFilter(null)
+ .setOperation(AclOperation.ANY.code())
+ .setPermissionType(AclPermissionType.ANY.code()),
+ new DeleteAclsRequestData.DeleteAclsFilter()
+ .setResourceTypeFilter(ResourceType.ANY.code())
+ .setResourceNameFilter(null)
+ .setPatternTypeFilter(PatternType.LITERAL.code())
+ .setPrincipalFilter("User:bob")
+ .setHostFilter(null)
+ .setOperation(AclOperation.ANY.code())
+ .setPermissionType(AclPermissionType.ANY.code())
+ ));
+ return new DeleteAclsRequest.Builder(data).build();
}
private DeleteAclsResponse createDeleteAclsResponse() {
- List<AclFilterResponse> responses = new ArrayList<>();
- responses.add(new AclFilterResponse(Utils.mkSet(
- new AclDeletionResult(new AclBinding(
- new ResourcePattern(ResourceType.TOPIC, "mytopic3",
PatternType.LITERAL),
- new AccessControlEntry("User:ANONYMOUS", "*",
AclOperation.DESCRIBE, AclPermissionType.ALLOW))),
- new AclDeletionResult(new AclBinding(
- new ResourcePattern(ResourceType.TOPIC, "mytopic4",
PatternType.LITERAL),
- new AccessControlEntry("User:ANONYMOUS", "*",
AclOperation.DESCRIBE, AclPermissionType.DENY))))));
- responses.add(new AclFilterResponse(new
ApiError(Errors.SECURITY_DISABLED, "No security"),
- Collections.<AclDeletionResult>emptySet()));
- return new DeleteAclsResponse(0, responses);
+ List<DeleteAclsResponseData.DeleteAclsFilterResult> filterResults =
new ArrayList<>();
+ filterResults.add(new
DeleteAclsResponseData.DeleteAclsFilterResult().setMatchingAcls(asList(
+ new DeleteAclsResponseData.DeleteAclsMatchingAcl()
+ .setResourceType(ResourceType.TOPIC.code())
+ .setResourceName("mytopic3")
+ .setPatternType(PatternType.LITERAL.code())
+ .setPrincipal("User:ANONYMOUS")
+ .setHost("*")
+ .setOperation(AclOperation.DESCRIBE.code())
+ .setPermissionType(AclPermissionType.ALLOW.code()),
+ new DeleteAclsResponseData.DeleteAclsMatchingAcl()
+ .setResourceType(ResourceType.TOPIC.code())
+ .setResourceName("mytopic4")
+ .setPatternType(PatternType.LITERAL.code())
+ .setPrincipal("User:ANONYMOUS")
+ .setHost("*")
+ .setOperation(AclOperation.DESCRIBE.code())
+ .setPermissionType(AclPermissionType.DENY.code()))));
+ filterResults.add(new DeleteAclsResponseData.DeleteAclsFilterResult()
+ .setErrorCode(Errors.SECURITY_DISABLED.code())
+ .setErrorMessage("No security"));
+ return new DeleteAclsResponse(new DeleteAclsResponseData()
+ .setThrottleTimeMs(0)
+ .setFilterResults(filterResults));
}
private DescribeConfigsRequest createDescribeConfigsRequest(int version) {
diff --git a/core/src/main/scala/kafka/server/DelayedFuture.scala
b/core/src/main/scala/kafka/server/DelayedFuture.scala
index 823a851..cf522ab 100644
--- a/core/src/main/scala/kafka/server/DelayedFuture.scala
+++ b/core/src/main/scala/kafka/server/DelayedFuture.scala
@@ -30,7 +30,7 @@ import scala.collection.Seq
* in a DelayedFuturePurgatory purgatory. This is used for ACL updates using
async Authorizers.
*/
class DelayedFuture[T](timeoutMs: Long,
- futures: List[CompletableFuture[T]],
+ futures: Seq[CompletableFuture[T]],
responseCallback: () => Unit)
extends DelayedOperation(timeoutMs) {
@@ -79,7 +79,7 @@ class DelayedFuturePurgatory(purgatoryName: String, brokerId:
Int) {
val purgatoryKey = new Object
def tryCompleteElseWatch[T](timeoutMs: Long,
- futures: List[CompletableFuture[T]],
+ futures: Seq[CompletableFuture[T]],
responseCallback: () => Unit): DelayedFuture[T]
= {
val delayedFuture = new DelayedFuture[T](timeoutMs, futures,
responseCallback)
val done = purgatory.tryCompleteElseWatch(delayedFuture, Seq(purgatoryKey))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9e817ea..e3adb04 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -49,11 +49,12 @@ import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
TRANSACTION_STATE_TOPIC_NAME, isInternal}
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import
org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData,
CreatePartitionsResponseData, CreateTopicsResponseData,
DeleteGroupsResponseData, DeleteTopicsResponseData, DescribeGroupsResponseData,
EndTxnResponseData, ExpireDelegationTokenResponseData,
FindCoordinatorResponseData, HeartbeatResponseData, InitProducerIdResponseData,
JoinGroupResponseData, LeaveGroupResponseData, ListGroupsResponseData,
ListPartitionReassignmentsResponseData, OffsetCommitRequestData, Offs [...]
+import
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
+import
org.apache.kafka.common.message.{AlterPartitionReassignmentsResponseData,
CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData,
DeleteAclsResponseData, DeleteGroupsResponseData, DeleteTopicsResponseData,
DescribeAclsResponseData, DescribeGroupsResponseData, EndTxnResponseData,
ExpireDelegationTokenResponseData, FindCoordinatorResponseData,
HeartbeatResponseData, InitProducerIdResponseData, JoinGroupResponseData,
LeaveGroupResponseData, ListGroupsResponseD [...]
import
org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult,
CreatableTopicResultCollection}
import
org.apache.kafka.common.message.DeleteGroupsResponseData.{DeletableGroupResult,
DeletableGroupResultCollection}
import
org.apache.kafka.common.message.AlterPartitionReassignmentsResponseData.{ReassignablePartitionResponse,
ReassignableTopicResponse}
-import
org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
+import org.apache.kafka.common.message.CreateAclsResponseData.AclCreationResult
import
org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult,
DeletableTopicResultCollection}
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult
import
org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult
@@ -64,8 +65,6 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record._
import org.apache.kafka.common.replica.ClientMetadata
import org.apache.kafka.common.replica.ClientMetadata.DefaultClientMetadata
-import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
-import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult,
AclFilterResponse}
import org.apache.kafka.common.requests.DescribeLogDirsResponse.LogDirInfo
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -81,6 +80,7 @@ import org.apache.kafka.server.authorizer._
import scala.compat.java8.OptionConverters._
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.util.{Failure, Success, Try}
@@ -2210,14 +2210,18 @@ class KafkaApis(val requestChannel: RequestChannel,
authorizer match {
case None =>
sendResponseMaybeThrottle(request, requestThrottleMs =>
- DescribeAclsResponse.prepareResponse(requestThrottleMs,
- new ApiError(Errors.SECURITY_DISABLED, "No Authorizer is
configured on the broker"), util.Collections.emptySet()))
+ new DescribeAclsResponse(new DescribeAclsResponseData()
+ .setErrorCode(Errors.SECURITY_DISABLED.code)
+ .setErrorMessage("No Authorizer is configured on the broker")
+ .setThrottleTimeMs(requestThrottleMs)))
case Some(auth) =>
val filter = describeAclsRequest.filter
val returnedAcls = new util.HashSet[AclBinding]()
auth.acls(filter).forEach(returnedAcls.add)
sendResponseMaybeThrottle(request, requestThrottleMs =>
- DescribeAclsResponse.prepareResponse(requestThrottleMs,
ApiError.NONE, returnedAcls))
+ new DescribeAclsResponse(new DescribeAclsResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setResources(DescribeAclsResponse.aclsResources(returnedAcls))))
}
}
@@ -2226,40 +2230,46 @@ class KafkaApis(val requestChannel: RequestChannel,
val createAclsRequest = request.body[CreateAclsRequest]
authorizer match {
- case None =>
- sendResponseMaybeThrottle(request, requestThrottleMs =>
- createAclsRequest.getErrorResponse(requestThrottleMs,
- new SecurityDisabledException("No Authorizer is configured on the
broker.")))
+ case None => sendResponseMaybeThrottle(request, requestThrottleMs =>
+ createAclsRequest.getErrorResponse(requestThrottleMs,
+ new SecurityDisabledException("No Authorizer is configured on the
broker.")))
case Some(auth) =>
-
+ val allBindings =
createAclsRequest.aclCreations.asScala.map(CreateAclsRequest.aclBinding)
val errorResults = mutable.Map[AclBinding, AclCreateResult]()
- val aclBindings = createAclsRequest.aclCreations.asScala.map(_.acl)
- val validBindings = aclBindings
- .filter { acl =>
- val resource = acl.pattern
- val throwable = if (resource.resourceType == ResourceType.CLUSTER
&& !AuthorizerUtils.isClusterResource(resource.name))
- new InvalidRequestException("The only valid name for the
CLUSTER resource is " + CLUSTER_NAME)
- else if (resource.name.isEmpty)
- new InvalidRequestException("Invalid empty resource name")
- else
- null
- if (throwable != null) {
- debug(s"Failed to add acl $acl to $resource", throwable)
- errorResults(acl) = new AclCreateResult(throwable)
- false
- } else
- true
- }
+ val validBindings = new ArrayBuffer[AclBinding]
+ allBindings.foreach { acl =>
+ val resource = acl.pattern
+ val throwable = if (resource.resourceType == ResourceType.CLUSTER &&
!AuthorizerUtils.isClusterResource(resource.name))
+ new InvalidRequestException("The only valid name for the CLUSTER
resource is " + CLUSTER_NAME)
+ else if (resource.name.isEmpty)
+ new InvalidRequestException("Invalid empty resource name")
+ else
+ null
+ if (throwable != null) {
+ debug(s"Failed to add acl $acl to $resource", throwable)
+ errorResults(acl) = new AclCreateResult(throwable)
+ } else
+ validBindings += acl
+ }
+
+ val createResults = auth.createAcls(request.context,
validBindings.asJava).asScala.map(_.toCompletableFuture)
- val createResults = auth.createAcls(request.context,
validBindings.asJava)
- .asScala.map(_.toCompletableFuture).toList
def sendResponseCallback(): Unit = {
- val aclCreationResults = aclBindings.map { acl =>
+ val aclCreationResults = allBindings.map { acl =>
val result = errorResults.getOrElse(acl,
createResults(validBindings.indexOf(acl)).get)
- new
AclCreationResponse(result.exception.asScala.map(ApiError.fromThrowable).getOrElse(ApiError.NONE))
+ val creationResult = new AclCreationResult()
+ result.exception.asScala.foreach { throwable =>
+ val apiError = ApiError.fromThrowable(throwable)
+ creationResult
+ .setErrorCode(apiError.error.code)
+ .setErrorMessage(apiError.message)
+ }
+ creationResult
}
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new CreateAclsResponse(requestThrottleMs,
aclCreationResults.asJava))
+ new CreateAclsResponse(new CreateAclsResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setResults(aclCreationResults.asJava)))
}
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs,
createResults, sendResponseCallback)
@@ -2279,19 +2289,12 @@ class KafkaApis(val requestChannel: RequestChannel,
val deleteResults = auth.deleteAcls(request.context,
deleteAclsRequest.filters)
.asScala.map(_.toCompletableFuture).toList
- def toErrorCode(exception: Optional[ApiException]): ApiError = {
-
exception.asScala.map(ApiError.fromThrowable).getOrElse(ApiError.NONE)
- }
-
def sendResponseCallback(): Unit = {
- val filterResponses = deleteResults.map(_.get).map { result =>
- val deletions =
result.aclBindingDeleteResults().asScala.toList.map { deletionResult =>
- new AclDeletionResult(toErrorCode(deletionResult.exception),
deletionResult.aclBinding)
- }.asJava
- new AclFilterResponse(toErrorCode(result.exception), deletions)
- }.asJava
+ val filterResults =
deleteResults.map(_.get).map(DeleteAclsResponse.filterResult).asJava
sendResponseMaybeThrottle(request, requestThrottleMs =>
- new DeleteAclsResponse(requestThrottleMs, filterResponses))
+ new DeleteAclsResponse(new DeleteAclsResponseData()
+ .setThrottleTimeMs(requestThrottleMs)
+ .setFilterResults(filterResults)))
}
alterAclsPurgatory.tryCompleteElseWatch(config.connectionsMaxIdleMs,
deleteResults, sendResponseCallback)
}
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a6d0887..cf570ca 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -29,8 +29,8 @@ import org.apache.kafka.clients.consumer._
import
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.acl.AclOperation._
-import org.apache.kafka.common.acl.AclPermissionType.{ALLOW, DENY}
-import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation}
+import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation,
AclPermissionType}
import org.apache.kafka.common.config.{ConfigResource, LogLevelConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
@@ -41,15 +41,14 @@ import
org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProt
import
org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
UpdateMetadataEndpoint, UpdateMetadataPartitionState}
-import
org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData,
ControlledShutdownRequestData, CreatePartitionsRequestData,
CreateTopicsRequestData, DeleteGroupsRequestData, DeleteTopicsRequestData,
DescribeGroupsRequestData, FindCoordinatorRequestData, HeartbeatRequestData,
IncrementalAlterConfigsRequestData, JoinGroupRequestData,
ListPartitionReassignmentsRequestData, OffsetCommitRequestData,
SyncGroupRequestData}
+import
org.apache.kafka.common.message.{AlterPartitionReassignmentsRequestData,
ControlledShutdownRequestData, CreateAclsRequestData,
CreatePartitionsRequestData, CreateTopicsRequestData, DeleteAclsRequestData,
DeleteGroupsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData,
FindCoordinatorRequestData, HeartbeatRequestData,
IncrementalAlterConfigsRequestData, JoinGroupRequestData,
ListPartitionReassignmentsRequestData, OffsetCommitRequestData,
SyncGroupRequestData}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, MemoryRecords,
RecordBatch, Records, SimpleRecord}
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.PatternType.LITERAL
import org.apache.kafka.common.resource.ResourceType._
-import org.apache.kafka.common.resource.{Resource, ResourcePattern,
ResourcePatternFilter, ResourceType}
+import org.apache.kafka.common.resource.{PatternType, Resource,
ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.{ElectionType, IsolationLevel, KafkaException,
Node, TopicPartition, requests}
import org.apache.kafka.test.{TestUtils => JTestUtils}
@@ -166,9 +165,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) =>
resp.error),
ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) =>
resp.errors.get(tp)),
- ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) =>
resp.aclCreationResponses.asScala.head.error.error),
+ ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) =>
Errors.forCode(resp.results.asScala.head.errorCode)),
ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) =>
resp.error.error),
- ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) =>
resp.responses.asScala.head.error.error),
+ ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) =>
Errors.forCode(resp.filterResults.asScala.head.errorCode)),
ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) =>
resp.responses.get(tp)),
ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) =>
if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error
else Errors.CLUSTER_AUTHORIZATION_FAILED),
@@ -470,15 +469,29 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
private def describeAclsRequest = new
DescribeAclsRequest.Builder(AclBindingFilter.ANY).build()
- private def createAclsRequest = new CreateAclsRequest.Builder(
- Collections.singletonList(new AclCreation(new AclBinding(
- new ResourcePattern(ResourceType.TOPIC, "mytopic", LITERAL),
- new AccessControlEntry(userPrincipalStr, "*", AclOperation.WRITE,
DENY))))).build()
+ private def createAclsRequest: CreateAclsRequest = new
CreateAclsRequest.Builder(
+ new CreateAclsRequestData().setCreations(Collections.singletonList(
+ new CreateAclsRequestData.AclCreation()
+ .setResourceType(ResourceType.TOPIC.code)
+ .setResourceName("mytopic")
+ .setResourcePatternType(PatternType.LITERAL.code)
+ .setPrincipal(userPrincipalStr)
+ .setHost("*")
+ .setOperation(AclOperation.WRITE.code)
+ .setPermissionType(AclPermissionType.DENY.code)))
+ ).build()
- private def deleteAclsRequest = new DeleteAclsRequest.Builder(
- Collections.singletonList(new AclBindingFilter(
- new ResourcePatternFilter(ResourceType.TOPIC, null, LITERAL),
- new AccessControlEntryFilter(userPrincipalStr, "*", AclOperation.ANY,
DENY)))).build()
+ private def deleteAclsRequest: DeleteAclsRequest = new
DeleteAclsRequest.Builder(
+ new DeleteAclsRequestData().setFilters(Collections.singletonList(
+ new DeleteAclsRequestData.DeleteAclsFilter()
+ .setResourceTypeFilter(ResourceType.TOPIC.code)
+ .setResourceNameFilter(null)
+ .setPatternTypeFilter(PatternType.LITERAL.code)
+ .setPrincipalFilter(userPrincipalStr)
+ .setHostFilter("*")
+ .setOperation(AclOperation.ANY.code)
+ .setPermissionType(AclPermissionType.DENY.code)))
+ ).build()
private def alterReplicaLogDirsRequest = new
AlterReplicaLogDirsRequest.Builder(Collections.singletonMap(tp, logDir)).build()
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 8bae11e..8bc13ec 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -35,9 +35,8 @@ import org.apache.kafka.common.metrics.{KafkaMetric, Quota,
Sensor}
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
import org.apache.kafka.common.requests._
-import org.apache.kafka.common.resource.{PatternType, ResourcePattern,
ResourcePatternFilter, ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{PatternType, ResourceType =>
AdminResourceType}
import org.apache.kafka.common.security.auth.{AuthenticationContext,
KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils}
import org.apache.kafka.common.{ElectionType, IsolationLevel, Node,
TopicPartition}
@@ -414,15 +413,25 @@ class RequestQuotaTest extends BaseRequestTest {
new DescribeAclsRequest.Builder(AclBindingFilter.ANY)
case ApiKeys.CREATE_ACLS =>
- new CreateAclsRequest.Builder(Collections.singletonList(new
AclCreation(new AclBinding(
- new ResourcePattern(AdminResourceType.TOPIC, "mytopic",
PatternType.LITERAL),
- new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE,
AclPermissionType.DENY)))))
-
+ new CreateAclsRequest.Builder(new
CreateAclsRequestData().setCreations(Collections.singletonList(
+ new CreateAclsRequestData.AclCreation()
+ .setResourceType(AdminResourceType.TOPIC.code)
+ .setResourceName("mytopic")
+ .setResourcePatternType(PatternType.LITERAL.code)
+ .setPrincipal("User:ANONYMOUS")
+ .setHost("*")
+ .setOperation(AclOperation.WRITE.code)
+ .setPermissionType(AclPermissionType.DENY.code))))
case ApiKeys.DELETE_ACLS =>
- new DeleteAclsRequest.Builder(Collections.singletonList(new
AclBindingFilter(
- new ResourcePatternFilter(AdminResourceType.TOPIC, null,
PatternType.LITERAL),
- new AccessControlEntryFilter("User:ANONYMOUS", "*",
AclOperation.ANY, AclPermissionType.DENY))))
-
+ new DeleteAclsRequest.Builder(new
DeleteAclsRequestData().setFilters(Collections.singletonList(
+ new DeleteAclsRequestData.DeleteAclsFilter()
+ .setResourceTypeFilter(AdminResourceType.TOPIC.code)
+ .setResourceNameFilter(null)
+ .setPatternTypeFilter(PatternType.LITERAL.code)
+ .setPrincipalFilter("User:ANONYMOUS")
+ .setHostFilter("*")
+ .setOperation(AclOperation.ANY.code)
+ .setPermissionType(AclPermissionType.DENY.code))))
case ApiKeys.DESCRIBE_CONFIGS =>
new DescribeConfigsRequest.Builder(Collections.singleton(new
ConfigResource(ConfigResource.Type.TOPIC, tp.topic)))