Repository: kafka
Updated Branches:
  refs/heads/trunk 54ed3435b -> 7d6ca52a2


MINOR: Push JMX metric name mangling into the JmxReporter (KIP-190 follow up)

Author: Ewen Cheslack-Postava <m...@ewencp.org>

Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>, Ismael Juma 
<ism...@juma.me.uk>

Closes #3980 from ewencp/dont-mangle-names


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7d6ca52a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7d6ca52a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7d6ca52a

Branch: refs/heads/trunk
Commit: 7d6ca52a2751908c7fc6b752d70dfaaaaa9bbe8c
Parents: 54ed343
Author: Ewen Cheslack-Postava <m...@ewencp.org>
Authored: Wed Oct 11 17:32:40 2017 -0400
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Wed Oct 11 17:32:40 2017 -0400

----------------------------------------------------------------------
 .../kafka/clients/admin/KafkaAdminClient.java   | 14 ++-
 .../kafka/clients/consumer/KafkaConsumer.java   |  8 +-
 .../kafka/clients/producer/KafkaProducer.java   |  8 +-
 .../kafka/common/metrics/JmxReporter.java       |  3 +-
 .../apache/kafka/common/metrics/Sanitizer.java  | 61 ------------
 .../kafka/common/utils/AppInfoParser.java       |  3 +-
 .../apache/kafka/common/utils/Sanitizer.java    | 61 ++++++++++++
 .../kafka/common/metrics/JmxReporterTest.java   | 67 ++++++++++++-
 .../kafka/common/metrics/SanitizerTest.java     | 35 -------
 .../kafka/common/utils/SanitizerTest.java       | 35 +++++++
 .../kafka/connect/runtime/ConnectMetrics.java   | 45 +--------
 .../connect/runtime/ConnectMetricsTest.java     | 24 -----
 .../main/scala/kafka/admin/ConfigCommand.scala  |  5 +-
 .../scala/kafka/network/RequestChannel.scala    |  3 +-
 .../scala/kafka/server/ClientQuotaManager.scala | 98 ++++++++++----------
 .../server/ClientRequestQuotaManager.scala      |  4 +-
 .../main/scala/kafka/server/ConfigHandler.scala | 10 +-
 .../kafka/server/DynamicConfigManager.scala     |  2 +-
 .../integration/kafka/api/BaseQuotaTest.scala   |  8 +-
 .../kafka/api/ClientIdQuotaTest.scala           |  6 +-
 .../kafka/api/UserClientIdQuotaTest.scala       |  6 +-
 .../integration/kafka/api/UserQuotaTest.scala   |  6 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    |  4 +-
 .../kafka/server/ClientQuotaManagerTest.scala   | 70 +++++++-------
 .../kafka/server/DynamicConfigChangeTest.scala  |  1 +
 .../unit/kafka/server/RequestQuotaTest.scala    |  3 +-
 26 files changed, 299 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 1a66371..ece27ca 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -55,7 +55,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Sanitizer;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
@@ -290,7 +289,6 @@ public class KafkaAdminClient extends AdminClient {
         NetworkClient networkClient = null;
         Time time = Time.SYSTEM;
         String clientId = generateClientId(config);
-        String sanitizedClientId = Sanitizer.sanitize(clientId);
         ChannelBuilder channelBuilder = null;
         Selector selector = null;
         ApiVersions apiVersions = new ApiVersions();
@@ -303,7 +301,7 @@ public class KafkaAdminClient extends AdminClient {
                     config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG), 
true);
             List<MetricsReporter> reporters = 
config.getConfiguredInstances(AdminClientConfig.METRIC_REPORTER_CLASSES_CONFIG,
                 MetricsReporter.class);
-            Map<String, String> metricTags = 
Collections.singletonMap("client-id", sanitizedClientId);
+            Map<String, String> metricTags = 
Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(AdminClientConfig.METRICS_NUM_SAMPLES_CONFIG))
                 
