Repository: kafka Updated Branches: refs/heads/trunk f6acfb089 -> bbb7d97ad
KAFKA-2084; Add per-client-id byte-rate metrics and quota manager; reviewed by Joel Koshy, Dong Lin, Jun Rao and Edward Ribeiro Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bbb7d97a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bbb7d97a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bbb7d97a Branch: refs/heads/trunk Commit: bbb7d97adefe5826f2e02a8e55423ea215c9f749 Parents: f6acfb0 Author: Aditya Auradkar <aaurad...@linkedin.com> Authored: Fri Aug 14 18:51:48 2015 -0700 Committer: Joel Koshy <jjko...@gmail.com> Committed: Fri Aug 14 18:51:48 2015 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../org/apache/kafka/common/metrics/Quota.java | 18 ++ .../common/metrics/QuotaViolationException.java | 1 - .../org/apache/kafka/common/metrics/Sensor.java | 11 +- .../apache/kafka/common/metrics/stats/Rate.java | 23 +- .../kafka/common/metrics/MetricsTest.java | 32 ++- .../scala/kafka/server/ClientQuotaManager.scala | 250 +++++++++++++++++++ .../src/main/scala/kafka/server/KafkaApis.scala | 88 +++++-- .../main/scala/kafka/server/KafkaConfig.scala | 56 ++++- .../main/scala/kafka/server/KafkaServer.scala | 24 +- .../scala/kafka/server/ReplicaManager.scala | 4 +- .../scala/kafka/server/ThrottledResponse.scala | 46 ++++ .../scala/kafka/utils/ShutdownableThread.scala | 3 + .../integration/kafka/api/QuotasTest.scala | 194 ++++++++++++++ .../kafka/server/ClientQuotaManagerTest.scala | 159 ++++++++++++ .../unit/kafka/server/KafkaConfigTest.scala | 6 + .../ThrottledResponseExpirationTest.scala | 90 +++++++ 17 files changed, 945 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 864427b..c7f66be 100644 --- a/build.gradle +++ b/build.gradle @@ -253,6 +253,7 @@ project(':core') { testCompile "$easymock" testCompile 'org.objenesis:objenesis:1.2' testCompile "org.scalatest:scalatest_$baseScalaVersion:2.2.5" + testCompile project(path: ':clients', configuration: 'archives') testRuntime "$slf4jlog4j" http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java index d82bb0c..a3535dc 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java @@ -49,4 +49,22 @@ public final class Quota { return (upper && value <= bound) || (!upper && value >= bound); } + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + (int) this.bound; + result = prime * result + (this.upper ? 1 : 0); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!(obj instanceof Quota)) + return false; + Quota that = (Quota) obj; + return (that.bound == this.bound) && (this.upper == this.upper); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java index a451e53..fbe03f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java @@ -28,5 +28,4 @@ public class QuotaViolationException extends KafkaException { public QuotaViolationException(String m) { super(m); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index ca823fd..4d55771 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -112,8 +112,14 @@ public final class Sensor { if (config != null) { Quota quota = config.quota(); if (quota != null) { - if (!quota.acceptable(metric.value(timeMs))) - throw new QuotaViolationException(metric.metricName() + " is in violation of its quota of " + quota.bound()); + double value = metric.value(timeMs); + if (!quota.acceptable(value)) { + throw new QuotaViolationException(String.format( + "(%s) violated quota. Actual: (%f), Threshold: (%f)", + metric.metricName(), + quota.bound(), + value)); + } } } } @@ -170,5 +176,4 @@ public final class Sensor { synchronized List<KafkaMetric> metrics() { return Collections.unmodifiableList(this.metrics); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java index 98429da..fe43940 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java @@ -18,6 +18,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.common.metrics.MeasurableStat; import org.apache.kafka.common.metrics.MetricConfig; + /** * The rate of the given quantity. By default this is the total observed over a set of samples from a sampled statistic * divided by the elapsed time over the sample windows. Alternative {@link SampledStat} implementations can be provided, @@ -58,26 +59,28 @@ public class Rate implements MeasurableStat { @Override public double measure(MetricConfig config, long now) { double value = stat.measure(config, now); - double elapsed = convert(now - stat.oldest(now).lastWindowMs); - return value / elapsed; + // the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete + long elapsedCurrentWindowMs = now - stat.current(now).lastWindowMs; + long elapsedPriorWindowsMs = config.timeWindowMs() * (config.samples() - 1); + return value / convert(elapsedCurrentWindowMs + elapsedPriorWindowsMs); } - private double convert(long time) { + private double convert(long timeMs) { switch (unit) { case NANOSECONDS: - return time * 1000.0 * 1000.0; + return timeMs * 1000.0 * 1000.0; case MICROSECONDS: - return time * 1000.0; + return timeMs * 1000.0; case MILLISECONDS: - return time; + return timeMs; case SECONDS: - return time / 1000.0; + return timeMs / 1000.0; case MINUTES: - return time / (60.0 * 1000.0); + return timeMs / (60.0 * 1000.0); case HOURS: - return time / (60.0 * 60.0 * 1000.0); + return timeMs / (60.0 * 60.0 * 1000.0); case DAYS: - return time / (24.0 * 60.0 * 60.0 * 1000.0); + return timeMs / (24.0 * 60.0 * 60.0 * 1000.0); default: throw new IllegalStateException("Unknown unit: " + unit); } http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 544e120..0a7dcd8 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -37,9 +37,9 @@ import org.junit.Test; public class MetricsTest { private static final double EPS = 0.000001; - - MockTime time = new MockTime(); - Metrics metrics = new Metrics(new MetricConfig(), Arrays.asList((MetricsReporter) new JmxReporter()), time); + private MockTime time = new MockTime(); + private MetricConfig config = new MetricConfig(); + private Metrics metrics = new Metrics(config, Arrays.asList((MetricsReporter) new JmxReporter()), time); @Test public void testMetricName() { @@ -77,19 +77,33 @@ public class MetricsTest { s2.add(new MetricName("s2.total", "grp1"), new Total()); s2.record(5.0); - for (int i = 0; i < 10; i++) + int sum = 0; + int count = 10; + for (int i = 0; i < count; i++) { s.record(i); + sum += i; + } + // prior to any time passing + double elapsedSecs = (config.timeWindowMs() * (config.samples() - 1)) / 1000.0; + assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), count / elapsedSecs, + metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); // pretend 2 seconds passed... - time.sleep(2000); + long sleepTimeMs = 2; + time.sleep(sleepTimeMs * 1000); + elapsedSecs += sleepTimeMs; assertEquals("s2 reflects the constant value", 5.0, metrics.metrics().get(new MetricName("s2.total", "grp1")).value(), EPS); assertEquals("Avg(0...9) = 4.5", 4.5, metrics.metrics().get(new MetricName("test.avg", "grp1")).value(), EPS); - assertEquals("Max(0...9) = 9", 9.0, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS); + assertEquals("Max(0...9) = 9", count - 1, metrics.metrics().get(new MetricName("test.max", "grp1")).value(), EPS); assertEquals("Min(0...9) = 0", 0.0, metrics.metrics().get(new MetricName("test.min", "grp1")).value(), EPS); - assertEquals("Rate(0...9) = 22.5", 22.5, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS); - assertEquals("Occurences(0...9) = 5", 5.0, metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); - assertEquals("Count(0...9) = 10", 10.0, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS); + assertEquals("Rate(0...9) = 1.40625", + sum / elapsedSecs, metrics.metrics().get(new MetricName("test.rate", "grp1")).value(), EPS); + assertEquals(String.format("Occurrences(0...%d) = %f", count, count / elapsedSecs), + count / elapsedSecs, + metrics.metrics().get(new MetricName("test.occurences", "grp1")).value(), EPS); + assertEquals("Count(0...9) = 10", + (double) count, metrics.metrics().get(new MetricName("test.count", "grp1")).value(), EPS); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ClientQuotaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala new file mode 100644 index 0000000..9f8473f --- /dev/null +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.concurrent.{DelayQueue, TimeUnit} + +import kafka.utils.{ShutdownableThread, Logging} +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics._ +import org.apache.kafka.common.metrics.stats.{Total, Rate, Avg} +import java.util.concurrent.locks.ReentrantReadWriteLock + +import org.apache.kafka.common.utils.Time + +/** + * Represents the sensors aggregated per client + * @param quotaSensor @Sensor that tracks the quota + * @param throttleTimeSensor @Sensor that tracks the throttle time + */ +private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor) + +/** + * Configuration settings for quota management + * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client + * @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y" + * @param numQuotaSamples The number of samples to retain in memory + * @param quotaWindowSizeSeconds The time span of each sample + * + */ +case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long = + ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, + quotaBytesPerSecondOverrides: String = + ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides, + numQuotaSamples: Int = + ClientQuotaManagerConfig.DefaultNumQuotaSamples, + quotaWindowSizeSeconds: Int = + ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds) + +object ClientQuotaManagerConfig { + val QuotaBytesPerSecondDefault = Long.MaxValue + val QuotaBytesPerSecondOverrides = "" + // Always have 10 whole windows + 1 current window + val DefaultNumQuotaSamples = 11 + val DefaultQuotaWindowSizeSeconds = 1 + val MaxThrottleTimeSeconds = 30 +} + +/** + * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics + * for all clients. + * @param config @ClientQuotaManagerConfig quota configs + * @param metrics @Metrics Metrics instance + * @param apiKey API Key for the request + * @param time @Time object to use + */ +class ClientQuotaManager(private val config: ClientQuotaManagerConfig, + private val metrics: Metrics, + private val apiKey: String, + private val time: Time) extends Logging { + private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides) + private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault) + private val lock = new ReentrantReadWriteLock() + private val delayQueue = new DelayQueue[ThrottledResponse]() + val throttledRequestReaper = new ThrottledRequestReaper(delayQueue) + throttledRequestReaper.start() + + private val delayQueueSensor = metrics.sensor(apiKey + "-delayQueue") + delayQueueSensor.add(new MetricName("queue-size", + apiKey, + "Tracks the size of the delay queue"), new Total()) + + /** + * Reaper thread that triggers callbacks on all throttled requests + * @param delayQueue DelayQueue to dequeue from + */ + class ThrottledRequestReaper(delayQueue: DelayQueue[ThrottledResponse]) extends ShutdownableThread( + "ThrottledRequestReaper-%s".format(apiKey), false) { + + override def doWork(): Unit = { + val response: ThrottledResponse = delayQueue.poll(1, TimeUnit.SECONDS) + if (response != null) { + // Decrement the size of the delay queue + delayQueueSensor.record(-1) + trace("Response throttled for: " + response.delayTimeMs + " ms") + response.execute() + } + } + } + + /** + * Records that a clientId changed some metric being throttled (produced/consumed bytes, QPS etc.) + * @param clientId clientId that produced the data + * @param value amount of data written in bytes + * @param callback Callback function. This will be triggered immediately if quota is not violated. + * If there is a quota violation, this callback will be triggered after a delay + * @return Number of milliseconds to delay the response in case of Quota violation. + * Zero otherwise + */ + def recordAndMaybeThrottle(clientId: String, value: Int, callback: => Unit): Int = { + val clientSensors = getOrCreateQuotaSensors(clientId) + var delayTimeMs = 0L + try { + clientSensors.quotaSensor.record(value) + // trigger the callback immediately if quota is not violated + callback + } catch { + case qve: QuotaViolationException => + // Compute the delay + val clientMetric = metrics.metrics().get(clientRateMetricName(clientId)) + delayTimeMs = delayTime(clientMetric.value(), getQuotaMetricConfig(quota(clientId))) + delayQueue.add(new ThrottledResponse(time, delayTimeMs, callback)) + delayQueueSensor.record() + clientSensors.throttleTimeSensor.record(delayTimeMs) + // If delayed, add the element to the delayQueue + logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), delayTimeMs)) + } + delayTimeMs.toInt + } + + /* + * This calculates the amount of time needed to bring the metric within quota + * assuming that no new metrics are recorded. + * + * Basically, if O is the observed rate and T is the target rate over a window of W, to bring O down to T, + * we need to add a delay of X to W such that O * W / (W + X) = T. + * Solving for X, we get X = (O - T)/T * W. + */ + private def delayTime(metricValue: Double, config: MetricConfig): Long = + { + val quota = config.quota() + val difference = metricValue - quota.bound + val time = difference / quota.bound * config.timeWindowMs() * config.samples() + time.round + } + + /** + * Returns the consumer quota for the specified clientId + * @return + */ + private[server] def quota(clientId: String): Quota = overriddenQuota.getOrElse(clientId, defaultQuota) + + /* + * This function either returns the sensors for a given client id or creates them if they don't exist + * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor + */ + private def getOrCreateQuotaSensors(clientId: String): ClientSensors = { + + // Names of the sensors to access + val quotaSensorName = apiKey + "-" + clientId + val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId + var quotaSensor: Sensor = null + var throttleTimeSensor: Sensor = null + + /* Acquire the read lock to fetch the sensors. It is safe to call getSensor from multiple threads. + * The read lock allows a thread to create a sensor in isolation. The thread creating the sensor + * will acquire the write lock and prevent the sensors from being read while they are being created. + * It should be sufficient to simply check if the sensor is null without acquiring a read lock but the + * sensor being present doesn't mean that it is fully initialized i.e. all the Metrics may not have been added. + * This read lock waits until the writer thread has released it's lock i.e. fully initialized the sensor + * at which point it is safe to read + */ + lock.readLock().lock() + try { + quotaSensor = metrics.getSensor(quotaSensorName) + throttleTimeSensor = metrics.getSensor(throttleTimeSensorName) + } + finally { + lock.readLock().unlock() + } + + /* If the sensor is null, try to create it else return the created sensor + * Also if quota sensor is null, the throttle time sensor must be null + */ + if (quotaSensor == null) { + /* Acquire a write lock because the sensor may not have been created and we only want one thread to create it. + * Note that multiple threads may acquire the write lock if they all see a null sensor initially + * In this case, the writer checks the sensor after acquiring the lock again. + * This is safe from Double Checked Locking because the references are read + * after acquiring read locks and hence they cannot see a partially published reference + */ + lock.writeLock().lock() + try { + quotaSensor = metrics.getSensor(quotaSensorName) + if (quotaSensor == null) { + // create the throttle time sensor also + throttleTimeSensor = metrics.sensor(throttleTimeSensorName) + throttleTimeSensor.add(new MetricName("throttle-time", + apiKey, + "Tracking average throttle-time per client", + "client-id", + clientId), new Avg()) + quotaSensor = metrics.sensor(quotaSensorName, getQuotaMetricConfig(quota(clientId))) + quotaSensor.add(clientRateMetricName(clientId), new Rate()) + } + } finally { + lock.writeLock().unlock() + } + } + // return the read or created sensors + ClientSensors(quotaSensor, throttleTimeSensor) + } + + private def getQuotaMetricConfig(quota: Quota): MetricConfig = { + new MetricConfig() + .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) + .samples(config.numQuotaSamples) + .quota(quota) + } + + /* Construct a Map of (clientId -> Quota) + * The input config is specified as a comma-separated K=V pairs + */ + private def initQuotaMap(input: String): Map[String, Quota] = { + // If empty input, return an empty map + if (input.trim.length == 0) + Map[String, Quota]() + else + input.split(",").map(entry => { + val trimmedEntry = entry.trim + val pair: Array[String] = trimmedEntry.split("=") + if (pair.length != 2) + throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format(entry)) + pair(0) -> new Quota(pair(1).toDouble, true) + }).toMap + } + + private def clientRateMetricName(clientId: String): MetricName = { + new MetricName("byte-rate", apiKey, + "Tracking byte-rate per client", + "client-id", clientId) + } + + def shutdown() = { + throttledRequestReaper.shutdown() + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 7ea509c..67f0cad 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -17,6 +17,7 @@ package kafka.server +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.TopicPartition import kafka.api._ @@ -42,9 +43,12 @@ class KafkaApis(val requestChannel: RequestChannel, val zkClient: ZkClient, val brokerId: Int, val config: KafkaConfig, - val metadataCache: MetadataCache) extends Logging { + val metadataCache: MetadataCache, + val metrics: Metrics) extends Logging { this.logIdent = "[KafkaApi-%d] ".format(brokerId) + // Store all the quota managers for each type of request + private val quotaManagers = instantiateQuotaManagers(config) /** * Top-level method that handles all requests and multiplexes to the right api @@ -250,6 +254,7 @@ class KafkaApis(val requestChannel: RequestChannel, */ def handleProducerRequest(request: RequestChannel.Request) { val produceRequest = request.requestObj.asInstanceOf[ProducerRequest] + val numBytesAppended = produceRequest.sizeInBytes // the callback for sending a produce response def sendResponseCallback(responseStatus: Map[TopicAndPartition, ProducerResponseStatus]) { @@ -265,21 +270,27 @@ class KafkaApis(val requestChannel: RequestChannel, } } - if (produceRequest.requiredAcks == 0) { - // no operation needed if producer request.required.acks = 0; however, if there is any error in handling - // the request, since no response is expected by the producer, the server will close socket server so that - // the producer client will know that some error has happened and will refresh its metadata - if (errorInResponse) { - info("Close connection due to error handling produce request with correlation id %d from client id %s with ack=0" - .format(produceRequest.correlationId, produceRequest.clientId)) - requestChannel.closeConnection(request.processor, request) + def produceResponseCallback { + if (produceRequest.requiredAcks == 0) { + // no operation needed if producer request.required.acks = 0; however, if there is any error in handling + // the request, since no response is expected by the producer, the server will close socket server so that + // the producer client will know that some error has happened and will refresh its metadata + if (errorInResponse) { + info( + "Close connection due to error handling produce request with correlation id %d from client id %s with ack=0".format( + produceRequest.correlationId, + produceRequest.clientId)) + requestChannel.closeConnection(request.processor, request) + } else { + requestChannel.noOperation(request.processor, request) + } } else { - requestChannel.noOperation(request.processor, request) + val response = ProducerResponse(produceRequest.correlationId, responseStatus) + requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response))) } - } else { - val response = ProducerResponse(produceRequest.correlationId, responseStatus) - requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response))) } + + quotaManagers(RequestKeys.ProduceKey).recordAndMaybeThrottle(produceRequest.clientId, numBytesAppended, produceResponseCallback) } // only allow appending to internal topic partitions @@ -316,14 +327,27 @@ class KafkaApis(val requestChannel: RequestChannel, .format(fetchRequest.correlationId, fetchRequest.clientId, topicAndPartition, ErrorMapping.exceptionNameFor(data.error))) } - // record the bytes out metrics only when the response is being sent BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).bytesOutRate.mark(data.messages.sizeInBytes) BrokerTopicStats.getBrokerAllTopicsStats().bytesOutRate.mark(data.messages.sizeInBytes) } val response = FetchResponse(fetchRequest.correlationId, responsePartitionData) - requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) + def fetchResponseCallback { + requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(request.connectionId, response))) + } + + // Do not throttle replication traffic + if (fetchRequest.isFromFollower) { + fetchResponseCallback + } else { + quotaManagers.get(RequestKeys.FetchKey) match { + case Some(quotaManager) => + quotaManager.recordAndMaybeThrottle(fetchRequest.clientId, response.sizeInBytes, fetchResponseCallback) + case None => + warn("Cannot throttle Api key %s".format(RequestKeys.nameForKey(RequestKeys.FetchKey))) + } + } } // call the replica manager to fetch messages from the local replica @@ -604,9 +628,37 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback) } + /* + * Returns a Map of all quota managers configured. The request Api key is the key for the Map + */ + private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = { + val producerQuotaManagerCfg = ClientQuotaManagerConfig( + quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault, + quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides, + numQuotaSamples = cfg.numQuotaSamples, + quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds + ) + + val consumerQuotaManagerCfg = ClientQuotaManagerConfig( + quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault, + quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides, + numQuotaSamples = cfg.numQuotaSamples, + quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds + ) + + val quotaManagers = Map[Short, ClientQuotaManager]( + RequestKeys.ProduceKey -> + new ClientQuotaManager(producerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.ProduceKey), new org.apache.kafka.common.utils.SystemTime), + RequestKeys.FetchKey -> + new ClientQuotaManager(consumerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.FetchKey), new org.apache.kafka.common.utils.SystemTime) + ) + quotaManagers + } + def close() { - // TODO currently closing the API is an no-op since the API no longer maintain any modules - // maybe removing the closing call in the end when KafkaAPI becomes a pure stateless layer - debug("Shut down complete.") + quotaManagers.foreach { case(apiKey, quotaManager) => + quotaManager.shutdown() + } + info("Shutdown complete.") } } http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a06f0bd..394f21b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -26,7 +26,10 @@ import kafka.consumer.ConsumerConfig import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet} import kafka.utils.CoreUtils import org.apache.kafka.clients.CommonClientConfigs -import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} +import org.apache.kafka.common.config.ConfigDef.Importance._ +import org.apache.kafka.common.config.ConfigDef.Range._ +import org.apache.kafka.common.config.ConfigDef.Type._ +import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef} import org.apache.kafka.common.metrics.MetricsReporter import org.apache.kafka.common.protocol.SecurityProtocol @@ -132,12 +135,21 @@ object Defaults { val OffsetCommitTimeoutMs = OffsetManagerConfig.DefaultOffsetCommitTimeoutMs val OffsetCommitRequiredAcks = OffsetManagerConfig.DefaultOffsetCommitRequiredAcks + /** ********* Quota Configuration ***********/ + val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault + val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault + val ProducerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides + val ConsumerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides + val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples + val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds + val DeleteTopicEnable = false val CompressionType = "producer" + /** ********* Kafka Metrics Configuration ***********/ val MetricNumSamples = 2 - val MetricSampleWindowMs = 1000 + val MetricSampleWindowMs = 30000 val MetricReporterClasses = "" } @@ -250,15 +262,22 @@ object KafkaConfig { val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms" val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms" val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks" + /** ********* Quota Configuration ***********/ + val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default" + val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default" + val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides" + val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides" + val NumQuotaSamplesProp = "quota.window.num" + val QuotaWindowSizeSecondsProp = "quota.window.size.seconds" val DeleteTopicEnableProp = "delete.topic.enable" val CompressionTypeProp = "compression.type" + /** ********* Kafka Metrics Configuration ***********/ val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG - /* Documentation */ /** ********* Zookeeper Configuration ***********/ val ZkConnectDoc = "Zookeeper host string" @@ -388,11 +407,22 @@ object KafkaConfig { val OffsetCommitTimeoutMsDoc = "Offset commit will be delayed until all replicas for the offsets topic receive the commit " + "or this timeout is reached. This is similar to the producer request timeout." val OffsetCommitRequiredAcksDoc = "The required acks before the commit can be accepted. In general, the default (-1) should not be overridden" + /** ********* Quota Configuration ***********/ + val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" + val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" + val ProducerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " + + "Example: clientIdX=10485760,clientIdY=10485760" + val ConsumerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " + + "Example: clientIdX=10485760,clientIdY=10485760" + val NumQuotaSamplesDoc = "The number of samples to retain in memory" + val QuotaWindowSizeSecondsDoc = "The time span of each sample" + val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + "('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and " + "'producer' which means retain the original compression codec set by the producer." + /** ********* Kafka Metrics Configuration ***********/ val MetricSampleWindowMsDoc = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC val MetricNumSamplesDoc = CommonClientConfigs.METRICS_NUM_SAMPLES_DOC val MetricReporterClassesDoc = CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC @@ -518,9 +548,19 @@ object KafkaConfig { .define(OffsetCommitRequiredAcksProp, SHORT, Defaults.OffsetCommitRequiredAcks, HIGH, OffsetCommitRequiredAcksDoc) .define(DeleteTopicEnableProp, BOOLEAN, Defaults.DeleteTopicEnable, HIGH, DeleteTopicEnableDoc) .define(CompressionTypeProp, STRING, Defaults.CompressionType, HIGH, CompressionTypeDoc) + + /** ********* Kafka Metrics Configuration ***********/ .define(MetricNumSamplesProp, INT, Defaults.MetricNumSamples, atLeast(1), LOW, MetricNumSamplesDoc) .define(MetricSampleWindowMsProp, LONG, Defaults.MetricSampleWindowMs, atLeast(1), LOW, MetricSampleWindowMsDoc) .define(MetricReporterClassesProp, LIST, Defaults.MetricReporterClasses, LOW, MetricReporterClassesDoc) + + /** ********* Quota configuration ***********/ + .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc) + .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc) + .define(ProducerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ProducerQuotaBytesPerSecondOverrides, HIGH, ProducerQuotaBytesPerSecondOverridesDoc) + .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc) + .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) + .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) } def configNames() = { @@ -548,7 +588,6 @@ object KafkaConfig { props.putAll(overrides) fromProps(props) } - } case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(KafkaConfig.configDef, props) { @@ -661,10 +700,17 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka val metricSampleWindowMs = getLong(KafkaConfig.MetricSampleWindowMsProp) val metricReporterClasses: java.util.List[MetricsReporter] = getConfiguredInstances(KafkaConfig.MetricReporterClassesProp, classOf[MetricsReporter]) + /** ********* Quota Configuration **************/ + val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp) + val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp) + val producerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp) + val consumerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp) + val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp) + val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp) + val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp) val compressionType = getString(KafkaConfig.CompressionTypeProp) - val listeners = getListeners val advertisedListeners = getAdvertisedListeners val logRetentionTimeMillis = getLogRetentionTimeMillis http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 84d4730..6d65507 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -18,7 +18,6 @@ package kafka.server import java.util -import java.util.Properties import kafka.admin._ import kafka.log.LogConfig @@ -62,11 +61,11 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg private val reporters: java.util.List[MetricsReporter] = config.metricReporterClasses reporters.add(new JmxReporter(jmxPrefix)) - - - // This exists so SocketServer (which uses Client libraries) can use the client Time objects without having to convert all of Kafka to use them - // Once we get rid of kafka.utils.time, we can get rid of this too - private val socketServerTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime() + // This exists because the Metrics package from clients has its own Time implementation. + // SocketServer/Quotas (which uses client libraries) have to use the client Time objects without having to convert all of Kafka to use them + // Eventually, we want to merge the Time objects in core and clients + private val kafkaMetricsTime: org.apache.kafka.common.utils.Time = new org.apache.kafka.common.utils.SystemTime() + var metrics: Metrics = null val brokerState: BrokerState = new BrokerState @@ -80,7 +79,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg var dynamicConfigHandlers: Map[String, ConfigHandler] = null var dynamicConfigManager: DynamicConfigManager = null - val metrics: Metrics = new Metrics() var consumerCoordinator: ConsumerCoordinator = null @@ -92,7 +90,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val metadataCache: MetadataCache = new MetadataCache(config.brokerId) - var zkClient: ZkClient = null val correlationId: AtomicInteger = new AtomicInteger(0) val brokerMetaPropsFile = "meta.properties" @@ -121,6 +118,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg val canStartup = isStartingUp.compareAndSet(false, true) if (canStartup) { + metrics = new Metrics(metricConfig, reporters, kafkaMetricsTime) + brokerState.newState(Starting) /* start scheduler */ @@ -137,9 +136,6 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.brokerId = getBrokerId this.logIdent = "[Kafka Server " + config.brokerId + "], " - val metrics = new Metrics(metricConfig, reporters, socketServerTime) - - socketServer = new SocketServer(config.brokerId, config.listeners, config.numNetworkThreads, @@ -150,7 +146,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg config.maxConnectionsPerIp, config.connectionsMaxIdleMs, config.maxConnectionsPerIpOverrides, - socketServerTime, + kafkaMetricsTime, metrics) socketServer.startup() @@ -168,7 +164,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, consumerCoordinator, - kafkaController, zkClient, config.brokerId, config, metadataCache) + kafkaController, zkClient, config.brokerId, config, metadataCache, metrics) requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) @@ -362,6 +358,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg CoreUtils.swallow(kafkaController.shutdown()) if(zkClient != null) CoreUtils.swallow(zkClient.close()) + if (metrics != null) + CoreUtils.swallow(metrics.close()) brokerState.newState(NotRunning) http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 2e0bbcd..d829e18 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -31,6 +31,7 @@ import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.metrics.Metrics import scala.collection._ @@ -98,7 +99,7 @@ class ReplicaManager(val config: KafkaConfig, val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, - val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup { + val isShuttingDown: AtomicBoolean) extends Logging with KafkaMetricsGroup { /* epoch of the controller that last changed the leader */ @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1 private val localBrokerId = config.brokerId @@ -440,7 +441,6 @@ class ReplicaManager(val config: KafkaConfig, fetchMinBytes: Int, fetchInfo: immutable.Map[TopicAndPartition, PartitionFetchInfo], responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) { - val isFromFollower = replicaId >= 0 val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId) http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/server/ThrottledResponse.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ThrottledResponse.scala b/core/src/main/scala/kafka/server/ThrottledResponse.scala new file mode 100644 index 0000000..1f80d54 --- /dev/null +++ b/core/src/main/scala/kafka/server/ThrottledResponse.scala @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{TimeUnit, Delayed} + +import org.apache.kafka.common.utils.Time + + +/** + * Represents a request whose response has been delayed. + * @param time @Time instance to use + * @param delayTimeMs delay associated with this request + * @param callback Callback to trigger after delayTimeMs milliseconds + */ +private[server] class ThrottledResponse(val time: Time, val delayTimeMs: Long, callback: => Unit) extends Delayed { + val endTime = time.milliseconds + delayTimeMs + + def execute() = callback + + override def getDelay(unit: TimeUnit): Long = { + unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS) + } + + override def compareTo(d: Delayed): Int = { + val other = d.asInstanceOf[ThrottledResponse] + if (this.endTime < other.endTime) -1 + else if (this.endTime > other.endTime) 1 + else 0 + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/main/scala/kafka/utils/ShutdownableThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ShutdownableThread.scala b/core/src/main/scala/kafka/utils/ShutdownableThread.scala index fc226c8..dc46797 100644 --- a/core/src/main/scala/kafka/utils/ShutdownableThread.scala +++ b/core/src/main/scala/kafka/utils/ShutdownableThread.scala @@ -51,6 +51,9 @@ abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean info("Shutdown completed") } + /** + * This method is repeatedly invoked until the thread shuts down or this method throws an exception + */ def doWork(): Unit override def run(): Unit = { http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/integration/kafka/api/QuotasTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala new file mode 100644 index 0000000..a11bf90 --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -0,0 +1,194 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package kafka.api + +import java.util.Properties + +import junit.framework.Assert +import kafka.consumer.SimpleConsumer +import kafka.integration.KafkaServerTestHarness +import kafka.server.{KafkaServer, KafkaConfig} +import kafka.utils.TestUtils +import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} +import org.apache.kafka.clients.producer._ +import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.KafkaMetric +import org.junit.Assert._ +import org.junit.{After, Before, Test} +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + +import scala.collection.mutable + +class QuotasTest extends KafkaServerTestHarness { + private val producerBufferSize = 300000 + private val producerId1 = "QuotasTestProducer-1" + private val producerId2 = "QuotasTestProducer-2" + private val consumerId1 = "QuotasTestConsumer-1" + private val consumerId2 = "QuotasTestConsumer-2" + + val numServers = 2 + val overridingProps = new Properties() + + // Low enough quota that a producer sending a small payload in a tight loop should get throttled + overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000") + overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500") + + // un-throttled + overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue) + overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue) + + override def generateConfigs() = { + FixedPortTestUtils.createBrokerConfigs(numServers, + zkConnect, + enableControlledShutdown = false) + .map(KafkaConfig.fromProps(_, overridingProps)) + } + + var producers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() + var consumers = mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() + var replicaConsumers = mutable.Buffer[SimpleConsumer]() + + var leaderNode: KafkaServer = null + var followerNode: KafkaServer = null + private val topic1 = "topic-1" + + @Before + override def setUp() { + super.setUp() + val producerProps = new Properties() + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + producerProps.put(ProducerConfig.ACKS_CONFIG, "0") + producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "false") + producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString) + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1) + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) + producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps) + + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId2) + producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps) + + val numPartitions = 1 + val leaders = TestUtils.createTopic(zkClient, topic1, numPartitions, numServers, servers) + leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1) + followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1) + assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) + + // Create consumers + val consumerProps = new Properties + consumerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) + consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest") + consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) + consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapUrl) + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) + consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range") + + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1) + consumers += new KafkaConsumer(consumerProps) + // Create replica consumers with the same clientId as the high level consumer. These requests should never be throttled + replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId1) + + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2) + consumers += new KafkaConsumer(consumerProps) + replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2) + + } + + @After + override def tearDown() { + producers.foreach( _.close ) + consumers.foreach( _.close ) + replicaConsumers.foreach( _.close ) + super.tearDown() + } + + @Test + def testThrottledProducerConsumer() { + val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala + + val numRecords = 1000 + produce(producers.head, numRecords) + + val producerMetricName = new MetricName("throttle-time", + RequestKeys.nameForKey(RequestKeys.ProduceKey), + "Tracking throttle-time per client", + "client-id", producerId1) + Assert.assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0) + + // Consumer should read in a bursty manner and get throttled immediately + consume(consumers.head, numRecords) + // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately + val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build() + replicaConsumers.head.fetch(request) + val consumerMetricName = new MetricName("throttle-time", + RequestKeys.nameForKey(RequestKeys.FetchKey), + "Tracking throttle-time per client", + "client-id", consumerId1) + Assert.assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0) + } + + @Test + def testProducerConsumerOverrideUnthrottled() { + val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala + val numRecords = 1000 + produce(producers(1), numRecords) + val producerMetricName = new MetricName("throttle-time", + RequestKeys.nameForKey(RequestKeys.ProduceKey), + "Tracking throttle-time per client", + "client-id", producerId2) + Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(producerMetricName).value()) + + // The "client" consumer does not get throttled. + consume(consumers(1), numRecords) + // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately + val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build() + replicaConsumers(1).fetch(request) + val consumerMetricName = new MetricName("throttle-time", + RequestKeys.nameForKey(RequestKeys.FetchKey), + "Tracking throttle-time per client", + "client-id", consumerId2) + Assert.assertEquals("Should not have been throttled", Double.NaN, allMetrics(consumerMetricName).value()) + } + + def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = { + var numBytesProduced = 0 + for (i <- 0 to count) { + val payload = i.toString.getBytes + numBytesProduced += payload.length + p.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, null, null, payload), + new ErrorLoggingCallback(topic1, null, null, true)).get() + Thread.sleep(1) + } + numBytesProduced + } + + def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) { + consumer.subscribe(topic1) + var numConsumed = 0 + while (numConsumed < numRecords) { + for (cr <- consumer.poll(100)) { + numConsumed += 1 + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala new file mode 100644 index 0000000..97dcca8 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +import java.util.Collections + +import kafka.api.RequestKeys +import org.apache.kafka.common.MetricName +import org.apache.kafka.common.metrics.{Metrics, Quota, MetricConfig} +import org.apache.kafka.common.utils.MockTime +import org.scalatest.junit.JUnit3Suite +import org.junit.{Before, Test, Assert} + +class ClientQuotaManagerTest extends JUnit3Suite { + private val time = new MockTime + + private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500, + quotaBytesPerSecondOverrides = "p1=2000,p2=4000") + + var numCallbacks: Int = 0 + def callback { + numCallbacks += 1 + } + + @Before + def beforeMethod() { + numCallbacks = 0 + } + + @Test + def testQuotaParsing() { + val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time) + try { + Assert.assertEquals("Default producer quota should be 500", + new Quota(500, true), clientMetrics.quota("random-client-id")) + Assert.assertEquals("Should return the overridden value (2000)", + new Quota(2000, true), clientMetrics.quota("p1")) + Assert.assertEquals("Should return the overridden value (4000)", + new Quota(4000, true), clientMetrics.quota("p2")) + } finally { + clientMetrics.shutdown() + } + } + + @Test + def testQuotaViolation() { + val metrics = newMetrics + val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) + val queueSizeMetric = metrics.metrics().get(new MetricName("queue-size", "producer", "")) + try { + /* We have 10 second windows. Make sure that there is no quota violation + * if we produce under the quota + */ + for (i <- 0 until 10) { + clientMetrics.recordAndMaybeThrottle("unknown", 400, callback) + time.sleep(1000) + } + Assert.assertEquals(10, numCallbacks) + Assert.assertEquals(0, queueSizeMetric.value().toInt) + + // Create a spike. + // 400*10 + 2000 = 6000/10 = 600 bytes per second. + // (600 - quota)/quota*window-size = (600-500)/500*11 seconds = 2200 + val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2000, callback) + Assert.assertEquals("Should be throttled", 2200, sleepTime) + Assert.assertEquals(1, queueSizeMetric.value().toInt) + // After a request is delayed, the callback cannot be triggered immediately + clientMetrics.throttledRequestReaper.doWork() + Assert.assertEquals(10, numCallbacks) + time.sleep(sleepTime) + + // Callback can only be triggered after the the delay time passes + clientMetrics.throttledRequestReaper.doWork() + Assert.assertEquals(0, queueSizeMetric.value().toInt) + Assert.assertEquals(11, numCallbacks) + + // Could continue to see delays until the bursty sample disappears + for (i <- 0 until 10) { + clientMetrics.recordAndMaybeThrottle("unknown", 400, callback) + time.sleep(1000) + } + + Assert.assertEquals("Should be unthrottled since bursty sample has rolled over", + 0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback)) + } finally { + clientMetrics.shutdown() + } + } + + @Test + def testOverrideParse() { + var testConfig = ClientQuotaManagerConfig() + var clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time) + + try { + // Case 1 - Default config + Assert.assertEquals(new Quota(ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, true), + clientMetrics.quota("p1")) + } finally { + clientMetrics.shutdown() + } + + + // Case 2 - Empty override + testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500, + quotaBytesPerSecondOverrides = "p1=2000,p2=4000,,") + + clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time) + try { + Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1")) + Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2")) + } finally { + clientMetrics.shutdown() + } + + // Case 3 - NumberFormatException for override + testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500, + quotaBytesPerSecondOverrides = "p1=2000,p2=4000,p3=p4") + try { + clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time) + Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides) + } + catch { + // Swallow. + case nfe: NumberFormatException => + } + + // Case 4 - IllegalArgumentException for override + testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500, + quotaBytesPerSecondOverrides = "p1=2000=3000") + try { + clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "producer", time) + Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides) + } + catch { + // Swallow. + case nfe: IllegalArgumentException => + } + + } + + def newMetrics: Metrics = { + new Metrics(new MetricConfig(), Collections.emptyList(), time) + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index e26a730..9688b8c 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -474,6 +474,12 @@ class KafkaConfigTest { case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") + case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => // ignore string + case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => // ignore string + case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") + case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0") http://git-wip-us.apache.org/repos/asf/kafka/blob/bbb7d97a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala new file mode 100644 index 0000000..14a7f45 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ThrottledResponseExpirationTest.scala @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + + +import java.util.Collections +import java.util.concurrent.{TimeUnit, DelayQueue} + +import org.apache.kafka.common.metrics.MetricConfig +import org.apache.kafka.common.utils.MockTime +import org.junit.{AfterClass, Before, Assert, Test} +import org.scalatest.junit.JUnit3Suite + +class ThrottledResponseExpirationTest extends JUnit3Suite { + private val time = new MockTime + private var numCallbacks: Int = 0 + private val metrics = new org.apache.kafka.common.metrics.Metrics(new MetricConfig(), + Collections.emptyList(), + time) + + def callback { + numCallbacks += 1 + } + + @Before + def beforeMethod() { + numCallbacks = 0 + } + + @Test + def testExpire() { + val clientMetrics = new ClientQuotaManager(ClientQuotaManagerConfig(), metrics, "producer", time) + + val delayQueue = new DelayQueue[ThrottledResponse]() + val reaper = new clientMetrics.ThrottledRequestReaper(delayQueue) + try { + // Add 4 elements to the queue out of order. Add 2 elements with the same expire timestamp + delayQueue.add(new ThrottledResponse(time, 10, callback)) + delayQueue.add(new ThrottledResponse(time, 30, callback)) + delayQueue.add(new ThrottledResponse(time, 30, callback)) + delayQueue.add(new ThrottledResponse(time, 20, callback)) + + for(itr <- 1 to 3) { + time.sleep(10) + reaper.doWork() + Assert.assertEquals(itr, numCallbacks) + + } + reaper.doWork() + Assert.assertEquals(4, numCallbacks) + Assert.assertEquals(0, delayQueue.size()) + reaper.doWork() + Assert.assertEquals(4, numCallbacks) + } finally { + clientMetrics.shutdown() + } + } + + @Test + def testThrottledRequest() { + val t1: ThrottledResponse = new ThrottledResponse(time, 10, callback) + val t2: ThrottledResponse = new ThrottledResponse(time, 20, callback) + val t3: ThrottledResponse = new ThrottledResponse(time, 20, callback) + Assert.assertEquals(10, t1.delayTimeMs) + Assert.assertEquals(20, t2.delayTimeMs) + Assert.assertEquals(20, t3.delayTimeMs) + + for(itr <- 0 to 2) { + Assert.assertEquals(10 - 10*itr, t1.getDelay(TimeUnit.MILLISECONDS)) + Assert.assertEquals(20 - 10*itr, t2.getDelay(TimeUnit.MILLISECONDS)) + Assert.assertEquals(20 - 10*itr, t3.getDelay(TimeUnit.MILLISECONDS)) + time.sleep(10) + } + } +}