This is an automated email from the ASF dual-hosted git repository.

jeffkbkim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bc47ce1a53e MINOR: Fix a race and add JMH bench for HdrHistogram 
(#17221)
bc47ce1a53e is described below

commit bc47ce1a53e693709e0005517c82f33c6880afe2
Author: Dimitar Dimitrov <[email protected]>
AuthorDate: Fri Sep 27 09:49:10 2024 -0500

    MINOR: Fix a race and add JMH bench for HdrHistogram (#17221)
---
 build.gradle                                       |   1 +
 checkstyle/import-control-jmh-benchmarks.xml       |   2 +
 .../coordinator/common/runtime/HdrHistogram.java   |  26 ++---
 .../common/runtime/HdrHistogramTest.java           |  41 +++++++
 .../kafka/jmh/metrics/HistogramBenchmark.java      | 122 +++++++++++++++++++++
 5 files changed, 179 insertions(+), 13 deletions(-)

diff --git a/build.gradle b/build.gradle
index 64b9d488365..0907f0506a4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3196,6 +3196,7 @@ project(':jmh-benchmarks') {
     implementation project(':server')
     implementation project(':raft')
     implementation project(':clients')
+    implementation project(':coordinator-common')
     implementation project(':group-coordinator')
     implementation project(':group-coordinator:group-coordinator-api')
     implementation project(':metadata')
diff --git a/checkstyle/import-control-jmh-benchmarks.xml 
b/checkstyle/import-control-jmh-benchmarks.xml
index 1612477ea00..65bfbb63373 100644
--- a/checkstyle/import-control-jmh-benchmarks.xml
+++ b/checkstyle/import-control-jmh-benchmarks.xml
@@ -29,6 +29,7 @@
     <allow pkg="java.security"/>
     <allow pkg="javax.net.ssl"/>
     <allow pkg="javax.security"/>
+    <allow pkg="com.yammer.metrics.core"/>
     <allow pkg="org.apache.kafka.common"/>
     <allow pkg="org.apache.kafka.clients.producer"/>
     <allow pkg="kafka.cluster"/>
@@ -51,6 +52,7 @@
     <allow pkg="org.apache.kafka.server"/>
     <allow pkg="org.apache.kafka.storage"/>
     <allow pkg="org.apache.kafka.clients"/>
+    <allow class="org.apache.kafka.coordinator.common.runtime.HdrHistogram"/>
     <allow pkg="org.apache.kafka.coordinator.group"/>
     <allow pkg="org.apache.kafka.image"/>
     <allow pkg="org.apache.kafka.metadata"/>
diff --git 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/HdrHistogram.java
 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/HdrHistogram.java
index 454a7aeb3b3..4b961d957cc 100644
--- 
a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/HdrHistogram.java
+++ 
b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/HdrHistogram.java
@@ -20,8 +20,6 @@ import org.HdrHistogram.Histogram;
 import org.HdrHistogram.Recorder;
 import org.HdrHistogram.ValueRecorder;
 
-import java.util.concurrent.atomic.AtomicReference;
-
 /**
  * <p>A wrapper on top of the HdrHistogram API. It handles writing to the 
histogram by delegating
  * to an internal {@link ValueRecorder} implementation, and reading from the 
histogram by
@@ -35,6 +33,7 @@ public final class HdrHistogram {
 
     private static final long DEFAULT_MAX_SNAPSHOT_AGE_MS = 1000L;
 
+    private final Object lock = new Object();
     /**
      * The duration (in millis) after which the latest histogram snapshot is 
considered outdated and
      * subsequent calls to {@link #latestHistogram(long)} will result in the 
snapshot being recreated.
@@ -54,7 +53,7 @@ public final class HdrHistogram {
      * The latest snapshot of the internal HdrHistogram. Automatically updated 
by
      * {@link #latestHistogram(long)} if older than {@link #maxSnapshotAgeMs}.
      */
-    private final AtomicReference<Timestamped<Histogram>> 
timestampedHistogramSnapshot;
+    private volatile Timestamped<Histogram> timestampedHistogramSnapshot;
 
     public HdrHistogram(
         long highestTrackableValue,
@@ -70,19 +69,20 @@ public final class HdrHistogram {
     ) {
         this.maxSnapshotAgeMs = maxSnapshotAgeMs;
         recorder = new Recorder(highestTrackableValue, 
numberOfSignificantValueDigits);
-        this.timestampedHistogramSnapshot = new AtomicReference<>(new 
Timestamped<>(0, null));
+        this.timestampedHistogramSnapshot = new Timestamped<>(0, null);
     }
 
     private Histogram latestHistogram(long now) {
-        Timestamped<Histogram> latest = timestampedHistogramSnapshot.get();
-        while (now - latest.timestamp > maxSnapshotAgeMs) {
-            Histogram currentSnapshot = recorder.getIntervalHistogram();
-            boolean updatedLatest = timestampedHistogramSnapshot.compareAndSet(
-                latest, new Timestamped<>(now, currentSnapshot));
-
-            latest = timestampedHistogramSnapshot.get();
-            if (updatedLatest) {
-                break;
+        Timestamped<Histogram> latest = timestampedHistogramSnapshot;
+        if (now - latest.timestamp > maxSnapshotAgeMs) {
+            // Double-checked locking ensures that the thread that extracts 
the histogram data is
+            // the one that updates the internal snapshot.
+            synchronized (lock) {
+                latest = timestampedHistogramSnapshot;
+                if (now - latest.timestamp > maxSnapshotAgeMs) {
+                    latest = new Timestamped<>(now, 
recorder.getIntervalHistogram());
+                    timestampedHistogramSnapshot = latest;
+                }
             }
         }
         return latest.value;
diff --git 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java
 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java
index ad4203310d8..7703f11c81c 100644
--- 
a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java
+++ 
b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/HdrHistogramTest.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.coordinator.common.runtime;
 
+import org.apache.kafka.common.utils.ThreadUtils;
+
 import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricsRegistry;
@@ -25,8 +27,12 @@ import org.junit.jupiter.api.Test;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
@@ -172,4 +178,39 @@ public class HdrHistogramTest {
         assertEquals(numEventsInFirstCycle, hdrHistogram.count(now + 
maxSnapshotAgeMs));
         assertEquals(numEventsInSecondCycle, hdrHistogram.count(now + 1 + 
maxSnapshotAgeMs));
     }
+
+    @Test
+    public void testLatestHistogramRace() throws InterruptedException, 
ExecutionException {
+        long maxSnapshotAgeMs = 10L;
+        long now = System.currentTimeMillis();
+        HdrHistogram hdrHistogram = new HdrHistogram(maxSnapshotAgeMs, 
MAX_VALUE, 1);
+        ExecutorService countExecutor = Executors.newFixedThreadPool(2);
+        for (int i = 1; i < 10000; i++) {
+            int numEvents = 2;
+            for (int j = 0; j < numEvents; j++) {
+                hdrHistogram.record(i);
+            }
+            final long moreThanMaxAge = now + maxSnapshotAgeMs + 1;
+            now = moreThanMaxAge;
+            CountDownLatch latch = new CountDownLatch(1);
+            Callable<Long> countTask = () -> {
+                try {
+                    assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
+                    return hdrHistogram.count(moreThanMaxAge);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            };
+            Future<Long> t1Future = countExecutor.submit(countTask);
+            Future<Long> t2Future = countExecutor.submit(countTask);
+            latch.countDown();
+            long t1Count = t1Future.get();
+            long t2Count = t2Future.get();
+            assertTrue(
+                numEvents == t1Count && numEvents == t2Count,
+                String.format("Expected %d events in both threads, got %d in 
T1 and %d in T2",
+                    numEvents, t1Count, t2Count));
+        }
+        ThreadUtils.shutdownExecutorServiceQuietly(countExecutor, 500, 
TimeUnit.MILLISECONDS);
+    }
 }
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metrics/HistogramBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metrics/HistogramBenchmark.java
new file mode 100644
index 00000000000..d5f65eaf134
--- /dev/null
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metrics/HistogramBenchmark.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.jmh.metrics;
+
+import org.apache.kafka.coordinator.common.runtime.HdrHistogram;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricName;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Group;
+import org.openjdk.jmh.annotations.GroupThreads;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+
+@State(Scope.Benchmark)
+@Fork(value = 1)
+@Threads(10)
+@Warmup(iterations = 5)
+@Measurement(iterations = 5)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+public class HistogramBenchmark {
+
+    /*
+     * This benchmark compares the performance of the most commonly used in 
the Kafka codebase
+     * Yammer histogram and the new HdrHistogram. It does it by focusing on 
the write path in a
+     * multiple writers, multiple readers scenario.
+     *
+     * The benchmark relies on JMH Groups which allows us to distribute the 
number of worker threads
+     * to the different benchmark methods.
+     */
+
+    private static final long MAX_VALUE = TimeUnit.MINUTES.toMillis(1L);
+
+    private Histogram yammerHistogram;
+    private HdrHistogram hdrHistogram;
+
+    @Setup(Level.Trial)
+    public void setUp() {
+        yammerHistogram = 
KafkaYammerMetrics.defaultRegistry().newHistogram(new MetricName("a", "", ""), 
true);
+        hdrHistogram = new HdrHistogram(MAX_VALUE, 3);
+    }
+
+    /*
+     * The write benchmark methods below are the core of the benchmark. They 
use ThreadLocalRandom
+     * to generate values to record. This is much faster than the actual 
histogram recording, so
+     * the benchmark results are representative of the histogram 
implementation.
+     */
+
+    @Benchmark
+    @Group("runner")
+    @GroupThreads(3)
+    public void writeYammerHistogram() {
+        
yammerHistogram.update(ThreadLocalRandom.current().nextLong(MAX_VALUE));
+    }
+
+    @Benchmark
+    @Group("runner")
+    @GroupThreads(3)
+    public void writeHdrHistogram() {
+        hdrHistogram.record(ThreadLocalRandom.current().nextLong(MAX_VALUE));
+    }
+
+    /*
+     * The read benchmark methods below are not real benchmark methods!
+     * They are there only to simulate the concurrent exercise of the read and 
the write paths in
+     * the histogram implementations (with the read path exercised 
significantly less often). The
+     * measurements for these benchmark methods should be ignored as although 
not optimized away
+     * (that's why the methods have a return value), they practically measure 
the cost of a
+     * System.currentTimeMillis() call.
+     */
+
+    @Benchmark
+    @Group("runner")
+    @GroupThreads(2)
+    public double readYammerHistogram() {
+        long now = System.currentTimeMillis();
+        if (now % 199 == 0) {
+            return yammerHistogram.getSnapshot().get999thPercentile();
+        }
+        return now;
+    }
+
+    @Benchmark
+    @Group("runner")
+    @GroupThreads(2)
+    public double readHdrHistogram() {
+        long now = System.currentTimeMillis();
+        if (now % 199 == 0) {
+            return hdrHistogram.measurePercentile(now, 99.9);
+        }
+        return now;
+    }
+}

Reply via email to