Repository: kafka
Updated Branches:
  refs/heads/trunk ecc1fb10f -> 69356fbc6


http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/ClientQuotasTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientQuotasTest.scala 
b/core/src/test/scala/integration/kafka/api/ClientQuotasTest.scala
deleted file mode 100644
index 366507d..0000000
--- a/core/src/test/scala/integration/kafka/api/ClientQuotasTest.scala
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package kafka.api
-
-import java.util.Properties
-
-import kafka.admin.AdminUtils
-import kafka.consumer.SimpleConsumer
-import kafka.integration.KafkaServerTestHarness
-import kafka.server._
-import kafka.utils.TestUtils
-import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
-import org.apache.kafka.clients.producer._
-import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
-import org.apache.kafka.common.MetricName
-import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.junit.Assert.assertEquals
-import org.junit.Assert.assertTrue
-import org.junit.{After, Before, Test}
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-import scala.collection.Map
-import scala.collection.mutable
-
-class ClientQuotasTest extends KafkaServerTestHarness {
-  private val producerBufferSize = 300000
-  private val producerId1 = "QuotasTestProducer-1"
-  private val producerId2 = "QuotasTestProducer-2"
-  private val consumerId1 = "QuotasTestConsumer-1"
-  private val consumerId2 = "QuotasTestConsumer-2"
-
-  val numServers = 2
-  val overridingProps = new Properties()
-
-  // Low enough quota that a producer sending a small payload in a tight loop 
should get throttled
-  overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, 
"8000")
-  overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, 
"2500")
-
-  override def generateConfigs() = {
-    FixedPortTestUtils.createBrokerConfigs(numServers,
-                                           zkConnect,
-                                           enableControlledShutdown = false)
-            .map(KafkaConfig.fromProps(_, overridingProps))
-  }
-
-  var producers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
-  var consumers = mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
-  var replicaConsumers = mutable.Buffer[SimpleConsumer]()
-
-  var leaderNode: KafkaServer = null
-  var followerNode: KafkaServer = null
-  private val topic1 = "topic-1"
-
-  @Before
-  override def setUp() {
-    super.setUp()
-    val producerProps = new Properties()
-    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    producerProps.put(ProducerConfig.ACKS_CONFIG, "0")
-    producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 
producerBufferSize.toString)
-    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1)
-    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                      
classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                      
classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
-    producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
-
-    producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId2)
-    producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
-
-    val numPartitions = 1
-    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, 
numServers, servers)
-    leaderNode = if (leaders(0).get == servers.head.config.brokerId) 
servers.head else servers(1)
-    followerNode = if (leaders(0).get != servers.head.config.brokerId) 
servers.head else servers(1)
-    assertTrue("Leader of all partitions of the topic should exist", 
leaders.values.forall(leader => leader.isDefined))
-
-    // Create consumers
-    val consumerProps = new Properties
-    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
-    consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 
4096.toString)
-    consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
-    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                      
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                      
classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer])
-
-    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1)
-    consumers += new KafkaConsumer(consumerProps)
-    // Create replica consumers with the same clientId as the high level 
consumer. These requests should never be throttled
-    replicaConsumers += new SimpleConsumer("localhost", 
leaderNode.boundPort(), 1000000, 64*1024, consumerId1)
-
-    consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2)
-    consumers += new KafkaConsumer(consumerProps)
-    replicaConsumers += new SimpleConsumer("localhost", 
leaderNode.boundPort(), 1000000, 64*1024, consumerId2)
-  }
-
-  @After
-  override def tearDown() {
-    producers.foreach( _.close )
-    consumers.foreach( _.close )
-    replicaConsumers.foreach( _.close )
-    super.tearDown()
-  }
-
-  @Test
-  def testThrottledProducerConsumer() {
-    val allMetrics: mutable.Map[MetricName, KafkaMetric] = 
leaderNode.metrics.metrics().asScala
-
-    val numRecords = 1000
-    produce(producers.head, numRecords)
-
-    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
-                                                           
ApiKeys.PRODUCE.name,
-                                                           "Tracking 
throttle-time per client",
-                                                           "client-id", 
producerId1)
-    assertTrue("Should have been throttled", 
allMetrics(producerMetricName).value() > 0)
-
-    // Consumer should read in a bursty manner and get throttled immediately
-    consume(consumers.head, numRecords)
-    // The replica consumer should not be throttled also. Create a fetch 
request which will exceed the quota immediately
-    val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 
1024*1024).replicaId(followerNode.config.brokerId).build()
-    replicaConsumers.head.fetch(request)
-    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
-                                                           ApiKeys.FETCH.name,
-                                                           "Tracking 
throttle-time per client",
-                                                           "client-id", 
consumerId1)
-    assertTrue("Should have been throttled", 
allMetrics(consumerMetricName).value() > 0)
-  }
-
-  @Test
-  def testProducerConsumerOverrideUnthrottled() {
-    // Give effectively unlimited quota for producerId2 and consumerId2
-    val props = new Properties()
-    props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString)
-    props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString)
-
-    AdminUtils.changeClientIdConfig(zkUtils, producerId2, props)
-    AdminUtils.changeClientIdConfig(zkUtils, consumerId2, props)
-
-    TestUtils.retry(10000) {
-      val overrideProducerQuota = 
leaderNode.apis.quotas.produce.quota(producerId2)
-      val overrideConsumerQuota = 
leaderNode.apis.quotas.fetch.quota(consumerId2)
-
-      assertEquals(s"ClientId $producerId2 must have unlimited producer 
quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota)
-      assertEquals(s"ClientId $consumerId2 must have unlimited consumer 
quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota)
-    }
-
-
-    val allMetrics: mutable.Map[MetricName, KafkaMetric] = 
leaderNode.metrics.metrics().asScala
-    val numRecords = 1000
-    produce(producers(1), numRecords)
-    val producerMetricName = leaderNode.metrics.metricName("throttle-time",
-                                                           
ApiKeys.PRODUCE.name,
-                                                           "Tracking 
throttle-time per client",
-                                                           "client-id", 
producerId2)
-    assertEquals("Should not have been throttled", 0.0, 
allMetrics(producerMetricName).value(), 0.0)
-
-    // The "client" consumer does not get throttled.
-    consume(consumers(1), numRecords)
-    // The replica consumer should not be throttled also. Create a fetch 
request which will exceed the quota immediately
-    val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 
1024*1024).replicaId(followerNode.config.brokerId).build()
-    replicaConsumers(1).fetch(request)
-    val consumerMetricName = leaderNode.metrics.metricName("throttle-time",
-                                                           ApiKeys.FETCH.name,
-                                                           "Tracking 
throttle-time per client",
-                                                           "client-id", 
consumerId2)
-    assertEquals("Should not have been throttled", 0.0, 
allMetrics(consumerMetricName).value(), 0.0)
-  }
-
-  def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
-    var numBytesProduced = 0
-    for (i <- 0 to count) {
-      val payload = i.toString.getBytes
-      numBytesProduced += payload.length
-      p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, 
payload),
-             new ErrorLoggingCallback(topic1, null, null, true)).get()
-      Thread.sleep(1)
-    }
-    numBytesProduced
-  }
-
-  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: 
Int) {
-    consumer.subscribe(List(topic1))
-    var numConsumed = 0
-    while (numConsumed < numRecords) {
-      for (cr <- consumer.poll(100)) {
-        numConsumed += 1
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
new file mode 100644
index 0000000..081fa0b
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/UserClientIdQuotaTest.scala
@@ -0,0 +1,66 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.io.File
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+
+import kafka.server.{KafkaConfig, ConfigEntityName, QuotaConfigOverride, 
QuotaId}
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Before
+
+class UserClientIdQuotaTest extends BaseQuotaTest {
+
+  override protected def securityProtocol = SecurityProtocol.SSL
+  override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
+
+  override val userPrincipal = "O=A client,CN=localhost"
+  override def producerQuotaId = 
QuotaId(Some(QuotaId.sanitize(userPrincipal)), Some(producerClientId))
+  override def consumerQuotaId = 
QuotaId(Some(QuotaId.sanitize(userPrincipal)), Some(consumerClientId))
+
+  @Before
+  override def setUp() {
+    this.serverConfig.setProperty(KafkaConfig.SslClientAuthProp, "required")
+    
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp,
 Long.MaxValue.toString)
+    
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp,
 Long.MaxValue.toString)
+    super.setUp()
+    val defaultProps = quotaProperties(defaultProducerQuota, 
defaultConsumerQuota)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
ConfigEntityName.Default + "/clients/" + ConfigEntityName.Default, defaultProps)
+    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota)
+  }
+
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+    val producerProps = new Properties()
+    producerProps.setProperty(QuotaConfigOverride.ProducerOverride, 
producerQuota.toString)
+    updateQuotaOverride(userPrincipal, producerClientId, producerProps)
+
+    val consumerProps = new Properties()
+    consumerProps.setProperty(QuotaConfigOverride.ConsumerOverride, 
consumerQuota.toString)
+    updateQuotaOverride(userPrincipal, consumerClientId, consumerProps)
+  }
+
+  override def removeQuotaOverrides() {
+    val emptyProps = new Properties
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
QuotaId.sanitize(userPrincipal) + "/clients/" + producerClientId, emptyProps)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
QuotaId.sanitize(userPrincipal) + "/clients/" + consumerClientId, emptyProps)
+  }
+
+  private def updateQuotaOverride(userPrincipal: String, clientId: String, 
properties: Properties) {
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
QuotaId.sanitize(userPrincipal) + "/clients/" + clientId, properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
new file mode 100644
index 0000000..3d5d702
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.io.File
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.server.{KafkaConfig, ConfigEntityName, QuotaId}
+
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.Before
+
+class UserQuotaTest extends BaseQuotaTest with SaslTestHarness {
+
+  override protected def securityProtocol = SecurityProtocol.SASL_SSL
+  override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
+  override protected val zkSaslEnabled = false
+  override protected val saslProperties = 
Some(kafkaSaslProperties(kafkaClientSaslMechanism, 
Some(kafkaServerSaslMechanisms)))
+
+  override val userPrincipal = "client"
+  override val producerQuotaId = QuotaId(Some(userPrincipal), None)
+  override val consumerQuotaId = QuotaId(Some(userPrincipal), None)
+
+  @Before
+  override def setUp() {
+    
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp,
 Long.MaxValue.toString)
+    
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp,
 Long.MaxValue.toString)
+    super.setUp()
+    val defaultProps = quotaProperties(defaultProducerQuota, 
defaultConsumerQuota)
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
ConfigEntityName.Default, defaultProps)
+    waitForQuotaUpdate(defaultProducerQuota, defaultConsumerQuota)
+  }
+
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+    val props = quotaProperties(producerQuota, consumerQuota)
+    updateQuotaOverride(props)
+  }
+
+  override def removeQuotaOverrides() {
+    val emptyProps = new Properties
+    updateQuotaOverride(emptyProps)
+    updateQuotaOverride(emptyProps)
+  }
+
+  private def updateQuotaOverride(properties: Properties) {
+    AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
QuotaId.sanitize(userPrincipal), properties)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/unit/kafka/admin/AdminTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala 
b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 5dca17f..892e26b 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -513,8 +513,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging 
with RackAwareTest {
     // Test that the existing clientId overrides are read
     val server = 
TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, 
zkConnect)))
     try {
-      assertEquals(new Quota(1000, true), 
server.apis.quotas.produce.quota(clientId))
-      assertEquals(new Quota(2000, true), 
server.apis.quotas.fetch.quota(clientId))
+      assertEquals(new Quota(1000, true), 
server.apis.quotas.produce.quota("ANONYMOUS", clientId))
+      assertEquals(new Quota(2000, true), 
server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
     } finally {
       server.shutdown()
       CoreUtils.delete(server.config.logDirs)

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index be798ff..ddfbb51 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -19,10 +19,14 @@ package kafka.admin
 import java.util.Properties
 
 import kafka.admin.ConfigCommand.ConfigCommandOptions
+import kafka.common.InvalidConfigException
+import kafka.server.{ConfigEntityName, QuotaId}
+import kafka.utils.{Logging, ZkUtils}
+import kafka.zk.ZooKeeperTestHarness
+
+import org.easymock.EasyMock
 import org.junit.Assert._
 import org.junit.Test
-import kafka.utils.{ZkUtils, Logging}
-import kafka.zk.ZooKeeperTestHarness
 
 class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
   @Test
@@ -217,4 +221,182 @@ class ConfigCommandTest extends ZooKeeperTestHarness with 
Logging {
     }
     ConfigCommand.alterConfig(null, createOpts, configChange)
   }
+
+  @Test
+  def testQuotaConfigEntity() {
+
+    def createOpts(entityType: String, entityName: Option[String], otherArgs: 
Array[String]) : ConfigCommandOptions = {
+      val optArray = Array("--zookeeper", zkConnect,
+                           "--entity-type", entityType)
+      val nameArray = entityName match {
+        case Some(name) => Array("--entity-name", name)
+        case None => Array[String]()
+      }
+      new ConfigCommandOptions(optArray ++ nameArray ++ otherArgs)
+    }
+
+    def checkEntity(entityType: String, entityName: Option[String], 
expectedEntityName: String, otherArgs: Array[String]) {
+      val opts = createOpts(entityType, entityName, otherArgs)
+      opts.checkArgs()
+      val entity = ConfigCommand.parseEntity(opts)
+      assertEquals(entityType, entity.root.entityType)
+      assertEquals(expectedEntityName, entity.fullSanitizedName)
+    }
+
+    def checkInvalidEntity(entityType: String, entityName: Option[String], 
otherArgs: Array[String]) {
+      val opts = createOpts(entityType, entityName, otherArgs)
+      try {
+        opts.checkArgs()
+        ConfigCommand.parseEntity(opts)
+        fail("Did not fail with invalid argument list")
+      } catch {
+        case e: IllegalArgumentException => // expected exception
+      }
+    }
+
+    val describeOpts = Array("--describe")
+    val alterOpts = Array("--alter", "--add-config", "a=b,c=d")
+
+    // <client-id> quota
+    val clientId = "client-1"
+    for (opts <- Seq(describeOpts, alterOpts)) {
+      checkEntity("clients", Some(clientId), clientId, opts)
+      checkEntity("clients", Some(""), ConfigEntityName.Default, opts)
+    }
+    checkEntity("clients", None, "", describeOpts)
+    checkInvalidEntity("clients", None, alterOpts)
+
+    // <user> quota
+    val principal = "CN=ConfigCommandTest,O=Apache,L=<default>"
+    val sanitizedPrincipal = QuotaId.sanitize(principal)
+    assertEquals(-1, sanitizedPrincipal.indexOf('='))
+    assertEquals(principal, QuotaId.desanitize(sanitizedPrincipal))
+    for (opts <- Seq(describeOpts, alterOpts)) {
+      checkEntity("users", Some(principal), sanitizedPrincipal, opts)
+      checkEntity("users", Some(""), ConfigEntityName.Default, opts)
+    }
+    checkEntity("users", None, "", describeOpts)
+    checkInvalidEntity("users", None, alterOpts)
+
+    // <user, client-id> quota
+    val userClient = sanitizedPrincipal + "/clients/" + clientId
+    def clientIdOpts(name: String) = Array("--entity-type", "clients", 
"--entity-name", name)
+    for (opts <- Seq(describeOpts, alterOpts)) {
+      checkEntity("users", Some(principal), userClient, opts ++ 
clientIdOpts(clientId))
+      checkEntity("users", Some(principal), sanitizedPrincipal + "/clients/" + 
ConfigEntityName.Default, opts ++ clientIdOpts(""))
+      checkEntity("users", Some(""), ConfigEntityName.Default + "/clients/" + 
clientId, describeOpts ++ clientIdOpts(clientId))
+      checkEntity("users", Some(""), ConfigEntityName.Default + "/clients/" + 
ConfigEntityName.Default, opts ++ clientIdOpts(""))
+    }
+    checkEntity("users", Some(principal), sanitizedPrincipal + "/clients", 
describeOpts ++ Array("--entity-type", "clients"))
+    // Both user and client-id must be provided for alter
+    checkInvalidEntity("users", Some(principal), alterOpts ++ 
Array("--entity-type", "clients"))
+    checkInvalidEntity("users", None, alterOpts ++ clientIdOpts(clientId))
+    checkInvalidEntity("users", None, alterOpts ++ Array("--entity-type", 
"clients"))
+  }
+
+  @Test
+  def testUserClientQuotaOpts() {
+    def checkEntity(expectedEntityType: String, expectedEntityName: String, 
args: String*) {
+      val opts = new ConfigCommandOptions(Array("--zookeeper", zkConnect) ++ 
args)
+      opts.checkArgs()
+      val entity = ConfigCommand.parseEntity(opts)
+      assertEquals(expectedEntityType, entity.root.entityType)
+      assertEquals(expectedEntityName, entity.fullSanitizedName)
+    }
+
+    // <default> is a valid user principal (can be handled with URL-encoding),
+    // but an invalid client-id (cannot be handled since client-ids are not 
encoded)
+    checkEntity("users", QuotaId.sanitize("<default>"),
+        "--entity-type", "users", "--entity-name", "<default>",
+        "--alter", "--add-config", "a=b,c=d")
+    try {
+      checkEntity("clients", QuotaId.sanitize("<default>"),
+          "--entity-type", "clients", "--entity-name", "<default>",
+          "--alter", "--add-config", "a=b,c=d")
+      fail("Did not fail with invalid client-id")
+    } catch {
+      case e: InvalidConfigException => // expected
+    }
+
+    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
+        "--entity-type", "users", "--entity-name", "CN=user1", 
"--entity-type", "clients", "--entity-name", "client1",
+        "--alter", "--add-config", "a=b,c=d")
+    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
+        "--entity-name", "CN=user1", "--entity-type", "users", 
"--entity-name", "client1", "--entity-type", "clients",
+        "--alter", "--add-config", "a=b,c=d")
+    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
+        "--entity-type", "clients", "--entity-name", "client1", 
"--entity-type", "users", "--entity-name", "CN=user1",
+        "--alter", "--add-config", "a=b,c=d")
+    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients/client1",
+        "--entity-name", "client1", "--entity-type", "clients", 
"--entity-name", "CN=user1", "--entity-type", "users",
+        "--alter", "--add-config", "a=b,c=d")
+    checkEntity("users", QuotaId.sanitize("CN=user1") + "/clients",
+        "--entity-type", "clients", "--entity-name", "CN=user1", 
"--entity-type", "users",
+        "--describe")
+    checkEntity("users", "/clients",
+        "--entity-type", "clients", "--entity-type", "users",
+        "--describe")
+  }
+
+  @Test
+  def testQuotaDescribeEntities() {
+    val zkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
+
+    def checkEntities(opts: Array[String], expectedFetches: Map[String, 
Seq[String]], expectedEntityNames: Seq[String]) {
+      val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ 
"--describe"))
+      expectedFetches.foreach {
+        case (name, values) => 
EasyMock.expect(zkUtils.getAllEntitiesWithConfig(name)).andReturn(values)
+      }
+      EasyMock.replay(zkUtils)
+      val entities = entity.getAllEntities(zkUtils)
+      assertEquals(expectedEntityNames, entities.map(e => e.fullSanitizedName))
+      EasyMock.reset(zkUtils)
+    }
+
+    val clientId = "a-client"
+    val principal = "CN=ConfigCommandTest.testQuotaDescribeEntities , 
O=Apache, L=<default>"
+    val sanitizedPrincipal = QuotaId.sanitize(principal)
+    val userClient = sanitizedPrincipal + "/clients/" + clientId
+
+    var opts = Array("--entity-type", "clients", "--entity-name", clientId)
+    checkEntities(opts, Map.empty, Seq(clientId))
+
+    opts = Array("--entity-type", "clients", "--entity-default")
+    checkEntities(opts, Map.empty, Seq("<default>"))
+
+    opts = Array("--entity-type", "clients")
+    checkEntities(opts, Map("clients" -> Seq(clientId)), Seq(clientId))
+
+    opts = Array("--entity-type", "users", "--entity-name", principal)
+    checkEntities(opts, Map.empty, Seq(sanitizedPrincipal))
+
+    opts = Array("--entity-type", "users", "--entity-default")
+    checkEntities(opts, Map.empty, Seq("<default>"))
+
+    opts = Array("--entity-type", "users")
+    checkEntities(opts, Map("users" -> Seq("<default>", sanitizedPrincipal)), 
Seq("<default>", sanitizedPrincipal))
+
+    opts = Array("--entity-type", "users", "--entity-name", principal, 
"--entity-type", "clients", "--entity-name", clientId)
+    checkEntities(opts, Map.empty, Seq(userClient))
+
+    opts = Array("--entity-type", "users", "--entity-name", principal, 
"--entity-type", "clients", "--entity-default")
+    checkEntities(opts, Map.empty, Seq(sanitizedPrincipal + 
"/clients/<default>"))
+
+    opts = Array("--entity-type", "users", "--entity-name", principal, 
"--entity-type", "clients")
+    checkEntities(opts,
+        Map("users/" + sanitizedPrincipal + "/clients" -> Seq("client-4")),
+        Seq(sanitizedPrincipal + "/clients/client-4"))
+
+    opts = Array("--entity-type", "users", "--entity-default", 
"--entity-type", "clients")
+    checkEntities(opts,
+        Map("users/<default>/clients" -> Seq("client-5")),
+        Seq("<default>/clients/client-5"))
+
+    opts = Array("--entity-type", "users", "--entity-type", "clients")
+    val userMap = Map("users/" + sanitizedPrincipal + "/clients" -> 
Seq("client-2"))
+    val defaultUserMap = Map("users/<default>/clients" -> Seq("client-3"))
+    checkEntities(opts,
+        Map("users" -> Seq("<default>", sanitizedPrincipal)) ++ defaultUserMap 
++ userMap,
+        Seq("<default>/clients/client-3", sanitizedPrincipal + 
"/clients/client-2"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala 
b/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
index c7e101f..55fe587 100644
--- a/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
+++ b/core/src/test/scala/unit/kafka/admin/TestAdminUtils.scala
@@ -23,5 +23,6 @@ class TestAdminUtils extends AdminUtilities {
   override def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], 
configs: Properties): Unit = {}
   override def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, 
entityName: String): Properties = {new Properties}
   override def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, 
