This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit cf703662698b75b0f163b9191c0a6c2d49895a1b Author: Chia-Ping Tsai <[email protected]> AuthorDate: Thu May 10 19:27:45 2018 +0800 KAFKA-6870 Concurrency conflicts in SampledStat (#4985) Make `KafkaMetric.measurableValue` thread-safe Reviewers: Rajini Sivaram <[email protected]> --- .../apache/kafka/common/metrics/KafkaMetric.java | 14 ++--- .../apache/kafka/common/metrics/SensorTest.java | 70 +++++++++++++++++++++- 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java index f04981a..48999e1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java @@ -55,9 +55,7 @@ public final class KafkaMetric implements Metric { @Override @Deprecated public double value() { - synchronized (this.lock) { - return measurableValue(time.milliseconds()); - } + return measurableValue(time.milliseconds()); } @Override @@ -81,10 +79,12 @@ public final class KafkaMetric implements Metric { } double measurableValue(long timeMs) { - if (this.metricValueProvider instanceof Measurable) - return ((Measurable) metricValueProvider).measure(config, timeMs); - else - return 0; + synchronized (this.lock) { + if (this.metricValueProvider instanceof Measurable) + return ((Measurable) metricValueProvider).measure(config, timeMs); + else + return 0; + } } public void config(MetricConfig config) { diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java index d22111e..74d4036 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/SensorTest.java @@ -16,13 +16,25 @@ */ package org.apache.kafka.common.metrics; +import org.apache.kafka.common.metrics.stats.Rate; +import org.apache.kafka.common.utils.SystemTime; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import org.apache.kafka.common.utils.SystemTime; -import org.junit.Test; - public class SensorTest { @Test public void testRecordLevelEnum() { @@ -59,4 +71,56 @@ public class SensorTest { 0, Sensor.RecordingLevel.DEBUG); assertFalse(debugSensor.shouldRecord()); } + + /** + * The Sensor#checkQuotas should be thread-safe since the method may be used by many ReplicaFetcherThreads. + */ + @Test + public void testCheckQuotasInMultiThreads() throws InterruptedException, ExecutionException { + final Metrics metrics = new Metrics(new MetricConfig().quota(Quota.upperBound(Double.MAX_VALUE)) + // decreasing the value of time window make SampledStat always record the given value + .timeWindow(1, TimeUnit.MILLISECONDS) + // increasing the value of samples make SampledStat store more samples + .samples(100)); + final Sensor sensor = metrics.sensor("sensor"); + + sensor.add(metrics.metricName("test-metric", "test-group"), new Rate()); + final int threadCount = 10; + final CountDownLatch latch = new CountDownLatch(1); + ExecutorService service = Executors.newFixedThreadPool(threadCount); + List<Future<Throwable>> workers = new ArrayList<>(threadCount); + boolean needShutdown = true; + try { + for (int i = 0; i != threadCount; ++i) { + final int index = i; + workers.add(service.submit(new Callable<Throwable>() { + @Override + public Throwable call() { + try { + assertTrue(latch.await(5, TimeUnit.SECONDS)); + for (int j = 0; j != 20; ++j) { + sensor.record(j * index, System.currentTimeMillis() + j, false); + sensor.checkQuotas(); + } + return null; + } catch (Throwable e) { + return e; + } + } + })); + } + latch.countDown(); + service.shutdown(); + assertTrue(service.awaitTermination(10, TimeUnit.SECONDS)); + needShutdown = false; + for (Future<Throwable> callable : workers) { + assertTrue("If this failure happen frequently, we can try to increase the wait time", callable.isDone()); + assertNull("Sensor#checkQuotas SHOULD be thread-safe!", callable.get()); + } + } finally { + if (needShutdown) { + service.shutdownNow(); + } + } + } } -- To stop receiving notification emails like this one, please contact [email protected].
