Repository: kafka Updated Branches: refs/heads/trunk 54ed3435b -> 7d6ca52a2
MINOR: Push JMX metric name mangling into the JmxReporter (KIP-190 follow up) Author: Ewen Cheslack-Postava <m...@ewencp.org> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma <ism...@juma.me.uk> Closes #3980 from ewencp/dont-mangle-names Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d6ca52a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d6ca52a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d6ca52a Branch: refs/heads/trunk Commit: 7d6ca52a2751908c7fc6b752d70dfaaaaa9bbe8c Parents: 54ed343 Author: Ewen Cheslack-Postava <m...@ewencp.org> Authored: Wed Oct 11 17:32:40 2017 -0400 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Wed Oct 11 17:32:40 2017 -0400 ---------------------------------------------------------------------- .../kafka/clients/admin/KafkaAdminClient.java | 14 ++- .../kafka/clients/consumer/KafkaConsumer.java | 8 +- .../kafka/clients/producer/KafkaProducer.java | 8 +- .../kafka/common/metrics/JmxReporter.java | 3 +- .../apache/kafka/common/metrics/Sanitizer.java | 61 ------------ .../kafka/common/utils/AppInfoParser.java | 3 +- .../apache/kafka/common/utils/Sanitizer.java | 61 ++++++++++++ .../kafka/common/metrics/JmxReporterTest.java | 67 ++++++++++++- .../kafka/common/metrics/SanitizerTest.java | 35 ------- .../kafka/common/utils/SanitizerTest.java | 35 +++++++ .../kafka/connect/runtime/ConnectMetrics.java | 45 +-------- .../connect/runtime/ConnectMetricsTest.java | 24 ----- .../main/scala/kafka/admin/ConfigCommand.scala | 5 +- .../scala/kafka/network/RequestChannel.scala | 3 +- .../scala/kafka/server/ClientQuotaManager.scala | 98 ++++++++++---------- .../server/ClientRequestQuotaManager.scala | 4 +- .../main/scala/kafka/server/ConfigHandler.scala | 10 +- .../kafka/server/DynamicConfigManager.scala | 2 +- .../integration/kafka/api/BaseQuotaTest.scala | 8 +- .../kafka/api/ClientIdQuotaTest.scala | 6 +- .../kafka/api/UserClientIdQuotaTest.scala | 6 +- .../integration/kafka/api/UserQuotaTest.scala | 6 +- .../unit/kafka/admin/ConfigCommandTest.scala | 4 +- .../kafka/server/ClientQuotaManagerTest.scala | 70 +++++++------- .../kafka/server/DynamicConfigChangeTest.scala | 1 + .../unit/kafka/server/RequestQuotaTest.scala | 3 +- 26 files changed, 299 insertions(+), 291 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 1a66371..ece27ca 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -55,7 +55,6 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.metrics.Sanitizer; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; @@ -290,7 +289,6 @@ public class KafkaAdminClient extends AdminClient { NetworkClient networkClient = null; Time time = Time.SYSTEM; String clientId = generateClientId(config); - String sanitizedClientId = Sanitizer.sanitize(clientId); ChannelBuilder channelBuilder = null; Selector selector = null; ApiVersions apiVersions = new ApiVersions(); @@ -303,7 +301,7 @@ public class KafkaAdminClient extends AdminClient { config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), true); List<MetricsReporter> reporters = config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); - Map<String, String> metricTags = Collections.singletonMap("client-id", sanitizedClientId); + Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -328,7 +326,7 @@ public class KafkaAdminClient extends AdminClient { true, apiVersions, logContext); - return new KafkaAdminClient(config, clientId, sanitizedClientId, time, metadata, metrics, networkClient, + return new KafkaAdminClient(config, clientId, time, metadata, metrics, networkClient, timeoutProcessorFactory, logContext); } catch (Throwable exc) { closeQuietly(metrics, "Metrics"); @@ -345,7 +343,7 @@ public class KafkaAdminClient extends AdminClient { try { metrics = new Metrics(new MetricConfig(), new LinkedList<MetricsReporter>(), time); - return new KafkaAdminClient(config, clientId, Sanitizer.sanitize(clientId), time, metadata, metrics, client, null, + return new KafkaAdminClient(config, clientId, time, metadata, metrics, client, null, createLogContext(clientId)); } catch (Throwable exc) { closeQuietly(metrics, "Metrics"); @@ -357,7 +355,7 @@ public class KafkaAdminClient extends AdminClient { return new LogContext("[AdminClient clientId=" + clientId + "] "); } - private KafkaAdminClient(AdminClientConfig config, String clientId, String sanitizedClientId, Time time, Metadata metadata, + private KafkaAdminClient(AdminClientConfig config, String clientId, Time time, Metadata metadata, Metrics metrics, KafkaClient client, TimeoutProcessorFactory timeoutProcessorFactory, LogContext logContext) { this.defaultTimeoutMs = config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG); @@ -377,7 +375,7 @@ public class KafkaAdminClient extends AdminClient { new TimeoutProcessorFactory() : timeoutProcessorFactory; this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics); + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka admin client initialized"); thread.start(); } @@ -418,7 +416,7 @@ public class KafkaAdminClient extends AdminClient { // Wait for the thread to be joined. thread.join(); - AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics); + AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka admin client closed."); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6fb6919..9547aee 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -42,7 +42,6 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.metrics.Sanitizer; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; @@ -647,7 +646,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { if (clientId.isEmpty()) clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; - String sanitizedClientId = Sanitizer.sanitize(this.clientId); String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG); LogContext logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); @@ -661,7 +659,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG); this.time = Time.SYSTEM; - Map<String, String> metricsTags = Collections.singletonMap("client-id", sanitizedClientId); + Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -772,7 +770,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { isolationLevel); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics); + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka consumer initialized"); } catch (Throwable t) { @@ -1739,7 +1737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { ClientUtils.closeQuietly(client, "consumer network client", firstException); ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", firstException); ClientUtils.closeQuietly(valueDeserializer, "consumer value deserializer", firstException); - AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics); + AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka consumer has been closed"); Throwable exception = firstException.get(); if (exception != null && !swallowException) { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index a202217..b6c0a53 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -51,7 +51,6 @@ import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.metrics.Sanitizer; import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; @@ -317,7 +316,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> { if (clientId.length() <= 0) clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement(); this.clientId = clientId; - String sanitizedClientId = Sanitizer.sanitize(clientId); String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ? (String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null; @@ -329,7 +327,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { log = logContext.logger(KafkaProducer.class); log.trace("Starting the Kafka producer"); - Map<String, String> metricTags = Collections.singletonMap("client-id", sanitizedClientId); + Map<String, String> metricTags = Collections.singletonMap("client-id", clientId); MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG))) @@ -427,7 +425,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.ioThread.start(); this.errors = this.metrics.sensor("errors"); config.logUnused(); - AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics); + AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka producer started"); } catch (Throwable t) { // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121 @@ -1075,7 +1073,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { ClientUtils.closeQuietly(keySerializer, "producer keySerializer", firstException); ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", firstException); ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException); - AppInfoParser.unregisterAppInfo(JMX_PREFIX, Sanitizer.sanitize(clientId), metrics); + AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics); log.debug("Kafka producer has been closed"); if (firstException.get() != null && !swallowException) throw new KafkaException("Failed to close kafka producer", firstException.get()); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 294e1d8..fda37d1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -37,6 +37,7 @@ import javax.management.ReflectionException; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.utils.Sanitizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +134,7 @@ public class JmxReporter implements MetricsReporter { mBeanName.append(","); mBeanName.append(entry.getKey()); mBeanName.append("="); - mBeanName.append(entry.getValue()); + mBeanName.append(Sanitizer.sanitize(entry.getValue())); } return mBeanName.toString(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java deleted file mode 100644 index b98a426..0000000 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.metrics; - -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; - -import org.apache.kafka.common.KafkaException; - -/** - * Utility class for sanitizing/desanitizing user principal and client-ids - * to a safe value for use in MetricName and as Zookeeper node name - */ -public class Sanitizer { - - public static String sanitize(String name) { - String encoded = ""; - try { - encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name()); - StringBuilder builder = new StringBuilder(); - for (int i = 0; i < encoded.length(); i++) { - char c = encoded.charAt(i); - if (c == '*') { // Metric ObjectName treats * as pattern - builder.append("%2A"); - } else if (c == '+') { // Space URL-encoded as +, replace with percent encoding - builder.append("%20"); - } else { - builder.append(c); - } - } - return builder.toString(); - } catch (UnsupportedEncodingException e) { - throw new KafkaException(e); - } - } - - public static String desanitize(String name) { - try { - return URLDecoder.decode(name, StandardCharsets.UTF_8.name()); - } catch (UnsupportedEncodingException e) { - throw new KafkaException(e); - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java index 9a1bab8..42cf312 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java @@ -28,7 +28,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.metrics.Sanitizer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,7 +70,7 @@ public class AppInfoParser { public static synchronized void unregisterAppInfo(String prefix, String id, Metrics metrics) { MBeanServer server = ManagementFactory.getPlatformMBeanServer(); try { - ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + id); + ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.sanitize(id)); if (server.isRegistered(name)) server.unregisterMBean(name); http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java new file mode 100644 index 0000000..0b68d0c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.common.KafkaException; + +/** + * Utility class for sanitizing/desanitizing user principal and client-ids + * to a safe value for use in JMX metric names and as Zookeeper node name + */ +public class Sanitizer { + + public static String sanitize(String name) { + String encoded = ""; + try { + encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name()); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < encoded.length(); i++) { + char c = encoded.charAt(i); + if (c == '*') { // Metric ObjectName treats * as pattern + builder.append("%2A"); + } else if (c == '+') { // Space URL-encoded as +, replace with percent encoding + builder.append("%20"); + } else { + builder.append(c); + } + } + return builder.toString(); + } catch (UnsupportedEncodingException e) { + throw new KafkaException(e); + } + } + + public static String desanitize(String name) { + try { + return URLDecoder.decode(name, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + throw new KafkaException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java index 3f09e08..3b39db6 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java @@ -20,19 +20,80 @@ import org.apache.kafka.common.metrics.stats.Avg; import org.apache.kafka.common.metrics.stats.Total; import org.junit.Test; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import java.lang.management.ManagementFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class JmxReporterTest { @Test public void testJmxRegistration() throws Exception { Metrics metrics = new Metrics(); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); try { metrics.addReporter(new JmxReporter()); + + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + Sensor sensor = metrics.sensor("kafka.requests"); sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg()); sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total()); - Sensor sensor2 = metrics.sensor("kafka.blah"); - sensor2.add(metrics.metricName("pack.bean1.some", "grp1"), new Total()); - sensor2.add(metrics.metricName("pack.bean2.some", "grp1"), new Total()); + + assertTrue(server.isRegistered(new ObjectName(":type=grp1"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp1"), "pack.bean1.avg")); + assertTrue(server.isRegistered(new ObjectName(":type=grp2"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp2"), "pack.bean2.total")); + + metrics.removeMetric(metrics.metricName("pack.bean1.avg", "grp1")); + + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + assertTrue(server.isRegistered(new ObjectName(":type=grp2"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=grp2"), "pack.bean2.total")); + + metrics.removeMetric(metrics.metricName("pack.bean2.total", "grp2")); + + assertFalse(server.isRegistered(new ObjectName(":type=grp1"))); + assertFalse(server.isRegistered(new ObjectName(":type=grp2"))); + } finally { + metrics.close(); + } + } + + @Test + public void testJmxRegistrationSanitization() throws Exception { + Metrics metrics = new Metrics(); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + try { + metrics.addReporter(new JmxReporter()); + + Sensor sensor = metrics.sensor("kafka.requests"); + sensor.add(metrics.metricName("name", "group", "desc", "id", "foo*"), new Total()); + sensor.add(metrics.metricName("name", "group", "desc", "id", "foo+"), new Total()); + sensor.add(metrics.metricName("name", "group", "desc", "id", "foo?"), new Total()); + sensor.add(metrics.metricName("name", "group", "desc", "id", "foo:"), new Total()); + + assertTrue(server.isRegistered(new ObjectName(":type=group,id=foo%2A"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=foo%2A"), "name")); + assertTrue(server.isRegistered(new ObjectName(":type=group,id=foo%2B"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=foo%2B"), "name")); + assertTrue(server.isRegistered(new ObjectName(":type=group,id=foo%3F"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=foo%3F"), "name")); + assertTrue(server.isRegistered(new ObjectName(":type=group,id=foo%3A"))); + assertEquals(0.0, server.getAttribute(new ObjectName(":type=group,id=foo%3A"), "name")); + + metrics.removeMetric(metrics.metricName("name", "group", "desc", "id", "foo*")); + metrics.removeMetric(metrics.metricName("name", "group", "desc", "id", "foo+")); + metrics.removeMetric(metrics.metricName("name", "group", "desc", "id", "foo?")); + metrics.removeMetric(metrics.metricName("name", "group", "desc", "id", "foo:")); + + assertFalse(server.isRegistered(new ObjectName(":type=group,id=foo%2A"))); + assertFalse(server.isRegistered(new ObjectName(":type=group,id=foo%2B"))); + assertFalse(server.isRegistered(new ObjectName(":type=group,id=foo%3F"))); + assertFalse(server.isRegistered(new ObjectName(":type=group,id=foo%3A"))); } finally { metrics.close(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java deleted file mode 100644 index d66bda1..0000000 --- a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.common.metrics; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.UnsupportedEncodingException; - -import org.junit.Test; - -public class SanitizerTest { - - @Test - public void testSanitize() throws UnsupportedEncodingException { - String principal = "CN=Some characters !@#$%&*()_-+=';:,/~"; - String sanitizedPrincipal = Sanitizer.sanitize(principal); - assertTrue(sanitizedPrincipal.replace('%', '_').matches("[a-zA-Z0-9\\._\\-]+")); - assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal)); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java new file mode 100644 index 0000000..dd384ee --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.UnsupportedEncodingException; + +import org.junit.Test; + +public class SanitizerTest { + + @Test + public void testSanitize() throws UnsupportedEncodingException { + String principal = "CN=Some characters !@#$%&*()_-+=';:,/~"; + String sanitizedPrincipal = Sanitizer.sanitize(principal); + assertTrue(sanitizedPrincipal.replace('%', '_').matches("[a-zA-Z0-9\\._\\-]+")); + assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java index 3cd1eae..5bbe148 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java @@ -64,7 +64,7 @@ public class ConnectMetrics { * @param time the time; may not be null */ public ConnectMetrics(String workerId, WorkerConfig config, Time time) { - this.workerId = makeValidName(workerId); + this.workerId = workerId; this.time = time; MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG)) @@ -111,8 +111,7 @@ public class ConnectMetrics { * Get or create a {@link MetricGroup} with the specified group name and the given tags. * Each group is uniquely identified by the name and tags. * - * @param groupName the name of the metric group; may not be null and must be a - * {@link #checkNameIsValid(String) valid name} + * @param groupName the name of the metric group; may not be null * @param tagKeyValues pairs of tag name and values * @return the {@link MetricGroup} that can be used to create metrics; never null * @throws IllegalArgumentException if the group name is not valid @@ -130,7 +129,6 @@ public class ConnectMetrics { } protected MetricGroupId groupId(String groupName, String... tagKeyValues) { - checkNameIsValid(groupName); Map<String, String> tags = tags(tagKeyValues); return new MetricGroupId(groupName, tags); } @@ -262,7 +260,6 @@ public class ConnectMetrics { * @throws IllegalArgumentException if the name is not valid */ public MetricName metricName(MetricNameTemplate template) { - checkNameIsValid(template.name()); return metrics.metricInstance(template, groupId.tags()); } @@ -428,8 +425,7 @@ public class ConnectMetrics { } /** - * Create a set of tags using the supplied key and value pairs. Every tag name and value will be - * {@link #makeValidName(String) made valid} before it is used. The order of the tags will be kept. + * Create a set of tags using the supplied key and value pairs. The order of the tags will be kept. * * @param keyValue the key and value pairs for the tags; must be an even number * @return the map of tags that can be supplied to the {@link Metrics} methods; never null @@ -439,49 +435,18 @@ public class ConnectMetrics { throw new IllegalArgumentException("keyValue needs to be specified in pairs"); Map<String, String> tags = new LinkedHashMap<>(); for (int i = 0; i < keyValue.length; i += 2) { - tags.put(makeValidName(keyValue[i]), makeValidName(keyValue[i + 1])); + tags.put(keyValue[i], keyValue[i + 1]); } return tags; } /** - * Utility to ensure the supplied name contains valid characters, replacing with a single '-' sequences of - * 1 or more characters <em>other than</em> word characters (e.g., "[a-zA-Z_0-9]"). - * - * @param name the name; may not be null - * @return the validated name; never null - */ - static String makeValidName(String name) { - Objects.requireNonNull(name); - name = name.trim(); - if (!name.isEmpty()) { - name = name.replaceAll("[^\\w]+", "-"); - } - return name; - } - - /** - * Utility method that determines whether the supplied name contains only "[a-zA-Z0-9_-]" characters and thus - * would be unchanged by {@link #makeValidName(String)}. - * - * @param name the name; may not be null - * @return true if the name is valid, or false otherwise - * @throws IllegalArgumentException if the name is not valid - */ - static void checkNameIsValid(String name) { - if (!name.equals(makeValidName(name))) { - throw new IllegalArgumentException("The name '" + name + "' contains at least one invalid character"); - } - } - - /** * Utility to generate the documentation for the Connect metrics. * * @param args the arguments */ public static void main(String[] args) { ConnectMetricsRegistry metrics = new ConnectMetricsRegistry(); - System.out.println(Metrics.toHtmlTable("kafka.connect", metrics.getAllTemplates())); + System.out.println(Metrics.toHtmlTable(JMX_PREFIX, metrics.getAllTemplates())); } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java index a16ab41..2de7cb6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java @@ -57,30 +57,6 @@ public class ConnectMetricsTest { } @Test - public void testValidatingNameWithAllValidCharacters() { - String name = "abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-0123456789"; - assertEquals(name, ConnectMetrics.makeValidName(name)); - } - - @Test - public void testValidatingEmptyName() { - String name = ""; - assertSame(name, ConnectMetrics.makeValidName(name)); - } - - @Test(expected = NullPointerException.class) - public void testValidatingNullName() { - ConnectMetrics.makeValidName(null); - } - - @Test - public void testValidatingNameWithInvalidCharacters() { - assertEquals("a-b-c-d-e-f-g-h-i-j-k", ConnectMetrics.makeValidName("a:b;c/d\\e,f*.--..;;g?h[i]j=k")); - assertEquals("-a-b-c-d-e-f-g-h-", ConnectMetrics.makeValidName(":a:b;c/d\\e,f*g?[]=h:")); - assertEquals("a-f-h", ConnectMetrics.makeValidName("a:;/\\,f*?h")); - } - - @Test public void testKafkaMetricsNotNull() { assertNotNull(metrics.metrics()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/admin/ConfigCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 306d64a..febf40f 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -18,6 +18,7 @@ package kafka.admin import java.util.Properties + import joptsimple._ import kafka.common.Config import kafka.common.InvalidConfigException @@ -27,8 +28,8 @@ import kafka.utils.{CommandLineUtils, ZkUtils} import kafka.utils.Implicits._ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.security.scram._ -import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.metrics.Sanitizer +import org.apache.kafka.common.utils.{Sanitizer, Utils} + import scala.collection._ import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/network/RequestChannel.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index ec16ab0..a4ec5e3 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -26,12 +26,11 @@ import kafka.metrics.KafkaMetricsGroup import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, NoOpAction, CloseConnectionAction} import kafka.utils.{Logging, NotNothing} import org.apache.kafka.common.memory.MemoryPool -import org.apache.kafka.common.metrics.Sanitizer import org.apache.kafka.common.network.Send import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{Sanitizer, Time} import org.apache.log4j.Logger import scala.collection.mutable http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ClientQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index afaa5dd..5d0b966 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -19,11 +19,11 @@ package kafka.server import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit} import java.util.concurrent.locks.ReentrantReadWriteLock -import kafka.utils.{ShutdownableThread, Logging} +import kafka.utils.{Logging, ShutdownableThread} import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics._ -import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg} -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total} +import org.apache.kafka.common.utils.{Sanitizer, Time} import scala.collection.JavaConverters._ @@ -61,9 +61,9 @@ object ClientQuotaManagerConfig { val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1) val UnlimitedQuota = Quota.upperBound(Long.MaxValue) - val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default)) - val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None) - val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) + val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) + val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None, None) + val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) } object QuotaTypes { @@ -73,9 +73,9 @@ object QuotaTypes { val UserClientIdQuotaEnabled = 4 } -case class QuotaId(sanitizedUser: Option[String], sanitizedClientId: Option[String]) +case class QuotaId(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String]) -case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, sanitizedClientId: String, quota: Quota) +case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: String, sanitizedClientId: String, quota: Quota) /** * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics @@ -187,7 +187,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case _: QuotaViolationException => // Compute the delay val clientQuotaEntity = clientSensors.quotaEntity - val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.sanitizedClientId)) + val clientMetric = metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId)) throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(clientQuotaEntity.quota)).toInt clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue @@ -213,33 +213,33 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * and the associated quota override or default quota. * */ - private def quotaEntity(sanitizedUser: String, sanitizedClientId: String) : QuotaEntity = { + private def quotaEntity(sanitizedUser: String, clientId: String, sanitizedClientId: String) : QuotaEntity = { quotaTypesEnabled match { case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled => - val quotaId = QuotaId(None, Some(sanitizedClientId)) + val quotaId = QuotaId(None, Some(clientId), Some(sanitizedClientId)) var quota = overriddenQuota.get(quotaId) if (quota == null) { quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId) if (quota == null) quota = staticConfigClientIdQuota } - QuotaEntity(quotaId, "", sanitizedClientId, quota) + QuotaEntity(quotaId, "", clientId, sanitizedClientId, quota) case QuotaTypes.UserQuotaEnabled => - val quotaId = QuotaId(Some(sanitizedUser), None) + val quotaId = QuotaId(Some(sanitizedUser), None, None) var quota = overriddenQuota.get(quotaId) if (quota == null) { quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserQuotaId) if (quota == null) quota = ClientQuotaManagerConfig.UnlimitedQuota } - QuotaEntity(quotaId, sanitizedUser, "", quota) + QuotaEntity(quotaId, sanitizedUser, "", "", quota) case QuotaTypes.UserClientIdQuotaEnabled => - val quotaId = QuotaId(Some(sanitizedUser), Some(sanitizedClientId)) + val quotaId = QuotaId(Some(sanitizedUser), Some(clientId), Some(sanitizedClientId)) var quota = overriddenQuota.get(quotaId) if (quota == null) { - quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default))) + quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))) if (quota == null) { - quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(sanitizedClientId))) + quota = overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId), Some(sanitizedClientId))) if (quota == null) { quota = overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId) if (quota == null) @@ -247,17 +247,17 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } } - QuotaEntity(quotaId, sanitizedUser, sanitizedClientId, quota) + QuotaEntity(quotaId, sanitizedUser, clientId, sanitizedClientId, quota) case _ => - quotaEntityWithMultipleQuotaLevels(sanitizedUser, sanitizedClientId) + quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId, sanitizedClientId) } } - private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, sanitizerClientId: String) : QuotaEntity = { - val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(sanitizerClientId)) + private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, clientId: String, sanitizerClientId: String) : QuotaEntity = { + val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId), Some(sanitizerClientId)) - val userQuotaId = QuotaId(Some(sanitizedUser), None) - val clientQuotaId = QuotaId(None, Some(sanitizerClientId)) + val userQuotaId = QuotaId(Some(sanitizedUser), None, None) + val clientQuotaId = QuotaId(None, Some(clientId), Some(sanitizerClientId)) var quotaId = userClientQuotaId var quotaConfigId = userClientQuotaId // 1) /config/users/<user>/clients/<client-id> @@ -265,7 +265,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, if (quota == null) { // 2) /config/users/<user>/clients/<default> quotaId = userClientQuotaId - quotaConfigId = QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default)) + quotaConfigId = QuotaId(Some(sanitizedUser), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) quota = overriddenQuota.get(quotaConfigId) if (quota == null) { @@ -277,31 +277,31 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, if (quota == null) { // 4) /config/users/<default>/clients/<client-id> quotaId = userClientQuotaId - quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(sanitizerClientId)) + quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(clientId), Some(sanitizerClientId)) quota = overriddenQuota.get(quotaConfigId) if (quota == null) { // 5) /config/users/<default>/clients/<default> quotaId = userClientQuotaId - quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) + quotaConfigId = QuotaId(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) quota = overriddenQuota.get(quotaConfigId) if (quota == null) { // 6) /config/users/<default> quotaId = userQuotaId - quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None) + quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None, None) quota = overriddenQuota.get(quotaConfigId) if (quota == null) { // 7) /config/clients/<client-id> quotaId = clientQuotaId - quotaConfigId = QuotaId(None, Some(sanitizerClientId)) + quotaConfigId = QuotaId(None, Some(clientId), Some(sanitizerClientId)) quota = overriddenQuota.get(quotaConfigId) if (quota == null) { // 8) /config/clients/<default> quotaId = clientQuotaId - quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default)) + quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)) quota = overriddenQuota.get(quotaConfigId) if (quota == null) { @@ -317,8 +317,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser - val quotaClientId = if (quotaId == userQuotaId) "" else sanitizerClientId - QuotaEntity(quotaId, quotaUser, quotaClientId, quota) + val quotaClientId = if (quotaId == userQuotaId) "" else clientId + val quotaSanitizedClientId = if (quotaId == userQuotaId) "" else sanitizerClientId + QuotaEntity(quotaId, quotaUser, quotaClientId, sanitizerClientId, quota) } /** @@ -327,7 +328,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * Note: this method is expensive, it is meant to be used by tests only */ def quota(user: String, clientId: String) = { - quotaEntity(Sanitizer.sanitize(user), Sanitizer.sanitize(clientId)).quota + quotaEntity(Sanitizer.sanitize(user), clientId, Sanitizer.sanitize(clientId)).quota } /* @@ -361,14 +362,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, */ def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): ClientSensors = { val sanitizedClientId = Sanitizer.sanitize(clientId) - val clientQuotaEntity = quotaEntity(sanitizedUser, sanitizedClientId) + val clientQuotaEntity = quotaEntity(sanitizedUser, clientId, sanitizedClientId) // Names of the sensors to access ClientSensors( clientQuotaEntity, sensorAccessor.getOrCreate( getQuotaSensorName(clientQuotaEntity.quotaId), ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds, - clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.sanitizedClientId), + clientRateMetricName(clientQuotaEntity.sanitizedUser, clientQuotaEntity.clientId), Some(getQuotaMetricConfig(clientQuotaEntity.quota)), new Rate ), @@ -381,9 +382,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, ) } - private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.sanitizedClientId.getOrElse("") + private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType + "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("") - private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.sanitizedClientId.getOrElse("") + private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("") protected def getQuotaMetricConfig(quota: Quota): MetricConfig = { new MetricConfig() @@ -406,10 +407,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * Overrides quotas for <user>, <client-id> or <user, client-id> or the dynamic defaults * for any of these levels. * @param sanitizedUser user to override if quota applies to <user> or <user, client-id> - * @param sanitizedClientId client to override if quota applies to <client-id> or <user, client-id> + * @param clientId client to override if quota applies to <client-id> or <user, client-id> + * @param sanitizedClientId sanitized client ID to override if quota applies to <client-id> or <user, client-id> * @param quota custom quota to apply or None if quota override is being removed */ - def updateQuota(sanitizedUser: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]) { + def updateQuota(sanitizedUser: Option[String], clientId: Option[String], sanitizedClientId: Option[String], quota: Option[Quota]) { /* * Acquire the write lock to apply changes in the quota objects. * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists). @@ -419,13 +421,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, */ lock.writeLock().lock() try { - val quotaId = QuotaId(sanitizedUser, sanitizedClientId) + val quotaId = QuotaId(sanitizedUser, clientId, sanitizedClientId) val userInfo = sanitizedUser match { case Some(ConfigEntityName.Default) => "default user " case Some(user) => "user " + user + " " case None => "" } - val clientIdInfo = sanitizedClientId match { + val clientIdInfo = clientId match { case Some(ConfigEntityName.Default) => "default client-id" case Some(id) => "client-id " + id case None => "" @@ -434,7 +436,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case Some(newQuota) => logger.info(s"Changing ${quotaType} quota for ${userInfo}${clientIdInfo} to $newQuota.bound}") overriddenQuota.put(quotaId, newQuota) - (sanitizedUser, sanitizedClientId) match { + (sanitizedUser, clientId) match { case (Some(_), Some(_)) => quotaTypesEnabled |= QuotaTypes.UserClientIdQuotaEnabled case (Some(_), None) => quotaTypesEnabled |= QuotaTypes.UserQuotaEnabled case (None, Some(_)) => quotaTypesEnabled |= QuotaTypes.ClientIdQuotaEnabled @@ -445,21 +447,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, overriddenQuota.remove(quotaId) } - val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), sanitizedClientId.getOrElse("")) + val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), clientId.getOrElse("")) val allMetrics = metrics.metrics() // If multiple-levels of quotas are defined or if this is a default quota update, traverse metrics // to find all affected values. Otherwise, update just the single matching one. val singleUpdate = quotaTypesEnabled match { case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled => - !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !sanitizedClientId.filter(_ == ConfigEntityName.Default).isDefined + !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && !clientId.filter(_ == ConfigEntityName.Default).isDefined case _ => false } if (singleUpdate) { // Change the underlying metric config if the sensor has been created val metric = allMetrics.get(quotaMetricName) if (metric != null) { - val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), sanitizedClientId.getOrElse("")) + val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), clientId.getOrElse(""), sanitizedClientId.getOrElse("")) val newQuota = metricConfigEntity.quota logger.info(s"Sensor for ${userInfo}${clientIdInfo} already exists. Changing quota to ${newQuota.bound()} in MetricConfig") metric.config(getQuotaMetricConfig(newQuota)) @@ -469,7 +471,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, case (metricName, metric) => val userTag = if (metricName.tags.containsKey("user")) metricName.tags.get("user") else "" val clientIdTag = if (metricName.tags.containsKey("client-id")) metricName.tags.get("client-id") else "" - val metricConfigEntity = quotaEntity(userTag, clientIdTag) + val metricConfigEntity = quotaEntity(userTag, clientIdTag, Sanitizer.sanitize(clientIdTag)) if (metricConfigEntity.quota != metric.config.quota) { val newQuota = metricConfigEntity.quota logger.info(s"Sensor for quota-id ${metricConfigEntity.quotaId} already exists. Setting quota to ${newQuota.bound} in MetricConfig") @@ -483,11 +485,11 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } - protected def clientRateMetricName(sanitizedUser: String, sanitizedClientId: String): MetricName = { + protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = { metrics.metricName("byte-rate", quotaType.toString, "Tracking byte-rate per user/client-id", "user", sanitizedUser, - "client-id", sanitizedClientId) + "client-id", clientId) } private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = { @@ -495,7 +497,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, quotaType.toString, "Tracking average throttle-time per user/client-id", "user", quotaEntity.sanitizedUser, - "client-id", quotaEntity.sanitizedClientId) + "client-id", quotaEntity.clientId) } def shutdown() = { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala index d2114dc..f454483 100644 --- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala @@ -64,11 +64,11 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs) } - override protected def clientRateMetricName(sanitizedUser: String, sanitizedClientId: String): MetricName = { + override protected def clientRateMetricName(sanitizedUser: String, clientId: String): MetricName = { metrics.metricName("request-time", QuotaType.Request.toString, "Tracking request-time per user/client-id", "user", sanitizedUser, - "client-id", sanitizedClientId) + "client-id", clientId) } private def exemptMetricName: MetricName = { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ConfigHandler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 6f85801..ddeecb0 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -29,8 +29,9 @@ import kafka.utils.Implicits._ import kafka.utils.Logging import org.apache.kafka.common.config.ConfigDef.Validator import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.metrics.{Quota, Sanitizer} +import org.apache.kafka.common.metrics.Quota import org.apache.kafka.common.metrics.Quota._ +import org.apache.kafka.common.utils.Sanitizer import scala.collection.JavaConverters._ @@ -118,24 +119,25 @@ class TopicConfigHandler(private val logManager: LogManager, kafkaConfig: KafkaC class QuotaConfigHandler(private val quotaManagers: QuotaManagers) { def updateQuotaConfig(sanitizedUser: Option[String], sanitizedClientId: Option[String], config: Properties) { + val clientId = sanitizedClientId.map(Sanitizer.desanitize) val producerQuota = if (config.containsKey(DynamicConfig.Client.ProducerByteRateOverrideProp)) Some(new Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong, true)) else None - quotaManagers.produce.updateQuota(sanitizedUser, sanitizedClientId, producerQuota) + quotaManagers.produce.updateQuota(sanitizedUser, clientId, sanitizedClientId, producerQuota) val consumerQuota = if (config.containsKey(DynamicConfig.Client.ConsumerByteRateOverrideProp)) Some(new Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong, true)) else None - quotaManagers.fetch.updateQuota(sanitizedUser, sanitizedClientId, consumerQuota) + quotaManagers.fetch.updateQuota(sanitizedUser, clientId, sanitizedClientId, consumerQuota) val requestQuota = if (config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp)) Some(new Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble, true)) else None - quotaManagers.request.updateQuota(sanitizedUser, sanitizedClientId, requestQuota) + quotaManagers.request.updateQuota(sanitizedUser, clientId, sanitizedClientId, requestQuota) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/DynamicConfigManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index 634b0c2..69f9e96 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -28,7 +28,7 @@ import kafka.admin.AdminUtils import kafka.utils.json.JsonObject import org.apache.kafka.common.config.types.Password import org.apache.kafka.common.security.scram.ScramMechanism -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{Sanitizer, Time} /** * Represents all the entities that can be configured via ZK http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index e8967d1..d821f52 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.{MetricName, TopicPartition} -import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sanitizer} +import org.apache.kafka.common.metrics.{KafkaMetric, Quota} import org.junit.Assert._ import org.junit.{Before, Test} @@ -210,7 +210,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) { val tags = new HashMap[String, String] - tags.put("client-id", Sanitizer.sanitize(producerClientId)) + tags.put("client-id", producerClientId) val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags)) val maxMetric = producer.metrics.get(new MetricName("produce-throttle-time-max", "producer-metrics", "", tags)) @@ -220,7 +220,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], maxThrottleTime: Option[Double] = None) { val tags = new HashMap[String, String] - tags.put("client-id", Sanitizer.sanitize(consumerClientId)) + tags.put("client-id", consumerClientId) val avgMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", tags)) val maxMetric = consumer.metrics.get(new MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", tags)) @@ -234,7 +234,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness { quotaType.toString, "Tracking throttle-time per user/client-id", "user", quotaId.sanitizedUser.getOrElse(""), - "client-id", quotaId.sanitizedClientId.getOrElse("")) + "client-id", quotaId.clientId.getOrElse("")) } def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala index f5a2cf5..383f139 100644 --- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala @@ -18,8 +18,8 @@ import java.util.Properties import kafka.admin.AdminUtils import kafka.server.{DynamicConfig, KafkaConfig, QuotaId} -import org.apache.kafka.common.metrics.Sanitizer import org.apache.kafka.common.security.auth.KafkaPrincipal +import org.apache.kafka.common.utils.Sanitizer import org.junit.Before class ClientIdQuotaTest extends BaseQuotaTest { @@ -27,8 +27,8 @@ class ClientIdQuotaTest extends BaseQuotaTest { override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName override def producerClientId = "QuotasTestProducer-!@#$%^&*()" override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()" - override val producerQuotaId = QuotaId(None, Some(Sanitizer.sanitize(producerClientId))) - override val consumerQuotaId = QuotaId(None, Some(Sanitizer.sanitize(consumerClientId))) + override val producerQuotaId = QuotaId(None, Some(producerClientId), Some(Sanitizer.sanitize(producerClientId))) + override val consumerQuotaId = QuotaId(None, Some(consumerClientId), Some(Sanitizer.sanitize(consumerClientId))) @Before override def setUp() { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala index cb6d376..e25f886 100644 --- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala @@ -20,8 +20,8 @@ import java.util.Properties import kafka.admin.AdminUtils import kafka.server._ import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.utils.Sanitizer import org.junit.Before -import org.apache.kafka.common.metrics.Sanitizer class UserClientIdQuotaTest extends BaseQuotaTest { @@ -31,8 +31,8 @@ class UserClientIdQuotaTest extends BaseQuotaTest { override val userPrincipal = "O=A client,CN=localhost" override def producerClientId = "QuotasTestProducer-!@#$%^&*()" override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()" - override def producerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(Sanitizer.sanitize(producerClientId))) - override def consumerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(Sanitizer.sanitize(consumerClientId))) + override def producerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(producerClientId), Some(Sanitizer.sanitize(producerClientId))) + override def consumerQuotaId = QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(consumerClientId), Some(Sanitizer.sanitize(consumerClientId))) @Before override def setUp() { http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala index a7bddc5..b5d88c0 100644 --- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala @@ -21,8 +21,8 @@ import kafka.admin.AdminUtils import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId} import kafka.utils.JaasTestUtils import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.utils.Sanitizer import org.junit.{After, Before} -import org.apache.kafka.common.metrics.Sanitizer class UserQuotaTest extends BaseQuotaTest with SaslSetup { @@ -34,8 +34,8 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup { override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) override val userPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName2 - override val producerQuotaId = QuotaId(Some(userPrincipal), None) - override val consumerQuotaId = QuotaId(Some(userPrincipal), None) + override val producerQuotaId = QuotaId(Some(userPrincipal), None, None) + override val consumerQuotaId = QuotaId(Some(userPrincipal), None, None) @Before http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index c0aff93..87ce46e 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -23,14 +23,14 @@ import kafka.common.InvalidConfigException import kafka.server.ConfigEntityName import kafka.utils.{Logging, ZkUtils} import kafka.zk.ZooKeeperTestHarness - import org.apache.kafka.common.security.scram.ScramCredentialUtils +import org.apache.kafka.common.utils.Sanitizer import org.easymock.EasyMock import org.junit.Assert._ import org.junit.Test + import scala.collection.mutable import scala.collection.JavaConverters._ -import org.apache.kafka.common.metrics.Sanitizer class ConfigCommandTest extends ZooKeeperTestHarness with Logging { @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 54be960..4196bc1 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -18,8 +18,8 @@ package kafka.server import java.util.Collections -import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota, Sanitizer} -import org.apache.kafka.common.utils.MockTime +import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota} +import org.apache.kafka.common.utils.{MockTime, Sanitizer} import org.junit.Assert.{assertEquals, assertTrue} import org.junit.{Before, Test} @@ -43,8 +43,8 @@ class ClientQuotaManagerTest { try { // Case 1: Update the quota. Assert that the new quota value is returned - clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(2000, true))) - clientMetrics.updateQuota(client2.configUser, client2.configClientId, Some(new Quota(4000, true))) + clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(2000, true))) + clientMetrics.updateQuota(client2.configUser, client2.configClientId, client2.sanitizedConfigClientId, Some(new Quota(4000, true))) assertEquals("Default producer quota should be " + config.quotaBytesPerSecondDefault, new Quota(config.quotaBytesPerSecondDefault, true), clientMetrics.quota(randomClient.user, randomClient.clientId)) assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota(client1.user, client1.clientId)) @@ -56,22 +56,22 @@ class ClientQuotaManagerTest { // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created. // p1 should not longer be throttled after the quota change - clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(3000, true))) + clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(3000, true))) assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota(client1.user, client1.clientId)) throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback) assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs) // Case 3: Change quota back to default. Should be throttled again - clientMetrics.updateQuota(client1.configUser, client1.configClientId, Some(new Quota(500, true))) + clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, Some(new Quota(500, true))) assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota(client1.user, client1.clientId)) throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 0, this.callback) assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) // Case 4: Set high default quota, remove p1 quota. p1 should no longer be throttled - clientMetrics.updateQuota(client1.configUser, client1.configClientId, None) - clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, Some(new Quota(4000, true))) + clientMetrics.updateQuota(client1.configUser, client1.configClientId, client1.sanitizedConfigClientId, None) + clientMetrics.updateQuota(defaultConfigClient.configUser, defaultConfigClient.configClientId, defaultConfigClient.sanitizedConfigClientId, Some(new Quota(4000, true))) assertEquals("Should return the newly overridden value (4000)", new Quota(4000, true), clientMetrics.quota(client1.user, client1.clientId)) throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, client1.clientId, 1000 * config.numQuotaSamples, this.callback) @@ -161,16 +161,16 @@ class ClientQuotaManagerTest { } try { - quotaManager.updateQuota(Some(ConfigEntityName.Default), None, Some(new Quota(1000, true))) - quotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(new Quota(2000, true))) - quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(3000, true))) - quotaManager.updateQuota(Some("userA"), None, Some(new Quota(4000, true))) - quotaManager.updateQuota(Some("userA"), Some("client1"), Some(new Quota(5000, true))) - quotaManager.updateQuota(Some("userB"), None, Some(new Quota(6000, true))) - quotaManager.updateQuota(Some("userB"), Some("client1"), Some(new Quota(7000, true))) - quotaManager.updateQuota(Some("userB"), Some(ConfigEntityName.Default), Some(new Quota(8000, true))) - quotaManager.updateQuota(Some("userC"), None, Some(new Quota(10000, true))) - quotaManager.updateQuota(None, Some("client1"), Some(new Quota(9000, true))) + quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, Some(new Quota(1000, true))) + quotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(2000, true))) + quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(3000, true))) + quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(4000, true))) + quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(5000, true))) + quotaManager.updateQuota(Some("userB"), None, None, Some(new Quota(6000, true))) + quotaManager.updateQuota(Some("userB"), Some("client1"), Some("client1"), Some(new Quota(7000, true))) + quotaManager.updateQuota(Some("userB"), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(8000, true))) + quotaManager.updateQuota(Some("userC"), None, None, Some(new Quota(10000, true))) + quotaManager.updateQuota(None, Some("client1"), Some("client1"), Some(new Quota(9000, true))) checkQuota("userA", "client1", 5000, 4500, false) // <user, client> quota takes precedence over <user> checkQuota("userA", "client2", 4000, 4500, true) // <user> quota takes precedence over <client> and defaults @@ -186,32 +186,32 @@ class ClientQuotaManagerTest { checkQuota("userE", "client1", 3000, 2500, false) // Remove default <user, client> quota config, revert to <user> default - quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), None) + quotaManager.updateQuota(Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), None) checkQuota("userD", "client1", 1000, 0, false) // Metrics tags changed, restart counter checkQuota("userE", "client4", 1000, 1500, true) checkQuota("userF", "client4", 1000, 800, false) // Default <user> quota shared across clients of user checkQuota("userF", "client5", 1000, 800, true) // Remove default <user> quota config, revert to <client-id> default - quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None) + quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, None) checkQuota("userF", "client4", 2000, 0, false) // Default <client-id> quota shared across client-id of all users checkQuota("userF", "client5", 2000, 0, false) checkQuota("userF", "client5", 2000, 2500, true) checkQuota("userG", "client5", 2000, 0, true) // Update quotas - quotaManager.updateQuota(Some("userA"), None, Some(new Quota(8000, true))) - quotaManager.updateQuota(Some("userA"), Some("client1"), Some(new Quota(10000, true))) + quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(8000, true))) + quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), Some(new Quota(10000, true))) checkQuota("userA", "client2", 8000, 0, false) checkQuota("userA", "client2", 8000, 4500, true) // Throttled due to sum of new and earlier values checkQuota("userA", "client1", 10000, 0, false) checkQuota("userA", "client1", 10000, 6000, true) - quotaManager.updateQuota(Some("userA"), Some("client1"), None) + quotaManager.updateQuota(Some("userA"), Some("client1"), Some("client1"), None) checkQuota("userA", "client6", 8000, 0, true) // Throttled due to shared user quota - quotaManager.updateQuota(Some("userA"), Some("client6"), Some(new Quota(11000, true))) + quotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), Some(new Quota(11000, true))) checkQuota("userA", "client6", 11000, 8500, false) - quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), Some(new Quota(12000, true))) - quotaManager.updateQuota(Some("userA"), Some("client6"), None) + quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new Quota(12000, true))) + quotaManager.updateQuota(Some("userA"), Some("client6"), Some("client6"), None) checkQuota("userA", "client6", 12000, 4000, true) // Throttled due to sum of new and earlier values } finally { @@ -271,7 +271,7 @@ class ClientQuotaManagerTest { def testRequestPercentageQuotaViolation() { val metrics = newMetrics val quotaManager = new ClientRequestQuotaManager(config, metrics, time) - quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some(Quota.upperBound(1))) + quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), Some("test-client"), Some(Quota.upperBound(1))) val queueSizeMetric = metrics.metrics().get(metrics.metricName("queue-size", "Request", "")) def millisToPercent(millis: Double) = millis * 1000 * 1000 * ClientQuotaManagerConfig.NanosToPercentagePerSecond try { @@ -373,17 +373,18 @@ class ClientQuotaManagerTest { } @Test - def testSanitizeClientId() { + def testClientIdNotSanitized() { val metrics = newMetrics val clientMetrics = new ClientQuotaManager(config, metrics, QuotaType.Produce, time) val clientId = "client@#$%" try { clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, callback) - - val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + Sanitizer.sanitize(clientId)) + + // The metrics should use the raw client ID, even if the reporters internally sanitize them + val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + clientId) assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) - val byteRateSensor = metrics.getSensor("Produce-:" + Sanitizer.sanitize(clientId)) + val byteRateSensor = metrics.getSensor("Produce-:" + clientId) assertTrue("Byte rate sensor should exist", byteRateSensor != null) } finally { clientMetrics.shutdown() @@ -394,5 +395,10 @@ class ClientQuotaManagerTest { new Metrics(new MetricConfig(), Collections.emptyList(), time) } - private case class UserClient(val user: String, val clientId: String, val configUser: Option[String] = None, val configClientId: Option[String] = None) + private case class UserClient(val user: String, val clientId: String, val configUser: Option[String] = None, val configClientId: Option[String] = None) { + // The class under test expects only sanitized client configs. We pass both the default value (which should not be + // sanitized to ensure it remains unique) and non-default values, so we need to take care in generating the sanitized + // client ID + def sanitizedConfigClientId = configClientId.map(x => if (x == ConfigEntityName.Default) ConfigEntityName.Default else Sanitizer.sanitize(x)) + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 9d2bb8b..2e0b454 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -28,6 +28,7 @@ import kafka.integration.KafkaServerTestHarness import kafka.utils._ import kafka.admin.{AdminOperationException, AdminUtils} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.utils.Sanitizer import scala.collection.Map http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 480dfa6..4774e1d 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -36,6 +36,7 @@ import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} import org.apache.kafka.common.security.auth.{AuthenticationContext, KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol} +import org.apache.kafka.common.utils.Sanitizer import org.junit.Assert._ import org.junit.{After, Before, Test} @@ -82,7 +83,7 @@ class RequestQuotaTest extends BaseRequestTest { quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01") AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps) quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000") - AdminUtils.changeClientIdConfig(zkUtils, unthrottledClientId, quotaProps) + AdminUtils.changeClientIdConfig(zkUtils, Sanitizer.sanitize(unthrottledClientId), quotaProps) TestUtils.retry(10000) { val quotaManager = servers.head.apis.quotas.request