configs: Properties): Unit = {}
+  override def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, 
sanitizedEntityName: String, configs: Properties): Unit = {}
   override def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: 
Properties): Unit = {}
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index b14464f..291e822 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -22,6 +22,7 @@ import org.apache.kafka.common.metrics.{MetricConfig, 
Metrics, Quota}
 import org.apache.kafka.common.utils.MockTime
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.{Before, Test}
+import kafka.admin.ConfigCommand
 
 class ClientQuotaManagerTest {
   private val time = new MockTime
@@ -38,42 +39,187 @@ class ClientQuotaManagerTest {
     numCallbacks = 0
   }
 
-  @Test
-  def testQuotaParsing() {
+  private def testQuotaParsing(config: ClientQuotaManagerConfig, client1: 
UserClient, client2: UserClient, randomClient: UserClient, defaultConfigClient: 
UserClient) {
     val clientMetrics = new ClientQuotaManager(config, newMetrics, 
QuotaType.Produce, time)
 
-    // Case 1: Update the quota. Assert that the new quota value is returned
-    clientMetrics.updateQuota("p1", new Quota(2000, true))
-    clientMetrics.updateQuota("p2", new Quota(4000, true))
-
     try {
-      assertEquals("Default producer quota should be 500", new Quota(500, 
true), clientMetrics.quota("random-client-id"))
-      assertEquals("Should return the overridden value (2000)", new 
Quota(2000, true), clientMetrics.quota("p1"))
-      assertEquals("Should return the overridden value (4000)", new 
Quota(4000, true), clientMetrics.quota("p2"))
+      // Case 1: Update the quota. Assert that the new quota value is returned
+      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
Some(new Quota(2000, true)))
+      clientMetrics.updateQuota(client2.configUser, client2.configClientId, 
Some(new Quota(4000, true)))
+
+      assertEquals("Default producer quota should be " + 
config.quotaBytesPerSecondDefault, new Quota(config.quotaBytesPerSecondDefault, 
true), clientMetrics.quota(randomClient.user, randomClient.clientId))
+      assertEquals("Should return the overridden value (2000)", new 
Quota(2000, true), clientMetrics.quota(client1.user, client1.clientId))
+      assertEquals("Should return the overridden value (4000)", new 
Quota(4000, true), clientMetrics.quota(client2.user, client2.clientId))
 
       // p1 should be throttled using the overridden quota
-      var throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 2500 * 
config.numQuotaSamples, this.callback)
+      var throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, 
client1.clientId, 2500 * config.numQuotaSamples, this.callback)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", 
throttleTimeMs > 0)
 
       // Case 2: Change quota again. The quota should be updated within 
