This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cd38a3205b7278efda4bd5ebd634aa60bcbff691
Author: Yunze Xu <xyzinfern...@163.com>
AuthorDate: Wed Sep 10 00:12:36 2025 +0800

    [fix][broker] Fix memory leak when metrics are updated in a thread other 
than FastThreadLocalThread (#24719)
    
    (cherry picked from commit 97156533ee0afb05f0538fbe4e5cae379c71b95a)
---
 .../metrics/DataSketchesOpStatsLogger.java         |  87 ++-----------
 .../metrics/DataSketchesSummaryLogger.java         |  49 +-------
 .../prometheus/metrics/ThreadLocalAccessor.java    | 135 +++++++++++++++++++++
 .../metrics/ThreadLocalAccessorTest.java           |  93 ++++++++++++++
 4 files changed, 239 insertions(+), 125 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
index 12f54ba48a4..8973ba6a25c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java
@@ -19,15 +19,10 @@
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
 import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
 import com.yahoo.sketches.quantiles.DoublesUnion;
 import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
-import io.netty.util.concurrent.FastThreadLocal;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.StampedLock;
 import org.apache.bookkeeper.stats.OpStatsData;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 
@@ -65,15 +60,7 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
 
         failCountAdder.increment();
         failSumAdder.add((long) valueMillis);
-
-        LocalData localData = current.localData.get();
-
-        long stamp = localData.lock.readLock();
-        try {
-            localData.failSketch.update(valueMillis);
-        } finally {
-            localData.lock.unlockRead(stamp);
-        }
+        current.getLocalData().updateFail(valueMillis);
     }
 
     @Override
@@ -82,45 +69,21 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
 
         successCountAdder.increment();
         successSumAdder.add((long) valueMillis);
-
-        LocalData localData = current.localData.get();
-
-        long stamp = localData.lock.readLock();
-        try {
-            localData.successSketch.update(valueMillis);
-        } finally {
-            localData.lock.unlockRead(stamp);
-        }
+        current.getLocalData().updateSuccess(valueMillis);
     }
 
     @Override
     public void registerSuccessfulValue(long value) {
         successCountAdder.increment();
         successSumAdder.add(value);
-
-        LocalData localData = current.localData.get();
-
-        long stamp = localData.lock.readLock();
-        try {
-            localData.successSketch.update(value);
-        } finally {
-            localData.lock.unlockRead(stamp);
-        }
+        current.getLocalData().updateSuccess(value);
     }
 
     @Override
     public void registerFailedValue(long value) {
         failCountAdder.increment();
         failSumAdder.add(value);
-
-        LocalData localData = current.localData.get();
-
-        long stamp = localData.lock.readLock();
-        try {
-            localData.failSketch.update(value);
-        } finally {
-            localData.lock.unlockRead(stamp);
-        }
+        current.getLocalData().updateFail(value);
     }
 
     @Override
@@ -141,21 +104,11 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
         current = replacement;
         replacement = local;
 
-        final DoublesUnion aggregateSuccesss = new 
DoublesUnionBuilder().build();
+        final DoublesUnion aggregateSuccess = new 
DoublesUnionBuilder().build();
         final DoublesUnion aggregateFail = new DoublesUnionBuilder().build();
-        local.map.forEach((localData, b) -> {
-            long stamp = localData.lock.writeLock();
-            try {
-                aggregateSuccesss.update(localData.successSketch);
-                localData.successSketch.reset();
-                aggregateFail.update(localData.failSketch);
-                localData.failSketch.reset();
-            } finally {
-                localData.lock.unlockWrite(stamp);
-            }
-        });
-
-        successResult = aggregateSuccesss.getResultAndReset();
+        local.record(aggregateSuccess, aggregateFail);
+
+        successResult = aggregateSuccess.getResultAndReset();
         failResult = aggregateFail.getResultAndReset();
     }
 
@@ -171,28 +124,4 @@ public class DataSketchesOpStatsLogger implements 
OpStatsLogger {
         DoublesSketch s = success ? successResult : failResult;
         return s != null ? s.getQuantile(quantile) : Double.NaN;
     }
