This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 2b26c23  KAFKA-9658; Fix user quota removal (#8232)
2b26c23 is described below

commit 2b26c23d36ef64a2a40044c2b7e50056ec924db8
Author: Anna Povzner <[email protected]>
AuthorDate: Tue Mar 10 12:30:58 2020 -0700

    KAFKA-9658; Fix user quota removal (#8232)
    
    Adding (add-config) default user, user, or <user, client-id> quota and then 
removing it via delete-config does not update quota bound in 
ClientQuotaManager.Metrics for existing users or <user,client-id>. This causes 
brokers to continue to throttle with the previously set quotas until brokers 
restart (or <user,client> stops sending traffic for sometime and sensor 
expires). This happens only when removing the user or user,client-id where 
there are no more quotas  to fall back to. Common [...]
    
    The cause of the issue was `DefaultQuotaCallback.quotaLimit` was returning 
`null` when no default user quota set, which caused 
`ClientQuotaManager.updateQuotaMetricConfigs` to skip updating the appropriate 
sensor, which left it unchanged with the previous quota. Since `null` is an 
acceptable return value for `ClientQuotaCallback.quotaLimit`, which is already 
treated as unlimited quota in other parts of the code, this PR ensures that 
`ClientQuotaManager.updateQuotaMetricConfigs` update [...]
    
    Reviewers: Jason Gustafson <[email protected]>, Rajini Sivaram 
<[email protected]>
---
 .../scala/kafka/server/ClientQuotaManager.scala    |   5 +-
 .../unit/kafka/server/ClientQuotaManagerTest.scala | 132 +++++++++++++++------
 2 files changed, 98 insertions(+), 39 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 9817f21..87e4923 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -483,7 +483,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       // Change the underlying metric config if the sensor has been created
       val metric = allMetrics.get(quotaMetricName)
       if (metric != null) {
-        Option(quotaCallback.quotaLimit(clientQuotaType, 
metricTags.asJava)).foreach { newQuota =>
+        Option(quotaLimit(metricTags.asJava)).foreach { newQuota =>
           info(s"Sensor for $quotaEntity already exists. Changing quota to 
$newQuota in MetricConfig")
           metric.config(getQuotaMetricConfig(newQuota))
         }
@@ -493,8 +493,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       allMetrics.asScala.filterKeys(n => n.name == quotaMetricName.name && 
n.group == quotaMetricName.group).foreach {
         case (metricName, metric) =>
           val metricTags = metricName.tags
-          Option(quotaCallback.quotaLimit(clientQuotaType, 
metricTags)).foreach { quota =>
-            val newQuota = quota.asInstanceOf[Double]
+          Option(quotaLimit(metricTags)).foreach { newQuota =>
             if (newQuota != metric.config.quota.bound) {
               info(s"Sensor for quota-id $metricTags already exists. Setting 
quota to $newQuota in MetricConfig")
               metric.config(getQuotaMetricConfig(newQuota))
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index e10d4b2..6598d3e 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -190,19 +190,79 @@ class ClientQuotaManagerTest {
     testQuotaParsing(config, client1, client2, randomClient, 
defaultConfigClient)
   }
 
+  private def checkQuota(quotaManager: ClientQuotaManager, user: String, 
clientId: String, expectedBound: Long, value: Int, expectThrottle: Boolean): 
Unit = {
+    assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 0.0)
+    val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * 
config.numQuotaSamples)
+    if (expectThrottle)
+      assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", 
throttleTimeMs > 0)
+    else
+      assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, 
throttleTimeMs)
+  }
+
   @Test
-  def testQuotaConfigPrecedence() {
-    val quotaManager = new 
ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
-        newMetrics, Produce, time, "")
-
-    def checkQuota(user: String, clientId: String, expectedBound: Int, value: 
Int, expectThrottle: Boolean) {
-      assertEquals(expectedBound, quotaManager.quota(user, clientId).bound, 
0.0)
-      val throttleTimeMs = maybeRecord(quotaManager, user, clientId, value * 
config.numQuotaSamples)
-      if (expectThrottle)
-        assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", 
throttleTimeMs > 0)
-      else
-        assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, 
throttleTimeMs)
+  def testSetAndRemoveDefaultUserQuota(): Unit = {
+    // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+    val quotaManager = new 
ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 
Long.MaxValue),
+      newMetrics, Produce, time, "")
+
+    try {
+      // no quota set yet, should not throttle
+      checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+
+      // Set default <user> quota config
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, 
Some(new Quota(10, true)))
+      checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+      // Remove default <user> quota config, back to no quotas
+      quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, 
None)
+      checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+    } finally {
+      quotaManager.shutdown()
     }
+  }
+
+  @Test
+  def testSetAndRemoveUserQuota(): Unit = {
+    // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+    val quotaManager = new 
ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 
Long.MaxValue),
+      newMetrics, Produce, time, "")
+
+    try {
+      // Set <user> quota config
+      quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(10, 
true)))
+      checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+      // Remove <user> quota config, back to no quotas
+      quotaManager.updateQuota(Some("userA"), None, None, None)
+      checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+    } finally {
+      quotaManager.shutdown()
+    }
+  }
+
+  @Test
+  def testSetAndRemoveUserClientQuota(): Unit = {
+    // quotaTypesEnabled will be QuotaTypes.NoQuotas initially
+    val quotaManager = new 
ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 
Long.MaxValue),
+      newMetrics, Produce, time, "")
+
+    try {
+      // Set <user, client-id> quota config
+      quotaManager.updateQuota(Some("userA"), Some("client1"), 
Some("client1"), Some(new Quota(10, true)))
+      checkQuota(quotaManager, "userA", "client1", 10, 1000, true)
+
+      // Remove <user, client-id> quota config, back to no quotas
+      quotaManager.updateQuota(Some("userA"), Some("client1"), 
Some("client1"), None)
+      checkQuota(quotaManager, "userA", "client1", Long.MaxValue, 1000, false)
+    } finally {
+      quotaManager.shutdown()
+    }
+  }
+
+  @Test
+  def testQuotaConfigPrecedence(): Unit = {
+    val quotaManager = new 
ClientQuotaManager(ClientQuotaManagerConfig(quotaBytesPerSecondDefault=Long.MaxValue),
+      newMetrics, Produce, time, "")
 
     try {
       quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, 
Some(new Quota(1000, true)))
@@ -216,47 +276,47 @@ class ClientQuotaManagerTest {
       quotaManager.updateQuota(Some("userC"), None, None, Some(new 
Quota(10000, true)))
       quotaManager.updateQuota(None, Some("client1"), Some("client1"), 
Some(new Quota(9000, true)))
 
-      checkQuota("userA", "client1", 5000, 4500, false) // <user, client> 
quota takes precedence over <user>
-      checkQuota("userA", "client2", 4000, 4500, true)  // <user> quota takes 
precedence over <client> and defaults
-      checkQuota("userA", "client3", 4000, 0, true)     // <user> quota is 
shared across clients of user
-      checkQuota("userA", "client1", 5000, 0, false)    // <user, client> is 
exclusive use, unaffected by other clients
+      checkQuota(quotaManager, "userA", "client1", 5000, 4500, false) // 
<user, client> quota takes precedence over <user>
+      checkQuota(quotaManager, "userA", "client2", 4000, 4500, true)  // 
<user> quota takes precedence over <client> and defaults
+      checkQuota(quotaManager, "userA", "client3", 4000, 0, true)     // 
<user> quota is shared across clients of user
+      checkQuota(quotaManager, "userA", "client1", 5000, 0, false)    // 
<user, client> is exclusive use, unaffected by other clients
 
-      checkQuota("userB", "client1", 7000, 8000, true)
-      checkQuota("userB", "client2", 8000, 7000, false) // Default per-client 
quota for exclusive use of <user, client>
-      checkQuota("userB", "client3", 8000, 7000, false)
+      checkQuota(quotaManager, "userB", "client1", 7000, 8000, true)
+      checkQuota(quotaManager, "userB", "client2", 8000, 7000, false) // 
Default per-client quota for exclusive use of <user, client>
+      checkQuota(quotaManager, "userB", "client3", 8000, 7000, false)
 
-      checkQuota("userD", "client1", 3000, 3500, true)  // Default <user, 
client> quota
-      checkQuota("userD", "client2", 3000, 2500, false)
-      checkQuota("userE", "client1", 3000, 2500, false)
+      checkQuota(quotaManager, "userD", "client1", 3000, 3500, true)  // 
Default <user, client> quota
+      checkQuota(quotaManager, "userD", "client2", 3000, 2500, false)
+      checkQuota(quotaManager, "userE", "client1", 3000, 2500, false)
 
       // Remove default <user, client> quota config, revert to <user> default
       quotaManager.updateQuota(Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(ConfigEntityName.Default), None)
-      checkQuota("userD", "client1", 1000, 0, false)    // Metrics tags 
changed, restart counter
-      checkQuota("userE", "client4", 1000, 1500, true)
-      checkQuota("userF", "client4", 1000, 800, false)  // Default <user> 
quota shared across clients of user
-      checkQuota("userF", "client5", 1000, 800, true)
+      checkQuota(quotaManager, "userD", "client1", 1000, 0, false)    // 
Metrics tags changed, restart counter
+      checkQuota(quotaManager, "userE", "client4", 1000, 1500, true)
+      checkQuota(quotaManager, "userF", "client4", 1000, 800, false)  // 
Default <user> quota shared across clients of user
+      checkQuota(quotaManager, "userF", "client5", 1000, 800, true)
 
       // Remove default <user> quota config, revert to <client-id> default
       quotaManager.updateQuota(Some(ConfigEntityName.Default), None, None, 
None)
-      checkQuota("userF", "client4", 2000, 0, false)  // Default <client-id> 
quota shared across client-id of all users
-      checkQuota("userF", "client5", 2000, 0, false)
-      checkQuota("userF", "client5", 2000, 2500, true)
-      checkQuota("userG", "client5", 2000, 0, true)
+      checkQuota(quotaManager, "userF", "client4", 2000, 0, false)  // Default 
<client-id> quota shared across client-id of all users
+      checkQuota(quotaManager, "userF", "client5", 2000, 0, false)
+      checkQuota(quotaManager, "userF", "client5", 2000, 2500, true)
+      checkQuota(quotaManager, "userG", "client5", 2000, 0, true)
 
       // Update quotas
       quotaManager.updateQuota(Some("userA"), None, None, Some(new Quota(8000, 
true)))
       quotaManager.updateQuota(Some("userA"), Some("client1"), 
Some("client1"), Some(new Quota(10000, true)))
-      checkQuota("userA", "client2", 8000, 0, false)
-      checkQuota("userA", "client2", 8000, 4500, true) // Throttled due to sum 
of new and earlier values
-      checkQuota("userA", "client1", 10000, 0, false)
-      checkQuota("userA", "client1", 10000, 6000, true)
+      checkQuota(quotaManager, "userA", "client2", 8000, 0, false)
+      checkQuota(quotaManager, "userA", "client2", 8000, 4500, true) // 
Throttled due to sum of new and earlier values
+      checkQuota(quotaManager, "userA", "client1", 10000, 0, false)
+      checkQuota(quotaManager, "userA", "client1", 10000, 6000, true)
       quotaManager.updateQuota(Some("userA"), Some("client1"), 
Some("client1"), None)
-      checkQuota("userA", "client6", 8000, 0, true)    // Throttled due to 
shared user quota
+      checkQuota(quotaManager, "userA", "client6", 8000, 0, true)    // 
Throttled due to shared user quota
       quotaManager.updateQuota(Some("userA"), Some("client6"), 
Some("client6"), Some(new Quota(11000, true)))
-      checkQuota("userA", "client6", 11000, 8500, false)
+      checkQuota(quotaManager, "userA", "client6", 11000, 8500, false)
       quotaManager.updateQuota(Some("userA"), Some(ConfigEntityName.Default), 
Some(ConfigEntityName.Default), Some(new Quota(12000, true)))
       quotaManager.updateQuota(Some("userA"), Some("client6"), 
Some("client6"), None)
-      checkQuota("userA", "client6", 12000, 4000, true) // Throttled due to 
sum of new and earlier values
+      checkQuota(quotaManager, "userA", "client6", 12000, 4000, true) // 
Throttled due to sum of new and earlier values
 
     } finally {
       quotaManager.shutdown()

Reply via email to