KafkaMetrics as well since the sensor was created.
       // p1 should not longer be throttled after the quota change
-      clientMetrics.updateQuota("p1", new Quota(3000, true))
-      assertEquals("Should return the newly overridden value (3000)", new 
Quota(3000, true), clientMetrics.quota("p1"))
+      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
Some(new Quota(3000, true)))
+      assertEquals("Should return the newly overridden value (3000)", new 
Quota(3000, true), clientMetrics.quota(client1.user, client1.clientId))
 
-      throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, 
this.callback)
+      throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, 
client1.clientId, 0, this.callback)
       assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, 
throttleTimeMs)
 
       // Case 3: Change quota back to default. Should be throttled again
-      clientMetrics.updateQuota("p1", new Quota(500, true))
-      assertEquals("Should return the default value (500)", new Quota(500, 
true), clientMetrics.quota("p1"))
+      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
Some(new Quota(500, true)))
+      assertEquals("Should return the default value (500)", new Quota(500, 
true), clientMetrics.quota(client1.user, client1.clientId))
 
-      throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, 
this.callback)
+      throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, 
client1.clientId, 0, this.callback)
       assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", 
throttleTimeMs > 0)
+
+      // Case 4: Set high default quota, remove p1 quota. p1 should no longer 
be throttled
+      clientMetrics.updateQuota(client1.configUser, client1.configClientId, 
None)
+      clientMetrics.updateQuota(defaultConfigClient.configUser, 
defaultConfigClient.configClientId, Some(new Quota(4000, true)))
+      assertEquals("Should return the newly overridden value (4000)", new 
Quota(4000, true), clientMetrics.quota(client1.user, client1.clientId))
+
+      throttleTimeMs = clientMetrics.recordAndMaybeThrottle(client1.user, 
client1.clientId, 1000 * config.numQuotaSamples, this.callback)
+      assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, 
throttleTimeMs)
+
     } finally {
       clientMetrics.shutdown()
     }
   }
 
