KAFKA-3492; Secure quotas for authenticated users

Implementation and tests for secure quotas at <user> and <user, client-id> 
levels as described in KIP-55. Also adds dynamic default quotas for 
<client-id>, <user> and <user-client-id>. For each client connection, the most 
specific quota matching the connection is used, with user quota taking 
precedence over client-id quota.

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Jun Rao <jun...@gmail.com>

Closes #1753 from rajinisivaram/KAFKA-3492


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/69356fbc
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/69356fbc
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/69356fbc

Branch: refs/heads/trunk
Commit: 69356fbc6e76ab4291ff4957f0d6ea04e7245909
Parents: ecc1fb1
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Sat Sep 17 10:06:05 2016 -0700
Committer: Jun Rao <jun...@gmail.com>
Committed: Sat Sep 17 10:06:05 2016 -0700

----------------------------------------------------------------------
 .../kafka/common/metrics/KafkaMetric.java       |   2 +-
 .../org/apache/kafka/common/metrics/Quota.java  |   5 +
 .../src/main/scala/kafka/admin/AdminUtils.scala |  72 +++-
 .../main/scala/kafka/admin/ConfigCommand.scala  | 192 +++++++++--
 .../scala/kafka/network/RequestChannel.scala    |   5 +-
 .../scala/kafka/server/ClientQuotaManager.scala | 332 ++++++++++++++++---
 .../main/scala/kafka/server/ConfigHandler.scala |  66 ++--
 .../kafka/server/DynamicConfigManager.scala     |  94 ++++--
 .../src/main/scala/kafka/server/KafkaApis.scala |   3 +-
 .../main/scala/kafka/server/KafkaConfig.scala   |   6 +-
 .../main/scala/kafka/server/KafkaServer.scala   |   7 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   3 +
 .../integration/kafka/api/BaseQuotaTest.scala   | 195 +++++++++++
 .../kafka/api/ClientIdQuotaTest.scala           |  55 +++
 .../kafka/api/ClientQuotasTest.scala            | 206 ------------
 .../kafka/api/UserClientIdQuotaTest.scala       |  66 ++++
 .../integration/kafka/api/UserQuotaTest.scala   |  61 ++++
 .../test/scala/unit/kafka/admin/AdminTest.scala |   4 +-
 .../unit/kafka/admin/ConfigCommandTest.scala    | 186 ++++++++++-
 .../scala/unit/kafka/admin/TestAdminUtils.scala |   1 +
 .../kafka/server/ClientQuotaManagerTest.scala   | 217 ++++++++++--
 .../kafka/server/DynamicConfigChangeTest.scala  |  77 +++--
 22 files changed, 1449 insertions(+), 406 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index e4d3ae8..86014e5 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -37,7 +37,7 @@ public final class KafkaMetric implements Metric {
         this.time = time;
     }
 
-    MetricConfig config() {
+    public MetricConfig config() {
         return this.config;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
index 8431e50..663b963 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java
@@ -67,4 +67,9 @@ public final class Quota {
         Quota that = (Quota) obj;
         return (that.bound == this.bound) && (that.upper == this.upper);
     }
+
+    @Override
+    public String toString() {
+        return (upper ? "upper=" : "lower=") + bound;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 400cc47..b3f8e5c 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -41,6 +41,7 @@ import org.I0Itec.zkclient.exception.ZkNodeExistsException
 trait AdminUtilities {
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties)
   def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: 
Properties)
+  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: 
String, configs: Properties)
   def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: 
Properties)
   def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: 
String): Properties
 }
@@ -449,7 +450,7 @@ object AdminUtils extends Logging with AdminUtilities {
     if (!update) {
       // write out the config if there is any, this isn't transactional with 
the partition assignments
       LogConfig.validate(config)
-      writeEntityConfig(zkUtils, ConfigType.Topic, topic, config)
+      writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), 
config)
     }
 
     // create the partition assignment
@@ -476,7 +477,9 @@ object AdminUtils extends Logging with AdminUtilities {
   }
 
   /**
-   * Update the config for a client and create a change notification so the 
change will propagate to other brokers
+   * Update the config for a client and create a change notification so the 
change will propagate to other brokers.
+   * If clientId is <default>, default clientId config is updated. ClientId 
configs are used only if <user, clientId>
+   * and <user> configs are not specified.
    *
    * @param zkUtils Zookeeper utilities used to write the config to ZK
    * @param clientId: The clientId for which configs are being changed
@@ -489,6 +492,21 @@ object AdminUtils extends Logging with AdminUtilities {
   }
 
   /**
+   * Update the config for a <user> or <user, clientId> and create a change 
notification so the change will propagate to other brokers.
+   * User and/or clientId components of the path may be <default>, indicating 
that the configuration is the default
+   * value to be applied if a more specific override is not configured.
+   *
+   * @param zkUtils Zookeeper utilities used to write the config to ZK
+   * @param sanitizedEntityName: <sanitizedUserPrincipal> or 
<sanitizedUserPrincipal>/clients/<clientId>
+   * @param configs: The final set of configs that will be applied to the 
topic. If any new configs need to be added or
+   *                 existing configs need to be deleted, it should be done 
prior to invoking this API
+   *
+   */
+  def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: 
String, configs: Properties) {
+    changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
+  }
+
+  /**
    * Update the config for an existing topic and create a change notification 
so the change will propagate to other brokers
    *
    * @param zkUtils Zookeeper utilities used to write the config to ZK
@@ -520,37 +538,41 @@ object AdminUtils extends Logging with AdminUtilities {
     }
   }
 
-  private def changeEntityConfig(zkUtils: ZkUtils, entityType: String, 
entityName: String, configs: Properties) {
+  private def changeEntityConfig(zkUtils: ZkUtils, rootEntityType: String, 
fullSanitizedEntityName: String, configs: Properties) {
+    val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
+    val entityConfigPath = getEntityConfigPath(rootEntityType, 
fullSanitizedEntityName)
     // write the new config--may not exist if there were previously no 
overrides
-    writeEntityConfig(zkUtils, entityType, entityName, configs)
+    writeEntityConfig(zkUtils, entityConfigPath, configs)
 
     // create the change notification
     val seqNode = ZkUtils.EntityConfigChangesPath + "/" + 
EntityConfigChangeZnodePrefix
-    val content = Json.encode(getConfigChangeZnodeData(entityType, entityName))
+    val content = Json.encode(getConfigChangeZnodeData(sanitizedEntityPath))
     zkUtils.zkClient.createPersistentSequential(seqNode, content)
   }
 
-  def getConfigChangeZnodeData(entityType: String, entityName: String) : 
Map[String, Any] = {
-    Map("version" -> 1, "entity_type" -> entityType, "entity_name" -> 
entityName)
+  def getConfigChangeZnodeData(sanitizedEntityPath: String) : Map[String, Any] 
= {
+    Map("version" -> 2, "entity_path" -> sanitizedEntityPath)
   }
 
   /**
-   * Write out the topic config to zk, if there is any
+   * Write out the entity config to zk, if there is any
    */