-
-    private static class LocalData {
-        private final DoublesSketch successSketch = new 
DoublesSketchBuilder().build();
-        private final DoublesSketch failSketch = new 
DoublesSketchBuilder().build();
-        private final StampedLock lock = new StampedLock();
-    }
-
-    private static class ThreadLocalAccessor {
-        private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>();
-        private final FastThreadLocal<LocalData> localData = new 
FastThreadLocal<LocalData>() {
-
-            @Override
-            protected LocalData initialValue() throws Exception {
-                LocalData localData = new LocalData();
-                map.put(localData, Boolean.TRUE);
-                return localData;
-            }
-
-            @Override
-            protected void onRemoval(LocalData value) throws Exception {
-                map.remove(value);
-            }
-        };
-    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
index cc144e0eb69..42c189d4bf3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java
@@ -19,15 +19,10 @@
 package org.apache.pulsar.broker.stats.prometheus.metrics;
 
 import com.yahoo.sketches.quantiles.DoublesSketch;
-import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
 import com.yahoo.sketches.quantiles.DoublesUnion;
 import com.yahoo.sketches.quantiles.DoublesUnionBuilder;
-import io.netty.util.concurrent.FastThreadLocal;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
-import java.util.concurrent.locks.StampedLock;
 
 public class DataSketchesSummaryLogger {
 
@@ -55,14 +50,7 @@ public class DataSketchesSummaryLogger {
         countAdder.increment();
         sumAdder.add((long) valueMillis);
 
-        LocalData localData = current.localData.get();
-
-        long stamp = localData.lock.readLock();
-        try {
-            localData.successSketch.update(valueMillis);
-        } finally {
-            localData.lock.unlockRead(stamp);
-        }
+        current.getLocalData().updateSuccess(valueMillis);
     }
 
     public void rotateLatencyCollection() {
@@ -72,15 +60,7 @@ public class DataSketchesSummaryLogger {
         replacement = local;
 
         final DoublesUnion aggregateValues = new DoublesUnionBuilder().build();
-        local.map.forEach((localData, b) -> {
-            long stamp = localData.lock.writeLock();
-            try {
-                aggregateValues.update(localData.successSketch);
-                localData.successSketch.reset();
-            } finally {
-                localData.lock.unlockWrite(stamp);
-            }
-        });
+        local.record(aggregateValues, null);
 
         values = aggregateValues.getResultAndReset();
     }
@@ -97,27 +77,4 @@ public class DataSketchesSummaryLogger {
         DoublesSketch s = values;
         return s != null ? s.getQuantile(quantile) : Double.NaN;
     }
-
-    private static class LocalData {
-        private final DoublesSketch successSketch = new 
DoublesSketchBuilder().build();
-        private final StampedLock lock = new StampedLock();
-    }
-
-    private static class ThreadLocalAccessor {
-        private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>();
-        private final FastThreadLocal<LocalData> localData = new 
FastThreadLocal<LocalData>() {
-
-            @Override
-            protected LocalData initialValue() throws Exception {
-                LocalData localData = new LocalData();
-                map.put(localData, Boolean.TRUE);
-                return localData;
-            }
-
-            @Override
-            protected void onRemoval(LocalData value) throws Exception {
-                map.remove(value);
-            }
-        };
-    }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
new file mode 100644
index 00000000000..6a32ce5b905
--- /dev/null
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.yahoo.sketches.quantiles.DoublesSketch;
+import com.yahoo.sketches.quantiles.DoublesSketchBuilder;
+import com.yahoo.sketches.quantiles.DoublesUnion;
+import io.netty.util.concurrent.FastThreadLocal;
+import io.netty.util.concurrent.FastThreadLocalThread;
+import java.lang.ref.WeakReference;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.StampedLock;
+import org.jspecify.annotations.Nullable;
+
+class ThreadLocalAccessor {
+
+    private final ConcurrentHashMap<LocalData, Boolean> map = new 
ConcurrentHashMap<>();
+    private final FastThreadLocal<LocalData> localData = new 
FastThreadLocal<>() {
+
+        @Override
+        protected LocalData initialValue() {
+            LocalData localData = new LocalData(Thread.currentThread());
+            map.put(localData, Boolean.TRUE);
+            return localData;
+        }
+
+        @Override
+        protected void onRemoval(LocalData value) {
+            map.remove(value);
+        }
+    };
+
+    void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion 
aggregateFail) {
+        map.keySet().forEach(key -> {
+            key.record(aggregateSuccess, aggregateFail);
+            if (key.shouldRemove()) {
+                map.remove(key);
+            }
+        });
+    }
+
+    LocalData getLocalData() {
+        return localData.get();
+    }
+
+    @VisibleForTesting
+    int getLocalDataCount() {
+        return map.keySet().size();
+    }
+
+    static class LocalData {
+
+        private final DoublesSketch successSketch = new 
DoublesSketchBuilder().build();
+        private final DoublesSketch failSketch = new 
DoublesSketchBuilder().build();
+        private final StampedLock lock = new StampedLock();
+        // Keep a weak reference to the owner thread so that we can remove the 
LocalData when the thread
+        // is not alive anymore or has been garbage collected.
+        // This reference isn't needed when the owner thread is a 
FastThreadLocalThread and will be null in that case.
+        // The removal is handled by FastThreadLocal#onRemoval when the owner 
thread is a FastThreadLocalThread.
+        private final WeakReference<Thread> ownerThreadReference;
+
+        LocalData(Thread ownerThread) {
+            if (ownerThread instanceof FastThreadLocalThread) {
+                ownerThreadReference = null;
+            } else {
+                ownerThreadReference = new WeakReference<>(ownerThread);
+            }
+        }
+
+        private boolean shouldRemove() {
+            if (ownerThreadReference == null) {
+                // the owner is a FastThreadLocalThread which handles the 
removal using FastThreadLocal#onRemoval
+                return false;
+            } else {
+                Thread ownerThread = ownerThreadReference.get();
+                if (ownerThread == null) {
+                    // the thread has already been garbage collected, 
LocalData should be removed
+                    return true;
+                } else {
+                    // the thread isn't alive anymore, LocalData should be 
removed
+                    return !ownerThread.isAlive();
+                }
+            }
+        }
+
+        void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion 
aggregateFail) {
+            long stamp = lock.writeLock();
+            try {
+                aggregateSuccess.update(successSketch);
+                successSketch.reset();
+                if (aggregateFail != null) {
+                    aggregateFail.update(failSketch);
+                    failSketch.reset();
+                }
+            } finally {
+                lock.unlockWrite(stamp);
+            }
+        }
+
+        void updateSuccess(double value) {
+            long stamp = lock.readLock();
+            try {
+                successSketch.update(value);
+            } finally {
+                lock.unlockRead(stamp);
+            }
+        }
+
+        void updateFail(double value) {
+            long stamp = lock.readLock();
+            try {
+                failSketch.update(value);
+            } finally {
+                lock.unlockRead(stamp);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
new file mode 100644
index 00000000000..94c8337307d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats.prometheus.metrics;
+
+import static org.testng.Assert.assertEquals;
+import com.yahoo.sketches.quantiles.DoublesUnion;
+import io.netty.util.concurrent.FastThreadLocalThread;
+import java.util.concurrent.Phaser;
+import org.jspecify.annotations.Nullable;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+public class ThreadLocalAccessorTest {
+
+    @DataProvider
+    public static Object[][] provider() {
+        return new Object[][] {
+                // 1st element: whether the thread is a FastThreadLocalThread
+                // 2nd element: the 2nd argument passed to the 
`ThreadLocalAccessor#record` method
+                { true, DoublesUnion.builder().build() },
+                { true, null },
+                { false, DoublesUnion.builder().build() },
+                { false, null },
+        };
+    }
+
+    @Test(dataProvider = "provider")
+    public void testShouldRemoveLocalDataWhenOwnerThreadIsNotAlive(
+            boolean fastThreadLocalThread, @Nullable DoublesUnion 
aggregateFail) throws Exception {
+        // given a ThreadLocalAccessor instance
+        final var threadLocalAccessor = new ThreadLocalAccessor();
+        DoublesUnion aggregateSuccess = DoublesUnion.builder().build();
+        // using phaser to synchronize threads
+        Phaser phaser = new Phaser(2);
+        Thread thread = getThread(fastThreadLocalThread, () -> {
+            // Create a new LocalData instance for the current thread.
+            threadLocalAccessor.getLocalData();
+            // sync point #1, wait and advance at the same time
+            phaser.arriveAndAwaitAdvance();
+            // sync point #2, wait and advance at the same time
+            phaser.arriveAndAwaitAdvance();
+        });
+        // sync point #1, wait and advance at the same time
+        phaser.arriveAndAwaitAdvance();
+        // and record is called
+        threadLocalAccessor.record(aggregateSuccess, aggregateFail);
+        // then LocalData should exist
+        assertEquals(threadLocalAccessor.getLocalDataCount(), 1);
+
+        // when thread is not alive anymore
+        // sync point #2, wait and advance at the same time
+        phaser.arriveAndAwaitAdvance();
+        // wait for thread to finish
+        thread.join();
+        // and record is called
+        threadLocalAccessor.record(aggregateSuccess, aggregateFail);
+        // then LocalData should be removed
+        assertEquals(threadLocalAccessor.getLocalDataCount(), 0);
+    }
+
+    @Test(dataProvider = "provider")
+    public void testThreadGc(boolean fastThreadLocalThread, @Nullable 
DoublesUnion aggregateFail) throws Exception {
+        final var accessor = new ThreadLocalAccessor();
+        getThread(fastThreadLocalThread, accessor::getLocalData).join();
+        System.gc();
+        // FastThreadLocalThread removes the LocalData from the map when the 
thread finishes
+        assertEquals(accessor.getLocalDataCount(), fastThreadLocalThread ? 0 : 
1);
+        accessor.record(DoublesUnion.builder().build(), aggregateFail);
+        assertEquals(accessor.getLocalDataCount(), 0);
+    }
+
+    private static Thread getThread(boolean fastThreadLocalThread, Runnable 
runnable) {
+        final var thread = fastThreadLocalThread ? new 
FastThreadLocalThread(runnable) : new Thread(runnable);
+        thread.start(); // when LocalData is created
+        return thread;
+    }
+}

Reply via email to