.timeWindow(config.getLong(AdminClientConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
                 
.recordLevel(Sensor.RecordingLevel.forName(config.getString(AdminClientConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -328,7 +326,7 @@ public class KafkaAdminClient extends AdminClient {
                 true,
                 apiVersions,
                 logContext);
-            return new KafkaAdminClient(config, clientId, sanitizedClientId, 
time, metadata, metrics, networkClient,
+            return new KafkaAdminClient(config, clientId, time, metadata, 
metrics, networkClient,
                 timeoutProcessorFactory, logContext);
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
@@ -345,7 +343,7 @@ public class KafkaAdminClient extends AdminClient {
 
         try {
             metrics = new Metrics(new MetricConfig(), new 
LinkedList<MetricsReporter>(), time);
-            return new KafkaAdminClient(config, clientId, 
Sanitizer.sanitize(clientId), time, metadata, metrics, client, null,
+            return new KafkaAdminClient(config, clientId, time, metadata, 
metrics, client, null,
                     createLogContext(clientId));
         } catch (Throwable exc) {
             closeQuietly(metrics, "Metrics");
@@ -357,7 +355,7 @@ public class KafkaAdminClient extends AdminClient {
         return new LogContext("[AdminClient clientId=" + clientId + "] ");
     }
 
-    private KafkaAdminClient(AdminClientConfig config, String clientId, String 
sanitizedClientId, Time time, Metadata metadata,
+    private KafkaAdminClient(AdminClientConfig config, String clientId, Time 
time, Metadata metadata,
                      Metrics metrics, KafkaClient client, 
TimeoutProcessorFactory timeoutProcessorFactory,
                      LogContext logContext) {
         this.defaultTimeoutMs = 
config.getInt(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG);
@@ -377,7 +375,7 @@ public class KafkaAdminClient extends AdminClient {
             new TimeoutProcessorFactory() : timeoutProcessorFactory;
         this.maxRetries = config.getInt(AdminClientConfig.RETRIES_CONFIG);
         config.logUnused();
-        AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, metrics);
+        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
         log.debug("Kafka admin client initialized");
         thread.start();
     }
@@ -418,7 +416,7 @@ public class KafkaAdminClient extends AdminClient {
             // Wait for the thread to be joined.
             thread.join();
 
-            AppInfoParser.unregisterAppInfo(JMX_PREFIX, 
Sanitizer.sanitize(clientId), metrics);
+            AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
 
             log.debug("Kafka admin client closed.");
         } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6fb6919..9547aee 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -42,7 +42,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Sanitizer;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
@@ -647,7 +646,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (clientId.isEmpty())
                 clientId = "consumer-" + 
CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
             this.clientId = clientId;
-            String sanitizedClientId = Sanitizer.sanitize(this.clientId);
             String groupId = config.getString(ConsumerConfig.GROUP_ID_CONFIG);
 
             LogContext logContext = new LogContext("[Consumer clientId=" + 
clientId + ", groupId=" + groupId + "] ");
@@ -661,7 +659,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                 throw new 
ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater 
than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + 
ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
             this.time = Time.SYSTEM;
 
-            Map<String, String> metricsTags = 
Collections.singletonMap("client-id", sanitizedClientId);
+            Map<String, String> metricsTags = 
Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
                     
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -772,7 +770,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     isolationLevel);
 
             config.logUnused();
-            AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, 
metrics);
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
 
             log.debug("Kafka consumer initialized");
         } catch (Throwable t) {
@@ -1739,7 +1737,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         ClientUtils.closeQuietly(client, "consumer network client", 
firstException);
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", 
firstException);
         ClientUtils.closeQuietly(valueDeserializer, "consumer value 
deserializer", firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, 
Sanitizer.sanitize(clientId), metrics);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         log.debug("Kafka consumer has been closed");
         Throwable exception = firstException.get();
         if (exception != null && !swallowException) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a202217..b6c0a53 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -51,7 +51,6 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
-import org.apache.kafka.common.metrics.Sanitizer;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
@@ -317,7 +316,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             if (clientId.length() <= 0)
                 clientId = "producer-" + 
PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
             this.clientId = clientId;
-            String sanitizedClientId = Sanitizer.sanitize(clientId);
 
             String transactionalId = 
userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?
                     (String) 
userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;
@@ -329,7 +327,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             log = logContext.logger(KafkaProducer.class);
             log.trace("Starting the Kafka producer");
 
-            Map<String, String> metricTags = 
Collections.singletonMap("client-id", sanitizedClientId);
+            Map<String, String> metricTags = 
Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), 
TimeUnit.MILLISECONDS)
                     
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
@@ -427,7 +425,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.ioThread.start();
             this.errors = this.metrics.sensor("errors");
             config.logUnused();
-            AppInfoParser.registerAppInfo(JMX_PREFIX, sanitizedClientId, 
metrics);
+            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
             log.debug("Kafka producer started");
         } catch (Throwable t) {
             // call close methods if internal objects are already constructed 
this is to prevent resource leak. see KAFKA-2121
@@ -1075,7 +1073,7 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
         ClientUtils.closeQuietly(keySerializer, "producer keySerializer", 
firstException);
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", 
firstException);
         ClientUtils.closeQuietly(partitioner, "producer partitioner", 
firstException);
-        AppInfoParser.unregisterAppInfo(JMX_PREFIX, 
Sanitizer.sanitize(clientId), metrics);
+        AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         log.debug("Kafka producer has been closed");
         if (firstException.get() != null && !swallowException)
             throw new KafkaException("Failed to close kafka producer", 
firstException.get());

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 294e1d8..fda37d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -37,6 +37,7 @@ import javax.management.ReflectionException;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.utils.Sanitizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -133,7 +134,7 @@ public class JmxReporter implements MetricsReporter {
             mBeanName.append(",");
             mBeanName.append(entry.getKey());
             mBeanName.append("=");
-            mBeanName.append(entry.getValue());
+            mBeanName.append(Sanitizer.sanitize(entry.getValue()));
         }
         return mBeanName.toString();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
deleted file mode 100644
index b98a426..0000000
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Sanitizer.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.metrics;
-
-import java.io.UnsupportedEncodingException;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.kafka.common.KafkaException;
-
-/**
- * Utility class for sanitizing/desanitizing user principal and client-ids
- * to a safe value for use in MetricName and as Zookeeper node name
- */
-public class Sanitizer {
-
-    public static String sanitize(String name) {
-        String encoded = "";
-        try {
-            encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name());
-            StringBuilder builder = new StringBuilder();
-            for (int i = 0; i < encoded.length(); i++) {
-                char c = encoded.charAt(i);
-                if (c == '*') {         // Metric ObjectName treats * as 
pattern
-                    builder.append("%2A");
-                } else if (c == '+') {  // Space URL-encoded as +, replace 
with percent encoding
-                    builder.append("%20");
-                } else {
-                    builder.append(c);
-                }
-            }
-            return builder.toString();
-        } catch (UnsupportedEncodingException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-    public static String desanitize(String name) {
-        try {
-            return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
-        } catch (UnsupportedEncodingException e) {
-            throw new KafkaException(e);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java 
b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
index 9a1bab8..42cf312 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sanitizer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,7 +70,7 @@ public class AppInfoParser {
     public static synchronized void unregisterAppInfo(String prefix, String 
id, Metrics metrics) {
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         try {
-            ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + 
id);
+            ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + 
Sanitizer.sanitize(id));
             if (server.isRegistered(name))
                 server.unregisterMBean(name);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java 
b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
new file mode 100644
index 0000000..0b68d0c
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Sanitizer.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLDecoder;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.kafka.common.KafkaException;
+
+/**
+ * Utility class for sanitizing/desanitizing user principal and client-ids
+ * to a safe value for use in JMX metric names and as Zookeeper node name
+ */
+public class Sanitizer {
+
+    public static String sanitize(String name) {
+        String encoded = "";
+        try {
+            encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name());
+            StringBuilder builder = new StringBuilder();
+            for (int i = 0; i < encoded.length(); i++) {
+                char c = encoded.charAt(i);
+                if (c == '*') {         // Metric ObjectName treats * as 
pattern
+                    builder.append("%2A");
+                } else if (c == '+') {  // Space URL-encoded as +, replace 
with percent encoding
+                    builder.append("%20");
+                } else {
+                    builder.append(c);
+                }
+            }
+            return builder.toString();
+        } catch (UnsupportedEncodingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    public static String desanitize(String name) {
+        try {
+            return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
+        } catch (UnsupportedEncodingException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index 3f09e08..3b39db6 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -20,19 +20,80 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.junit.Test;
 
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 public class JmxReporterTest {
 
     @Test
     public void testJmxRegistration() throws Exception {
         Metrics metrics = new Metrics();
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         try {
             metrics.addReporter(new JmxReporter());
+
+            assertFalse(server.isRegistered(new ObjectName(":type=grp1")));
+
             Sensor sensor = metrics.sensor("kafka.requests");
             sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new 
Avg());
             sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new 
Total());
-            Sensor sensor2 = metrics.sensor("kafka.blah");
-            sensor2.add(metrics.metricName("pack.bean1.some", "grp1"), new 
Total());
-            sensor2.add(metrics.metricName("pack.bean2.some", "grp1"), new 
Total());
+
+            assertTrue(server.isRegistered(new ObjectName(":type=grp1")));
+            assertEquals(0.0, server.getAttribute(new 
ObjectName(":type=grp1"), "pack.bean1.avg"));
+            assertTrue(server.isRegistered(new ObjectName(":type=grp2")));
+            assertEquals(0.0, server.getAttribute(new 
ObjectName(":type=grp2"), "pack.bean2.total"));
+
+            metrics.removeMetric(metrics.metricName("pack.bean1.avg", "grp1"));
+
+            assertFalse(server.isRegistered(new ObjectName(":type=grp1")));
+            assertTrue(server.isRegistered(new ObjectName(":type=grp2")));
+            assertEquals(0.0, server.getAttribute(new 
ObjectName(":type=grp2"), "pack.bean2.total"));
+
+            metrics.removeMetric(metrics.metricName("pack.bean2.total", 
"grp2"));
+
+            assertFalse(server.isRegistered(new ObjectName(":type=grp1")));
+            assertFalse(server.isRegistered(new ObjectName(":type=grp2")));
+        } finally {
+            metrics.close();
+        }
+    }
+
+    @Test
+    public void testJmxRegistrationSanitization() throws Exception {
+        Metrics metrics = new Metrics();
+        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        try {
+            metrics.addReporter(new JmxReporter());
+
+            Sensor sensor = metrics.sensor("kafka.requests");
+            sensor.add(metrics.metricName("name", "group", "desc", "id", 
"foo*"), new Total());
+            sensor.add(metrics.metricName("name", "group", "desc", "id", 
"foo+"), new Total());
+            sensor.add(metrics.metricName("name", "group", "desc", "id", 
"foo?"), new Total());
+            sensor.add(metrics.metricName("name", "group", "desc", "id", 
"foo:"), new Total());
+
+            assertTrue(server.isRegistered(new 
ObjectName(":type=group,id=foo%2A")));
+            assertEquals(0.0, server.getAttribute(new 
ObjectName(":type=group,id=foo%2A"), "name"));
+            assertTrue(server.isRegistered(new 
ObjectName(":type=group,id=foo%2B")));
+            assertEquals(0.0, server.getAttribute(new 
ObjectName(":type=group,id=foo%2B"), "name"));
+            assertTrue(server.isRegistered(new 
ObjectName(":type=group,id=foo%3F")));
+            assertEquals(0.0, server.getAttribute(new 
ObjectName(":type=group,id=foo%3F"), "name"));
+            assertTrue(server.isRegistered(new 
ObjectName(":type=group,id=foo%3A")));
+            assertEquals(0.0, server.getAttribute(new 
ObjectName(":type=group,id=foo%3A"), "name"));
+
+            metrics.removeMetric(metrics.metricName("name", "group", "desc", 
"id", "foo*"));
+            metrics.removeMetric(metrics.metricName("name", "group", "desc", 
"id", "foo+"));
+            metrics.removeMetric(metrics.metricName("name", "group", "desc", 
"id", "foo?"));
+            metrics.removeMetric(metrics.metricName("name", "group", "desc", 
"id", "foo:"));
+
+            assertFalse(server.isRegistered(new 
ObjectName(":type=group,id=foo%2A")));
+            assertFalse(server.isRegistered(new 
ObjectName(":type=group,id=foo%2B")));
+            assertFalse(server.isRegistered(new 
ObjectName(":type=group,id=foo%3F")));
+            assertFalse(server.isRegistered(new 
ObjectName(":type=group,id=foo%3A")));
         } finally {
             metrics.close();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java 
b/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
deleted file mode 100644
index d66bda1..0000000
--- a/clients/src/test/java/org/apache/kafka/common/metrics/SanitizerTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.metrics;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.UnsupportedEncodingException;
-
-import org.junit.Test;
-
-public class SanitizerTest {
-
-    @Test
-    public void testSanitize() throws UnsupportedEncodingException {
-        String principal = "CN=Some characters !@#$%&*()_-+=';:,/~";
-        String sanitizedPrincipal = Sanitizer.sanitize(principal);
-        assertTrue(sanitizedPrincipal.replace('%', 
'_').matches("[a-zA-Z0-9\\._\\-]+"));
-        assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
new file mode 100644
index 0000000..dd384ee
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/SanitizerTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.UnsupportedEncodingException;
+
+import org.junit.Test;
+
+public class SanitizerTest {
+
+    @Test
+    public void testSanitize() throws UnsupportedEncodingException {
+        String principal = "CN=Some characters !@#$%&*()_-+=';:,/~";
+        String sanitizedPrincipal = Sanitizer.sanitize(principal);
+        assertTrue(sanitizedPrincipal.replace('%', 
'_').matches("[a-zA-Z0-9\\._\\-]+"));
+        assertEquals(principal, Sanitizer.desanitize(sanitizedPrincipal));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
index 3cd1eae..5bbe148 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectMetrics.java
@@ -64,7 +64,7 @@ public class ConnectMetrics {
      * @param time     the time; may not be null
      */
     public ConnectMetrics(String workerId, WorkerConfig config, Time time) {
-        this.workerId = makeValidName(workerId);
+        this.workerId = workerId;
         this.time = time;
 
         MetricConfig metricConfig = new 
MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
@@ -111,8 +111,7 @@ public class ConnectMetrics {
      * Get or create a {@link MetricGroup} with the specified group name and 
the given tags.
      * Each group is uniquely identified by the name and tags.
      *
-     * @param groupName    the name of the metric group; may not be null and 
must be a
-     *                     {@link #checkNameIsValid(String) valid name}
+     * @param groupName    the name of the metric group; may not be null
      * @param tagKeyValues pairs of tag name and values
      * @return the {@link MetricGroup} that can be used to create metrics; 
never null
      * @throws IllegalArgumentException if the group name is not valid
@@ -130,7 +129,6 @@ public class ConnectMetrics {
     }
 
     protected MetricGroupId groupId(String groupName, String... tagKeyValues) {
-        checkNameIsValid(groupName);
         Map<String, String> tags = tags(tagKeyValues);
         return new MetricGroupId(groupName, tags);
     }
@@ -262,7 +260,6 @@ public class ConnectMetrics {
          * @throws IllegalArgumentException if the name is not valid
          */
         public MetricName metricName(MetricNameTemplate template) {
-            checkNameIsValid(template.name());
             return metrics.metricInstance(template, groupId.tags());
         }
 
@@ -428,8 +425,7 @@ public class ConnectMetrics {
     }
 
     /**
-     * Create a set of tags using the supplied key and value pairs. Every tag 
name and value will be
-     * {@link #makeValidName(String) made valid} before it is used. The order 
of the tags will be kept.
+     * Create a set of tags using the supplied key and value pairs. The order 
of the tags will be kept.
      *
      * @param keyValue the key and value pairs for the tags; must be an even 
number
      * @return the map of tags that can be supplied to the {@link Metrics} 
methods; never null
@@ -439,49 +435,18 @@ public class ConnectMetrics {
             throw new IllegalArgumentException("keyValue needs to be specified 
in pairs");
         Map<String, String> tags = new LinkedHashMap<>();
         for (int i = 0; i < keyValue.length; i += 2) {
-            tags.put(makeValidName(keyValue[i]), makeValidName(keyValue[i + 
1]));
+            tags.put(keyValue[i], keyValue[i + 1]);
         }
         return tags;
     }
 
     /**
-     * Utility to ensure the supplied name contains valid characters, 
replacing with a single '-' sequences of
-     * 1 or more characters <em>other than</em> word characters (e.g., 
"[a-zA-Z_0-9]").
-     *
-     * @param name the name; may not be null
-     * @return the validated name; never null
-     */
-    static String makeValidName(String name) {
-        Objects.requireNonNull(name);
-        name = name.trim();
-        if (!name.isEmpty()) {
-            name = name.replaceAll("[^\\w]+", "-");
-        }
-        return name;
-    }
-
-    /**
-     * Utility method that determines whether the supplied name contains only 
"[a-zA-Z0-9_-]" characters and thus
-     * would be unchanged by {@link #makeValidName(String)}.
-     *
-     * @param name the name; may not be null
-     * @return true if the name is valid, or false otherwise
-     * @throws IllegalArgumentException if the name is not valid
-     */
-    static void checkNameIsValid(String name) {
-        if (!name.equals(makeValidName(name))) {
-            throw new IllegalArgumentException("The name '" + name + "' 
contains at least one invalid character");
-        }
-    }
-
-    /**
      * Utility to generate the documentation for the Connect metrics.
      *
      * @param args the arguments
      */
     public static void main(String[] args) {
         ConnectMetricsRegistry metrics = new ConnectMetricsRegistry();
-        System.out.println(Metrics.toHtmlTable("kafka.connect", 
metrics.getAllTemplates()));
+        System.out.println(Metrics.toHtmlTable(JMX_PREFIX, 
metrics.getAllTemplates()));
     }
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
index a16ab41..2de7cb6 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectMetricsTest.java
@@ -57,30 +57,6 @@ public class ConnectMetricsTest {
     }
 
     @Test
-    public void testValidatingNameWithAllValidCharacters() {
-        String name = 
"abcdefghijklmnopqrstuvwxyz_ABCDEFGHIJKLMNOPQRSTUVWXYZ-0123456789";
-        assertEquals(name, ConnectMetrics.makeValidName(name));
-    }
-
-    @Test
-    public void testValidatingEmptyName() {
-        String name = "";
-        assertSame(name, ConnectMetrics.makeValidName(name));
-    }
-
-    @Test(expected = NullPointerException.class)
-    public void testValidatingNullName() {
-        ConnectMetrics.makeValidName(null);
-    }
-
-    @Test
-    public void testValidatingNameWithInvalidCharacters() {
-        assertEquals("a-b-c-d-e-f-g-h-i-j-k", 
ConnectMetrics.makeValidName("a:b;c/d\\e,f*.--..;;g?h[i]j=k"));
-        assertEquals("-a-b-c-d-e-f-g-h-", 
ConnectMetrics.makeValidName(":a:b;c/d\\e,f*g?[]=h:"));
-        assertEquals("a-f-h", ConnectMetrics.makeValidName("a:;/\\,f*?h"));
-    }
-
-    @Test
     public void testKafkaMetricsNotNull() {
         assertNotNull(metrics.metrics());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 306d64a..febf40f 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -18,6 +18,7 @@
 package kafka.admin
 
 import java.util.Properties
+
 import joptsimple._
 import kafka.common.Config
 import kafka.common.InvalidConfigException
@@ -27,8 +28,8 @@ import kafka.utils.{CommandLineUtils, ZkUtils}
 import kafka.utils.Implicits._
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.scram._
-import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.common.metrics.Sanitizer
+import org.apache.kafka.common.utils.{Sanitizer, Utils}
+
 import scala.collection._
 import scala.collection.JavaConverters._
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index ec16ab0..a4ec5e3 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -26,12 +26,11 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.network.RequestChannel.{BaseRequest, SendAction, ShutdownRequest, 
NoOpAction, CloseConnectionAction}
 import kafka.utils.{Logging, NotNothing}
 import org.apache.kafka.common.memory.MemoryPool
-import org.apache.kafka.common.metrics.Sanitizer
 import org.apache.kafka.common.network.Send
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests._
 import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Sanitizer, Time}
 import org.apache.log4j.Logger
 
 import scala.collection.mutable

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index afaa5dd..5d0b966 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -19,11 +19,11 @@ package kafka.server
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
-import kafka.utils.{ShutdownableThread, Logging}
+import kafka.utils.{Logging, ShutdownableThread}
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
-import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg}
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.metrics.stats.{Avg, Rate, Total}
+import org.apache.kafka.common.utils.{Sanitizer, Time}
 
 import scala.collection.JavaConverters._
 
@@ -61,9 +61,9 @@ object ClientQuotaManagerConfig {
   val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
 
   val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
-  val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default))
-  val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None)
-  val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default))
+  val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default))
+  val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None, None)
+  val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
 }
 
 object QuotaTypes {
@@ -73,9 +73,9 @@ object QuotaTypes {
   val UserClientIdQuotaEnabled = 4
 }
 
-case class QuotaId(sanitizedUser: Option[String], sanitizedClientId: 
Option[String])
+case class QuotaId(sanitizedUser: Option[String], clientId: Option[String], 
sanitizedClientId: Option[String])
 
-case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, 
sanitizedClientId: String, quota: Quota)
+case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: 
String, sanitizedClientId: String, quota: Quota)
 
 /**
  * Helper class that records per-client metrics. It is also responsible for 
maintaining Quota usage statistics
@@ -187,7 +187,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       case _: QuotaViolationException =>
         // Compute the delay
         val clientQuotaEntity = clientSensors.quotaEntity
-        val clientMetric = 
metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.sanitizedClientId))
+        val clientMetric = 
metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.clientId))
         throttleTimeMs = throttleTime(clientMetric, 
getQuotaMetricConfig(clientQuotaEntity.quota)).toInt
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
@@ -213,33 +213,33 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    * and the associated quota override or default quota.
    *
    */
-  private def quotaEntity(sanitizedUser: String, sanitizedClientId: String) : 
QuotaEntity = {
+  private def quotaEntity(sanitizedUser: String, clientId: String, 
sanitizedClientId: String) : QuotaEntity = {
     quotaTypesEnabled match {
       case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
-        val quotaId = QuotaId(None, Some(sanitizedClientId))
+        val quotaId = QuotaId(None, Some(clientId), Some(sanitizedClientId))
         var quota = overriddenQuota.get(quotaId)
         if (quota == null) {
           quota = 
overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId)
           if (quota == null)
             quota = staticConfigClientIdQuota
         }
-        QuotaEntity(quotaId, "", sanitizedClientId, quota)
+        QuotaEntity(quotaId, "", clientId, sanitizedClientId, quota)
       case QuotaTypes.UserQuotaEnabled =>
-        val quotaId = QuotaId(Some(sanitizedUser), None)
+        val quotaId = QuotaId(Some(sanitizedUser), None, None)
         var quota = overriddenQuota.get(quotaId)
         if (quota == null) {
           quota = 
overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserQuotaId)
           if (quota == null)
             quota = ClientQuotaManagerConfig.UnlimitedQuota
         }
-        QuotaEntity(quotaId, sanitizedUser, "", quota)
+        QuotaEntity(quotaId, sanitizedUser, "", "", quota)
       case QuotaTypes.UserClientIdQuotaEnabled =>
-        val quotaId = QuotaId(Some(sanitizedUser), Some(sanitizedClientId))
+        val quotaId = QuotaId(Some(sanitizedUser), Some(clientId), 
Some(sanitizedClientId))
         var quota = overriddenQuota.get(quotaId)
         if (quota == null) {
-          quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), 
Some(ConfigEntityName.Default)))
+          quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default)))
           if (quota == null) {
-            quota = 
overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), 
Some(sanitizedClientId)))
+            quota = 
overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId), 
Some(sanitizedClientId)))
             if (quota == null) {
               quota = 
overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId)
               if (quota == null)
@@ -247,17 +247,17 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
             }
           }
         }
-        QuotaEntity(quotaId, sanitizedUser, sanitizedClientId, quota)
+        QuotaEntity(quotaId, sanitizedUser, clientId, sanitizedClientId, quota)
       case _ =>
-        quotaEntityWithMultipleQuotaLevels(sanitizedUser, sanitizedClientId)
+        quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId, 
sanitizedClientId)
     }
   }
 