-  private def writeEntityConfig(zkUtils: ZkUtils, entityType: String, 
entityName: String, config: Properties) {
+  private def writeEntityConfig(zkUtils: ZkUtils, entityPath: String, config: 
Properties) {
     val configMap: mutable.Map[String, String] = {
       import JavaConversions._
       config
     }
     val map = Map("version" -> 1, "config" -> configMap)
-    zkUtils.updatePersistentPath(getEntityConfigPath(entityType, entityName), 
Json.encode(map))
+    zkUtils.updatePersistentPath(entityPath, Json.encode(map))
   }
 
   /**
-   * Read the entity (topic or client) config (if any) from zk
+   * Read the entity (topic, broker, client, user or <user, client>) config 
(if any) from zk
+   * sanitizedEntityName is <topic>, <broker>, <client-id>, <user> or 
<user>/clients/<client-id>.
    */
-  def fetchEntityConfig(zkUtils: ZkUtils, entityType: String, entity: String): 
Properties = {
-    val str: String = 
zkUtils.zkClient.readData(getEntityConfigPath(entityType, entity), true)
+  def fetchEntityConfig(zkUtils: ZkUtils, rootEntityType: String, 
sanitizedEntityName: String): Properties = {
+    val entityConfigPath = getEntityConfigPath(rootEntityType, 
sanitizedEntityName)
+    val str: String = zkUtils.zkClient.readData(entityConfigPath, true)
     val props = new Properties()
     if (str != null) {
       Json.parseFull(str) match {
@@ -564,13 +586,12 @@ object AdminUtils extends Logging with AdminUtilities {
                 configTup match {
                   case (k: String, v: String) =>
                     props.setProperty(k, v)
-                  case _ => throw new IllegalArgumentException("Invalid " + 
entityType + " config: " + str)
+                  case _ => throw new IllegalArgumentException(s"Invalid 
${entityConfigPath} config: ${str}")
                 }
-            case _ => throw new IllegalArgumentException("Invalid " + 
entityType + " config: " + str)
+            case _ => throw new IllegalArgumentException(s"Invalid 
${entityConfigPath} config: ${str}")
           }
 
-        case o => throw new IllegalArgumentException("Unexpected value in 
config:(%s), entity_type: (%s), entity: (%s)"
-                                                             .format(str, 
entityType, entity))
+        case o => throw new IllegalArgumentException(s"Unexpected value in 
config:(${str}), entity_config_path: ${entityConfigPath}")
       }
     }
     props
@@ -582,6 +603,23 @@ object AdminUtils extends Logging with AdminUtilities {
   def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, 
Properties] =
     zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, 
fetchEntityConfig(zkUtils, entityType, entity))).toMap
 
+  def fetchAllChildEntityConfigs(zkUtils: ZkUtils, rootEntityType: String, 
childEntityType: String): Map[String, Properties] = {
+    def entityPaths(zkUtils: ZkUtils, rootPath: Option[String]): Seq[String] = 
{
+      val root = rootPath match {
+        case Some(path) => rootEntityType + '/' + rootPath
+        case None => rootEntityType
+      }
+      val entityNames = zkUtils.getAllEntitiesWithConfig(root)
+      rootPath match {
+        case Some(path) => entityNames.map(entityName => path + '/' + 
entityName)
+        case None => entityNames
+      }
+    }
+    entityPaths(zkUtils, None)
+      .flatMap(entity => entityPaths(zkUtils, Some(entity + '/' + 
childEntityType)))
+      .map(entityPath => (entityPath, fetchEntityConfig(zkUtils, 
rootEntityType, entityPath))).toMap
+  }
+
   def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): 
MetadataResponse.TopicMetadata =
     fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index ebf9e61..58bdb7a 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -21,8 +21,9 @@ import java.util.Properties
 
 import joptsimple._
 import kafka.admin.TopicCommand._
+import kafka.common.Config
 import kafka.log.{Defaults, LogConfig}
-import kafka.server.{KafkaConfig, ClientConfigOverride, ConfigType}
+import kafka.server.{KafkaConfig, QuotaConfigOverride, ConfigType, 
ConfigEntityName, QuotaId}
 import kafka.utils.{CommandLineUtils, ZkUtils}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.utils.Utils
@@ -33,15 +34,26 @@ import scala.collection._
 
 /**
  * This script can be used to change configs for topics/clients/brokers 
dynamically
+ * This script can be used to change configs for topics/clients/users/brokers 
dynamically
+ * An entity described or altered by the command may be one of:
+ * <ul>
+ *     <li> topic: --entity-type topics --entity-name <topic>
+ *     <li> client: --entity-type clients --entity-name <client-id>
+ *     <li> user: --entity-type users --entity-name <user-principal>
+ *     <li> <user, client>: --entity-type users --entity-name <user-principal> 
--entity-type clients --entity-name <client-id>
+ *     <li> broker: --entity-type brokers --entity-name <broker>
+ * </ul>
+ * --entity-default may be used instead of --entity-name when describing or 
altering default configuration for users and clients.
+ *
  */
-object ConfigCommand {
+object ConfigCommand extends Config {
 
   def main(args: Array[String]): Unit = {
 
     val opts = new ConfigCommandOptions(args)
 
     if(args.length == 0)
-      CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config 
for a topic, client or broker")
+      CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity config 
for a topic, client, user or broker")
 
     opts.checkArgs()
 
@@ -57,18 +69,19 @@ object ConfigCommand {
         describeConfig(zkUtils, opts)
     } catch {
       case e: Throwable =>
-        println("Error while executing topic command " + e.getMessage)
+        println("Error while executing config command " + e.getMessage)
         println(Utils.stackTrace(e))
     } finally {
       zkUtils.close()
     }
   }
 
-  def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, utils: 
AdminUtilities = AdminUtils) {
+  private[admin] def alterConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions, 
utils: AdminUtilities = AdminUtils) {
     val configsToBeAdded = parseConfigsToBeAdded(opts)
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
-    val entityType = opts.options.valueOf(opts.entityType)
-    val entityName = opts.options.valueOf(opts.entityName)
+    val entity = parseEntity(opts)
+    val entityType = entity.root.entityType
+    val entityName = entity.fullSanitizedName
     warnOnMaxMessagesChange(configsToBeAdded, opts.options.has(opts.forceOpt))
 
     // compile the final set of configs
@@ -77,12 +90,13 @@ object ConfigCommand {
     configsToBeDeleted.foreach(config => configs.remove(config))
 
     entityType match {
-      case ConfigType.Topic =>  utils.changeTopicConfig(zkUtils, entityName, 
configs)
-      case ConfigType.Client =>  utils.changeClientIdConfig(zkUtils, 
entityName, configs)
+      case ConfigType.Topic => utils.changeTopicConfig(zkUtils, entityName, 
configs)
+      case ConfigType.Client => utils.changeClientIdConfig(zkUtils, 
entityName, configs)
+      case ConfigType.User => utils.changeUserOrUserClientIdConfig(zkUtils, 
entityName, configs)
       case ConfigType.Broker => utils.changeBrokerConfig(zkUtils, 
Seq(parseBroker(entityName)), configs)
       case _ => throw new IllegalArgumentException(s"$entityType is not a 
known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, 
${ConfigType.Broker}")
     }
-    println(s"Updated config for EntityType:$entityType => 
EntityName:'$entityName'.")
+    println(s"Updated config for entity: $entity.")
   }
 
   def warnOnMaxMessagesChange(configs: Properties, force: Boolean): Unit = {
@@ -107,17 +121,16 @@ object ConfigCommand {
   }
 
   private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
-    val entityType = opts.options.valueOf(opts.entityType)
-    val entityNames: Seq[String] =
-      if (opts.options.has(opts.entityName))
-        Seq(opts.options.valueOf(opts.entityName))
-      else
-        zkUtils.getAllEntitiesWithConfig(entityType)
-
-    for (entityName <- entityNames) {
-      val configs = AdminUtils.fetchEntityConfig(zkUtils, entityType, 
entityName)
-      println("Configs for %s:%s are %s"
-        .format(entityType, entityName, configs.map(kv => kv._1 + "=" + 
kv._2).mkString(",")))
+    val configEntity = parseEntity(opts)
+    val describeAllUsers = configEntity.root.entityType == ConfigType.User && 
!configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined
+    val entities = configEntity.getAllEntities(zkUtils)
+    for (entity <- entities) {
+      val configs = AdminUtils.fetchEntityConfig(zkUtils, 
entity.root.entityType, entity.fullSanitizedName)
+      // When describing all users, don't include empty user nodes with only 
<user, client> quota overrides.
+      if (!configs.isEmpty || !describeAllUsers) {
+        println("Configs for %s are %s"
+          .format(entity, configs.map(kv => kv._1 + "=" + 
kv._2).mkString(",")))
+      }
     }
   }
 
@@ -150,6 +163,115 @@ object ConfigCommand {
       Seq.empty
   }
 
+  case class Entity(entityType: String, sanitizedName: Option[String]) {
+    val entityPath = sanitizedName match {
+      case Some(n) => entityType + "/" + n
+      case None => entityType
+    }
+    override def toString: String = {
+      val typeName = entityType match {
+        case ConfigType.User => "user-principal"
+        case ConfigType.Client => "client-id"
+        case ConfigType.Topic => "topic"
+        case t => t
+      }
+      sanitizedName match {
+        case Some(ConfigEntityName.Default) => "default " + typeName
+        case Some(n) =>
+          val desanitized = if (entityType == ConfigType.User) 
QuotaId.desanitize(n) else n
+          s"$typeName '$desanitized'"
+        case None => entityType
+      }
+    }
+  }
+
+  case class ConfigEntity(root: Entity, child: Option[Entity]) {
+    val fullSanitizedName = root.sanitizedName.getOrElse("") + child.map(s => 
"/" + s.entityPath).getOrElse("")
+
+    def getAllEntities(zkUtils: ZkUtils) : Seq[ConfigEntity] = {
+      // Describe option examples:
+      //   Describe entity with specified name:
+      //     --entity-type topics --entity-name topic1 (topic1)
+      //   Describe all entities of a type (topics/brokers/users/clients):
+      //     --entity-type topics (all topics)
+      //   Describe <user, client> quotas:
+      //     --entity-type users --entity-name user1 --entity-type clients 
--entity-name client2 (<user1, client2>)
+      //     --entity-type users --entity-name userA --entity-type clients 
(all clients of userA)
+      //     --entity-type users --entity-type clients (all <user, client>s))
+      //   Describe default quotas:
+      //     --entity-type users --entity-default (Default user)
+      //     --entity-type users --entity-default --entity-type clients 
--entity-default (Default <user, client>)
+      (root.sanitizedName, child) match {
+        case (None, _) =>
+          val rootEntities = zkUtils.getAllEntitiesWithConfig(root.entityType)
+                                   .map(name => 
ConfigEntity(Entity(root.entityType, Some(name)), child))
+          child match {
+            case Some (s) =>
+                rootEntities.flatMap(rootEntity =>
+                  ConfigEntity(rootEntity.root, Some(Entity(s.entityType, 
None))).getAllEntities(zkUtils))
+            case None => rootEntities
+          }
+        case (rootName, Some(childEntity)) =>
+          childEntity.sanitizedName match {
+            case Some(subName) => Seq(this)
+            case None =>
+                zkUtils.getAllEntitiesWithConfig(root.entityPath + "/" + 
childEntity.entityType)
+                       .map(name => ConfigEntity(root, 
Some(Entity(childEntity.entityType, Some(name)))))
+
+          }
+        case (rootName, None) =>
+          Seq(this)
+      }
+    }
+
+    override def toString: String = {
+      root.toString + child.map(s => ", " + s.toString).getOrElse("")
+    }
+  }
+
+  private[admin] def parseEntity(opts: ConfigCommandOptions): ConfigEntity = {
+    val entityTypes = opts.options.valuesOf(opts.entityType)
+    if (entityTypes.head == ConfigType.User || entityTypes.head == 
ConfigType.Client)
+      parseQuotaEntity(opts)
+    else {
+      // Exactly one entity type and at-most one entity name expected for 
other entities
+      val name = if (opts.options.has(opts.entityName)) 
Some(opts.options.valueOf(opts.entityName)) else None
+      ConfigEntity(Entity(entityTypes.head, name), None)
+    }
+  }
+
+  private def parseQuotaEntity(opts: ConfigCommandOptions): ConfigEntity = {
+    val types = opts.options.valuesOf(opts.entityType)
+    val namesIterator = opts.options.valuesOf(opts.entityName).iterator
+    val names = opts.options.specs
+                    .filter(spec => spec.options.contains("entity-name") || 
spec.options.contains("entity-default"))
+                    .map(spec => if (spec.options.contains("entity-name")) 
namesIterator.next else "")
+
+    if (opts.options.has(opts.alterOpt) && names.size != types.size)
+      throw new IllegalArgumentException("--entity-name or --entity-default 
must be specified with each --entity-type for --alter")
+
+    val reverse = types.size == 2 && types(0) == ConfigType.Client
+    val entityTypes = if (reverse) types.reverse else types.toBuffer
+    val sortedNames = (if (reverse && names.length == 2) names.reverse else 
names).iterator
+
+    def sanitizeName(entityType: String, name: String) = {
+      if (name.isEmpty)
+        ConfigEntityName.Default
+      else {
+        entityType match {
+          case ConfigType.User => QuotaId.sanitize(name)
+          case ConfigType.Client =>
+            validateChars("Client-id", name)
+            name
+          case _ => throw new IllegalArgumentException("Invalid entity type " 
+ entityType)
+        }
+      }
+    }
+
+    val entities = entityTypes.map(t => Entity(t, if (sortedNames.hasNext) 
Some(sanitizeName(t, sortedNames.next)) else None))
+    ConfigEntity(entities.head, if (entities.size > 1) Some(entities(1)) else 
None)
+  }
+
   class ConfigCommandOptions(args: Array[String]) {
     val parser = new OptionParser
     val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection 
string for the zookeeper connection in the form host:port. " +
@@ -159,19 +281,23 @@ object ConfigCommand {
             .ofType(classOf[String])
     val alterOpt = parser.accepts("alter", "Alter the configuration for the 
entity.")
     val describeOpt = parser.accepts("describe", "List configs for the given 
entity.")
-    val entityType = parser.accepts("entity-type", "Type of entity 
(topics/clients/brokers)")
+    val entityType = parser.accepts("entity-type", "Type of entity 
(topics/clients/users/brokers)")
             .withRequiredArg
             .ofType(classOf[String])
-    val entityName = parser.accepts("entity-name", "Name of entity (topic 
name/client id/broker id)")
+    val entityName = parser.accepts("entity-name", "Name of entity (topic 
name/client id/user principal name/broker id)")
             .withRequiredArg
             .ofType(classOf[String])
+    val entityDefault = parser.accepts("entity-default", "Default entity name 
for clients/users (applies to corresponding entity type in command line)")
 
     val nl = System.getProperty("line.separator")
     val addConfig = parser.accepts("add-config", "Key Value pairs of configs 
to add. Square brackets can be used to group values which contain commas: 
'k1=v1,k2=[v1,v2,v2],k3=v3'. The following is a list of valid configurations: " 
+
             "For entity_type '" + ConfigType.Topic + "': " + nl + 
LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
             "For entity_type '" + ConfigType.Broker + "': " + nl + 
KafkaConfig.dynamicBrokerConfigs.map("\t" + _).mkString(nl) + nl +
-            "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + 
ClientConfigOverride.ProducerOverride
-                                                            + nl + "\t" + 
ClientConfigOverride.ConsumerOverride)
+            "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + 
QuotaConfigOverride.ProducerOverride
+                                                            + nl + "\t" + 
QuotaConfigOverride.ConsumerOverride + nl +
+            "For entity_type '" + ConfigType.User + "': " + nl + "\t" + 
QuotaConfigOverride.ProducerOverride
+                                                          + nl + "\t" + 
QuotaConfigOverride.ConsumerOverride + nl +
+            s"Entity types '${ConfigType.User}' and '${ConfigType.Client}' may 
be specified together to update config for clients of a specific user.")
             .withRequiredArg
             .ofType(classOf[String])
     val deleteConfig = parser.accepts("delete-config", "config keys to remove 
'k1,k2'")
@@ -194,15 +320,27 @@ object ConfigCommand {
       CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, 
entityType)
       CommandLineUtils.checkInvalidArgs(parser, options, alterOpt, 
Set(describeOpt))
       CommandLineUtils.checkInvalidArgs(parser, options, describeOpt, 
Set(alterOpt, addConfig, deleteConfig))
+      val entityTypeVals = options.valuesOf(entityType)
       if(options.has(alterOpt)) {
-        require(options.has(entityName), "--entity-name must be specified with 
--alter")
+        if (entityTypeVals.contains(ConfigType.User) || 
entityTypeVals.contains(ConfigType.Client)) {
+          if (!options.has(entityName) && !options.has(entityDefault))
+            throw new IllegalArgumentException("--entity-name or 
--entity-default must be specified with --alter of users/clients")
+        } else if (!options.has(entityName))
+            throw new IllegalArgumentException(s"--entity-name must be 
specified with --alter of ${entityTypeVals}")
 
         val isAddConfigPresent: Boolean = options.has(addConfig)
         val isDeleteConfigPresent: Boolean = options.has(deleteConfig)
         if(! isAddConfigPresent && ! isDeleteConfigPresent)
           throw new IllegalArgumentException("At least one of --add-config or 
--delete-config must be specified with --alter")
       }
-      require(ConfigType.all.contains(options.valueOf(entityType)), 
s"--entity-type must be one of ${ConfigType.all}")
+      entityTypeVals.foreach(entityTypeVal =>
+        if (!ConfigType.all.contains(entityTypeVal))
+          throw new IllegalArgumentException(s"Invalid entity-type 
${entityTypeVal}, --entity-type must be one of ${ConfigType.all}")
+      )
+      if (entityTypeVals.isEmpty)
+        throw new IllegalArgumentException("At least one --entity-type must be 
specified")
+      else if (entityTypeVals.size > 1 && 
!entityTypeVals.toSet.equals(Set(ConfigType.User, ConfigType.Client)))
+        throw new IllegalArgumentException(s"Only '${ConfigType.User}' and 
'${ConfigType.Client}' entity types may be specified together")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala 
b/core/src/main/scala/kafka/network/RequestChannel.scala
index cff7b1a..8aec2d2 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -25,6 +25,7 @@ import java.util.concurrent._
 import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server.QuotaId
 import kafka.utils.{Logging, SystemTime}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.InvalidRequestException
@@ -44,7 +45,9 @@ object RequestChannel extends Logging {
     RequestSend.serialize(emptyRequestHeader, emptyProduceRequest.toStruct)
   }
 
-  case class Session(principal: KafkaPrincipal, clientAddress: InetAddress)
+  case class Session(principal: KafkaPrincipal, clientAddress: InetAddress) {
+    val sanitizedUser = QuotaId.sanitize(principal.getName)
+  }
 
   case class Request(processor: Int, connectionId: String, session: Session, 
private var buffer: ByteBuffer, startTimeMs: Long, securityProtocol: 
SecurityProtocol) {
     // These need to be volatile because the readers are in the network thread 
and the writers are in the request

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/ClientQuotaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index e6cac5d..c4472c6 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -16,15 +16,19 @@
  */
 package kafka.server
 
+import java.net.{URLEncoder, URLDecoder}
+import java.nio.charset.StandardCharsets
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
 import kafka.utils.{ShutdownableThread, Logging}
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg}
-import java.util.concurrent.locks.ReentrantReadWriteLock
-
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConversions._
+
 /**
  * Represents the sensors aggregated per client
  * @param quotaSensor @Sensor that tracks the quota
@@ -34,7 +38,8 @@ private case class ClientSensors(quotaSensor: Sensor, 
throttleTimeSensor: Sensor
 
 /**
  * Configuration settings for quota management
- * @param quotaBytesPerSecondDefault The default bytes per second quota 
allocated to any client
+ * @param quotaBytesPerSecondDefault The default bytes per second quota 
allocated to any client-id if
+ *        dynamic defaults or user quotas are not set
  * @param numQuotaSamples The number of samples to retain in memory
  * @param quotaWindowSizeSeconds The time span of each sample
  *
@@ -53,11 +58,72 @@ object ClientQuotaManagerConfig {
   val DefaultQuotaWindowSizeSeconds = 1
   // Purge sensors after 1 hour of inactivity
   val InactiveSensorExpirationTimeSeconds  = 3600
+
+  val UnlimitedQuota = Quota.upperBound(Long.MaxValue)
+  val DefaultClientIdQuotaId = QuotaId(None, Some(ConfigEntityName.Default))
+  val DefaultUserQuotaId = QuotaId(Some(ConfigEntityName.Default), None)
+  val DefaultUserClientIdQuotaId = QuotaId(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default))
+}
+
+object QuotaTypes {
+  val NoQuotas = 0
+  val ClientIdQuotaEnabled = 1
+  val UserQuotaEnabled = 2
+  val UserClientIdQuotaEnabled = 4
 }
 
+object QuotaId {
+
+  /**
+   * Sanitizes user principal to a safe value for use in MetricName
+   * and as Zookeeper node name
+   */
+  def sanitize(user: String): String = {
+    val encoded = URLEncoder.encode(user, StandardCharsets.UTF_8.name)
+    val builder = new StringBuilder
+    for (i <- 0 until encoded.length) {
+      encoded.charAt(i) match {
+        case '*' => builder.append("%2A") // Metric ObjectName treats * as 
pattern
+        case '+' => builder.append("%20") // Space URL-encoded as +, replace 
with percent encoding
+        case c => builder.append(c)
+      }
+    }
+    builder.toString
+  }
+
+  /**
+   * Decodes sanitized user principal
+   */
+  def desanitize(sanitizedUser: String): String = {
+    URLDecoder.decode(sanitizedUser, StandardCharsets.UTF_8.name)
+  }
+}
+
+case class QuotaId(sanitizedUser: Option[String], clientId: Option[String])
+
+case class QuotaEntity(quotaId: QuotaId, sanitizedUser: String, clientId: 
String, quota: Quota)
+
 /**
  * Helper class that records per-client metrics. It is also responsible for 
maintaining Quota usage statistics
  * for all clients.
+ * <p/>
+ * Quotas can be set at <user, client-id>, user or client-id levels. For a 
given client connection,
+ * the most specific quota matching the connection will be applied. For 
example, if both a <user, client-id>
+ * and a user quota match a connection, the <user, client-id> quota will be 
used. Otherwise, user quota takes
+ * precedence over client-id quota. The order of precedence is:
+ * <ul>
+ *   <li>/config/users/<user>/clients/<client-id>
+ *   <li>/config/users/<user>/clients/<default>
+ *   <li>/config/users/<user>
+ *   <li>/config/users/<default>/clients/<client-id>
+ *   <li>/config/users/<default>/clients/<default>
+ *   <li>/config/users/<default>
+ *   <li>/config/clients/<client-id>
+ *   <li>/config/clients/<default>
+ * </ul>
+ * Quota limits including defaults may be updated dynamically. The 
implementation is optimized for the case
+ * where a single level of quotas is configured.
+ *
  * @param config @ClientQuotaManagerConfig quota configs
  * @param metrics @Metrics Metrics instance
  * @param apiKey API Key for the request
@@ -67,8 +133,9 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
                          private val metrics: Metrics,
                          private val apiKey: QuotaType,
                          private val time: Time) extends Logging {
-  private val overriddenQuota = new ConcurrentHashMap[String, Quota]()
-  private val defaultQuota = 
Quota.upperBound(config.quotaBytesPerSecondDefault)
+  private val overriddenQuota = new ConcurrentHashMap[QuotaId, Quota]()
+  private val staticConfigClientIdQuota = 
Quota.upperBound(config.quotaBytesPerSecondDefault)
+  private var quotaTypesEnabled = if (config.quotaBytesPerSecondDefault == 
Long.MaxValue) QuotaTypes.NoQuotas else QuotaTypes.ClientIdQuotaEnabled
   private val lock = new ReentrantReadWriteLock()
   private val delayQueue = new DelayQueue[ThrottledResponse]()
   private val sensorAccessor = new SensorAccess
@@ -107,8 +174,9 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
    * @return Number of milliseconds to delay the response in case of Quota 
violation.
    *         Zero otherwise
    */
-  def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => 
Unit): Int = {
-    val clientSensors = getOrCreateQuotaSensors(clientId)
+  def recordAndMaybeThrottle(sanitizedUser: String, clientId: String, value: 
Int, callback: Int => Unit): Int = {
+    val clientQuotaEntity = quotaEntity(sanitizedUser, clientId)
+    val clientSensors = getOrCreateQuotaSensors(clientQuotaEntity)
     var throttleTimeMs = 0
     try {
       clientSensors.quotaSensor.record(value)
@@ -117,8 +185,8 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     } catch {
       case qve: QuotaViolationException =>
         // Compute the delay
-        val clientMetric = 
metrics.metrics().get(clientRateMetricName(clientId))
-        throttleTimeMs = throttleTime(clientMetric, 
getQuotaMetricConfig(quota(clientId)))
+        val clientMetric = 
metrics.metrics().get(clientRateMetricName(clientQuotaEntity.sanitizedUser, 
clientQuotaEntity.clientId))
+        throttleTimeMs = throttleTime(clientMetric, 
getQuotaMetricConfig(clientQuotaEntity.quota))
         clientSensors.throttleTimeSensor.record(throttleTimeMs)
         // If delayed, add the element to the delayQueue
         delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
@@ -128,6 +196,127 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     throttleTimeMs
   }
 
+  /**
+   * Determines the quota-id for the client with the specified user principal
+   * and client-id and returns the quota entity that encapsulates the quota-id
+   * and the associated quota override or default quota.
+   *
+   */
+  private def quotaEntity(sanitizedUser: String, clientId: String) : 
QuotaEntity = {
+    quotaTypesEnabled match {
+      case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled =>
+        val quotaId = QuotaId(None, Some(clientId))
+        var quota = overriddenQuota.get(quotaId)
+        if (quota == null) {
+          quota = 
overriddenQuota.get(ClientQuotaManagerConfig.DefaultClientIdQuotaId)
+          if (quota == null)
+            quota = staticConfigClientIdQuota
+        }
+        QuotaEntity(quotaId, "", clientId, quota)
+      case QuotaTypes.UserQuotaEnabled =>
+        val quotaId = QuotaId(Some(sanitizedUser), None)
+        var quota = overriddenQuota.get(quotaId)
+        if (quota == null) {
+          quota = 
overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserQuotaId)
+          if (quota == null)
+            quota = ClientQuotaManagerConfig.UnlimitedQuota
+        }
+        QuotaEntity(quotaId, sanitizedUser, "", quota)
+      case QuotaTypes.UserClientIdQuotaEnabled =>
+        val quotaId = QuotaId(Some(sanitizedUser), Some(clientId))
+        var quota = overriddenQuota.get(quotaId)
+        if (quota == null) {
+          quota = overriddenQuota.get(QuotaId(Some(sanitizedUser), 
Some(ConfigEntityName.Default)))
+          if (quota == null) {
+            quota = 
overriddenQuota.get(QuotaId(Some(ConfigEntityName.Default), Some(clientId)))
+            if (quota == null) {
+              quota = 
overriddenQuota.get(ClientQuotaManagerConfig.DefaultUserClientIdQuotaId)
+              if (quota == null)
+                quota = ClientQuotaManagerConfig.UnlimitedQuota
+            }
+          }
+        }
+        QuotaEntity(quotaId, sanitizedUser, clientId, quota)
+      case _ =>
+        quotaEntityWithMultipleQuotaLevels(sanitizedUser, clientId)
+    }
+  }
+
+  private def quotaEntityWithMultipleQuotaLevels(sanitizedUser: String, 
clientId: String) : QuotaEntity = {
+    val userClientQuotaId = QuotaId(Some(sanitizedUser), Some(clientId))
+
+    val userQuotaId = QuotaId(Some(sanitizedUser), None)
+    val clientQuotaId = QuotaId(None, Some(clientId))
+    var quotaId = userClientQuotaId
+    var quotaConfigId = userClientQuotaId
+    // 1) /config/users/<user>/clients/<client-id>
+    var quota = overriddenQuota.get(quotaConfigId)
+    if (quota == null) {
+      // 2) /config/users/<user>/clients/<default>
+      quotaId = userClientQuotaId
+      quotaConfigId = QuotaId(Some(sanitizedUser), 
Some(ConfigEntityName.Default))
+      quota = overriddenQuota.get(quotaConfigId)
+
+      if (quota == null) {
+        // 3) /config/users/<user>
+        quotaId = userQuotaId
+        quotaConfigId = quotaId
+        quota = overriddenQuota.get(quotaConfigId)
+
+        if (quota == null) {
+          // 4) /config/users/<default>/clients/<client-id>
+          quotaId = userClientQuotaId
+          quotaConfigId = QuotaId(Some(ConfigEntityName.Default), 
Some(clientId))
+          quota = overriddenQuota.get(quotaConfigId)
+
+          if (quota == null) {
+            // 5) /config/users/<default>/clients/<default>
+            quotaId = userClientQuotaId
+            quotaConfigId = QuotaId(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default))
+            quota = overriddenQuota.get(quotaConfigId)
+
+            if (quota == null) {
+              // 6) /config/users/<default>
+              quotaId = userQuotaId
+              quotaConfigId = QuotaId(Some(ConfigEntityName.Default), None)
+              quota = overriddenQuota.get(quotaConfigId)
+
+              if (quota == null) {
+                // 7) /config/clients/<client-id>
+                quotaId = clientQuotaId
+                quotaConfigId = QuotaId(None, Some(clientId))
+                quota = overriddenQuota.get(quotaConfigId)
+
+                if (quota == null) {
+                  // 8) /config/clients/<default>
+                  quotaId = clientQuotaId
+                  quotaConfigId = QuotaId(None, Some(ConfigEntityName.Default))
+                  quota = overriddenQuota.get(quotaConfigId)
+
+                  if (quota == null) {
+                    quotaId = clientQuotaId
+                    quotaConfigId = null
+                    quota = staticConfigClientIdQuota
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+    val quotaUser = if (quotaId == clientQuotaId) "" else sanitizedUser
+    val quotaClientId = if (quotaId == userQuotaId) "" else clientId
+    QuotaEntity(quotaId, quotaUser, quotaClientId, quota)
+  }
+
+  /**
+   * Returns the quota for the client with the specified (non-encoded) user 
principal and client-id.
+   */
+  def quota(user: String, clientId: String) = {
+    quotaEntity(QuotaId.sanitize(user), clientId).quota
+  }
+
   /*
    * This calculates the amount of time needed to bring the metric within quota
    * assuming that no new metrics are recorded.
@@ -153,40 +342,35 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
     }
   }
 
-  /**
-   * Returns the quota for the specified clientId
-   */
-  def quota(clientId: String): Quota =
-    if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) 
else defaultQuota
-
   /*
    * This function either returns the sensors for a given client id or creates 
them if they don't exist
    * First sensor of the tuple is the quota enforcement sensor. Second one is 
the throttle time sensor
    */
-  private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {
+  private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors 
= {
+    // Names of the sensors to access
     ClientSensors(
       sensorAccessor.getOrCreate(
-        getQuotaSensorName(clientId),
+        getQuotaSensorName(quotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
         lock, metrics,
-        () => clientRateMetricName(clientId),
-        () => getQuotaMetricConfig(quota(clientId)),
+        () => clientRateMetricName(quotaEntity.sanitizedUser, 
quotaEntity.clientId),
+        () => getQuotaMetricConfig(quotaEntity.quota),
         () => new Rate()
       ),
-      sensorAccessor.getOrCreate(getThrottleTimeSensorName(clientId),
+      
sensorAccessor.getOrCreate(getThrottleTimeSensorName(quotaEntity.quotaId),
         ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds,
         lock,
         metrics,
-        () => metrics.metricName("throttle-time", apiKey.toString, "Tracking 
average throttle-time per client", "client-id", clientId),
+        () => throttleMetricName(quotaEntity),
         () => null,
         () => new Avg()
       )
     )
   }
 
-  private def getThrottleTimeSensorName(clientId: String): String = apiKey + 
"ThrottleTime-" + clientId
+  private def getThrottleTimeSensorName(quotaId: QuotaId): String = apiKey + 
"ThrottleTime-" + quotaId.sanitizedUser.getOrElse("") + ':' + 
quotaId.clientId.getOrElse("")
 
-  private def getQuotaSensorName(clientId: String): String = apiKey + "-" + 
clientId
+  private def getQuotaSensorName(quotaId: QuotaId): String = apiKey + "-" + 
quotaId.sanitizedUser.getOrElse("") + ':' + quotaId.clientId.getOrElse("")
 
   private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
     new MetricConfig()
@@ -196,19 +380,13 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
   }
 
   /**
-   * Reset quotas to the default value for the given clientId
-   * @param clientId client to override
-   */
-  def resetQuota(clientId: String) = {
-    updateQuota(clientId, defaultQuota)
-  }
-
-  /**
-   * Overrides quotas per clientId
-   * @param clientId client to override
-   * @param quota custom quota to apply
+   * Overrides quotas for <user>, <client-id> or <user, client-id> or the 
dynamic defaults
+   * for any of these levels.
+   * @param sanitizedUser user to override if quota applies to <user> or 
<user, client-id>
+   * @param clientId client to override if quota applies to <client-id> or 
<user, client-id>
+   * @param quota custom quota to apply or None if quota override is being 
removed
    */
-  def updateQuota(clientId: String, quota: Quota) = {
+  def updateQuota(sanitizedUser: Option[String], clientId: Option[String], 
quota: Option[Quota]) {
     /*
      * Acquire the write lock to apply changes in the quota objects.
      * This method changes the quota in the overriddenQuota map and applies 
the update on the actual KafkaMetric object (if it exists).
@@ -218,31 +396,85 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
      */
     lock.writeLock().lock()
     try {
-      logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}")
-
-      if (quota.equals(defaultQuota))
-        this.overriddenQuota.remove(clientId)
-      else
-        this.overriddenQuota.put(clientId, quota)
-
-      // Change the underlying metric config if the sensor has been created.
-      // Note the metric could be expired by another thread, so use a local 
variable and null check.
-      val metric = metrics.metrics.get(clientRateMetricName(clientId))
-      if (metric != null) {
-        logger.info(s"Sensor for clientId $clientId already exists. Changing 
quota to ${quota.bound()} in MetricConfig")
-        metric.config(getQuotaMetricConfig(quota))
+      val quotaId = QuotaId(sanitizedUser, clientId)
+      val userInfo = sanitizedUser match {
+        case Some(ConfigEntityName.Default) => "default user "
+        case Some(user) => "user " + user + " "
+        case None => ""
+      }
+      val clientIdInfo = clientId match {
+        case Some(ConfigEntityName.Default) => "default client-id"
+        case Some(id) => "client-id " + id
+        case None => ""
+      }
+      quota match {
+        case Some(newQuota) =>
+          logger.info(s"Changing ${apiKey} quota for 
${userInfo}${clientIdInfo} to ${newQuota.bound}")
+          overriddenQuota.put(quotaId, newQuota)
+          (sanitizedUser, clientId) match {
+            case (Some(u), Some(c)) => quotaTypesEnabled |= 
QuotaTypes.UserClientIdQuotaEnabled
+            case (Some(u), None) => quotaTypesEnabled |= 
QuotaTypes.UserQuotaEnabled
+            case (None, Some(c)) => quotaTypesEnabled |= 
QuotaTypes.ClientIdQuotaEnabled
+            case (None, None) =>
+          }
+        case None =>
+          logger.info(s"Removing ${apiKey} quota for 
${userInfo}${clientIdInfo}")
+          overriddenQuota.remove(quotaId)
       }
+
+      val quotaMetricName = clientRateMetricName(sanitizedUser.getOrElse(""), 
clientId.getOrElse(""))
+      val allMetrics = metrics.metrics()
+
+      // If multiple-levels of quotas are defined or if this is a default 
quota update, traverse metrics
+      // to find all affected values. Otherwise, update just the single 
matching one.
+      val singleUpdate = quotaTypesEnabled match {
+        case QuotaTypes.NoQuotas | QuotaTypes.ClientIdQuotaEnabled | 
QuotaTypes.UserQuotaEnabled | QuotaTypes.UserClientIdQuotaEnabled =>
+          !sanitizedUser.filter(_ == ConfigEntityName.Default).isDefined && 
!clientId.filter(_ == ConfigEntityName.Default).isDefined
+        case _ => false
+      }
+      if (singleUpdate) {
+          // Change the underlying metric config if the sensor has been created
+          val metric = allMetrics.get(quotaMetricName)
+          if (metric != null) {
+            val metricConfigEntity = quotaEntity(sanitizedUser.getOrElse(""), 
clientId.getOrElse(""))
+            val newQuota = metricConfigEntity.quota
+            logger.info(s"Sensor for ${userInfo}${clientIdInfo} already 
exists. Changing quota to ${newQuota.bound()} in MetricConfig")
+            metric.config(getQuotaMetricConfig(newQuota))
+          }
+      } else {
+          allMetrics.filterKeys(n => n.name == quotaMetricName.name && n.group 
== quotaMetricName.group).foreach {
+            case (metricName, metric) =>
+              val userTag = if (metricName.tags.containsKey("user")) 
metricName.tags.get("user") else ""
+              val clientIdTag = if (metricName.tags.containsKey("client-id")) 
metricName.tags.get("client-id") else ""
+              val metricConfigEntity = quotaEntity(userTag, clientIdTag)
+              if (metricConfigEntity.quota != metric.config.quota) {
+                val newQuota = metricConfigEntity.quota
+                logger.info(s"Sensor for quota-id 
${metricConfigEntity.quotaId} already exists. Setting quota to 
${newQuota.bound} in MetricConfig")
+                metric.config(getQuotaMetricConfig(newQuota))
+              }
+          }
+      }
+
     } finally {
       lock.writeLock().unlock()
     }
   }
 
-  private def clientRateMetricName(clientId: String): MetricName = {
+  private def clientRateMetricName(sanitizedUser: String, clientId: String): 
MetricName = {
     metrics.metricName("byte-rate", apiKey.toString,
-                   "Tracking byte-rate per client",
+                   "Tracking byte-rate per user/client-id",
+                   "user", sanitizedUser,
                    "client-id", clientId)
   }
 
+  private def throttleMetricName(quotaEntity: QuotaEntity): MetricName = {
+    metrics.metricName("throttle-time",
+                       apiKey.toString,
+                       "Tracking average throttle-time per user/client-id",
+                       "user", quotaEntity.sanitizedUser,
+                       "client-id", quotaEntity.clientId)
+  }
+
   def shutdown() = {
     throttledRequestReaper.shutdown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/ConfigHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala 
b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 67b74a7..5be9c12 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -96,31 +96,59 @@ class TopicConfigHandler(private val logManager: 
LogManager, kafkaConfig: KafkaC
   }
 }
 
-object ClientConfigOverride {
+object QuotaConfigOverride {
   val ProducerOverride = "producer_byte_rate"
   val ConsumerOverride = "consumer_byte_rate"
 }
 
 /**
-  * The ClientIdConfigHandler will process clientId config changes in ZK.
-  * The callback provides the clientId and the full properties set read from 
ZK.
-  * This implementation reports the overrides to the respective 
ClientQuotaManager objects
-  */
-class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends 
ConfigHandler {
+ * Handles <client-id>, <user> or <user, client-id> quota config updates in ZK.
+ * This implementation reports the overrides to the respective 
ClientQuotaManager objects
+ */
+class QuotaConfigHandler(private val quotaManagers: QuotaManagers) {
+
+  def updateQuotaConfig(sanitizedUser: Option[String], clientId: 
Option[String], config: Properties) {
+    val producerQuota =
+      if (config.containsKey(QuotaConfigOverride.ProducerOverride))
+        Some(new 
Quota(config.getProperty(QuotaConfigOverride.ProducerOverride).toLong, true))
+      else
+        None
+    quotaManagers.produce.updateQuota(sanitizedUser, clientId, producerQuota)
+    val consumerQuota =
+      if (config.containsKey(QuotaConfigOverride.ConsumerOverride))
+        Some(new 
Quota(config.getProperty(QuotaConfigOverride.ConsumerOverride).toLong, true))
+      else
+        None
+    quotaManagers.fetch.updateQuota(sanitizedUser, clientId, consumerQuota)
+  }
+}
+
+/**
+ * The ClientIdConfigHandler will process clientId config changes in ZK.
+ * The callback provides the clientId and the full properties set read from ZK.
+ */
+class ClientIdConfigHandler(private val quotaManagers: QuotaManagers) extends 
QuotaConfigHandler(quotaManagers) with ConfigHandler {
+
   def processConfigChanges(clientId: String, clientConfig: Properties) {
-    if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
-      quotaManagers.produce.updateQuota(clientId,
-        new 
Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, 
true))
-    } else {
-      quotaManagers.fetch.resetQuota(clientId)
-    }
+    updateQuotaConfig(None, Some(clientId), clientConfig)
+  }
+}
 
-    if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
-      quotaManagers.fetch.updateQuota(clientId,
-        new 
Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, 
true))
-    } else {
-      quotaManagers.produce.resetQuota(clientId)
-    }
+/**
+ * The UserConfigHandler will process <user> and <user, client-id> quota 
changes in ZK.
+ * The callback provides the node name containing sanitized user principal, 
client-id if this is
+ * a <user, client-id> update and the full properties set read from ZK.
+ */
+class UserConfigHandler(private val quotaManagers: QuotaManagers) extends 
QuotaConfigHandler(quotaManagers) with ConfigHandler {
+
+  def processConfigChanges(quotaEntityPath: String, config: Properties) {
+    // Entity path is <user> or <user>/clients/<client>
+    val entities = quotaEntityPath.split("/")
+    if (entities.length != 1 && entities.length != 3)
+      throw new IllegalArgumentException("Invalid quota entity path: " + 
quotaEntityPath);
+    val sanitizedUser = entities(0)
+    val clientId = if (entities.length == 3) Some(entities(2)) else None
+    updateQuotaConfig(Some(sanitizedUser), clientId, config)
   }
 }
 
@@ -151,4 +179,4 @@ object ThrottledReplicaValidator extends Validator {
   private def isValid(proposed: String): Boolean = {
     proposed.trim.equals("*") || 
proposed.trim.matches("([0-9]+:[0-9]+)?(,[0-9]+:[0-9]+)*")
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/DynamicConfigManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala 
b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
index 556534a..b31d838 100644
--- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala
@@ -37,8 +37,13 @@ import org.I0Itec.zkclient.{IZkStateListener, 
IZkChildListener, ZkClient}
 object ConfigType {
   val Topic = "topics"
   val Client = "clients"
+  val User = "users"
   val Broker = "brokers"
-  val all = Seq(Topic, Client, Broker)
+  val all = Seq(Topic, Client, User, Broker)
+}
+
+object ConfigEntityName {
+  val Default = "<default>"
 }
 
 /**
@@ -48,7 +53,9 @@ object ConfigType {
  *
  * Config is stored under the path: /config/entityType/entityName
  *   E.g. /config/topics/<topic_name> and /config/clients/<clientId>
- * This znode stores the overrides for this entity (but no defaults) in 
properties format.
+ * This znode stores the overrides for this entity in properties format with 
defaults stored using entityName "<default>".
+ * Multiple entity names may be specified (eg. <user, client-id> quotas) using 
a hierarchical path:
+ *   E.g. /config/users/<user>/clients/<clientId>
  *
  * To avoid watching all topics for changes instead we have a notification path
  *   /config/changes
@@ -57,8 +64,10 @@ object ConfigType {
  * To update a config we first update the config properties. Then we create a 
new sequential
  * znode under the change path which contains the name of the entityType and 
entityName that was updated, say
  *   /config/changes/config_change_13321
- * The sequential znode contains data in this format: {"version" : 1, 
"entityType":"topic/client", "entityName" : "topic_name/client_id"}
+ * The sequential znode contains data in this format: {"version" : 1, 
"entity_type":"topic/client", "entity_name" : "topic_name/client_id"}
  * This is just a notification--the actual config change is stored only once 
under the /config/entityType/entityName path.
+ * Version 2 of notifications has the format: {"version" : 2, 
"entity_path":"entity_type/entity_name"}
+ * Multiple entities may be specified as a hierarchical path (eg. 
users/<user>/clients/<clientId>).
  *
  * This will fire a watcher on all brokers. This watcher works as follows. It 
reads all the config change notifications.
  * It keeps track of the highest config change suffix number it has applied 
previously. For any previously applied change it finds
@@ -89,30 +98,60 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
         case Some(mapAnon: Map[_, _]) =>
           val map = mapAnon collect
             { case (k: String, v: Any) => k -> v }
-          require(map("version") == 1)
-
-          val entityType = map.get("entity_type") match {
-            case Some(ConfigType.Topic) => ConfigType.Topic
-            case Some(ConfigType.Client) => ConfigType.Client
-            case Some(ConfigType.Broker) => ConfigType.Broker
-            case _ => throw new IllegalArgumentException(s"Config change 
notification must have 'entity_type' set to one of ${ConfigType.all}. Received: 
$json")
-          }
 
-          val entity = map.get("entity_name") match {
-            case Some(value: String) => value
-            case _ => throw new IllegalArgumentException("Config change 
notification does not specify 'entity_name'. Received: " + json)
+          map("version") match {
+            case 1 => processEntityConfigChangeVersion1(json, map)
+            case 2 => processEntityConfigChangeVersion2(json, map)
+            case _ => throw new IllegalArgumentException("Config change 
notification has an unsupported version " + map("version") +
+                "Supported versions are 1 and 2.")
           }
-          val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, 
entity)
-          logger.info(s"Processing override for entityType: $entityType, 
entity: $entity with config: $entityConfig")
-          configHandlers(entityType).processConfigChanges(entity, entityConfig)
 
         case o => throw new IllegalArgumentException("Config change 
notification has an unexpected value. The format is:" +
-          "{\"version\" : 1," +
-          " \"entity_type\":\"topic/client\"," +
-          " \"entity_name\" : \"topic_name/client_id\"}." +
+          "{\"version\" : 1, \"entity_type\":\"topics/clients\", 
\"entity_name\" : \"topic_name/client_id\"}." + " or " +
+          "{\"version\" : 2, \"entity_path\":\"entity_type/entity_name\"}." +
           " Received: " + json)
       }
     }
+
+    private def processEntityConfigChangeVersion1(json: String, map: 
Map[String, Any]) {
+
+      val entityType = map.get("entity_type") match {
+        case Some(ConfigType.Topic) => ConfigType.Topic
+        case Some(ConfigType.Client) => ConfigType.Client
+        case _ => throw new IllegalArgumentException("Version 1 config change 
notification must have 'entity_type' set to 'clients' or 'topics'." +
+              " Received: " + json)
+      }
+
+      val entity = map.get("entity_name") match {
+        case Some(value: String) => value
+        case _ => throw new IllegalArgumentException("Version 1 config change 
notification does not specify 'entity_name'. Received: " + json)
+      }
+
+      val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, 
entity)
+      logger.info(s"Processing override for entityType: $entityType, entity: 
$entity with config: $entityConfig")
+      configHandlers(entityType).processConfigChanges(entity, entityConfig)
+
+    }
+
+    private def processEntityConfigChangeVersion2(json: String, map: 
Map[String, Any]) {
+
+      val entityPath = map.get("entity_path") match {
+        case Some(value: String) => value
+        case _ => throw new IllegalArgumentException("Version 2 config change 
notification does not specify 'entity_path'. Received: " + json)
+      }
+
+      val index = entityPath.indexOf('/')
+      val rootEntityType = entityPath.substring(0, index)
+      if (index < 0 || !configHandlers.contains(rootEntityType))
+        throw new IllegalArgumentException("Version 2 config change 
notification must have 'entity_path' starting with 'clients/', 'topics/' or 
'users/'." +
+              " Received: " + json)
+      val fullSanitizedEntityName = entityPath.substring(index + 1)
+
+      val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, rootEntityType, 
fullSanitizedEntityName)
+      logger.info(s"Processing override for entityPath: $entityPath with 
config: $entityConfig")
+      
configHandlers(rootEntityType).processConfigChanges(fullSanitizedEntityName, 
entityConfig)
+
+    }
   }
 
   private val configChangeListener = new 
ZkNodeChangeNotificationListener(zkUtils, ZkUtils.EntityConfigChangesPath, 
AdminUtils.EntityConfigChangeZnodePrefix, ConfigChangedNotificationHandler)
@@ -122,5 +161,20 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
    */
   def startup(): Unit = {
     configChangeListener.init()
+
+    // Apply all existing client/user configs to the 
ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides
+    configHandlers.foreach {
+      case (ConfigType.User, handler) =>
+          AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.User).foreach {
+            case (sanitizedUser, properties) => 
handler.processConfigChanges(sanitizedUser, properties)
+          }
+          AdminUtils.fetchAllChildEntityConfigs(zkUtils, ConfigType.User, 
ConfigType.Client).foreach {
+            case (sanitizedUserClientId, properties) => 
handler.processConfigChanges(sanitizedUserClientId, properties)
+          }
+      case (configType, handler) =>
+          AdminUtils.fetchAllEntityConfigs(zkUtils, configType).foreach {
+            case (entityName, properties) => 
handler.processConfigChanges(entityName, properties)
+          }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 3008426..d3ba5ef 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -396,6 +396,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       request.apiRemoteCompleteTimeMs = SystemTime.milliseconds
 
       quotas.produce.recordAndMaybeThrottle(
+        request.session.sanitizedUser,
         request.header.clientId,
         numBytesAppended,
         produceResponseCallback)
@@ -494,7 +495,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         fetchResponseCallback(0)
       } else {
         val size = 
FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), 
fetchRequest.versionId)
-        quotas.fetch.recordAndMaybeThrottle(fetchRequest.clientId, size, 
fetchResponseCallback)
+        quotas.fetch.recordAndMaybeThrottle(request.session.sanitizedUser, 
fetchRequest.clientId, size, fetchResponseCallback)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 3671297..b37be5b 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -523,8 +523,10 @@ object KafkaConfig {
   "or this timeout is reached. This is similar to the producer request 
timeout."
   val OffsetCommitRequiredAcksDoc = "The required acks before the commit can 
be accepted. In general, the default (-1) should not be overridden"
   /** ********* Quota Configuration ***********/
-  val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by 
clientId will get throttled if it produces more bytes than this value 
per-second"
-  val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by 
clientId/consumer group will get throttled if it fetches more bytes than this 
value per-second"
+  val ProducerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when 
dynamic default quotas are not configured for <user>, <client-id> or <user, 
client-id> in Zookeeper. " +
+  "Any producer distinguished by clientId will get throttled if it produces 
more bytes than this value per-second"
+  val ConsumerQuotaBytesPerSecondDefaultDoc = "DEPRECATED: Used only when 
dynamic default quotas are not configured for <user, <client-id> or <user, 
client-id> in Zookeeper. " +
+  "Any consumer distinguished by clientId/consumer group will get throttled if 
it fetches more bytes than this value per-second"
   val NumQuotaSamplesDoc = "The number of samples to retain in memory for 
client quotas"
   val NumReplicationQuotaSamplesDoc = "The number of samples to retain in 
memory for replication quotas"
   val QuotaWindowSizeSecondsDoc = "The time span of each sample for client 
quotas"

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index db92cb8..5055c87 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -251,14 +251,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
SystemTime, threadNamePr
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> 
new TopicConfigHandler(logManager, config, quotaManagers),
                                                            ConfigType.Client 
-> new ClientIdConfigHandler(quotaManagers),
+                                                           ConfigType.User -> 
new UserConfigHandler(quotaManagers),
                                                            ConfigType.Broker 
-> new BrokerConfigHandler(config, quotaManagers))
 
-        // Apply all existing client configs to the ClientIdConfigHandler to 
bootstrap the overrides
-        // TODO: Move this logic to DynamicConfigManager
-        AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach {
-          case (clientId, properties) => 
dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, 
properties)
-        }
-
         // Create the config manager. start listening to notifications
         dynamicConfigManager = new DynamicConfigManager(zkUtils, 
dynamicConfigHandlers)
         dynamicConfigManager.startup()

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 503ed54..96779ff 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -119,6 +119,9 @@ object ZkUtils {
   def getEntityConfigPath(entityType: String, entity: String): String =
     getEntityConfigRootPath(entityType) + "/" + entity
 
+  def getEntityConfigPath(entityPath: String): String =
+    ZkUtils.EntityConfigPath + "/" + entityPath
+
   def getDeleteTopicPath(topic: String): String =
     DeleteTopicsPath + "/" + topic
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
new file mode 100644
index 0000000..c9b7787
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -0,0 +1,195 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.server.{QuotaConfigOverride, KafkaConfig, KafkaServer, QuotaId}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.{Quota, KafkaMetric}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.Map
+import scala.collection.mutable
+
+abstract class BaseQuotaTest extends IntegrationTestHarness {
+
+  def userPrincipal : String
+  def producerQuotaId : QuotaId
+  def consumerQuotaId : QuotaId
+  def overrideQuotas(producerQuota: Long, consumerQuota: Long)
+  def removeQuotaOverrides()
+
+  override val serverCount = 2
+  val producerCount = 1
+  val consumerCount = 1
+
+  private val producerBufferSize = 300000
+  protected val producerClientId = "QuotasTestProducer-1"
+  protected val consumerClientId = "QuotasTestConsumer-1"
+
+  this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, 
"false")
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, 
"2")
+  this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1")
+  this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, 
"100")
+  this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, 
"30000")
+  this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "0")
+  this.producerConfig.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, 
producerBufferSize.toString)
+  this.producerConfig.setProperty(ProducerConfig.CLIENT_ID_CONFIG, 
producerClientId)
+  this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest")
+  
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,
 4096.toString)
+  this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest")
+  this.consumerConfig.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, 
consumerClientId)
+
+  // Low enough quota that a producer sending a small payload in a tight loop 
should get throttled
+  val defaultProducerQuota = 8000
+  val defaultConsumerQuota = 2500
+
+  var leaderNode: KafkaServer = null
+  var followerNode: KafkaServer = null
+  private val topic1 = "topic-1"
+
+  @Before
+  override def setUp() {
+    super.setUp()
+
+    val numPartitions = 1
+    val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, 
serverCount, servers)
+    leaderNode = if (leaders(0).get == servers.head.config.brokerId) 
servers.head else servers(1)
+    followerNode = if (leaders(0).get != servers.head.config.brokerId) 
servers.head else servers(1)
+    assertTrue("Leader of all partitions of the topic should exist", 
leaders.values.forall(leader => leader.isDefined))
+  }
+
+  @After
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  @Test
+  def testThrottledProducerConsumer() {
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = 
leaderNode.metrics.metrics().asScala
+
+    val numRecords = 1000
+    produce(producers.head, numRecords)
+
+    val producerMetricName = throttleMetricName(ApiKeys.PRODUCE, 
producerQuotaId)
+    assertTrue("Should have been throttled", 
allMetrics(producerMetricName).value() > 0)
+
+    // Consumer should read in a bursty manner and get throttled immediately
+    consume(consumers.head, numRecords)
+    val consumerMetricName = throttleMetricName(ApiKeys.FETCH, consumerQuotaId)
+    assertTrue("Should have been throttled", 
allMetrics(consumerMetricName).value() > 0)
+  }
+
+  @Test
+  def testProducerConsumerOverrideUnthrottled() {
+    // Give effectively unlimited quota for producer and consumer
+    val props = new Properties()
+    props.put(QuotaConfigOverride.ProducerOverride, Long.MaxValue.toString)
+    props.put(QuotaConfigOverride.ConsumerOverride, Long.MaxValue.toString)
+
+    overrideQuotas(Long.MaxValue, Long.MaxValue)
+    waitForQuotaUpdate(Long.MaxValue, Long.MaxValue)
+
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = 
leaderNode.metrics.metrics().asScala
+    val numRecords = 1000
+    produce(producers.head, numRecords)
+    val producerMetricName = throttleMetricName(ApiKeys.PRODUCE, 
producerQuotaId)
+    assertEquals("Should not have been throttled", 0.0, 
allMetrics(producerMetricName).value(), 0.0)
+
+    // The "client" consumer does not get throttled.
+    consume(consumers.head, numRecords)
+    val consumerMetricName = throttleMetricName(ApiKeys.FETCH, consumerQuotaId)
+    assertEquals("Should not have been throttled", 0.0, 
allMetrics(consumerMetricName).value(), 0.0)
+  }
+
+  @Test
+  def testQuotaOverrideDelete() {
+    // Override producer and consumer quotas to unlimited
+    overrideQuotas(Long.MaxValue, Long.MaxValue)
+
+    val allMetrics: mutable.Map[MetricName, KafkaMetric] = 
leaderNode.metrics.metrics().asScala
+    val numRecords = 1000
+    produce(producers.head, numRecords)
+    assertTrue("Should not have been throttled", 
allMetrics(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)).value() == 0)
+    consume(consumers.head, numRecords)
+    assertTrue("Should not have been throttled", 
allMetrics(throttleMetricName(ApiKeys.FETCH, consumerQuotaId)).value() == 0)
+
+    // Delete producer and consumer quota overrides. Consumer and producer 
should now be
+    // throttled since broker defaults are very small
+    removeQuotaOverrides()
+    produce(producers.head, numRecords)
+
+    assertTrue("Should have been throttled", 
allMetrics(throttleMetricName(ApiKeys.PRODUCE, producerQuotaId)).value() > 0)
+    consume(consumers.head, numRecords)
+    assertTrue("Should have been throttled", 
allMetrics(throttleMetricName(ApiKeys.FETCH, consumerQuotaId)).value() > 0)
+  }
+
+  def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = {
+    var numBytesProduced = 0
+    for (i <- 0 to count) {
+      val payload = i.toString.getBytes
+      numBytesProduced += payload.length
+      p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, 
payload),
+             new ErrorLoggingCallback(topic1, null, null, true)).get()
+      Thread.sleep(1)
+    }
+    numBytesProduced
+  }
+
+  def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: 
Int) {
+    consumer.subscribe(List(topic1))
+    var numConsumed = 0
+    while (numConsumed < numRecords) {
+      for (cr <- consumer.poll(100)) {
+        numConsumed += 1
+      }
+    }
+  }
+
+  def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long) {
+    TestUtils.retry(10000) {
+      val quotaManagers = leaderNode.apis.quotas
+      val overrideProducerQuota = quotaManagers.produce.quota(userPrincipal, 
producerClientId)
+      val overrideConsumerQuota = quotaManagers.fetch.quota(userPrincipal, 
consumerClientId)
+
+      assertEquals(s"ClientId $producerClientId of user $userPrincipal must 
have producer quota", Quota.upperBound(producerQuota), overrideProducerQuota)
+      assertEquals(s"ClientId $consumerClientId of user $userPrincipal must 
have consumer quota", Quota.upperBound(consumerQuota), overrideConsumerQuota)
+    }
+  }
+
+  private def throttleMetricName(apiKey: ApiKeys, quotaId: QuotaId): 
MetricName = {
+    leaderNode.metrics.metricName("throttle-time",
+                                  apiKey.name,
+                                  "Tracking throttle-time per user/client-id",
+                                  "user", quotaId.sanitizedUser.getOrElse(""),
+                                  "client-id", quotaId.clientId.getOrElse(""))
+  }
+
+  def quotaProperties(producerQuota: Long, consumerQuota: Long): Properties = {
+    val props = new Properties()
+    props.put(QuotaConfigOverride.ProducerOverride, producerQuota.toString)
+    props.put(QuotaConfigOverride.ConsumerOverride, consumerQuota.toString)
+    props
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/69356fbc/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala 
b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
new file mode 100644
index 0000000..7477f7f
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotaTest.scala
@@ -0,0 +1,55 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.admin.AdminUtils
+import kafka.server.{KafkaConfig, QuotaConfigOverride, QuotaId}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+import org.junit.Before
+
+class ClientIdQuotaTest extends BaseQuotaTest {
+
+  override val userPrincipal = KafkaPrincipal.ANONYMOUS.getName
+  override val producerQuotaId = QuotaId(None, Some(producerClientId))
+  override val consumerQuotaId = QuotaId(None, Some(consumerClientId))
+
+  @Before
+  override def setUp() {
+    
this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp,
 defaultProducerQuota.toString)
+    
this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp,
 defaultConsumerQuota.toString)
+    super.setUp()
+  }
+
+  override def overrideQuotas(producerQuota: Long, consumerQuota: Long) {
+    val producerProps = new Properties()
+    producerProps.put(QuotaConfigOverride.ProducerOverride, 
producerQuota.toString)
+    updateQuotaOverride(producerClientId, producerProps)
+
+    val consumerProps = new Properties()
+    consumerProps.put(QuotaConfigOverride.ConsumerOverride, 
consumerQuota.toString)
+    updateQuotaOverride(consumerClientId, consumerProps)
+  }
+  override def removeQuotaOverrides() {
+    val emptyProps = new Properties
+    updateQuotaOverride(producerClientId, emptyProps)
+    updateQuotaOverride(consumerClientId, emptyProps)
+  }
+
+  private def updateQuotaOverride(clientId: String, properties: Properties) {
+    AdminUtils.changeClientIdConfig(zkUtils, clientId, properties)
+  }
+}

Reply via email to