+  /**
+   * Tests parsing for <client-id> quotas.
+   * Quota overrides persisted in Zookeeper in /config/clients/<client-id>, 
default persisted in /config/clients/<default>
+   */
+  @Test
+  def testClientIdQuotaParsing() {
+    val client1 = UserClient("ANONYMOUS", "p1", None, Some("p1"))
+    val client2 = UserClient("ANONYMOUS", "p2", None, Some("p2"))
+    val randomClient = UserClient("ANONYMOUS", "random-client-id", None, None)
+    val defaultConfigClient = UserClient("", "", None, 
Some(ConfigEntityName.Default))
+    testQuotaParsing(config, client1, client2, randomClient, 
defaultConfigClient)
+  }
+
+  /**
+   * Tests parsing for <user> quotas.
+   * Quota overrides persisted in Zookeeper in /config/users/<user>, default 
persisted in /config/users/<default>
+   */
+  @Test
+  def testUserQuotaParsing() {
+    val client1 = UserClient("User1", "p1", Some("User1"), None)
+    val client2 = UserClient("User2", "p2", Some("User2"), None)
+    val randomClient = UserClient("RandomUser", "random-client-id", None, None)
+    val defaultConfigClient = UserClient("", "", 
Some(ConfigEntityName.Default), None)
+    val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 
Long.MaxValue)
+    testQuotaParsing(config, client1, client2, randomClient, 
defaultConfigClient)
+  }
+
+  /**
+   * Tests parsing for <user, client-id> quotas.
+   * Quotas persisted in Zookeeper in 
/config/users/<user>/clients/<client-id>, default in 
/config/users/<default>/clients/<default>
+   */
+  @Test
+  def testUserClientIdQuotaParsing() {
+    val client1 = UserClient("User1", "p1", Some("User1"), Some("p1"))
+    val client2 = UserClient("User2", "p2", Some("User2"), Some("p2"))
+    val randomClient = UserClient("RandomUser", "random-client-id", None, None)
+    val defaultConfigClient = UserClient("", "", 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+    val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 
Long.MaxValue)
+    testQuotaParsing(config, client1, client2, randomClient, 
defaultConfigClient)
+  }
+
+  /**
+   * Tests parsing for <user> quotas when client-id default quota properties 
are set.
+   */
+  @Test
+  def testUserQuotaParsingWithDefaultClientIdQuota() {
+    val client1 = UserClient("User1", "p1", Some("User1"), None)
+    val client2 = UserClient("User2", "p2", Some("User2"), None)
+    val randomClient = UserClient("RandomUser", "random-client-id", None, None)
+    val defaultConfigClient = UserClient("", "", 
Some(ConfigEntityName.Default), None)
+    testQuotaParsing(config, client1, client2, randomClient, 
defaultConfigClient)
+  }
+
+  /**
+   * Tests parsing for <user, client-id> quotas when client-id default quota 
properties are set.
+   */
+  @Test
+  def testUserClientQuotaParsingIdWithDefaultClientIdQuota() {
+    val client1 = UserClient("User1", "p1", Some("User1"), Some("p1"))
+    val client2 = UserClient("User2", "p2", Some("User2"), Some("p2"))
+    val randomClient = UserClient("RandomUser", "random-client-id", None, None)
+    val defaultConfigClient = UserClient("", "", 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default))
+    testQuotaParsing(config, client1, client2, randomClient, 
defaultConfigClient)
+  }
+
+  @Test
+  def testQuotaConfigPrecedence() {
+    val quotaManager = new 
ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
 newMetrics, QuotaType.Produce, time)
