szetszwo commented on code in PR #1023: URL: https://github.com/apache/ratis/pull/1023#discussion_r1471690348
########## ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java: ########## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * A utility to detect leaks from @{@link ReferenceCountedObject}. + */ +final class ReferenceCountedLeakDetector { + private static final Logger LOG = LoggerFactory.getLogger(ReferenceCountedLeakDetector.class); + // Leak detection is turned off by default. + + private static final AtomicReference<Mode> FACTORY = new AtomicReference<>(Mode.NONE); + private static final Supplier<LeakDetector> SUPPLIER + = MemoizedSupplier.valueOf(() -> new LeakDetector(FACTORY.get().name()).start()); + + static Factory getFactory() { + return FACTORY.get(); + } + + static LeakDetector getLeakDetector() { + return SUPPLIER.get(); + } + + private ReferenceCountedLeakDetector() { + } + + static synchronized void enable(boolean advanced) { + FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE); + } + + interface Factory { + <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod); + } + + private enum Mode implements Factory { + /** Leak detector is not enable in production to avoid performance impacts. */ + NONE { + public <V> ReferenceCountedObject<V> create(V value, Runnable retainMethod, Consumer<Boolean> releaseMethod) { Review Comment: Add `@Override`. ########## ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java: ########## @@ -75,4 +85,26 @@ protected void blockQueueAndSetDelay(String leaderId, int delayMs) RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection, leaderId, delayMs, getTimeoutMax()); } + + @Override + public void shutdown() { + super.shutdown(); + // GC to ensure leak detection work. + System.gc(); Review Comment: ` System.gc()` is best effort only. The second answer of https://stackoverflow.com/questions/1481178/how-to-force-garbage-collection-in-java suggests using a `WeakReference` and a loop to wait until a real gc happens. It seems a good idea. ```java static void gc() throws InterruptedException { // use WeakReference to detect gc Object obj = new Object(); final WeakReference<Object> weakRef = new WeakReference<>(obj); obj = null; // loop until gc has completed. for (int i = 0; weakRef.get() != null; i++) { LOG.info("gc {}", i); System.gc(); Thread.sleep(100); } } ``` ########## ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java: ########## @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.ref.ReferenceQueue; +import java.lang.ref.WeakReference; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Simple general resource leak detector using {@link ReferenceQueue} and {@link java.lang.ref.WeakReference} to + * observe resource object life-cycle and assert proper resource closure before they are GCed. + * + * <p> + * Example usage: + * + * <pre> {@code + * class MyResource implements AutoClosable { + * static final LeakDetector LEAK_DETECTOR = new LeakDetector("MyResource"); + * + * private final UncheckedAutoCloseable leakTracker = LEAK_DETECTOR.track(this, () -> { + * // report leaks, don't refer to the original object (MyResource) here. + * System.out.println("MyResource is not closed before being discarded."); + * }); + * + * @Override + * public void close() { + * // proper resources cleanup... + * // inform tracker that this object is closed properly. + * leakTracker.close(); + * } + * } + * + * }</pre> + */ +public class LeakDetector { + private static final Logger LOG = LoggerFactory.getLogger(LeakDetector.class); + private static final AtomicLong COUNTER = new AtomicLong(); + private final ReferenceQueue<Object> queue = new ReferenceQueue<>(); + private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final String name; + + public LeakDetector(String name) { + this.name = name + COUNTER.getAndIncrement(); + } + + LeakDetector start() { + Thread t = new Thread(this::run); + t.setName(LeakDetector.class.getSimpleName() + "-" + name); + t.setDaemon(true); + LOG.info("Starting leak detector thread {}.", name); + t.start(); + return this; + } + + private void run() { + while (true) { + try { + LeakTracker tracker = (LeakTracker) queue.remove(); + // Original resource already been GCed, if tracker is not closed yet, + // report a leak. + if (allLeaks.remove(tracker)) { + tracker.reportLeak(); + } + } catch (InterruptedException e) { + LOG.warn("Thread interrupted, exiting.", e); + break; + } + } + + LOG.warn("Exiting leak detector {}.", name); + } + + public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) { + // A rate filter can be put here to only track a subset of all objects, e.g. 5%, 10%, + // if we have proofs that leak tracking impacts performance, or a single LeakDetector + // thread can't keep up with the pace of object allocation. + // For now, it looks effective enough and let keep it simple. + LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks, reportLeak); + allLeaks.add(tracker); + return tracker; + } + Review Comment: Let's add an `assertNoLeaks` method and use it in the test. ```java public void assertNoLeaks() { Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString); } String allLeaksString() { if (allLeaks.isEmpty()) { return "allLeaks = <empty>"; } allLeaks.forEach(LeakTracker::reportLeak); return "allLeaks.size = " + allLeaks.size(); } ``` ########## ratis-grpc/src/main/java/org/apache/ratis/grpc/metrics/ZeroCopyMetrics.java: ########## @@ -55,4 +56,21 @@ public void onReleasedMessage(AbstractMessage ignored) { releasedMessages.inc(); } + @VisibleForTesting + public long zeroCopyMessages() { + return zeroCopyMessages.getCount(); + } + + @VisibleForTesting + public long nonZeroCopyMessages() { + return nonZeroCopyMessages.getCount(); + } + + @VisibleForTesting + public long releasedMessages() { + return releasedMessages.getCount(); + } + + public void reset() { + } Review Comment: Let's remove this unused method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
