This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new dd71437 MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (#9030) dd71437 is described below commit dd71437de7675d92ad3e4ed01ac3ee11bf5da99d Author: Rajini Sivaram <rajinisiva...@googlemail.com> AuthorDate: Fri Jul 17 08:55:53 2020 +0100 MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server (#9030) Reviewers: Manikumar Reddy <manikumar.re...@gmail.com>, David Jacot <dja...@confluent.io>, Ron Dagostino <rdagost...@confluent.io> --- .../common/requests/AlterClientQuotasResponse.java | 13 +- .../org/apache/kafka/common/requests/ApiError.java | 5 +- .../requests/DescribeClientQuotasResponse.java | 11 +- .../common/requests/SaslAuthenticateRequest.java | 5 +- .../kafka/common/requests/RequestResponseTest.java | 198 +++++++++++++-------- .../src/main/scala/kafka/admin/ConfigCommand.scala | 9 + .../src/main/scala/kafka/server/AdminManager.scala | 12 +- .../main/scala/kafka/server/DynamicConfig.scala | 13 +- .../scala/unit/kafka/admin/ConfigCommandTest.scala | 21 +++ .../kafka/server/ClientQuotasRequestTest.scala | 30 +++- 10 files changed, 220 insertions(+), 97 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java index a3b01bf..3e95671 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterClientQuotasResponse.java @@ -20,10 +20,12 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.AlterClientQuotasResponseData; import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntityData; import org.apache.kafka.common.message.AlterClientQuotasResponseData.EntryData; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.quota.ClientQuotaEntity; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; @@ -50,14 +52,13 @@ public class AlterClientQuotasResponse extends AbstractResponse { } public AlterClientQuotasResponse(Collection<ClientQuotaEntity> entities, int throttleTimeMs, Throwable e) { - short errorCode = Errors.forException(e).code(); - String errorMessage = e.getMessage(); + ApiError apiError = ApiError.fromThrowable(e); List<EntryData> entries = new ArrayList<>(entities.size()); for (ClientQuotaEntity entity : entities) { entries.add(new EntryData() - .setErrorCode(errorCode) - .setErrorMessage(errorMessage) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message()) .setEntity(toEntityData(entity))); } @@ -120,4 +121,8 @@ public class AlterClientQuotasResponse extends AbstractResponse { } return entityData; } + + public static AlterClientQuotasResponse parse(ByteBuffer buffer, short version) { + return new AlterClientQuotasResponse(ApiKeys.ALTER_CLIENT_QUOTAS.parseResponse(version, buffer), version); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java index 6cb09f0..5c9ca7b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java @@ -38,9 +38,10 @@ public class ApiError { private final String message; public static ApiError fromThrowable(Throwable t) { - // Avoid populating the error message if it's a generic one + // Avoid populating the error message if it's a generic one. Also don't populate error + // message for UNKNOWN_SERVER_ERROR to ensure we don't leak sensitive information. Errors error = Errors.forException(t); - String message = error.message().equals(t.getMessage()) ? null : t.getMessage(); + String message = error == Errors.UNKNOWN_SERVER_ERROR || error.message().equals(t.getMessage()) ? null : t.getMessage(); return new ApiError(error, message); } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java index cb54b01..bda3673 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java @@ -21,10 +21,12 @@ import org.apache.kafka.common.message.DescribeClientQuotasResponseData; import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData; import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData; import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData; +import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.quota.ClientQuotaEntity; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -66,10 +68,11 @@ public class DescribeClientQuotasResponse extends AbstractResponse { } public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); this.data = new DescribeClientQuotasResponseData() .setThrottleTimeMs(throttleTimeMs) - .setErrorCode(Errors.forException(e).code()) - .setErrorMessage(e.getMessage()) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message()) .setEntries(null); } @@ -115,4 +118,8 @@ public class DescribeClientQuotasResponse extends AbstractResponse { protected Struct toStruct(short version) { return data.toStruct(version); } + + public static DescribeClientQuotasResponse parse(ByteBuffer buffer, short version) { + return new DescribeClientQuotasResponse(ApiKeys.DESCRIBE_CLIENT_QUOTAS.parseResponse(version, buffer), version); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java index 0ed5125..5fdfbec 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java @@ -77,9 +77,10 @@ public class SaslAuthenticateRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ApiError apiError = ApiError.fromThrowable(e); SaslAuthenticateResponseData response = new SaslAuthenticateResponseData() - .setErrorCode(ApiError.fromThrowable(e).error().code()) - .setErrorMessage(e.getMessage()); + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message()); return new SaslAuthenticateResponse(response); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index c508dbc..50775b6 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -139,6 +139,9 @@ import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.quota.ClientQuotaAlteration; +import org.apache.kafka.common.quota.ClientQuotaEntity; +import org.apache.kafka.common.quota.ClientQuotaFilter; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.RecordBatch; @@ -190,18 +193,21 @@ import static org.junit.Assert.fail; public class RequestResponseTest { + // Exception includes a message that we verify is not included in error responses + private final UnknownServerException unknownServerException = new UnknownServerException("secret"); + @Test public void testSerialization() throws Exception { checkRequest(createFindCoordinatorRequest(0), true); checkRequest(createFindCoordinatorRequest(1), true); - checkErrorResponse(createFindCoordinatorRequest(0), new UnknownServerException(), true); - checkErrorResponse(createFindCoordinatorRequest(1), new UnknownServerException(), true); + checkErrorResponse(createFindCoordinatorRequest(0), unknownServerException, true); + checkErrorResponse(createFindCoordinatorRequest(1), unknownServerException, true); checkResponse(createFindCoordinatorResponse(), 0, true); checkResponse(createFindCoordinatorResponse(), 1, true); checkRequest(createControlledShutdownRequest(), true); checkResponse(createControlledShutdownResponse(), 1, true); - checkErrorResponse(createControlledShutdownRequest(), new UnknownServerException(), true); - checkErrorResponse(createControlledShutdownRequest(0), new UnknownServerException(), true); + checkErrorResponse(createControlledShutdownRequest(), unknownServerException, true); + checkErrorResponse(createControlledShutdownRequest(0), unknownServerException, true); checkRequest(createFetchRequest(4), true); checkResponse(createFetchResponse(), 4, true); List<TopicPartition> toForgetTopics = new ArrayList<>(); @@ -211,53 +217,53 @@ public class RequestResponseTest { checkRequest(createFetchRequest(7, new FetchMetadata(123, 456), toForgetTopics), true); checkResponse(createFetchResponse(123), 7, true); checkResponse(createFetchResponse(Errors.FETCH_SESSION_ID_NOT_FOUND, 123), 7, true); - checkErrorResponse(createFetchRequest(4), new UnknownServerException(), true); + checkErrorResponse(createFetchRequest(4), unknownServerException, true); checkRequest(createHeartBeatRequest(), true); - checkErrorResponse(createHeartBeatRequest(), new UnknownServerException(), true); + checkErrorResponse(createHeartBeatRequest(), unknownServerException, true); checkResponse(createHeartBeatResponse(), 0, true); for (int v = ApiKeys.JOIN_GROUP.oldestVersion(); v <= ApiKeys.JOIN_GROUP.latestVersion(); v++) { checkRequest(createJoinGroupRequest(v), true); - checkErrorResponse(createJoinGroupRequest(v), new UnknownServerException(), true); + checkErrorResponse(createJoinGroupRequest(v), unknownServerException, true); checkResponse(createJoinGroupResponse(v), v, true); } for (int v = ApiKeys.SYNC_GROUP.oldestVersion(); v <= ApiKeys.SYNC_GROUP.latestVersion(); v++) { checkRequest(createSyncGroupRequest(v), true); - checkErrorResponse(createSyncGroupRequest(v), new UnknownServerException(), true); + checkErrorResponse(createSyncGroupRequest(v), unknownServerException, true); checkResponse(createSyncGroupResponse(v), v, true); } checkRequest(createLeaveGroupRequest(), true); - checkErrorResponse(createLeaveGroupRequest(), new UnknownServerException(), true); + checkErrorResponse(createLeaveGroupRequest(), unknownServerException, true); checkResponse(createLeaveGroupResponse(), 0, true); for (short v = ApiKeys.LIST_GROUPS.oldestVersion(); v <= ApiKeys.LIST_GROUPS.latestVersion(); v++) { checkRequest(createListGroupsRequest(v), false); - checkErrorResponse(createListGroupsRequest(v), new UnknownServerException(), true); + checkErrorResponse(createListGroupsRequest(v), unknownServerException, true); checkResponse(createListGroupsResponse(v), v, true); } checkRequest(createDescribeGroupRequest(), true); - checkErrorResponse(createDescribeGroupRequest(), new UnknownServerException(), true); + checkErrorResponse(createDescribeGroupRequest(), unknownServerException, true); checkResponse(createDescribeGroupResponse(), 0, true); checkRequest(createDeleteGroupsRequest(), true); - checkErrorResponse(createDeleteGroupsRequest(), new UnknownServerException(), true); + checkErrorResponse(createDeleteGroupsRequest(), unknownServerException, true); checkResponse(createDeleteGroupsResponse(), 0, true); for (int i = 0; i < ApiKeys.LIST_OFFSETS.latestVersion(); i++) { checkRequest(createListOffsetRequest(i), true); - checkErrorResponse(createListOffsetRequest(i), new UnknownServerException(), true); + checkErrorResponse(createListOffsetRequest(i), unknownServerException, true); checkResponse(createListOffsetResponse(i), i, true); } checkRequest(MetadataRequest.Builder.allTopics().build((short) 2), true); checkRequest(createMetadataRequest(1, Collections.singletonList("topic1")), true); - checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true); + checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true); checkResponse(createMetadataResponse(), 2, true); - checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), new UnknownServerException(), true); + checkErrorResponse(createMetadataRequest(2, Collections.singletonList("topic1")), unknownServerException, true); checkResponse(createMetadataResponse(), 3, true); - checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), new UnknownServerException(), true); + checkErrorResponse(createMetadataRequest(3, Collections.singletonList("topic1")), unknownServerException, true); checkResponse(createMetadataResponse(), 4, true); - checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), new UnknownServerException(), true); + checkErrorResponse(createMetadataRequest(4, Collections.singletonList("topic1")), unknownServerException, true); checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true); checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true); checkErrorResponse(createOffsetFetchRequestForAllPartition("group1", false), new NotCoordinatorException("Not Coordinator"), true); @@ -268,42 +274,42 @@ public class RequestResponseTest { checkRequest(createOffsetFetchRequest(7, true), true); checkRequest(createOffsetFetchRequestForAllPartition("group1", false), true); checkRequest(createOffsetFetchRequestForAllPartition("group1", true), true); - checkErrorResponse(createOffsetFetchRequest(0, false), new UnknownServerException(), true); - checkErrorResponse(createOffsetFetchRequest(1, false), new UnknownServerException(), true); - checkErrorResponse(createOffsetFetchRequest(2, false), new UnknownServerException(), true); - checkErrorResponse(createOffsetFetchRequest(7, true), new UnknownServerException(), true); + checkErrorResponse(createOffsetFetchRequest(0, false), unknownServerException, true); + checkErrorResponse(createOffsetFetchRequest(1, false), unknownServerException, true); + checkErrorResponse(createOffsetFetchRequest(2, false), unknownServerException, true); + checkErrorResponse(createOffsetFetchRequest(7, true), unknownServerException, true); checkResponse(createOffsetFetchResponse(), 0, true); checkRequest(createProduceRequest(2), true); - checkErrorResponse(createProduceRequest(2), new UnknownServerException(), true); + checkErrorResponse(createProduceRequest(2), unknownServerException, true); checkRequest(createProduceRequest(3), true); - checkErrorResponse(createProduceRequest(3), new UnknownServerException(), true); + checkErrorResponse(createProduceRequest(3), unknownServerException, true); checkResponse(createProduceResponse(), 2, true); checkResponse(createProduceResponseWithErrorMessage(), 8, true); for (int v = ApiKeys.STOP_REPLICA.oldestVersion(); v <= ApiKeys.STOP_REPLICA.latestVersion(); v++) { checkRequest(createStopReplicaRequest(v, true), true); checkRequest(createStopReplicaRequest(v, false), true); - checkErrorResponse(createStopReplicaRequest(v, true), new UnknownServerException(), true); - checkErrorResponse(createStopReplicaRequest(v, false), new UnknownServerException(), true); + checkErrorResponse(createStopReplicaRequest(v, true), unknownServerException, true); + checkErrorResponse(createStopReplicaRequest(v, false), unknownServerException, true); checkResponse(createStopReplicaResponse(), v, true); } checkRequest(createLeaderAndIsrRequest(0), true); - checkErrorResponse(createLeaderAndIsrRequest(0), new UnknownServerException(), false); + checkErrorResponse(createLeaderAndIsrRequest(0), unknownServerException, false); checkRequest(createLeaderAndIsrRequest(1), true); - checkErrorResponse(createLeaderAndIsrRequest(1), new UnknownServerException(), false); + checkErrorResponse(createLeaderAndIsrRequest(1), unknownServerException, false); checkRequest(createLeaderAndIsrRequest(2), true); - checkErrorResponse(createLeaderAndIsrRequest(2), new UnknownServerException(), false); + checkErrorResponse(createLeaderAndIsrRequest(2), unknownServerException, false); checkResponse(createLeaderAndIsrResponse(), 0, true); checkRequest(createSaslHandshakeRequest(), true); - checkErrorResponse(createSaslHandshakeRequest(), new UnknownServerException(), true); + checkErrorResponse(createSaslHandshakeRequest(), unknownServerException, true); checkResponse(createSaslHandshakeResponse(), 0, true); checkRequest(createSaslAuthenticateRequest(), true); - checkErrorResponse(createSaslAuthenticateRequest(), new UnknownServerException(), true); + checkErrorResponse(createSaslAuthenticateRequest(), unknownServerException, true); checkResponse(createSaslAuthenticateResponse(), 0, true); checkResponse(createSaslAuthenticateResponse(), 1, true); checkRequest(createApiVersionRequest(), true); - checkErrorResponse(createApiVersionRequest(), new UnknownServerException(), true); + checkErrorResponse(createApiVersionRequest(), unknownServerException, true); checkErrorResponse(createApiVersionRequest(), new UnsupportedVersionException("Not Supported"), true); checkResponse(createApiVersionResponse(), 0, true); checkResponse(createApiVersionResponse(), 1, true); @@ -315,107 +321,107 @@ public class RequestResponseTest { checkResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE, 3, true); checkRequest(createCreateTopicRequest(0), true); - checkErrorResponse(createCreateTopicRequest(0), new UnknownServerException(), true); + checkErrorResponse(createCreateTopicRequest(0), unknownServerException, true); checkResponse(createCreateTopicResponse(), 0, true); checkRequest(createCreateTopicRequest(1), true); - checkErrorResponse(createCreateTopicRequest(1), new UnknownServerException(), true); + checkErrorResponse(createCreateTopicRequest(1), unknownServerException, true); checkResponse(createCreateTopicResponse(), 1, true); checkRequest(createCreateTopicRequest(2), true); - checkErrorResponse(createCreateTopicRequest(2), new UnknownServerException(), true); + checkErrorResponse(createCreateTopicRequest(2), unknownServerException, true); checkResponse(createCreateTopicResponse(), 2, true); checkRequest(createCreateTopicRequest(3), true); - checkErrorResponse(createCreateTopicRequest(3), new UnknownServerException(), true); + checkErrorResponse(createCreateTopicRequest(3), unknownServerException, true); checkResponse(createCreateTopicResponse(), 3, true); checkRequest(createCreateTopicRequest(4), true); - checkErrorResponse(createCreateTopicRequest(4), new UnknownServerException(), true); + checkErrorResponse(createCreateTopicRequest(4), unknownServerException, true); checkResponse(createCreateTopicResponse(), 4, true); checkRequest(createCreateTopicRequest(5), true); - checkErrorResponse(createCreateTopicRequest(5), new UnknownServerException(), true); + checkErrorResponse(createCreateTopicRequest(5), unknownServerException, true); checkResponse(createCreateTopicResponse(), 5, true); checkRequest(createDeleteTopicsRequest(), true); - checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException(), true); + checkErrorResponse(createDeleteTopicsRequest(), unknownServerException, true); checkResponse(createDeleteTopicsResponse(), 0, true); checkRequest(createInitPidRequest(), true); - checkErrorResponse(createInitPidRequest(), new UnknownServerException(), true); + checkErrorResponse(createInitPidRequest(), unknownServerException, true); checkResponse(createInitPidResponse(), 0, true); checkRequest(createAddPartitionsToTxnRequest(), true); checkResponse(createAddPartitionsToTxnResponse(), 0, true); - checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true); + checkErrorResponse(createAddPartitionsToTxnRequest(), unknownServerException, true); checkRequest(createAddOffsetsToTxnRequest(), true); checkResponse(createAddOffsetsToTxnResponse(), 0, true); - checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException(), true); + checkErrorResponse(createAddOffsetsToTxnRequest(), unknownServerException, true); checkRequest(createEndTxnRequest(), true); checkResponse(createEndTxnResponse(), 0, true); - checkErrorResponse(createEndTxnRequest(), new UnknownServerException(), true); + checkErrorResponse(createEndTxnRequest(), unknownServerException, true); checkRequest(createWriteTxnMarkersRequest(), true); checkResponse(createWriteTxnMarkersResponse(), 0, true); - checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException(), true); + checkErrorResponse(createWriteTxnMarkersRequest(), unknownServerException, true); checkOlderFetchVersions(); checkResponse(createMetadataResponse(), 0, true); checkResponse(createMetadataResponse(), 1, true); - checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), new UnknownServerException(), true); + checkErrorResponse(createMetadataRequest(1, Collections.singletonList("topic1")), unknownServerException, true); checkRequest(createOffsetCommitRequest(0), true); - checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException(), true); + checkErrorResponse(createOffsetCommitRequest(0), unknownServerException, true); checkRequest(createOffsetCommitRequest(1), true); - checkErrorResponse(createOffsetCommitRequest(1), new UnknownServerException(), true); + checkErrorResponse(createOffsetCommitRequest(1), unknownServerException, true); checkRequest(createOffsetCommitRequest(2), true); - checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException(), true); + checkErrorResponse(createOffsetCommitRequest(2), unknownServerException, true); checkRequest(createOffsetCommitRequest(3), true); - checkErrorResponse(createOffsetCommitRequest(3), new UnknownServerException(), true); + checkErrorResponse(createOffsetCommitRequest(3), unknownServerException, true); checkRequest(createOffsetCommitRequest(4), true); - checkErrorResponse(createOffsetCommitRequest(4), new UnknownServerException(), true); + checkErrorResponse(createOffsetCommitRequest(4), unknownServerException, true); checkResponse(createOffsetCommitResponse(), 4, true); checkRequest(createOffsetCommitRequest(5), true); - checkErrorResponse(createOffsetCommitRequest(5), new UnknownServerException(), true); + checkErrorResponse(createOffsetCommitRequest(5), unknownServerException, true); checkResponse(createOffsetCommitResponse(), 5, true); checkRequest(createJoinGroupRequest(0), true); checkRequest(createUpdateMetadataRequest(0, null), false); - checkErrorResponse(createUpdateMetadataRequest(0, null), new UnknownServerException(), true); + checkErrorResponse(createUpdateMetadataRequest(0, null), unknownServerException, true); checkRequest(createUpdateMetadataRequest(1, null), false); checkRequest(createUpdateMetadataRequest(1, "rack1"), false); - checkErrorResponse(createUpdateMetadataRequest(1, null), new UnknownServerException(), true); + checkErrorResponse(createUpdateMetadataRequest(1, null), unknownServerException, true); checkRequest(createUpdateMetadataRequest(2, "rack1"), false); checkRequest(createUpdateMetadataRequest(2, null), false); - checkErrorResponse(createUpdateMetadataRequest(2, "rack1"), new UnknownServerException(), true); + checkErrorResponse(createUpdateMetadataRequest(2, "rack1"), unknownServerException, true); checkRequest(createUpdateMetadataRequest(3, "rack1"), false); checkRequest(createUpdateMetadataRequest(3, null), false); - checkErrorResponse(createUpdateMetadataRequest(3, "rack1"), new UnknownServerException(), true); + checkErrorResponse(createUpdateMetadataRequest(3, "rack1"), unknownServerException, true); checkRequest(createUpdateMetadataRequest(4, "rack1"), false); checkRequest(createUpdateMetadataRequest(4, null), false); - checkErrorResponse(createUpdateMetadataRequest(4, "rack1"), new UnknownServerException(), true); + checkErrorResponse(createUpdateMetadataRequest(4, "rack1"), unknownServerException, true); checkRequest(createUpdateMetadataRequest(5, "rack1"), false); checkRequest(createUpdateMetadataRequest(5, null), false); - checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), new UnknownServerException(), true); + checkErrorResponse(createUpdateMetadataRequest(5, "rack1"), unknownServerException, true); checkResponse(createUpdateMetadataResponse(), 0, true); checkRequest(createListOffsetRequest(0), true); - checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true); + checkErrorResponse(createListOffsetRequest(0), unknownServerException, true); checkResponse(createListOffsetResponse(0), 0, true); checkRequest(createLeaderEpochRequestForReplica(0, 1), true); checkRequest(createLeaderEpochRequestForConsumer(), true); checkResponse(createLeaderEpochResponse(), 0, true); - checkErrorResponse(createLeaderEpochRequestForConsumer(), new UnknownServerException(), true); + checkErrorResponse(createLeaderEpochRequestForConsumer(), unknownServerException, true); checkRequest(createAddPartitionsToTxnRequest(), true); - checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true); + checkErrorResponse(createAddPartitionsToTxnRequest(), unknownServerException, true); checkResponse(createAddPartitionsToTxnResponse(), 0, true); checkRequest(createAddOffsetsToTxnRequest(), true); - checkErrorResponse(createAddOffsetsToTxnRequest(), new UnknownServerException(), true); + checkErrorResponse(createAddOffsetsToTxnRequest(), unknownServerException, true); checkResponse(createAddOffsetsToTxnResponse(), 0, true); checkRequest(createEndTxnRequest(), true); - checkErrorResponse(createEndTxnRequest(), new UnknownServerException(), true); + checkErrorResponse(createEndTxnRequest(), unknownServerException, true); checkResponse(createEndTxnResponse(), 0, true); checkRequest(createWriteTxnMarkersRequest(), true); - checkErrorResponse(createWriteTxnMarkersRequest(), new UnknownServerException(), true); + checkErrorResponse(createWriteTxnMarkersRequest(), unknownServerException, true); checkResponse(createWriteTxnMarkersResponse(), 0, true); checkRequest(createTxnOffsetCommitRequest(0), true); checkRequest(createTxnOffsetCommitRequest(3), true); checkRequest(createTxnOffsetCommitRequestWithAutoDowngrade(2), true); - checkErrorResponse(createTxnOffsetCommitRequest(0), new UnknownServerException(), true); - checkErrorResponse(createTxnOffsetCommitRequest(3), new UnknownServerException(), true); - checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), new UnknownServerException(), true); + checkErrorResponse(createTxnOffsetCommitRequest(0), unknownServerException, true); + checkErrorResponse(createTxnOffsetCommitRequest(3), unknownServerException, true); + checkErrorResponse(createTxnOffsetCommitRequestWithAutoDowngrade(2), unknownServerException, true); checkResponse(createTxnOffsetCommitResponse(), 0, true); checkRequest(createDescribeAclsRequest(), true); checkErrorResponse(createDescribeAclsRequest(), new SecurityDisabledException("Security is not enabled."), true); @@ -427,18 +433,18 @@ public class RequestResponseTest { checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled."), true); checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion(), true); checkRequest(createAlterConfigsRequest(), false); - checkErrorResponse(createAlterConfigsRequest(), new UnknownServerException(), true); + checkErrorResponse(createAlterConfigsRequest(), unknownServerException, true); checkResponse(createAlterConfigsResponse(), 0, false); checkRequest(createDescribeConfigsRequest(0), true); checkRequest(createDescribeConfigsRequestWithConfigEntries(0), false); - checkErrorResponse(createDescribeConfigsRequest(0), new UnknownServerException(), true); + checkErrorResponse(createDescribeConfigsRequest(0), unknownServerException, true); checkResponse(createDescribeConfigsResponse(), 0, false); checkRequest(createDescribeConfigsRequest(1), true); checkRequest(createDescribeConfigsRequestWithConfigEntries(1), false); checkRequest(createDescribeConfigsRequestWithDocumentation(1), false); checkRequest(createDescribeConfigsRequestWithDocumentation(2), false); checkRequest(createDescribeConfigsRequestWithDocumentation(3), false); - checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException(), true); + checkErrorResponse(createDescribeConfigsRequest(1), unknownServerException, true); checkResponse(createDescribeConfigsResponse(), 1, false); checkDescribeConfigsResponseVersions(); checkRequest(createCreatePartitionsRequest(), true); @@ -446,33 +452,40 @@ public class RequestResponseTest { checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException(), true); checkResponse(createCreatePartitionsResponse(), 0, true); checkRequest(createCreateTokenRequest(), true); - checkErrorResponse(createCreateTokenRequest(), new UnknownServerException(), true); + checkErrorResponse(createCreateTokenRequest(), unknownServerException, true); checkResponse(createCreateTokenResponse(), 0, true); checkRequest(createDescribeTokenRequest(), true); - checkErrorResponse(createDescribeTokenRequest(), new UnknownServerException(), true); + checkErrorResponse(createDescribeTokenRequest(), unknownServerException, true); checkResponse(createDescribeTokenResponse(), 0, true); checkRequest(createExpireTokenRequest(), true); - checkErrorResponse(createExpireTokenRequest(), new UnknownServerException(), true); + checkErrorResponse(createExpireTokenRequest(), unknownServerException, true); checkResponse(createExpireTokenResponse(), 0, true); checkRequest(createRenewTokenRequest(), true); - checkErrorResponse(createRenewTokenRequest(), new UnknownServerException(), true); + checkErrorResponse(createRenewTokenRequest(), unknownServerException, true); checkResponse(createRenewTokenResponse(), 0, true); checkRequest(createElectLeadersRequest(), true); checkRequest(createElectLeadersRequestNullPartitions(), true); - checkErrorResponse(createElectLeadersRequest(), new UnknownServerException(), true); + checkErrorResponse(createElectLeadersRequest(), unknownServerException, true); checkResponse(createElectLeadersResponse(), 1, true); checkRequest(createIncrementalAlterConfigsRequest(), true); - checkErrorResponse(createIncrementalAlterConfigsRequest(), new UnknownServerException(), true); + checkErrorResponse(createIncrementalAlterConfigsRequest(), unknownServerException, true); checkResponse(createIncrementalAlterConfigsResponse(), 0, true); checkRequest(createAlterPartitionReassignmentsRequest(), true); - checkErrorResponse(createAlterPartitionReassignmentsRequest(), new UnknownServerException(), true); + checkErrorResponse(createAlterPartitionReassignmentsRequest(), unknownServerException, true); checkResponse(createAlterPartitionReassignmentsResponse(), 0, true); checkRequest(createListPartitionReassignmentsRequest(), true); - checkErrorResponse(createListPartitionReassignmentsRequest(), new UnknownServerException(), true); + checkErrorResponse(createListPartitionReassignmentsRequest(), unknownServerException, true); checkResponse(createListPartitionReassignmentsResponse(), 0, true); checkRequest(createOffsetDeleteRequest(), true); - checkErrorResponse(createOffsetDeleteRequest(), new UnknownServerException(), true); + checkErrorResponse(createOffsetDeleteRequest(), unknownServerException, true); checkResponse(createOffsetDeleteResponse(), 0, true); + + checkRequest(createDescribeClientQuotasRequest(), true); + checkErrorResponse(createDescribeClientQuotasRequest(), unknownServerException, true); + checkResponse(createDescribeClientQuotasResponse(), 0, true); + checkRequest(createAlterClientQuotasRequest(), true); + checkErrorResponse(createAlterClientQuotasRequest(), unknownServerException, true); + checkResponse(createAlterClientQuotasResponse(), 0, true); } @Test @@ -486,7 +499,7 @@ public class RequestResponseTest { private void checkOlderFetchVersions() throws Exception { int latestVersion = FETCH.latestVersion(); for (int i = 0; i < latestVersion; ++i) { - checkErrorResponse(createFetchRequest(i), new UnknownServerException(), true); + checkErrorResponse(createFetchRequest(i), unknownServerException, true); checkRequest(createFetchRequest(i), true); checkResponse(createFetchResponse(), i, true); } @@ -532,7 +545,13 @@ public class RequestResponseTest { } private void checkErrorResponse(AbstractRequest req, Throwable e, boolean checkEqualityAndHashCode) { - checkResponse(req.getErrorResponse(e), req.version(), checkEqualityAndHashCode); + AbstractResponse response = req.getErrorResponse(e); + checkResponse(response, req.version(), checkEqualityAndHashCode); + if (e instanceof UnknownServerException) { + String responseStr = response.toStruct(req.version()).toString(); + assertFalse(String.format("Unknown message included in response for %s: %s ", req.api, responseStr), + responseStr.contains(e.getMessage())); + } } private void checkRequest(AbstractRequest req, boolean checkEqualityAndHashCode) { @@ -2234,4 +2253,25 @@ public class RequestResponseTest { return new OffsetDeleteResponse(data); } + private DescribeClientQuotasRequest createDescribeClientQuotasRequest() { + ClientQuotaFilter filter = ClientQuotaFilter.all(); + return new DescribeClientQuotasRequest.Builder(filter).build((short) 0); + } + + private DescribeClientQuotasResponse createDescribeClientQuotasResponse() { + ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user")); + return new DescribeClientQuotasResponse(Collections.singletonMap(entity, Collections.singletonMap("request_percentage", 1.0)), 0); + } + + private AlterClientQuotasRequest createAlterClientQuotasRequest() { + ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user")); + ClientQuotaAlteration.Op op = new ClientQuotaAlteration.Op("request_percentage", 2.0); + ClientQuotaAlteration alteration = new ClientQuotaAlteration(entity, Collections.singleton(op)); + return new AlterClientQuotasRequest.Builder(Collections.singleton(alteration), false).build((short) 0); + } + + private AlterClientQuotasResponse createAlterClientQuotasResponse() { + ClientQuotaEntity entity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user")); + return new AlterClientQuotasResponse(Collections.singletonMap(entity, ApiError.NONE), 0); + } } diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index fdc83fd..3f968ef 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -23,6 +23,7 @@ import java.util.{Collections, Properties} import joptsimple._ import kafka.common.Config import kafka.log.LogConfig +import kafka.server.DynamicConfig.QuotaConfigs import kafka.server.{ConfigEntityName, ConfigType, Defaults, DynamicBrokerConfig, DynamicConfig, KafkaConfig} import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit, PasswordEncoder} import kafka.utils.Implicits._ @@ -364,6 +365,14 @@ object ConfigCommand extends Config { adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) case ConfigType.User | ConfigType.Client => + val nonQuotaConfigsToAdd = configsToBeAdded.keys.filterNot(QuotaConfigs.isQuotaConfig) + if (nonQuotaConfigsToAdd.nonEmpty) + throw new IllegalArgumentException(s"Only quota configs can be added for '$entityTypeHead' using --bootstrap-server. Unexpected config names: $nonQuotaConfigsToAdd") + val nonQuotaConfigsToDelete = configsToBeDeleted.filterNot(QuotaConfigs.isQuotaConfig) + if (nonQuotaConfigsToDelete.nonEmpty) + throw new IllegalArgumentException(s"Only quota configs can be deleted for '$entityTypeHead' using --bootstrap-server. Unexpected config names: $nonQuotaConfigsToDelete") + + val oldConfig = getClientQuotasConfig(adminClient, entityTypes, entityNames) val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains) diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 183a5d3..5fc411d 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -23,6 +23,7 @@ import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.log.LogConfig import kafka.utils.Log4jController import kafka.metrics.KafkaMetricsGroup +import kafka.server.DynamicConfig.QuotaConfigs import kafka.utils._ import kafka.zk.{AdminZkClient, KafkaZkClient} import org.apache.kafka.clients.admin.AlterConfigOp @@ -864,19 +865,20 @@ class AdminManager(val config: KafkaConfig, !name.isDefined || !strict } - def fromProps(props: Properties): Map[String, Double] = { - props.asScala.map { case (key, value) => + def fromProps(props: Map[String, String]): Map[String, Double] = { + props.map { case (key, value) => val doubleValue = try value.toDouble catch { case _: NumberFormatException => - throw new IllegalStateException(s"Unexpected client quota configuration value: ${key} -> ${value}") + throw new IllegalStateException(s"Unexpected client quota configuration value: $key -> $value") } (key -> doubleValue) } } (userEntries ++ clientIdEntries ++ bothEntries).map { case ((u, c), p) => - if (!p.isEmpty && matches(userComponent, u) && matches(clientIdComponent, c)) - Some((userClientIdToEntity(u, c) -> fromProps(p))) + val quotaProps = p.asScala.filter { case (key, _) => QuotaConfigs.isQuotaConfig(key) } + if (quotaProps.nonEmpty && matches(userComponent, u) && matches(clientIdComponent, c)) + Some(userClientIdToEntity(u, c) -> fromProps(quotaProps)) else None }.flatten.toMap diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala index 13c64bf..f3d40a1 100644 --- a/core/src/main/scala/kafka/server/DynamicConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicConfig.scala @@ -67,11 +67,20 @@ object DynamicConfig { def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true) } - object Client { - //Properties + object QuotaConfigs { val ProducerByteRateOverrideProp = "producer_byte_rate" val ConsumerByteRateOverrideProp = "consumer_byte_rate" val RequestPercentageOverrideProp = "request_percentage" + private val configNames = Set(ProducerByteRateOverrideProp, ConsumerByteRateOverrideProp, RequestPercentageOverrideProp) + + def isQuotaConfig(name: String): Boolean = configNames.contains(name) + } + + object Client { + //Properties + val ProducerByteRateOverrideProp = QuotaConfigs.ProducerByteRateOverrideProp + val ConsumerByteRateOverrideProp = QuotaConfigs.ConsumerByteRateOverrideProp + val RequestPercentageOverrideProp = QuotaConfigs.RequestPercentageOverrideProp //Defaults val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 2b357b1..d412123 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -486,6 +486,27 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { } @Test + def shouldNotAlterNonQuotaClientConfigUsingBootstrapServer(): Unit = { + val node = new Node(1, "localhost", 9092) + val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) + + def verifyCommand(entityType: String, alterOpts: String*): Unit = { + val opts = new ConfigCommandOptions(Array("--bootstrap-server", "localhost:9092", + "--entity-type", entityType, "--entity-name", "admin", + "--alter") ++ alterOpts) + val e = intercept[IllegalArgumentException] { + ConfigCommand.alterConfig(mockAdminClient, opts) + } + assertTrue(s"Unexpected exception: $e", e.getMessage.contains("some_config")) + } + + verifyCommand("users", "--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000,some_config=10") + verifyCommand("clients", "--add-config", "some_config=10") + verifyCommand("users", "--delete-config", "consumer_byte_rate=20000,some_config=10") + verifyCommand("clients", "--delete-config", "some_config=10") + } + + @Test def shouldAddTopicConfigUsingZookeeper(): Unit = { val createOpts = new ConfigCommandOptions(Array("--zookeeper", zkConnect, "--entity-name", "my-topic", diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala index b5c694a..6ddfd3b 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala @@ -23,9 +23,10 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, import org.apache.kafka.common.requests.{AlterClientQuotasRequest, AlterClientQuotasResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse} import org.junit.Assert._ import org.junit.Test - import java.util.concurrent.{ExecutionException, TimeUnit} +import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, ScramFormatter, ScramMechanism} + import scala.jdk.CollectionConverters._ class ClientQuotasRequestTest extends BaseRequestTest { @@ -37,6 +38,7 @@ class ClientQuotasRequestTest extends BaseRequestTest { @Test def testAlterClientQuotasRequest(): Unit = { + val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "user"), (ClientQuotaEntity.CLIENT_ID -> "client-id")).asJava) // Expect an empty configuration. @@ -162,6 +164,32 @@ class ClientQuotasRequestTest extends BaseRequestTest { )) } + @Test + def testClientQuotasForScramUsers(): Unit = { + val entityType = ConfigType.User + val userName = "user" + + val mechanism = ScramMechanism.SCRAM_SHA_256 + val credential = new ScramFormatter(mechanism).generateCredential("password", 4096) + val configs = adminZkClient.fetchEntityConfig(entityType, userName) + configs.setProperty(mechanism.mechanismName, ScramCredentialUtils.credentialToString(credential)) + adminZkClient.changeConfigs(entityType, userName, configs) + + val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.USER -> userName).asJava) + + verifyDescribeEntityQuotas(entity, Map.empty) + + alterEntityQuotas(entity, Map( + (ProducerByteRateProp -> Some(10000.0)), + (ConsumerByteRateProp -> Some(20000.0)) + ), validateOnly = false) + + verifyDescribeEntityQuotas(entity, Map( + (ProducerByteRateProp -> 10000.0), + (ConsumerByteRateProp -> 20000.0) + )) + } + @Test(expected = classOf[InvalidRequestException]) def testAlterClientQuotasBadUser(): Unit = { val entity = new ClientQuotaEntity(Map((ClientQuotaEntity.USER -> "")).asJava)