+
+    def checkQuota(user: String, clientId: String, expectedBound: Int, value: 
Int, expectThrottle: Boolean) {
+      assertEquals(new Quota(expectedBound, true), quotaManager.quota(user, 
clientId))
+      val throttleTimeMs = quotaManager.recordAndMaybeThrottle(user, clientId, 
value * config.numQuotaSamples, this.callback)
+      if (expectThrottle)
+        assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", 
throttleTimeMs > 0)
+      else
+        assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, 
throttleTimeMs)
+    }
+
+    try {
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, Some(new 
Quota(1000, true)))
+      quotaManager.updateQuota(None, Some(ConfigEntityName.Default), Some(new 
Quota(2000, true)))
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(new Quota(3000, true)))
+      quotaManager.updateQuota(Some("userA"), None, Some(new Quota(4000, 
true)))
+      quotaManager.updateQuota(Some("userA"), Some("client1"), Some(new 
Quota(5000, true)))
+      quotaManager.updateQuota(Some("userB"), None, Some(new Quota(6000, 
true)))
+      quotaManager.updateQuota(Some("userB"), Some("client1"), Some(new 
Quota(7000, true)))
+      quotaManager.updateQuota(Some("userB"), Some(ConfigEntityName.Default), 
Some(new Quota(8000, true)))
+      quotaManager.updateQuota(Some("userC"), None, Some(new Quota(10000, 
true)))
+      quotaManager.updateQuota(None, Some("client1"), Some(new Quota(9000, 
true)))
+
+      checkQuota("userA", "client1", 5000, 4500, false) // <user, client> 
quota takes precedence over <user>
+      checkQuota("userA", "client2", 4000, 4500, true)  // <user> quota takes 
precedence over <client> and defaults
+      checkQuota("userA", "client3", 4000, 0, true)     // <user> quota is 
shared across clients of user
+      checkQuota("userA", "client1", 5000, 0, false)    // <user, client> is 
exclusive use, unaffected by other clients
+
+      checkQuota("userB", "client1", 7000, 8000, true)
+      checkQuota("userB", "client2", 8000, 7000, false) // Default per-client 
quota for exclusive use of <user, client>
+      checkQuota("userB", "client3", 8000, 7000, false)
+
+      checkQuota("userD", "client1", 3000, 3500, true)  // Default <user, 
client> quota
+      checkQuota("userD", "client2", 3000, 2500, false)
+      checkQuota("userE", "client1", 3000, 2500, false)
+
+      // Remove default <user, client> quota config, revert to <user> default
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), None)
+      checkQuota("userD", "client1", 1000, 0, false)    // Metrics tags 
changed, restart counter
+      checkQuota("userE", "client4", 1000, 1500, true)
+      checkQuota("userF", "client4", 1000, 800, false)  // Default <user> 
quota shared across clients of user
+      checkQuota("userF", "client5", 1000, 800, true)
+
+      // Remove default <user> quota config, revert to <client-id> default
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None)
+      checkQuota("userF", "client4", 2000, 0, false)  // Default <client-id> 
quota shared across client-id of all users
+      checkQuota("userF", "client5", 2000, 0, false)
+      checkQuota("userF", "client5", 2000, 2500, true)
+      checkQuota("userG", "client5", 2000, 0, true)
+
+      // Update quotas
+      quotaManager.updateQuota(Some("userA"), None, Some(new Quota(8000, 
true)))
+      quotaManager.updateQuota(Some("userA"), Some("client1"), Some(new 
Quota(10000, true)))
+      checkQuota("userA", "client2", 8000, 0, false)
+      checkQuota("userA", "client2", 8000, 4500, true) // Throttled due to sum 
of new and earlier values
+      checkQuota("userA", "client1", 10000, 0, false)
+      checkQuota("userA", "client1", 10000, 6000, true)
+      quotaManager.updateQuota(Some("userA"), Some("client1"), None)
+      checkQuota("userA", "client6", 8000, 0, true)    // Throttled due to 
shared user quota
+      quotaManager.updateQuota(Some("userA"), Some("client6"), Some(new 
Quota(11000, true)))
+      checkQuota("userA", "client6", 11000, 8500, false)
+      quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), 
Some(new Quota(12000, true)))
+      quotaManager.updateQuota(Some("userA"), Some("client6"), None)
+      checkQuota("userA", "client6", 12000, 4000, true) // Throttled due to 
sum of new and earlier values
+
+    } finally {
+      quotaManager.shutdown()
+    }
+  }
+
   @Test
   def testQuotaViolation() {
     val metrics = newMetrics
@@ -84,7 +230,7 @@ class ClientQuotaManagerTest {
        * if we produce under the quota
        */
       for (i <- 0 until 10) {
-        clientMetrics.recordAndMaybeThrottle("unknown", 400, callback)
+        clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, 
callback)
         time.sleep(1000)
       }
       assertEquals(10, numCallbacks)
@@ -95,7 +241,7 @@ class ClientQuotaManagerTest {
       // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100
       // 10.5 seconds because the last window is half complete
       time.sleep(500)
-      val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, 
callback)
+      val sleepTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", 
"unknown", 2300, callback)
 
       assertEquals("Should be throttled", 2100, sleepTime)
       assertEquals(1, queueSizeMetric.value().toInt)
