This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cd38a3205b7278efda4bd5ebd634aa60bcbff691 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Wed Sep 10 00:12:36 2025 +0800 [fix][broker] Fix memory leak when metrics are updated in a thread other than FastThreadLocalThread (#24719) (cherry picked from commit 97156533ee0afb05f0538fbe4e5cae379c71b95a) --- .../metrics/DataSketchesOpStatsLogger.java | 87 ++----------- .../metrics/DataSketchesSummaryLogger.java | 49 +------- .../prometheus/metrics/ThreadLocalAccessor.java | 135 +++++++++++++++++++++ .../metrics/ThreadLocalAccessorTest.java | 93 ++++++++++++++ 4 files changed, 239 insertions(+), 125 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java index 12f54ba48a4..8973ba6a25c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesOpStatsLogger.java @@ -19,15 +19,10 @@ package org.apache.pulsar.broker.stats.prometheus.metrics; import com.yahoo.sketches.quantiles.DoublesSketch; -import com.yahoo.sketches.quantiles.DoublesSketchBuilder; import com.yahoo.sketches.quantiles.DoublesUnion; import com.yahoo.sketches.quantiles.DoublesUnionBuilder; -import io.netty.util.concurrent.FastThreadLocal; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.StampedLock; import org.apache.bookkeeper.stats.OpStatsData; import org.apache.bookkeeper.stats.OpStatsLogger; @@ -65,15 +60,7 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger { failCountAdder.increment(); failSumAdder.add((long) valueMillis); - - LocalData localData = current.localData.get(); - - long stamp = localData.lock.readLock(); - try { - localData.failSketch.update(valueMillis); - } finally { - localData.lock.unlockRead(stamp); - } + current.getLocalData().updateFail(valueMillis); } @Override @@ -82,45 +69,21 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger { successCountAdder.increment(); successSumAdder.add((long) valueMillis); - - LocalData localData = current.localData.get(); - - long stamp = localData.lock.readLock(); - try { - localData.successSketch.update(valueMillis); - } finally { - localData.lock.unlockRead(stamp); - } + current.getLocalData().updateSuccess(valueMillis); } @Override public void registerSuccessfulValue(long value) { successCountAdder.increment(); successSumAdder.add(value); - - LocalData localData = current.localData.get(); - - long stamp = localData.lock.readLock(); - try { - localData.successSketch.update(value); - } finally { - localData.lock.unlockRead(stamp); - } + current.getLocalData().updateSuccess(value); } @Override public void registerFailedValue(long value) { failCountAdder.increment(); failSumAdder.add(value); - - LocalData localData = current.localData.get(); - - long stamp = localData.lock.readLock(); - try { - localData.failSketch.update(value); - } finally { - localData.lock.unlockRead(stamp); - } + current.getLocalData().updateFail(value); } @Override @@ -141,21 +104,11 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger { current = replacement; replacement = local; - final DoublesUnion aggregateSuccesss = new DoublesUnionBuilder().build(); + final DoublesUnion aggregateSuccess = new DoublesUnionBuilder().build(); final DoublesUnion aggregateFail = new DoublesUnionBuilder().build(); - local.map.forEach((localData, b) -> { - long stamp = localData.lock.writeLock(); - try { - aggregateSuccesss.update(localData.successSketch); - localData.successSketch.reset(); - aggregateFail.update(localData.failSketch); - localData.failSketch.reset(); - } finally { - localData.lock.unlockWrite(stamp); - } - }); - - successResult = aggregateSuccesss.getResultAndReset(); + local.record(aggregateSuccess, aggregateFail); + + successResult = aggregateSuccess.getResultAndReset(); failResult = aggregateFail.getResultAndReset(); } @@ -171,28 +124,4 @@ public class DataSketchesOpStatsLogger implements OpStatsLogger { DoublesSketch s = success ? successResult : failResult; return s != null ? s.getQuantile(quantile) : Double.NaN; } - - private static class LocalData { - private final DoublesSketch successSketch = new DoublesSketchBuilder().build(); - private final DoublesSketch failSketch = new DoublesSketchBuilder().build(); - private final StampedLock lock = new StampedLock(); - } - - private static class ThreadLocalAccessor { - private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>(); - private final FastThreadLocal<LocalData> localData = new FastThreadLocal<LocalData>() { - - @Override - protected LocalData initialValue() throws Exception { - LocalData localData = new LocalData(); - map.put(localData, Boolean.TRUE); - return localData; - } - - @Override - protected void onRemoval(LocalData value) throws Exception { - map.remove(value); - } - }; - } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java index cc144e0eb69..42c189d4bf3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/DataSketchesSummaryLogger.java @@ -19,15 +19,10 @@ package org.apache.pulsar.broker.stats.prometheus.metrics; import com.yahoo.sketches.quantiles.DoublesSketch; -import com.yahoo.sketches.quantiles.DoublesSketchBuilder; import com.yahoo.sketches.quantiles.DoublesUnion; import com.yahoo.sketches.quantiles.DoublesUnionBuilder; -import io.netty.util.concurrent.FastThreadLocal; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; -import java.util.concurrent.locks.StampedLock; public class DataSketchesSummaryLogger { @@ -55,14 +50,7 @@ public class DataSketchesSummaryLogger { countAdder.increment(); sumAdder.add((long) valueMillis); - LocalData localData = current.localData.get(); - - long stamp = localData.lock.readLock(); - try { - localData.successSketch.update(valueMillis); - } finally { - localData.lock.unlockRead(stamp); - } + current.getLocalData().updateSuccess(valueMillis); } public void rotateLatencyCollection() { @@ -72,15 +60,7 @@ public class DataSketchesSummaryLogger { replacement = local; final DoublesUnion aggregateValues = new DoublesUnionBuilder().build(); - local.map.forEach((localData, b) -> { - long stamp = localData.lock.writeLock(); - try { - aggregateValues.update(localData.successSketch); - localData.successSketch.reset(); - } finally { - localData.lock.unlockWrite(stamp); - } - }); + local.record(aggregateValues, null); values = aggregateValues.getResultAndReset(); } @@ -97,27 +77,4 @@ public class DataSketchesSummaryLogger { DoublesSketch s = values; return s != null ? s.getQuantile(quantile) : Double.NaN; } - - private static class LocalData { - private final DoublesSketch successSketch = new DoublesSketchBuilder().build(); - private final StampedLock lock = new StampedLock(); - } - - private static class ThreadLocalAccessor { - private final Map<LocalData, Boolean> map = new ConcurrentHashMap<>(); - private final FastThreadLocal<LocalData> localData = new FastThreadLocal<LocalData>() { - - @Override - protected LocalData initialValue() throws Exception { - LocalData localData = new LocalData(); - map.put(localData, Boolean.TRUE); - return localData; - } - - @Override - protected void onRemoval(LocalData value) throws Exception { - map.remove(value); - } - }; - } -} \ No newline at end of file +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java new file mode 100644 index 00000000000..6a32ce5b905 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessor.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.yahoo.sketches.quantiles.DoublesSketch; +import com.yahoo.sketches.quantiles.DoublesSketchBuilder; +import com.yahoo.sketches.quantiles.DoublesUnion; +import io.netty.util.concurrent.FastThreadLocal; +import io.netty.util.concurrent.FastThreadLocalThread; +import java.lang.ref.WeakReference; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.StampedLock; +import org.jspecify.annotations.Nullable; + +class ThreadLocalAccessor { + + private final ConcurrentHashMap<LocalData, Boolean> map = new ConcurrentHashMap<>(); + private final FastThreadLocal<LocalData> localData = new FastThreadLocal<>() { + + @Override + protected LocalData initialValue() { + LocalData localData = new LocalData(Thread.currentThread()); + map.put(localData, Boolean.TRUE); + return localData; + } + + @Override + protected void onRemoval(LocalData value) { + map.remove(value); + } + }; + + void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion aggregateFail) { + map.keySet().forEach(key -> { + key.record(aggregateSuccess, aggregateFail); + if (key.shouldRemove()) { + map.remove(key); + } + }); + } + + LocalData getLocalData() { + return localData.get(); + } + + @VisibleForTesting + int getLocalDataCount() { + return map.keySet().size(); + } + + static class LocalData { + + private final DoublesSketch successSketch = new DoublesSketchBuilder().build(); + private final DoublesSketch failSketch = new DoublesSketchBuilder().build(); + private final StampedLock lock = new StampedLock(); + // Keep a weak reference to the owner thread so that we can remove the LocalData when the thread + // is not alive anymore or has been garbage collected. + // This reference isn't needed when the owner thread is a FastThreadLocalThread and will be null in that case. + // The removal is handled by FastThreadLocal#onRemoval when the owner thread is a FastThreadLocalThread. + private final WeakReference<Thread> ownerThreadReference; + + LocalData(Thread ownerThread) { + if (ownerThread instanceof FastThreadLocalThread) { + ownerThreadReference = null; + } else { + ownerThreadReference = new WeakReference<>(ownerThread); + } + } + + private boolean shouldRemove() { + if (ownerThreadReference == null) { + // the owner is a FastThreadLocalThread which handles the removal using FastThreadLocal#onRemoval + return false; + } else { + Thread ownerThread = ownerThreadReference.get(); + if (ownerThread == null) { + // the thread has already been garbage collected, LocalData should be removed + return true; + } else { + // the thread isn't alive anymore, LocalData should be removed + return !ownerThread.isAlive(); + } + } + } + + void record(DoublesUnion aggregateSuccess, @Nullable DoublesUnion aggregateFail) { + long stamp = lock.writeLock(); + try { + aggregateSuccess.update(successSketch); + successSketch.reset(); + if (aggregateFail != null) { + aggregateFail.update(failSketch); + failSketch.reset(); + } + } finally { + lock.unlockWrite(stamp); + } + } + + void updateSuccess(double value) { + long stamp = lock.readLock(); + try { + successSketch.update(value); + } finally { + lock.unlockRead(stamp); + } + } + + void updateFail(double value) { + long stamp = lock.readLock(); + try { + failSketch.update(value); + } finally { + lock.unlockRead(stamp); + } + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java new file mode 100644 index 00000000000..94c8337307d --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/metrics/ThreadLocalAccessorTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus.metrics; + +import static org.testng.Assert.assertEquals; +import com.yahoo.sketches.quantiles.DoublesUnion; +import io.netty.util.concurrent.FastThreadLocalThread; +import java.util.concurrent.Phaser; +import org.jspecify.annotations.Nullable; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ThreadLocalAccessorTest { + + @DataProvider + public static Object[][] provider() { + return new Object[][] { + // 1st element: whether the thread is a FastThreadLocalThread + // 2nd element: the 2nd argument passed to the `ThreadLocalAccessor#record` method + { true, DoublesUnion.builder().build() }, + { true, null }, + { false, DoublesUnion.builder().build() }, + { false, null }, + }; + } + + @Test(dataProvider = "provider") + public void testShouldRemoveLocalDataWhenOwnerThreadIsNotAlive( + boolean fastThreadLocalThread, @Nullable DoublesUnion aggregateFail) throws Exception { + // given a ThreadLocalAccessor instance + final var threadLocalAccessor = new ThreadLocalAccessor(); + DoublesUnion aggregateSuccess = DoublesUnion.builder().build(); + // using phaser to synchronize threads + Phaser phaser = new Phaser(2); + Thread thread = getThread(fastThreadLocalThread, () -> { + // Create a new LocalData instance for the current thread. + threadLocalAccessor.getLocalData(); + // sync point #1, wait and advance at the same time + phaser.arriveAndAwaitAdvance(); + // sync point #2, wait and advance at the same time + phaser.arriveAndAwaitAdvance(); + }); + // sync point #1, wait and advance at the same time + phaser.arriveAndAwaitAdvance(); + // and record is called + threadLocalAccessor.record(aggregateSuccess, aggregateFail); + // then LocalData should exist + assertEquals(threadLocalAccessor.getLocalDataCount(), 1); + + // when thread is not alive anymore + // sync point #2, wait and advance at the same time + phaser.arriveAndAwaitAdvance(); + // wait for thread to finish + thread.join(); + // and record is called + threadLocalAccessor.record(aggregateSuccess, aggregateFail); + // then LocalData should be removed + assertEquals(threadLocalAccessor.getLocalDataCount(), 0); + } + + @Test(dataProvider = "provider") + public void testThreadGc(boolean fastThreadLocalThread, @Nullable DoublesUnion aggregateFail) throws Exception { + final var accessor = new ThreadLocalAccessor(); + getThread(fastThreadLocalThread, accessor::getLocalData).join(); + System.gc(); + // FastThreadLocalThread removes the LocalData from the map when the thread finishes + assertEquals(accessor.getLocalDataCount(), fastThreadLocalThread ? 0 : 1); + accessor.record(DoublesUnion.builder().build(), aggregateFail); + assertEquals(accessor.getLocalDataCount(), 0); + } + + private static Thread getThread(boolean fastThreadLocalThread, Runnable runnable) { + final var thread = fastThreadLocalThread ? new FastThreadLocalThread(runnable) : new Thread(runnable); + thread.start(); // when LocalData is created + return thread; + } +}