This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c78f832753d [improve][misc] Migrate Yahoo DataSketches to Apache 
DataSketches (#25965)
c78f832753d is described below

commit c78f832753d913d859faacb37f3caf2dea53ecda
Author: Lari Hotari <[email protected]>
AuthorDate: Mon Jun 8 14:52:18 2026 +0300

    [improve][misc] Migrate Yahoo DataSketches to Apache DataSketches (#25965)
---
 .../pulsar.client-shade-conventions.gradle.kts     |  4 +--
 distribution/server/src/assemble/LICENSE.bin.txt   |  4 +--
 distribution/shell/src/assemble/LICENSE.bin.txt    |  4 +--
 gradle/libs.versions.toml                          |  9 ++---
 .../build.gradle.kts                               |  2 +-
 .../bookkeeper/DataSketchesOpStatsLogger.java      | 33 ++++++++----------
 pulsar-broker/build.gradle.kts                     |  2 +-
 .../metrics/DataSketchesOpStatsLogger.java         | 20 +++++------
 .../metrics/DataSketchesSummaryLogger.java         | 14 ++++----
 .../prometheus/metrics/ThreadLocalAccessor.java    | 16 ++++-----
 .../metrics/ThreadLocalAccessorTest.java           | 14 ++++----
 pulsar-client/build.gradle.kts                     |  2 +-
 .../client/impl/ProducerStatsRecorderImpl.java     | 40 +++++++++++++++-------
 pulsar-functions/instance/build.gradle.kts         |  2 +-
 pulsar-functions/localrun-shaded/build.gradle.kts  |  5 ++-
 15 files changed, 88 insertions(+), 83 deletions(-)

diff --git 
a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
 
b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
index 1435aaef76d..5aacd9c48b9 100644
--- 
a/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
+++ 
b/build-logic/conventions/src/main/kotlin/pulsar.client-shade-conventions.gradle.kts
@@ -51,7 +51,7 @@ 
tasks.named<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJ
         include(dependency("org.eclipse.angus:angus-activation"))
         include(dependency("com.thoughtworks.paranamer:paranamer"))
         include(dependency("com.typesafe.netty:netty-reactive-streams"))
-        include(dependency("com.yahoo.datasketches:.*"))
+        include(dependency("org.apache.datasketches:.*"))
         include(dependency("commons-.*:.*"))
         include(dependency("io.airlift:.*"))
         include(dependency("io.grpc:.*"))
@@ -138,7 +138,6 @@ 
tasks.named<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJ
     relocateWithPrefix(shadePrefix, "org.eclipse.angus")
     relocateWithPrefix(shadePrefix, "com.thoughtworks.paranamer")
     relocateWithPrefix(shadePrefix, "com.typesafe")
-    relocateWithPrefix(shadePrefix, "com.yahoo")
     relocateWithPrefix(shadePrefix, "io.airlift")
     relocateWithPrefix(shadePrefix, "io.grpc")
     relocateWithPrefix(shadePrefix, "io.netty")
@@ -156,6 +155,7 @@ 
tasks.named<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>("shadowJ
     relocateWithPrefix(shadePrefix, "org.aopalliance")
     relocateWithPrefix(shadePrefix, "org.apache.bookkeeper")
     relocateWithPrefix(shadePrefix, "org.apache.commons")
+    relocateWithPrefix(shadePrefix, "org.apache.datasketches")
     relocateWithPrefix(shadePrefix, "org.apache.pulsar.checksum")
     relocateWithPrefix(shadePrefix, "org.asynchttpclient")
     relocateWithPrefix(shadePrefix, "org.checkerframework")
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt 
b/distribution/server/src/assemble/LICENSE.bin.txt
index 957f2ab5851..edf92edd1c1 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -277,8 +277,8 @@ The Apache Software License, Version 2.0
     - io.swagger.core.v3-swagger-annotations-jakarta-2.2.50.jar
  * slog -- io.github.merlimat.slog-slog-0.9.7.jar
  * DataSketches
-    - com.yahoo.datasketches-memory-0.8.3.jar
-    - com.yahoo.datasketches-sketches-core-0.8.3.jar
+    - org.apache.datasketches-datasketches-java-7.0.1.jar
+    - org.apache.datasketches-datasketches-memory-4.1.0.jar
  * Apache Commons
     - commons-beanutils-commons-beanutils-1.11.0.jar
     - commons-cli-commons-cli-1.11.0.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt 
b/distribution/shell/src/assemble/LICENSE.bin.txt
index 0c2e2e2ff47..f90a5d69350 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -336,8 +336,8 @@ The Apache Software License, Version 2.0
  * Netty Reactive Streams -- netty-reactive-streams-2.0.6.jar
  * Swagger -- swagger-annotations-jakarta-2.2.50.jar
  * DataSketches
-    - memory-0.8.3.jar
-    - sketches-core-0.8.3.jar
+    - datasketches-java-7.0.1.jar
+    - datasketches-memory-4.1.0.jar
  * Apache Commons
     - commons-codec-1.20.0.jar
     - commons-io-2.21.0.jar
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index c624875b70a..870a763fc52 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -121,7 +121,6 @@ oshi = "6.4.0"
 netty-reactive-streams = "2.0.6"
 cron-utils = "9.1.6"
 failsafe = "3.3.2"
-sketches = "0.8.3"
 disruptor = "3.4.3"
 ant = "1.10.12"
 guice = "5.1.0"
@@ -166,6 +165,7 @@ json = "20231013"
 consolecaptor = "1.0.3"
 restassured = "5.4.0"
 skyscreamer = "1.5.0"
+# misc
 zstd-jni = "1.5.7-3"
 lz4java = "1.10.3"
 spring = "6.2.12"
@@ -174,6 +174,8 @@ aws-sdk = "1.12.788"
 hadoop3 = "3.5.0"
 jclouds = "2.6.0"
 thrift = "0.23.0"
+datasketches-memory = "4.1.0"
+datasketches-java = "7.0.1"
 # Shading
 shadow = "9.4.1"
 
@@ -375,7 +377,6 @@ vertx-web = { module = "io.vertx:vertx-web", version.ref = 
"vertx" }
 avro = { module = "org.apache.avro:avro", version.ref = "avro" }
 avro-protobuf = { module = "org.apache.avro:avro-protobuf", version.ref = 
"avro" }
 joda-time = "joda-time:joda-time:2.10.10"
-sketches-core = { module = "com.yahoo.datasketches:sketches-core", version.ref 
= "sketches" }
 java-semver = { module = "com.github.zafarkhaja:java-semver", version.ref = 
"java-semver" }
 oshi-core = { module = "com.github.oshi:oshi-core-java11", version.ref = 
"oshi" }
 jna = { module = "net.java.dev.jna:jna", version.ref = "jna" }
@@ -466,8 +467,8 @@ commons-logging = { module = 
"commons-logging:commons-logging", version.ref = "c
 commons-beanutils = { module = "commons-beanutils:commons-beanutils", 
version.ref = "commons-beanutils" }
 commons-configuration2 = { module = 
"org.apache.commons:commons-configuration2", version.ref = 
"commons-configuration2" }
 bookkeeper-stats-api = { module = 
"org.apache.bookkeeper.stats:bookkeeper-stats-api", version.ref = "bookkeeper" }
-datasketches-memory = "org.apache.datasketches:datasketches-memory:2.2.0"
-datasketches-java = "org.apache.datasketches:datasketches-java:6.1.1"
+datasketches-memory = { module = 
"org.apache.datasketches:datasketches-memory", version.ref = 
"datasketches-memory" }
+datasketches-java = { module = "org.apache.datasketches:datasketches-java", 
version.ref = "datasketches-java" }
 
 [plugins]
 lightproto = { id = "io.streamnative.lightproto", version.ref = "lightproto" }
diff --git 
a/jetty-upgrade/bookkeeper-prometheus-metrics-provider/build.gradle.kts 
b/jetty-upgrade/bookkeeper-prometheus-metrics-provider/build.gradle.kts
index 2870a236654..f8c8b5d055c 100644
--- a/jetty-upgrade/bookkeeper-prometheus-metrics-provider/build.gradle.kts
+++ b/jetty-upgrade/bookkeeper-prometheus-metrics-provider/build.gradle.kts
@@ -31,7 +31,7 @@ dependencies {
     implementation(libs.jetty.ee8.servlet)
     implementation(libs.guava)
     implementation(libs.netty.common)
-    implementation(libs.sketches.core)
+    implementation(libs.datasketches.java)
 
     testImplementation(libs.netty.buffer)
 }
diff --git 
a/jetty-upgrade/bookkeeper-prometheus-metrics-provider/src/main/java/org/apache/pulsar/metrics/prometheus/bookkeeper/DataSketchesOpStatsLogger.java
 
b/jetty-upgrade/bookkeeper-prometheus-metrics-provider/src/main/java/org/apache/pulsar/metrics/prometheus/bookkeeper/DataSketchesOpStatsLogger.java
index 5c5bf560799..f9f6a882ca6 100644
--- 
a/jetty-upgrade/bookkeeper-prometheus-metrics-provider/src/main/java/org/apache/pulsar/metrics/prometheus/bookkeeper/DataSketchesOpStatsLogger.java
+++ 
b/jetty-upgrade/bookkeeper-prometheus-metrics-provider/src/main/java/org/apache/pulsar/metrics/prometheus/bookkeeper/DataSketchesOpStatsLogger.java
@@ -18,10 +18,6 @@
  */
 package org.apache.pulsar.metrics.prometheus.bookkeeper;
 
-import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
-import com.yahoo.sketches.quantiles.DoublesUnion;
-import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -30,6 +26,7 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.StampedLock;
 import org.apache.bookkeeper.stats.OpStatsData;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.datasketches.kll.KllDoublesSketch;
 
 /**
  * OpStatsLogger implementation that uses DataSketches library to calculate 
the approximated latency quantiles.
@@ -45,8 +42,8 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
     /*
      * These are the sketches where all the aggregated results are published.
      */
-    private volatile DoublesSketch successResult;
-    private volatile DoublesSketch failResult;
+    private volatile KllDoublesSketch successResult;
+    private volatile KllDoublesSketch failResult;
 
     private final LongAdder successCountAdder = new LongAdder();
     private final LongAdder failCountAdder = new LongAdder();
@@ -147,22 +144,22 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
         current = replacement;
         replacement = local;
 
-        final DoublesUnion aggregateSuccesss = new 
DoublesUnionBuilder().build();
-        final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
+        final KllDoublesSketch aggregateSuccess = 
KllDoublesSketch.newHeapInstance();
+        final KllDoublesSketch aggregateFail = 
KllDoublesSketch.newHeapInstance();
         local.map.forEach((localData, b) -> {
             long stamp = localData.lock.writeLock();
             try {
-                aggregateSuccesss.update(localData.successSketch);
-                localData.successSketch.reset();
-                aggregateFail.update(localData.failSketch);
-                localData.failSketch.reset();
+                aggregateSuccess.merge(localData.successSketch);
+                aggregateFail.merge(localData.failSketch);
+                localData.successSketch = KllDoublesSketch.newHeapInstance();
+                localData.failSketch = KllDoublesSketch.newHeapInstance();
             } finally {
                 localData.lock.unlockWrite(stamp);
             }
         });
 
-        successResult = aggregateSuccesss.getResultAndReset();
-        failResult = aggregateFail.getResultAndReset();
+        successResult = aggregateSuccess;
+        failResult = aggregateFail;
     }
 
     public long getCount(boolean success) {
@@ -174,8 +171,8 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
     }
 
     public double getQuantileValue(boolean success, double quantile) {
-        DoublesSketch s = success ? successResult : failResult;
-        return s != null ? s.getQuantile(quantile) : Double.NaN;
+        KllDoublesSketch s = success ? successResult : failResult;
+        return (s != null && !s.isEmpty()) ? s.getQuantile(quantile) : 
Double.NaN;
     }
 
     public Map<String, String> getLabels() {
@@ -192,8 +189,8 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
     }
 
     private static class LocalData {
-        private final DoublesSketch successSketch = new 
DoublesSketchBuilder().build();
-        private final DoublesSketch failSketch = new 
DoublesSketchBuilder().build();
+        private KllDoublesSketch successSketch = 
KllDoublesSketch.newHeapInstance();
+        private KllDoublesSketch failSketch = 
KllDoublesSketch.newHeapInstance();
         private final StampedLock lock = new StampedLock();
     }
 
diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts
index 7dbd9e60de3..af9561e9a3b 100644
--- a/pulsar-broker/build.gradle.kts
+++ b/pulsar-broker/build.gradle.kts
@@ -92,7 +92,7 @@ dependencies {
     implementation(libs.bookkeeper.server)
     implementation(libs.bookkeeper.circe.checksum)
     implementation(libs.caffeine)
-    implementation(libs.sketches.core)
+    implementation(libs.datasketches.java)
     implementation(libs.netty.codec.haproxy)
     implementation(libs.opentelemetry.sdk.extension.autoconfigure)
     implementation(libs.jetty.ee10.websocket.jetty.server)
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
index 8973ba6a25c..e4e9a668fd9 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
@@ -18,13 +18,11 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesUnion;
-import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.bookkeeper.stats.OpStatsData;
 import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.datasketches.kll.KllDoublesSketch;
 
 /**
  * OpStatsLogger implementation that uses DataSketches library to calculate 
the approximated latency quantiles.
@@ -40,8 +38,8 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
     /**
      * These are the sketches where all the aggregated results are published.
      */
-    private volatile DoublesSketch successResult;
-    private volatile DoublesSketch failResult;
+    private volatile KllDoublesSketch successResult;
+    private volatile KllDoublesSketch failResult;
 
     private final LongAdder successCountAdder = new LongAdder();
     private final LongAdder failCountAdder = new LongAdder();
@@ -104,12 +102,12 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
         current = replacement;
         replacement = local;
 
-        final DoublesUnion aggregateSuccess = new 
DoublesUnionBuilder().build();
-        final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
+        final KllDoublesSketch aggregateSuccess = 
KllDoublesSketch.newHeapInstance();
+        final KllDoublesSketch aggregateFail = 
KllDoublesSketch.newHeapInstance();
         local.record(aggregateSuccess, aggregateFail);
 
-        successResult = aggregateSuccess.getResultAndReset();
-        failResult = aggregateFail.getResultAndReset();
+        successResult = aggregateSuccess;
+        failResult = aggregateFail;
     }
 
     public long getCount(boolean success) {
@@ -121,7 +119,7 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
     }
 
     public double getQuantileValue(boolean success, double quantile) {
-        DoublesSketch s = success ? successResult : failResult;
-        return s != null ? s.getQuantile(quantile) : Double.NaN;
+        KllDoublesSketch s = success ? successResult : failResult;
+        return (s != null && !s.isEmpty()) ? s.getQuantile(quantile) : 
Double.NaN;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
index 7495f057aa0..66c8b241f68 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
@@ -18,12 +18,10 @@
  */
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
-import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesUnion;
-import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.DoubleAdder;
 import java.util.concurrent.atomic.LongAdder;
+import org.apache.datasketches.kll.KllDoublesSketch;
 
 public class DataSketchesSummaryLogger {
 
@@ -36,7 +34,7 @@ public class DataSketchesSummaryLogger {
     /*
      * These are the sketches where all the aggregated results are published.
      */
-    private volatile DoublesSketch values;
+    private volatile KllDoublesSketch values;
     private final LongAdder countAdder = new LongAdder();
     private final DoubleAdder sumAdder = new DoubleAdder();
 
@@ -60,10 +58,10 @@ public class DataSketchesSummaryLogger {
         current = replacement;
         replacement = local;
 
-        final DoublesUnion aggregateValues = new DoublesUnionBuilder().build();
+        final KllDoublesSketch aggregateValues = 
KllDoublesSketch.newHeapInstance();
         local.record(aggregateValues, null);
 
-        values = aggregateValues.getResultAndReset();
+        values = aggregateValues;
     }
 
     public long getCount() {
@@ -75,7 +73,7 @@ public class DataSketchesSummaryLogger {
     }
 
     public double getQuantileValue(double quantile) {
-        DoublesSketch s = values;
-        return s != null ? s.getQuantile(quantile) : Double.NaN;
+        KllDoublesSketch s = values;
+        return (s != null && !s.isEmpty()) ? s.getQuantile(quantile) : 
Double.NaN;
     }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
index 6a32ce5b905..c0a9f31af0a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
@@ -19,14 +19,12 @@
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
-import com.yahoo.sketches.quantiles.DoublesUnion;
 import io.netty.util.concurrent.FastThreadLocal;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.lang.ref.WeakReference;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.StampedLock;
+import org.apache.datasketches.kll.KllDoublesSketch;
 import org.jspecify.annotations.Nullable;
 
 class ThreadLocalAccessor {
@@ -47,7 +45,7 @@ class ThreadLocalAccessor {
         }
     };
 
-    void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion 
aggregateFail) {
+    void record(KllDoublesSketch aggregateSuccess, @Nullable KllDoublesSketch 
aggregateFail) {
         map.keySet().forEach(key -> {
             key.record(aggregateSuccess, aggregateFail);
             if (key.shouldRemove()) {
@@ -67,8 +65,8 @@ class ThreadLocalAccessor {
 
     static class LocalData {
 
-        private final DoublesSketch successSketch = new 
DoublesSketchBuilder().build();
-        private final DoublesSketch failSketch = new 
DoublesSketchBuilder().build();
+        private final KllDoublesSketch successSketch = 
KllDoublesSketch.newHeapInstance();
+        private final KllDoublesSketch failSketch = 
KllDoublesSketch.newHeapInstance();
         private final StampedLock lock = new StampedLock();
         // Keep a weak reference to the owner thread so that we can remove the 
LocalData when the thread
         // is not alive anymore or has been garbage collected.
@@ -100,13 +98,13 @@ class ThreadLocalAccessor {
             }
         }
 
-        void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion 
aggregateFail) {
+        void record(KllDoublesSketch aggregateSuccess, @Nullable 
KllDoublesSketch aggregateFail) {
             long stamp = lock.writeLock();
             try {
-                aggregateSuccess.update(successSketch);
+                aggregateSuccess.merge(successSketch);
                 successSketch.reset();
                 if (aggregateFail != null) {
-                    aggregateFail.update(failSketch);
+                    aggregateFail.merge(failSketch);
                     failSketch.reset();
                 }
             } finally {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
index 94c8337307d..3ccebd5581d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
@@ -19,9 +19,9 @@
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
 import static org.testng.Assert.assertEquals;
-import com.yahoo.sketches.quantiles.DoublesUnion;
 import io.netty.util.concurrent.FastThreadLocalThread;
 import java.util.concurrent.Phaser;
+import org.apache.datasketches.kll.KllDoublesSketch;
 import org.jspecify.annotations.Nullable;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -33,19 +33,19 @@ public class ThreadLocalAccessorTest {
         return new Object[][] {
                 // 1st element: whether the thread is a FastThreadLocalThread
                 // 2nd element: the 2nd argument passed to the 
`ThreadLocalAccessor#record` method
-                { true, DoublesUnion.builder().build() },
+                { true, KllDoublesSketch.newHeapInstance() },
                 { true, null },
-                { false, DoublesUnion.builder().build() },
+                { false, KllDoublesSketch.newHeapInstance() },
                 { false, null },
         };
     }
 
     @Test(dataProvider = "provider")
     public void testShouldRemoveLocalDataWhenOwnerThreadIsNotAlive(
-            boolean fastThreadLocalThread, @Nullable DoublesUnion 
aggregateFail) throws Exception {
+            boolean fastThreadLocalThread, @Nullable KllDoublesSketch 
aggregateFail) throws Exception {
         // given a ThreadLocalAccessor instance
         final var threadLocalAccessor = new ThreadLocalAccessor();
-        DoublesUnion aggregateSuccess = DoublesUnion.builder().build();
+        KllDoublesSketch aggregateSuccess = KllDoublesSketch.newHeapInstance();
         // using phaser to synchronize threads
         Phaser phaser = new Phaser(2);
         Thread thread = getThread(fastThreadLocalThread, () -> {
@@ -75,13 +75,13 @@ public class ThreadLocalAccessorTest {
     }
 
     @Test(dataProvider = "provider")
-    public void testThreadGc(boolean fastThreadLocalThread, @Nullable 
DoublesUnion aggregateFail) throws Exception {
+    public void testThreadGc(boolean fastThreadLocalThread, @Nullable 
KllDoublesSketch aggregateFail) throws Exception {
         final var accessor = new ThreadLocalAccessor();
         getThread(fastThreadLocalThread, accessor::getLocalData).join();
         System.gc();
         // FastThreadLocalThread removes the LocalData from the map when the 
thread finishes
         assertEquals(accessor.getLocalDataCount(), fastThreadLocalThread ? 0 : 
1);
-        accessor.record(DoublesUnion.builder().build(), aggregateFail);
+        accessor.record(KllDoublesSketch.newHeapInstance(), aggregateFail);
         assertEquals(accessor.getLocalDataCount(), 0);
     }
 
diff --git a/pulsar-client/build.gradle.kts b/pulsar-client/build.gradle.kts
index f0618cd7efe..43c382b2478 100644
--- a/pulsar-client/build.gradle.kts
+++ b/pulsar-client/build.gradle.kts
@@ -47,7 +47,7 @@ dependencies {
     implementation(libs.netty.reactive.streams)
     implementation(libs.slog)
     implementation(libs.commons.codec)
-    implementation(libs.sketches.core)
+    implementation(libs.datasketches.java)
     implementation(libs.gson)
     implementation(libs.avro) {
         exclude(group = "org.slf4j")
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
index 19a49158c66..f8ad90be352 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java
@@ -20,14 +20,15 @@ package org.apache.pulsar.client.impl;
 
 import com.fasterxml.jackson.databind.ObjectWriter;
 import com.fasterxml.jackson.databind.SerializationFeature;
-import com.yahoo.sketches.quantiles.DoublesSketch;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
 import java.io.IOException;
 import java.text.DecimalFormat;
+import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import lombok.CustomLog;
+import org.apache.datasketches.kll.KllDoublesSketch;
 import org.apache.pulsar.client.api.ProducerStats;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
@@ -53,9 +54,9 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
     private final LongAdder totalAcksReceived;
     private static final DecimalFormat DEC = new DecimalFormat("0.000");
     private static final DecimalFormat THROUGHPUT_FORMAT = new 
DecimalFormat("0.00");
-    private final transient DoublesSketch ds;
-    private final transient DoublesSketch batchSizeDs;
-    private final transient DoublesSketch msgSizeDs;
+    private final transient KllDoublesSketch ds;
+    private final transient KllDoublesSketch batchSizeDs;
+    private final transient KllDoublesSketch msgSizeDs;
 
     private volatile double sendMsgsRate;
     private volatile double sendBytesRate;
@@ -74,9 +75,9 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent = new LongAdder();
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
-        ds = DoublesSketch.builder().build(256);
-        batchSizeDs = DoublesSketch.builder().build(256);
-        msgSizeDs = DoublesSketch.builder().build(256);
+        ds = KllDoublesSketch.newHeapInstance(256);
+        batchSizeDs = KllDoublesSketch.newHeapInstance(256);
+        msgSizeDs = KllDoublesSketch.newHeapInstance(256);
     }
 
     public ProducerStatsRecorderImpl(PulsarClientImpl pulsarClient, 
ProducerConfigurationData conf,
@@ -92,9 +93,9 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalBytesSent = new LongAdder();
         totalSendFailed = new LongAdder();
         totalAcksReceived = new LongAdder();
-        ds = DoublesSketch.builder().build(256);
-        batchSizeDs = DoublesSketch.builder().build(256);
-        msgSizeDs = DoublesSketch.builder().build(256);
+        ds = KllDoublesSketch.newHeapInstance(256);
+        batchSizeDs = KllDoublesSketch.newHeapInstance(256);
+        msgSizeDs = KllDoublesSketch.newHeapInstance(256);
         init(conf);
     }
 
@@ -154,17 +155,17 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         totalAcksReceived.add(currentNumAcksReceived);
 
         synchronized (ds) {
-            latencyPctValues = ds.getQuantiles(PERCENTILES);
+            latencyPctValues = getQuantiles(ds);
             ds.reset();
         }
 
         synchronized (batchSizeDs) {
-            batchSizePctValues = batchSizeDs.getQuantiles(PERCENTILES);
+            batchSizePctValues = getQuantiles(batchSizeDs);
             batchSizeDs.reset();
         }
 
         synchronized (msgSizeDs) {
-            msgSizePctValues = msgSizeDs.getQuantiles(PERCENTILES);
+            msgSizePctValues = getQuantiles(msgSizeDs);
             msgSizeDs.reset();
         }
 
@@ -206,6 +207,19 @@ public class ProducerStatsRecorderImpl implements 
ProducerStatsRecorder {
         }
     }
 
+    /**
+     * Returns the configured percentile quantiles for the given sketch. 
KllDoublesSketch throws on an empty
+     * sketch, so an array of {@link Double#NaN} is returned in that case to 
preserve the previous behavior.
+     */
+    private static double[] getQuantiles(KllDoublesSketch sketch) {
+        if (sketch.isEmpty()) {
+            double[] values = new double[PERCENTILES.length];
+            Arrays.fill(values, Double.NaN);
+            return values;
+        }
+        return sketch.getQuantiles(PERCENTILES);
+    }
+
     @Override
     public void updateNumMsgsSent(long numMsgs, long totalMsgsSize) {
         numMsgsSent.add(numMsgs);
diff --git a/pulsar-functions/instance/build.gradle.kts 
b/pulsar-functions/instance/build.gradle.kts
index e474f8e68e5..968f5b200a3 100644
--- a/pulsar-functions/instance/build.gradle.kts
+++ b/pulsar-functions/instance/build.gradle.kts
@@ -43,7 +43,7 @@ dependencies {
     implementation(libs.simpleclient.caffeine)
     implementation(libs.simpleclient.httpserver)
     implementation(libs.prometheus.jmx.collector)
-    implementation(libs.sketches.core)
+    implementation(libs.datasketches.java)
     implementation(libs.jackson.databind)
     implementation(libs.netty.buffer)
     implementation(libs.netty.common)
diff --git a/pulsar-functions/localrun-shaded/build.gradle.kts 
b/pulsar-functions/localrun-shaded/build.gradle.kts
index 882c0cbbbd1..2ff2100578f 100644
--- a/pulsar-functions/localrun-shaded/build.gradle.kts
+++ b/pulsar-functions/localrun-shaded/build.gradle.kts
@@ -64,7 +64,7 @@ tasks.shadowJar {
         include(dependency("info.picocli:.*"))
         include(dependency("net.jodah:.*"))
         include(dependency("io.airlift:.*"))
-        include(dependency("com.yahoo.datasketches:.*"))
+        include(dependency("org.apache.datasketches:.*"))
     }
 
     // Exclude bouncycastle from pulsar-client (signatures would break if 
shaded)
@@ -80,8 +80,6 @@ tasks.shadowJar {
     relocateWithPrefix(shadePrefix, "com.squareup.okhttp")
     relocateWithPrefix(shadePrefix, "com.squareup.okio")
     relocateWithPrefix(shadePrefix, "com.thoughtworks.paranamer")
-    relocateWithPrefix(shadePrefix, "com.yahoo.datasketches")
-    relocateWithPrefix(shadePrefix, "com.yahoo.sketches")
     relocateWithPrefix(shadePrefix, "commons-cli")
     relocateWithPrefix(shadePrefix, "commons-codec")
     relocateWithPrefix(shadePrefix, "commons-io")
@@ -111,6 +109,7 @@ tasks.shadowJar {
     relocateWithPrefix(shadePrefix, "org.apache.bookkeeper")
     relocateWithPrefix(shadePrefix, "org.apache.commons")
     relocateWithPrefix(shadePrefix, "org.apache.curator")
+    relocateWithPrefix(shadePrefix, "org.apache.datasketches")
     relocateWithPrefix(shadePrefix, "org.apache.distributedlog")
     relocateWithPrefix(shadePrefix, "org.apache.jute")
     relocateWithPrefix(shadePrefix, "org.apache.yetus")

Reply via email to