@@ -111,12 +257,12 @@ class ClientQuotaManagerTest {
 
       // Could continue to see delays until the bursty sample disappears
       for (i <- 0 until 10) {
-        clientMetrics.recordAndMaybeThrottle("unknown", 400, callback)
+        clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "unknown", 400, 
callback)
         time.sleep(1000)
       }
 
       assertEquals("Should be unthrottled since bursty sample has rolled over",
-                   0, clientMetrics.recordAndMaybeThrottle("unknown", 0, 
callback))
+                   0, clientMetrics.recordAndMaybeThrottle("ANONYMOUS", 
"unknown", 0, callback))
     } finally {
       clientMetrics.shutdown()
     }
@@ -127,14 +273,14 @@ class ClientQuotaManagerTest {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, 
QuotaType.Produce, time)
     try {
-      clientMetrics.recordAndMaybeThrottle("client1", 100, callback)
+      clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 100, 
callback)
       // remove the throttle time sensor
-      metrics.removeSensor("ProduceThrottleTime-client1")
+      metrics.removeSensor("ProduceThrottleTime-:client1")
       // should not throw an exception even if the throttle time sensor does 
not exist.
-      val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 
10000, callback)
+      val throttleTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", 
"client1", 10000, callback)
       assertTrue("Should be throttled", throttleTime > 0)
       // the sensor should get recreated