-  private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, 
sanitizerClientId: String) : QuotaEntity = {
-    val userClientQuotaId = QuotaId(Some(sanitizedUser), 
Some(sanitizerClientId))
+  private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, 
clientId: String, sanitizerClientId: String) : QuotaEntity = {
+    val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId), 
Some(sanitizerClientId))
 
-    val userQuotaId = QuotaId(Some(sanitizedUser), None)
-    val clientQuotaId = QuotaId(None, Some(sanitizerClientId))
+    val userQuotaId = QuotaId(Some(sanitizedUser), None, None)
+    val clientQuotaId = QuotaId(None, Some(clientId), Some(sanitizerClientId))
     var quotaId = userClientQuotaId
     var quotaConfigId = userClientQuotaId
     // 1) /config/users/<user>/clients/<client-id>
@@ -265,7 +265,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     if (quota == null) {
       // 2) /config/users/<user>/clients/<default>
       quotaId = userClientQuotaId
-      quotaConfigId = QuotaId(Some(sanitizedUser), 
Some(ConfigEntityName.Default))
+      quotaConfigId = QuotaId(Some(sanitizedUser), 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
       quota = overriddenQuota.get(quotaConfigId)
 
       if (quota == null) {
@@ -277,31 +277,31 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
         if (quota == null) {
           // 4) /config/users/<default>/clients/<client-id>
           quotaId = userClientQuotaId
-          quotaConfigId = QuotaId(Some(ConfigEntityName.Default), 
Some(sanitizerClientId))
+          quotaConfigId = QuotaId(Some(ConfigEntityName.Default), 
Some(clientId), Some(sanitizerClientId))
           quota = overriddenQuota.get(quotaConfigId)
 
           if (quota == null) {
             // 5) /config/users/<default>/clients/<default>
             quotaId = userClientQuotaId
-            quotaConfigId = QuotaId(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default))
+            quotaConfigId = QuotaId(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
             quota = overriddenQuota.get(quotaConfigId)
 
             if (quota == null) {
               // 6) /config/users/<default>
               quotaId = userQuotaId
-              quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None)
+              quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None, 
None)
               quota = overriddenQuota.get(quotaConfigId)
 
               if (quota == null) {
                 // 7) /config/clients/<client-id>
                 quotaId = clientQuotaId
-                quotaConfigId = QuotaId(None, Some(sanitizerClientId))
+                quotaConfigId = QuotaId(None, Some(clientId), 
Some(sanitizerClientId))
                 quota = overriddenQuota.get(quotaConfigId)
 
                 if (quota == null) {
                   // 8) /config/clients/<default>
                   quotaId = clientQuotaId
-                  quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default))
+                  quotaConfigId = QuotaId(None, 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
                   quota = overriddenQuota.get(quotaConfigId)
 
                   if (quota == null) {
@@ -317,8 +317,9 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       }
     }
     val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser
-    val quotaClientId = if (quotaId == userQuotaId) "" else sanitizerClientId
-    QuotaEntity(quotaId, quotaUser, quotaClientId, quota)
+    val quotaClientId = if (quotaId == userQuotaId) "" else clientId
+    val quotaSanitizedClientId = if (quotaId == userQuotaId) "" else 
sanitizerClientId
+    QuotaEntity(quotaId, quotaUser, quotaClientId, sanitizerClientId, quota)
   }
 
   /**
@@ -327,7 +328,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    * Note: this method is expensive, it is meant to be used by tests only
    */
   def quota(user: String, clientId: String) = {
-    quotaEntity(Sanitizer.sanitize(user), Sanitizer.sanitize(clientId)).quota
+    quotaEntity(Sanitizer.sanitize(user), clientId, 
Sanitizer.sanitize(clientId)).quota
   }
 
   /*
@@ -361,14 +362,14 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    */
   def getOrCreateQuotaSensors(sanitizedUser: String, clientId: String): 
ClientSensors = {
     val sanitizedClientId = Sanitizer.sanitize(clientId)
-    val clientQuotaEntity = quotaEntity(sanitizedUser, sanitizedClientId)
+    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId, 
sanitizedClientId)
     // Names of the sensors to access
     ClientSensors(
       clientQuotaEntity,
       sensorAccessor.getOrCreate(
         getQuotaSensorName(clientQuotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
-        clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.sanitizedClientId),
+        clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.clientId),
         Some(getQuotaMetricConfig(clientQuotaEntity.quota)),
         new Rate
       ),
@@ -381,9 +382,9 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     )
   }
 
-  private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType 
+ "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + 
quotaId.sanitizedClientId.getOrElse("")
+  private def getThrottleTimeSensorName(quotaId: QuotaId): String = quotaType 
+ "ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + 
quotaId.clientId.getOrElse("")
 
-  private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + 
quotaId.sanitizedUser.getOrElse("") + ':' + 
quotaId.sanitizedClientId.getOrElse("")
+  private def getQuotaSensorName(quotaId: QuotaId): String = quotaType + "-" + 
quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
 
   protected def getQuotaMetricConfig(quota: Quota): MetricConfig = {
     new MetricConfig()
@@ -406,10 +407,11 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    * Overrides quotas for <user>, <client-id> or <user, client-id> or the 
dynamic defaults
    * for any of these levels.
    * @param sanitizedUser user to override if quota applies to <user> or 
<user, client-id>
-   * @param sanitizedClientId client to override if quota applies to 
<client-id> or <user, client-id>
+   * @param clientId client to override if quota applies to <client-id> or 
<user, client-id>
+   * @param sanitizedClientId sanitized client ID to override if quota applies 
to <client-id> or <user, client-id>
    * @param quota custom quota to apply or None if quota override is being 
removed
    */
-  def updateQuota(sanitizedUser: Option[String], sanitizedClientId: 
Option[String], quota: Option[Quota]) {
+  def updateQuota(sanitizedUser: Option[String], clientId: Option[String], 
sanitizedClientId: Option[String], quota: Option[Quota]) {
     /*
      * Acquire the write lock to apply changes in the quota objects.
      * This method changes the quota in the overriddenQuota map and applies 
the update on the actual KafkaMetric object (if it exists).
@@ -419,13 +421,13 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
      */
     lock.writeLock().lock()
     try {
-      val quotaId = QuotaId(sanitizedUser, sanitizedClientId)
+      val quotaId = QuotaId(sanitizedUser, clientId, sanitizedClientId)
       val userInfo = sanitizedUser match {
         case Some(ConfigEntityName.Default) => "default user "
         case Some(user) => "user " + user + " "
         case None => ""
       }
-      val clientIdInfo = sanitizedClientId match {
+      val clientIdInfo = clientId match {
         case Some(ConfigEntityName.Default) => "default client-id"
         case Some(id) => "client-id " + id
         case None => ""
@@ -434,7 +436,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
         case Some(newQuota) =>
           logger.info(s"Changing ${quotaType} quota for 
${userInfo}${clientIdInfo} to $newQuota.bound}")
           overriddenQuota.put(quotaId, newQuota)
-          (sanitizedUser, sanitizedClientId) match {
+          (sanitizedUser, clientId) match {
             case (Some(_), Some(_)) => quotaTypesEnabled |= 
QuotaTypes.UserClientIdQuotaEnabled
             case (Some(_), None) => quotaTypesEnabled |= 
QuotaTypes.UserQuotaEnabled
             case (None, Some(_)) => quotaTypesEnabled |= 
QuotaTypes.ClientIdQuotaEnabled
@@ -445,21 +447,21 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
           overriddenQuota.remove(quotaId)
       }
 
-      val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), 
sanitizedClientId.getOrElse(""))
+      val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), 
clientId.getOrElse(""))
       val allMetrics = metrics.metrics()
 
       // If multiple-levels of quotas are defined or if this is a default 
quota update, traverse metrics
       // to find all affected values. Otherwise, update just the single 
matching one.
       val singleUpdate = quotaTypesEnabled match {
         case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | 
QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
-          !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && 
!sanitizedClientId.filter(_ == ConfigEntityName.Default).isDefined
+          !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && 
!clientId.filter(_ == ConfigEntityName.Default).isDefined
         case _ => false
       }
       if (singleUpdate) {
           // Change the underlying metric config if the sensor has been created
           val metric = allMetrics.get(quotaMetricName)
           if (metric != null) {
-            val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), 
sanitizedClientId.getOrElse(""))
+            val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), 
clientId.getOrElse(""), sanitizedClientId.getOrElse(""))
             val newQuota = metricConfigEntity.quota
             logger.info(s"Sensor for ${userInfo}${clientIdInfo} already 
exists. Changing quota to ${newQuota.bound()} in MetricConfig")
             metric.config(getQuotaMetricConfig(newQuota))
@@ -469,7 +471,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
             case (metricName, metric) =>
               val userTag = if (metricName.tags.containsKey("user")) 
metricName.tags.get("user") else ""
               val clientIdTag = if (metricName.tags.containsKey("client-id")) 
metricName.tags.get("client-id") else ""
-              val metricConfigEntity = quotaEntity(userTag, clientIdTag)
+              val metricConfigEntity = quotaEntity(userTag, clientIdTag, 
Sanitizer.sanitize(clientIdTag))
               if (metricConfigEntity.quota != metric.config.quota) {
                 val newQuota = metricConfigEntity.quota
                 logger.info(s"Sensor for quota-id 
${metricConfigEntity.quotaId} already exists. Setting quota to 
${newQuota.bound} in MetricConfig")
@@ -483,11 +485,11 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
-  protected def clientRateMetricName(sanitizedUser: String, sanitizedClientId: 
String): MetricName = {
+  protected def clientRateMetricName(sanitizedUser: String, clientId: String): 
MetricName = {
     metrics.metricName("byte-rate", quotaType.toString,
                    "Tracking byte-rate per user/client-id",
                    "user", sanitizedUser,
-                   "client-id", sanitizedClientId)
+                   "client-id", clientId)
   }
 
   private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
@@ -495,7 +497,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
                        quotaType.toString,
                        "Tracking average throttle-time per user/client-id",
                        "user", quotaEntity.sanitizedUser,
-                       "client-id", quotaEntity.sanitizedClientId)
+                       "client-id", quotaEntity.clientId)
   }
 
   def shutdown() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
index d2114dc..f454483 100644
--- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
@@ -64,11 +64,11 @@ class ClientRequestQuotaManager(private val config: 
ClientQuotaManagerConfig,
     math.min(super.throttleTime(clientMetric, config), maxThrottleTimeMs)
   }
 
-  override protected def clientRateMetricName(sanitizedUser: String, 
sanitizedClientId: String): MetricName = {
+  override protected def clientRateMetricName(sanitizedUser: String, clientId: 
String): MetricName = {
     metrics.metricName("request-time", QuotaType.Request.toString,
                    "Tracking request-time per user/client-id",
                    "user", sanitizedUser,
-                   "client-id", sanitizedClientId)
+                   "client-id", clientId)
   }
 
   private def exemptMetricName: MetricName = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 6f85801..ddeecb0 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -29,8 +29,9 @@ import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.common.config.ConfigDef.Validator
 import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.metrics.{Quota, Sanitizer}
+import org.apache.kafka.common.metrics.Quota
 import org.apache.kafka.common.metrics.Quota._
+import org.apache.kafka.common.utils.Sanitizer
 
 import scala.collection.JavaConverters._
 
@@ -118,24 +119,25 @@ class TopicConfigHandler(private val logManager: 
LogManager, kafkaConfig: KafkaC
 class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
 
   def updateQuotaConfig(sanitizedUser: Option[String], sanitizedClientId: 
Option[String], config: Properties) {
+    val clientId = sanitizedClientId.map(Sanitizer.desanitize)
     val producerQuota =
       if 
(config.containsKey(DynamicConfig.Client.ProducerByteRateOverrideProp))
         Some(new 
Quota(config.getProperty(DynamicConfig.Client.ProducerByteRateOverrideProp).toLong,
 true))
       else
         None
-    quotaManagers.produce.updateQuota(sanitizedUser, sanitizedClientId, 
producerQuota)
+    quotaManagers.produce.updateQuota(sanitizedUser, clientId, 
sanitizedClientId, producerQuota)
     val consumerQuota =
       if 
(config.containsKey(DynamicConfig.Client.ConsumerByteRateOverrideProp))
         Some(new 
Quota(config.getProperty(DynamicConfig.Client.ConsumerByteRateOverrideProp).toLong,
 true))
       else
         None
-    quotaManagers.fetch.updateQuota(sanitizedUser, sanitizedClientId, 
consumerQuota)
+    quotaManagers.fetch.updateQuota(sanitizedUser, clientId, 
sanitizedClientId, consumerQuota)
     val requestQuota =
       if 
(config.containsKey(DynamicConfig.Client.RequestPercentageOverrideProp))
         Some(new 
Quota(config.getProperty(DynamicConfig.Client.RequestPercentageOverrideProp).toDouble,
 true))
       else
         None
-    quotaManagers.request.updateQuota(sanitizedUser, sanitizedClientId, 
requestQuota)
+    quotaManagers.request.updateQuota(sanitizedUser, clientId, 
sanitizedClientId, requestQuota)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala 
b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 634b0c2..69f9e96 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -28,7 +28,7 @@ import kafka.admin.AdminUtils
 import kafka.utils.json.JsonObject
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.security.scram.ScramMechanism
-import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.utils.{Sanitizer, Time}
 
 /**
  * Represents all the entities that can be configured via ZK

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index e8967d1..d821f52 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -22,7 +22,7 @@ import org.apache.kafka.clients.consumer.{ConsumerConfig, 
KafkaConsumer}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
 import org.apache.kafka.common.{MetricName, TopicPartition}
-import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sanitizer}
+import org.apache.kafka.common.metrics.{KafkaMetric, Quota}
 import org.junit.Assert._
 import org.junit.{Before, Test}
 
@@ -210,7 +210,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
 
   private def verifyProducerThrottleTimeMetric(producer: KafkaProducer[_, _]) {
     val tags = new HashMap[String, String]
-    tags.put("client-id", Sanitizer.sanitize(producerClientId))
+    tags.put("client-id", producerClientId)
     val avgMetric = producer.metrics.get(new 
MetricName("produce-throttle-time-avg", "producer-metrics", "", tags))
     val maxMetric = producer.metrics.get(new 
MetricName("produce-throttle-time-max", "producer-metrics", "", tags))
 
@@ -220,7 +220,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
 
   private def verifyConsumerThrottleTimeMetric(consumer: KafkaConsumer[_, _], 
maxThrottleTime: Option[Double] = None) {
     val tags = new HashMap[String, String]
-    tags.put("client-id", Sanitizer.sanitize(consumerClientId))
+    tags.put("client-id", consumerClientId)
     val avgMetric = consumer.metrics.get(new 
MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics", "", 
tags))
     val maxMetric = consumer.metrics.get(new 
MetricName("fetch-throttle-time-max", "consumer-fetch-manager-metrics", "", 
tags))
 
@@ -234,7 +234,7 @@ abstract class BaseQuotaTest extends IntegrationTestHarness 
{
                                   quotaType.toString,
                                   "Tracking throttle-time per user/client-id",
                                   "user", quotaId.sanitizedUser.getOrElse(""),
-                                  "client-id", 
quotaId.sanitizedClientId.getOrElse(""))
+                                  "client-id", quotaId.clientId.getOrElse(""))
   }
 
   def throttleMetric(quotaType: QuotaType, quotaId: QuotaId): KafkaMetric = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
index f5a2cf5..383f139 100644
--- a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -18,8 +18,8 @@ import java.util.Properties
 
 import kafka.admin.AdminUtils
 import kafka.server.{DynamicConfig, KafkaConfig, QuotaId}
-import org.apache.kafka.common.metrics.Sanitizer
 import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.apache.kafka.common.utils.Sanitizer
 import org.junit.Before
 
 class ClientIdQuotaTest extends BaseQuotaTest {
@@ -27,8 +27,8 @@ class ClientIdQuotaTest extends BaseQuotaTest {
   override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName
   override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
   override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
-  override val producerQuotaId = QuotaId(None, 
Some(Sanitizer.sanitize(producerClientId)))
-  override val consumerQuotaId = QuotaId(None, 
Some(Sanitizer.sanitize(consumerClientId)))
+  override val producerQuotaId = QuotaId(None, Some(producerClientId), 
Some(Sanitizer.sanitize(producerClientId)))
+  override val consumerQuotaId = QuotaId(None, Some(consumerClientId), 
Some(Sanitizer.sanitize(consumerClientId)))
 
   @Before
   override def setUp() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
index cb6d376..e25f886 100644
--- a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -20,8 +20,8 @@ import java.util.Properties
 import kafka.admin.AdminUtils
 import kafka.server._
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Sanitizer
 import org.junit.Before
-import org.apache.kafka.common.metrics.Sanitizer
 
 class UserClientIdQuotaTest extends BaseQuotaTest {
 
@@ -31,8 +31,8 @@ class UserClientIdQuotaTest extends BaseQuotaTest {
   override val userPrincipal = "O=A client,CN=localhost"
   override def producerClientId = "QuotasTestProducer-!@#$%^&*()"
   override def consumerClientId = "QuotasTestConsumer-!@#$%^&*()"
-  override def producerQuotaId = 
QuotaId(Some(Sanitizer.sanitize(userPrincipal)), 
Some(Sanitizer.sanitize(producerClientId)))
-  override def consumerQuotaId = 
QuotaId(Some(Sanitizer.sanitize(userPrincipal)), 
Some(Sanitizer.sanitize(consumerClientId)))
+  override def producerQuotaId = 
QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(producerClientId), 
Some(Sanitizer.sanitize(producerClientId)))
+  override def consumerQuotaId = 
QuotaId(Some(Sanitizer.sanitize(userPrincipal)), Some(consumerClientId), 
Some(Sanitizer.sanitize(consumerClientId)))
 
   @Before
   override def setUp() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
index a7bddc5..b5d88c0 100644
--- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -21,8 +21,8 @@ import kafka.admin.AdminUtils
 import kafka.server.{ConfigEntityName, KafkaConfig, QuotaId}
 import kafka.utils.JaasTestUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.utils.Sanitizer
 import org.junit.{After, Before}
-import org.apache.kafka.common.metrics.Sanitizer
 
 class UserQuotaTest extends BaseQuotaTest with SaslSetup {
 
@@ -34,8 +34,8 @@ class UserQuotaTest extends BaseQuotaTest with SaslSetup {
   override protected val clientSaslProperties = 
Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
 
   override val userPrincipal = 
JaasTestUtils.KafkaClientPrincipalUnqualifiedName2
-  override val producerQuotaId = QuotaId(Some(userPrincipal), None)
-  override val consumerQuotaId = QuotaId(Some(userPrincipal), None)
+  override val producerQuotaId = QuotaId(Some(userPrincipal), None, None)
+  override val consumerQuotaId = QuotaId(Some(userPrincipal), None, None)
 
 
   @Before

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index c0aff93..87ce46e 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -23,14 +23,14 @@ import kafka.common.InvalidConfigException
 import kafka.server.ConfigEntityName
 import kafka.utils.{Logging, ZkUtils}
 import kafka.zk.ZooKeeperTestHarness
-
 import org.apache.kafka.common.security.scram.ScramCredentialUtils
+import org.apache.kafka.common.utils.Sanitizer
 import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
+
 import scala.collection.mutable
 import scala.collection.JavaConverters._
-import org.apache.kafka.common.metrics.Sanitizer
 
 class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 54be960..4196bc1 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -18,8 +18,8 @@ package kafka.server
 
 import java.util.Collections
 
-import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota, 
Sanitizer}
-import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota}
+import org.apache.kafka.common.utils.{MockTime, Sanitizer}
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{Before, Test}
 
@@ -43,8 +43,8 @@ class ClientQuotaManagerTest {
 
     try {
       // Case 1: Update the quota. Assert that the new quota value is returned
-      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
Some(new Quota(2000, true)))
-      clientMetrics.updateQuota(client2.configUser, client2.configClientId, 
Some(new Quota(4000, true)))
+      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
client1.sanitizedConfigClientId, Some(new Quota(2000, true)))
+      clientMetrics.updateQuota(client2.configUser, client2.configClientId, 
client2.sanitizedConfigClientId, Some(new Quota(4000, true)))
 
       assertEquals("Default producer quota should be " + 
config.quotaBytesPerSecondDefault, new Quota(config.quotaBytesPerSecondDefault, 
true), clientMetrics.quota(randomClient.user, randomClient.clientId))
       assertEquals("Should return the overridden value (2000)", new 
Quota(2000, true), clientMetrics.quota(client1.user, client1.clientId))
@@ -56,22 +56,22 @@ class ClientQuotaManagerTest {
 
       // Case 2: Change quota again. The quota should be updated within 
KafkaMetrics as well since the sensor was created.
       // p1 should not longer be throttled after the quota change
-      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
Some(new Quota(3000, true)))
+      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
client1.sanitizedConfigClientId, Some(new Quota(3000, true)))
       assertEquals("Should return the newly overridden value (3000)", new 
Quota(3000, true), clientMetrics.quota(client1.user, client1.clientId))
 
       throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, 
client1.clientId, 0, this.callback)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, 
throttleTimeMs)
 
       // Case 3: Change quota back to default. Should be throttled again
-      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
Some(new Quota(500, true)))
+      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
client1.sanitizedConfigClientId, Some(new Quota(500, true)))
       assertEquals("Should return the default value (500)", new Quota(500, 
true), clientMetrics.quota(client1.user, client1.clientId))
 
       throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, 
client1.clientId, 0, this.callback)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", 
throttleTimeMs > 0)
 
       // Case 4: Set high default quota, remove p1 quota. p1 should no longer 
be throttled
-      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
None)
-      clientMetrics.updateQuota(defaultConfigClient.configUser, 
defaultConfigClient.configClientId, Some(new Quota(4000, true)))
+      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
client1.sanitizedConfigClientId, None)
+      clientMetrics.updateQuota(defaultConfigClient.configUser, 
defaultConfigClient.configClientId, 
defaultConfigClient.sanitizedConfigClientId, Some(new Quota(4000, true)))
       assertEquals("Should return the newly overridden value (4000)", new 
Quota(4000, true), clientMetrics.quota(client1.user, client1.clientId))
 
       throttleTimeMs = clientMetrics.maybeRecordAndThrottle(client1.user, 
client1.clientId, 1000 * config.numQuotaSamples, this.callback)
@@ -161,16 +161,16 @@ class ClientQuotaManagerTest {
     }
 
     try {
-      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, Some(new 
Quota(1000, true)))
-      quotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(new 
Quota(2000, true)))
-      quotaManager.updateQuota(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(new Quota(3000, true)))
-      quotaManager.updateQuota(Some("userA"), None, Some(new Quota(4000, 
true)))
-      quotaManager.updateQuota(Some("userA"), Some("client1"), Some(new 
Quota(5000, true)))
-      quotaManager.updateQuota(Some("userB"), None, Some(new Quota(6000, 
true)))
-      quotaManager.updateQuota(Some("userB"), Some("client1"), Some(new 
Quota(7000, true)))
-      quotaManager.updateQuota(Some("userB"), Some(ConfigEntityName.Default), 
Some(new Quota(8000, true)))
-      quotaManager.updateQuota(Some("userC"), None, Some(new Quota(10000, 
true)))
-      quotaManager.updateQuota(None, Some("client1"), Some(new Quota(9000, 
true)))
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, 
Some(new Quota(1000, true)))
+      quotaManager.updateQuota(None, Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(new Quota(2000, true)))
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), Some(new 
Quota(3000, true)))
+      quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(4000, 
true)))
+      quotaManager.updateQuota(Some("userA"), Some("client1"), 
Some("client1"), Some(new Quota(5000, true)))
+      quotaManager.updateQuota(Some("userB"), None, None, Some(new Quota(6000, 
true)))
+      quotaManager.updateQuota(Some("userB"), Some("client1"), 
Some("client1"), Some(new Quota(7000, true)))
+      quotaManager.updateQuota(Some("userB"), Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(new Quota(8000, true)))
+      quotaManager.updateQuota(Some("userC"), None, None, Some(new 
Quota(10000, true)))
+      quotaManager.updateQuota(None, Some("client1"), Some("client1"), 
Some(new Quota(9000, true)))
 
       checkQuota("userA", "client1", 5000, 4500, false) // <user, client> 
quota takes precedence over <user>
       checkQuota("userA", "client2", 4000, 4500, true)  // <user> quota takes 
precedence over <client> and defaults
@@ -186,32 +186,32 @@ class ClientQuotaManagerTest {
       checkQuota("userE", "client1", 3000, 2500, false)
 
       // Remove default <user, client> quota config, revert to <user> default
-      quotaManager.updateQuota(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), None)
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), None)
       checkQuota("userD", "client1", 1000, 0, false)    // Metrics tags 
changed, restart counter
       checkQuota("userE", "client4", 1000, 1500, true)
       checkQuota("userF", "client4", 1000, 800, false)  // Default <user> 
quota shared across clients of user
       checkQuota("userF", "client5", 1000, 800, true)
 
       // Remove default <user> quota config, revert to <client-id> default
-      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None)
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, 
None)
       checkQuota("userF", "client4", 2000, 0, false)  // Default <client-id> 
quota shared across client-id of all users
       checkQuota("userF", "client5", 2000, 0, false)
       checkQuota("userF", "client5", 2000, 2500, true)
       checkQuota("userG", "client5", 2000, 0, true)
 
       // Update quotas
-      quotaManager.updateQuota(Some("userA"), None, Some(new Quota(8000, 
true)))
-      quotaManager.updateQuota(Some("userA"), Some("client1"), Some(new 
Quota(10000, true)))
+      quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(8000, 
true)))
+      quotaManager.updateQuota(Some("userA"), Some("client1"), 
Some("client1"), Some(new Quota(10000, true)))
       checkQuota("userA", "client2", 8000, 0, false)
       checkQuota("userA", "client2", 8000, 4500, true) // Throttled due to sum 
of new and earlier values
       checkQuota("userA", "client1", 10000, 0, false)
       checkQuota("userA", "client1", 10000, 6000, true)
-      quotaManager.updateQuota(Some("userA"), Some("client1"), None)
+      quotaManager.updateQuota(Some("userA"), Some("client1"), 
Some("client1"), None)
       checkQuota("userA", "client6", 8000, 0, true)    // Throttled due to 
shared user quota
-      quotaManager.updateQuota(Some("userA"), Some("client6"), Some(new 
Quota(11000, true)))
+      quotaManager.updateQuota(Some("userA"), Some("client6"), 
Some("client6"), Some(new Quota(11000, true)))
       checkQuota("userA", "client6", 11000, 8500, false)
-      quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), 
Some(new Quota(12000, true)))
-      quotaManager.updateQuota(Some("userA"), Some("client6"), None)
+      quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(new Quota(12000, true)))
+      quotaManager.updateQuota(Some("userA"), Some("client6"), 
Some("client6"), None)
       checkQuota("userA", "client6", 12000, 4000, true) // Throttled due to 
sum of new and earlier values
 
     } finally {
@@ -271,7 +271,7 @@ class ClientQuotaManagerTest {
   def testRequestPercentageQuotaViolation() {
     val metrics = newMetrics
     val quotaManager = new ClientRequestQuotaManager(config, metrics, time)
-    quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), 
Some(Quota.upperBound(1)))
+    quotaManager.updateQuota(Some("ANONYMOUS"), Some("test-client"), 
Some("test-client"), Some(Quota.upperBound(1)))
     val queueSizeMetric = 
metrics.metrics().get(metrics.metricName("queue-size", "Request", ""))
     def millisToPercent(millis: Double) = millis * 1000 * 1000 * 
ClientQuotaManagerConfig.NanosToPercentagePerSecond
     try {
@@ -373,17 +373,18 @@ class ClientQuotaManagerTest {
   }
 
   @Test
-  def testSanitizeClientId() {
+  def testClientIdNotSanitized() {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, 
QuotaType.Produce, time)
     val clientId = "client@#$%"
     try {
       clientMetrics.maybeRecordAndThrottle("ANONYMOUS", clientId, 100, 
callback)
-      
-      val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + 
Sanitizer.sanitize(clientId))
+
+      // The metrics should use the raw client ID, even if the reporters 
internally sanitize them
+      val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-:" + 
clientId)
       assertTrue("Throttle time sensor should exist", throttleTimeSensor != 
null)
 
-      val byteRateSensor = metrics.getSensor("Produce-:"  + 
Sanitizer.sanitize(clientId))
+      val byteRateSensor = metrics.getSensor("Produce-:"  + clientId)
       assertTrue("Byte rate sensor should exist", byteRateSensor != null)
     } finally {
       clientMetrics.shutdown()
@@ -394,5 +395,10 @@ class ClientQuotaManagerTest {
     new Metrics(new MetricConfig(), Collections.emptyList(), time)
   }
 
-  private case class UserClient(val user: String, val clientId: String, val 
configUser: Option[String] = None, val configClientId: Option[String] = None)
+  private case class UserClient(val user: String, val clientId: String, val 
configUser: Option[String] = None, val configClientId: Option[String] = None) {
+    // The class under test expects only sanitized client configs. We pass 
both the default value (which should not be
+    // sanitized to ensure it remains unique) and non-default values, so we 
need to take care in generating the sanitized
+    // client ID
+    def sanitizedConfigClientId = configClientId.map(x => if (x == 
ConfigEntityName.Default) ConfigEntityName.Default else Sanitizer.sanitize(x))
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 9d2bb8b..2e0b454 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -28,6 +28,7 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.utils._
 import kafka.admin.{AdminOperationException, AdminUtils}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Sanitizer
 
 import scala.collection.Map
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d6ca52a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala 
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 480dfa6..4774e1d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType 
=> RResourceType, _}
 import org.apache.kafka.common.security.auth.{AuthenticationContext, 
KafkaPrincipal, KafkaPrincipalBuilder, SecurityProtocol}
+import org.apache.kafka.common.utils.Sanitizer
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
 
@@ -82,7 +83,7 @@ class RequestQuotaTest extends BaseRequestTest {
     quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.01")
     AdminUtils.changeClientIdConfig(zkUtils, "<default>", quotaProps)
     quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "2000")
-    AdminUtils.changeClientIdConfig(zkUtils, unthrottledClientId, quotaProps)
+    AdminUtils.changeClientIdConfig(zkUtils, 
Sanitizer.sanitize(unthrottledClientId), quotaProps)
 
     TestUtils.retry(10000) {
       val quotaManager = servers.head.apis.quotas.request

Reply via email to