Repository: kafka Updated Branches: refs/heads/trunk f25fe02d9 -> 5d8936544
KAFKA-3310; Fix for NPEs observed when throttling clients. The fix basically ensures that the throttleTimeSensor is non-null before handing off to record the metric value. We also record the throttle time to 0 so that we don't recreate the sensor always. Author: Aditya Auradkar <[email protected]> Reviewers: Jiangjie Qin <[email protected]>, Jun Rao <[email protected]> Closes #989 from auradkar/KAFKA-3310 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d893654 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d893654 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d893654 Branch: refs/heads/trunk Commit: 5d893654489647e8be65c4d54864ab63b7285faa Parents: f25fe02 Author: Aditya Auradkar <[email protected]> Authored: Thu Mar 3 16:16:56 2016 -0800 Committer: Jun Rao <[email protected]> Committed: Thu Mar 3 16:16:56 2016 -0800 ---------------------------------------------------------------------- .../scala/kafka/server/ClientQuotaManager.scala | 11 +++-- .../kafka/server/ClientQuotaManagerTest.scala | 52 ++++++++++++++++++-- 2 files changed, 54 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5d893654/core/src/main/scala/kafka/server/ClientQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 5ec57ce..5863c72 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -120,9 +120,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, val clientMetric = metrics.metrics().get(clientRateMetricName(clientId)) throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId))) clientSensors.throttleTimeSensor.record(throttleTimeMs) + // If delayed, add the element to the delayQueue delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() - // If delayed, add the element to the delayQueue logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) } throttleTimeMs @@ -189,9 +189,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } /* If the sensor is null, try to create it else return the created sensor - * Also if quota sensor is null, the throttle time sensor must be null + * Either of the sensors can be null, hence null checks on both */ - if (quotaSensor == null) { + if (quotaSensor == null || throttleTimeSensor == null) { /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it. * Note that multiple threads may acquire the write lock if they all see a null sensor initially * In this case, the writer checks the sensor after acquiring the lock again. @@ -204,7 +204,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, // ensure that we initialise `ClientSensors` with non-null parameters. quotaSensor = metrics.getSensor(quotaSensorName) throttleTimeSensor = metrics.getSensor(throttleTimeSensorName) - if (quotaSensor == null) { + if (throttleTimeSensor == null) { // create the throttle time sensor also. Use default metric config throttleTimeSensor = metrics.sensor(throttleTimeSensorName, null, @@ -214,7 +214,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, "Tracking average throttle-time per client", "client-id", clientId), new Avg()) + } + + if (quotaSensor == null) { quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota(clientId)), ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds) http://git-wip-us.apache.org/repos/asf/kafka/blob/5d893654/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 68d6932..193acfd 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -18,7 +18,6 @@ package kafka.server import java.util.Collections -import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota} import org.apache.kafka.common.utils.MockTime import org.junit.Assert.{assertEquals, assertTrue} @@ -44,8 +43,8 @@ class ClientQuotaManagerTest { val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time) // Case 1: Update the quota. Assert that the new quota value is returned - clientMetrics.updateQuota("p1", new Quota(2000, true)); - clientMetrics.updateQuota("p2", new Quota(4000, true)); + clientMetrics.updateQuota("p1", new Quota(2000, true)) + clientMetrics.updateQuota("p2", new Quota(4000, true)) try { assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.quota("random-client-id")) @@ -58,14 +57,14 @@ class ClientQuotaManagerTest { // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created. // p1 should not longer be throttled after the quota change - clientMetrics.updateQuota("p1", new Quota(3000, true)); + clientMetrics.updateQuota("p1", new Quota(3000, true)) assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota("p1")) throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback) assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs) // Case 3: Change quota back to default. Should be throttled again - clientMetrics.updateQuota("p1", new Quota(500, true)); + clientMetrics.updateQuota("p1", new Quota(500, true)) assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota("p1")) throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback) @@ -123,6 +122,49 @@ class ClientQuotaManagerTest { } } + @Test + def testExpireThrottleTimeSensor() { + val metrics = newMetrics + val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) + try { + clientMetrics.recordAndMaybeThrottle("client1", 100, callback) + // remove the throttle time sensor + metrics.removeSensor("producerThrottleTime-client1") + // should not throw an exception even if the throttle time sensor does not exist. + val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback) + assertTrue("Should be throttled", throttleTime > 0) + // the sensor should get recreated + val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1") + assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) + } finally { + clientMetrics.shutdown() + } + } + + @Test + def testExpireQuotaSensors() { + val metrics = newMetrics + val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) + try { + clientMetrics.recordAndMaybeThrottle("client1", 100, callback) + // remove all the sensors + metrics.removeSensor("producerThrottleTime-client1") + metrics.removeSensor("producer-client1") + // should not throw an exception + val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback) + assertTrue("Should be throttled", throttleTime > 0) + + // all the sensors should get recreated + val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1") + assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) + + val byteRateSensor = metrics.getSensor("producer-client1") + assertTrue("Byte rate sensor should exist", byteRateSensor != null) + } finally { + clientMetrics.shutdown() + } + } + def newMetrics: Metrics = { new Metrics(new MetricConfig(), Collections.emptyList(), time) }