-      val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-client1")
+      val throttleTimeSensor = 
metrics.getSensor("ProduceThrottleTime-:client1")
       assertTrue("Throttle time sensor should exist", throttleTimeSensor != 
null)
     } finally {
       clientMetrics.shutdown()
@@ -146,26 +292,37 @@ class ClientQuotaManagerTest {
     val metrics = newMetrics
     val clientMetrics = new ClientQuotaManager(config, metrics, 
QuotaType.Produce, time)
     try {
-      clientMetrics.recordAndMaybeThrottle("client1", 100, callback)
+      clientMetrics.recordAndMaybeThrottle("ANONYMOUS", "client1", 100, 
callback)
       // remove all the sensors
-      metrics.removeSensor("ProduceThrottleTime-client1")
-      metrics.removeSensor("Produce-client1")
+      metrics.removeSensor("ProduceThrottleTime-:client1")
+      metrics.removeSensor("Produce-ANONYMOUS:client1")
       // should not throw an exception
-      val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 
10000, callback)
+      val throttleTime = clientMetrics.recordAndMaybeThrottle("ANONYMOUS", 
"client1", 10000, callback)
       assertTrue("Should be throttled", throttleTime > 0)
 
       // all the sensors should get recreated
-      val throttleTimeSensor = metrics.getSensor("ProduceThrottleTime-client1")
+      val throttleTimeSensor = 
metrics.getSensor("ProduceThrottleTime-:client1")
       assertTrue("Throttle time sensor should exist", throttleTimeSensor != 
null)
 
-      val byteRateSensor = metrics.getSensor("Produce-client1")
+      val byteRateSensor = metrics.getSensor("Produce-:client1")
       assertTrue("Byte rate sensor should exist", byteRateSensor != null)
     } finally {
       clientMetrics.shutdown()
     }
   }
 
+  @Test
+  def testQuotaUserSanitize() {
+    val principal = "CN=Some characters !@#$%&*()_-+=';:,/~"
+    val sanitizedPrincipal = QuotaId.sanitize(principal)
+    // Apart from % used in percent-encoding all characters of sanitized 
principal must be characters allowed in client-id
+    ConfigCommand.validateChars("sanitized-principal", 
sanitizedPrincipal.replace('%', '_'))
+    assertEquals(principal, QuotaId.desanitize(sanitizedPrincipal))
+  }
+
   def newMetrics: Metrics = {
     new Metrics(new MetricConfig(), Collections.emptyList(), time)
   }
+
+  private case class UserClient(val user: String, val clientId: String, val 
configUser: Option[String] = None, val configClientId: Option[String] = None)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index bf11332..14faa80 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -56,45 +56,78 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     }
   }
 
-  @Test
-  def testClientQuotaConfigChange() {
-    assertTrue("Should contain a ConfigHandler for topics",
-      this.servers.head.dynamicConfigHandlers.contains(ConfigType.Client))
-    val clientId = "testClient"
+  private def testQuotaConfigChange(user: String, clientId: String, 
rootEntityType: String, configEntityName: String) {
+    assertTrue("Should contain a ConfigHandler for " + rootEntityType ,
+               
this.servers.head.dynamicConfigHandlers.contains(rootEntityType))
     val props = new Properties()
-    props.put(ClientConfigOverride.ProducerOverride, "1000")
-    props.put(ClientConfigOverride.ConsumerOverride, "2000")
-    AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
+    props.put(QuotaConfigOverride.ProducerOverride, "1000")
+    props.put(QuotaConfigOverride.ConsumerOverride, "2000")
     val quotaManagers = servers.head.apis.quotas
+    rootEntityType match {
+      case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, 
configEntityName, props)
+      case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
configEntityName, props)
+    }
 
     TestUtils.retry(10000) {
-      val overrideProducerQuota = quotaManagers.produce.quota(clientId)
-      val overrideConsumerQuota = quotaManagers.fetch.quota(clientId)
+      val overrideProducerQuota = quotaManagers.produce.quota(user, clientId)
+      val overrideConsumerQuota = quotaManagers.fetch.quota(user, clientId)
 
-      assertEquals(s"ClientId $clientId must have overridden producer quota of 
1000",
+      assertEquals(s"User $user clientId $clientId must have overridden 
producer quota of 1000",
         Quota.upperBound(1000), overrideProducerQuota)
-      assertEquals(s"ClientId $clientId must have overridden consumer quota of 
2000",
+      assertEquals(s"User $user clientId $clientId must have overridden 
consumer quota of 2000",
         Quota.upperBound(2000), overrideConsumerQuota)
     }
 
-    val defaultProducerQuota = 
servers.head.apis.config.producerQuotaBytesPerSecondDefault.doubleValue
-    val defaultConsumerQuota = 
servers.head.apis.config.consumerQuotaBytesPerSecondDefault.doubleValue
-    assertNotEquals("defaultProducerQuota should be different from 1000", 
1000, defaultProducerQuota)
-    assertNotEquals("defaultConsumerQuota should be different from 2000", 
2000, defaultConsumerQuota)
-    AdminUtils.changeClientIdConfig(zkUtils, clientId, new Properties())
+    val defaultProducerQuota = Long.MaxValue.asInstanceOf[Double]
+    val defaultConsumerQuota = Long.MaxValue.asInstanceOf[Double]
 
+    val emptyProps = new Properties()
+    rootEntityType match {
+      case ConfigType.Client => AdminUtils.changeClientIdConfig(zkUtils, 
configEntityName, emptyProps)
+      case _ => AdminUtils.changeUserOrUserClientIdConfig(zkUtils, 
configEntityName, emptyProps)
+    }
     TestUtils.retry(10000) {
-      val producerQuota = quotaManagers.produce.quota(clientId)
-      val consumerQuota = quotaManagers.fetch.quota(clientId)
+      val producerQuota = quotaManagers.produce.quota(user, clientId)
+      val consumerQuota = quotaManagers.fetch.quota(user, clientId)
 
-      assertEquals(s"ClientId $clientId must have reset producer quota to " + 
defaultProducerQuota,
+      assertEquals(s"User $user clientId $clientId must have reset producer 
quota to " + defaultProducerQuota,
         Quota.upperBound(defaultProducerQuota), producerQuota)
-      assertEquals(s"ClientId $clientId must have reset consumer quota to " + 
defaultConsumerQuota,
+      assertEquals(s"User $user clientId $clientId must have reset consumer 
quota to " + defaultConsumerQuota,
         Quota.upperBound(defaultConsumerQuota), consumerQuota)
     }
   }
 
   @Test
+  def testClientIdQuotaConfigChange() {
+    testQuotaConfigChange("ANONYMOUS", "testClient", ConfigType.Client, 
"testClient")
+  }
+
+  @Test
+  def testUserQuotaConfigChange() {
+    testQuotaConfigChange("ANONYMOUS", "testClient", ConfigType.User, 
"ANONYMOUS")
+  }
+
+  @Test
+  def testUserClientIdQuotaChange() {
+    testQuotaConfigChange("ANONYMOUS", "testClient", ConfigType.User, 
"ANONYMOUS/clients/testClient")
+  }
+
+  @Test
+  def testDefaultClientIdQuotaConfigChange() {
+    testQuotaConfigChange("ANONYMOUS", "testClient", ConfigType.Client, 
"<default>")
+  }
+
+  @Test
+  def testDefaultUserQuotaConfigChange() {
+    testQuotaConfigChange("ANONYMOUS", "testClient", ConfigType.User, 
"<default>")
+  }
+
+  @Test
+  def testDefaultUserClientIdQuotaConfigChange() {
+    testQuotaConfigChange("ANONYMOUS", "testClient", ConfigType.User, 
"<default>/clients/<default>")
+  }
+
+  @Test
   def testConfigChangeOnNonExistingTopic() {
     val topic = TestUtils.tempTopic
     try {
@@ -205,4 +238,4 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
     //Then
     assertEquals(Seq(), result)
   }
-}
\ No newline at end of file
+}

Reply via email to