This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a15bde173 RATIS-2164. LeakDetector has a race condition. (#1163)
a15bde173 is described below
commit a15bde173573eb3ec2dda90f0d99204bf7056eba
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Oct 16 16:49:24 2024 -0700
RATIS-2164. LeakDetector has a race condition. (#1163)
---
.../java/org/apache/ratis/util/LeakDetector.java | 122 +++++++---
.../ratis/util/ReferenceCountedLeakDetector.java | 248 ++++++++++++++++-----
.../apache/ratis/util/ReferenceCountedObject.java | 9 -
.../apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 5 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 38 +++-
5 files changed, 320 insertions(+), 102 deletions(-)
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
index 82202f288..a6b2ec28b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/LeakDetector.java
@@ -22,10 +22,14 @@ import org.slf4j.LoggerFactory;
import java.lang.ref.ReferenceQueue;
import java.lang.ref.WeakReference;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
/**
* Simple general resource leak detector using {@link ReferenceQueue} and
{@link java.lang.ref.WeakReference} to
@@ -55,13 +59,61 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public class LeakDetector {
private static final Logger LOG =
LoggerFactory.getLogger(LeakDetector.class);
+
+ private static class LeakTrackerSet {
+ private final Set<LeakTracker> set = Collections.newSetFromMap(new
HashMap<>());
+
+ synchronized boolean remove(LeakTracker tracker) {
+ return set.remove(tracker);
+ }
+
+ synchronized void removeExisting(LeakTracker tracker) {
+ final boolean removed = set.remove(tracker);
+ Preconditions.assertTrue(removed, () -> "Failed to remove existing " +
tracker);
+ }
+
+ synchronized LeakTracker add(Object referent, ReferenceQueue<Object>
queue, Supplier<String> leakReporter) {
+ final LeakTracker tracker = new LeakTracker(referent, queue,
this::removeExisting, leakReporter);
+ final boolean added = set.add(tracker);
+ Preconditions.assertTrue(added, () -> "Failed to add " + tracker + " for
" + referent);
+ return tracker;
+ }
+
+ synchronized int getNumLeaks(boolean throwException) {
+ if (set.isEmpty()) {
+ return 0;
+ }
+
+ int n = 0;
+ for (LeakTracker tracker : set) {
+ if (tracker.reportLeak() != null) {
+ n++;
+ }
+ }
+ if (throwException) {
+ assertNoLeaks(n);
+ }
+ return n;
+ }
+
+ synchronized void assertNoLeaks(int leaks) {
+ Preconditions.assertTrue(leaks == 0, () -> {
+ final int size = set.size();
+ return "#leaks = " + leaks + " > 0, #leaks " + (leaks == size? "==" :
"!=") + " set.size = " + size;
+ });
+ }
+ }
+
private static final AtomicLong COUNTER = new AtomicLong();
private final ReferenceQueue<Object> queue = new ReferenceQueue<>();
- private final Set<LeakTracker> allLeaks = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+ /** All the {@link LeakTracker}s. */
+ private final LeakTrackerSet trackers = new LeakTrackerSet();
+ /** When a leak is discovered, a message is printed and added to this list.
*/
+ private final List<String> leakMessages = Collections.synchronizedList(new
ArrayList<>());
private final String name;
- public LeakDetector(String name) {
+ LeakDetector(String name) {
this.name = name + COUNTER.getAndIncrement();
}
@@ -80,8 +132,11 @@ public class LeakDetector {
LeakTracker tracker = (LeakTracker) queue.remove();
// Original resource already been GCed, if tracker is not closed yet,
// report a leak.
- if (allLeaks.remove(tracker)) {
- tracker.reportLeak();
+ if (trackers.remove(tracker)) {
+ final String leak = tracker.reportLeak();
+ if (leak != null) {
+ leakMessages.add(leak);
+ }
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -93,48 +148,51 @@ public class LeakDetector {
LOG.warn("Exiting leak detector {}.", name);
}
- public UncheckedAutoCloseable track(Object leakable, Runnable reportLeak) {
- // A rate filter can be put here to only track a subset of all objects,
e.g. 5%, 10%,
+ Runnable track(Object leakable, Supplier<String> reportLeak) {
+ // TODO: A rate filter can be put here to only track a subset of all
objects, e.g. 5%, 10%,
// if we have proofs that leak tracking impacts performance, or a single
LeakDetector
// thread can't keep up with the pace of object allocation.
// For now, it looks effective enough and let keep it simple.
- LeakTracker tracker = new LeakTracker(leakable, queue, allLeaks,
reportLeak);
- allLeaks.add(tracker);
- return tracker;
+ return trackers.add(leakable, queue, reportLeak)::remove;
}
- public void assertNoLeaks() {
- Preconditions.assertTrue(allLeaks.isEmpty(), this::allLeaksString);
- }
+ public void assertNoLeaks(int maxRetries, TimeDuration retrySleep) throws
InterruptedException {
+ synchronized (leakMessages) {
+ // leakMessages are all the leaks discovered so far.
+ Preconditions.assertTrue(leakMessages.isEmpty(),
+ () -> "#leaks = " + leakMessages.size() + "\n" + leakMessages);
+ }
- String allLeaksString() {
- if (allLeaks.isEmpty()) {
- return "allLeaks = <empty>";
+ for(int i = 0; i < maxRetries; i++) {
+ final int numLeaks = trackers.getNumLeaks(false);
+ if (numLeaks == 0) {
+ return;
+ }
+ LOG.warn("{}/{}) numLeaks == {} > 0, will wait and retry ...", i,
maxRetries, numLeaks);
+ retrySleep.sleep();
}
- allLeaks.forEach(LeakTracker::reportLeak);
- return "allLeaks.size = " + allLeaks.size();
+ trackers.getNumLeaks(true);
}
- private static final class LeakTracker extends WeakReference<Object>
implements UncheckedAutoCloseable {
- private final Set<LeakTracker> allLeaks;
- private final Runnable leakReporter;
+ private static final class LeakTracker extends WeakReference<Object> {
+ private final Consumer<LeakTracker> removeMethod;
+ private final Supplier<String> getLeakMessage;
+
LeakTracker(Object referent, ReferenceQueue<Object> referenceQueue,
- Set<LeakTracker> allLeaks, Runnable leakReporter) {
+ Consumer<LeakTracker> removeMethod, Supplier<String> getLeakMessage) {
super(referent, referenceQueue);
- this.allLeaks = allLeaks;
- this.leakReporter = leakReporter;
+ this.removeMethod = removeMethod;
+ this.getLeakMessage = getLeakMessage;
}
- /**
- * Called by the tracked resource when closing.
- */
- @Override
- public void close() {
- allLeaks.remove(this);
+ /** Called by the tracked resource when the object is completely released.
*/
+ void remove() {
+ removeMethod.accept(this);
}
- void reportLeak() {
- leakReporter.run();
+ /** @return the leak message if there is a leak; return null if there is
no leak. */
+ String reportLeak() {
+ return getLeakMessage.get();
}
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java
index 32abe805f..acf6fb8cf 100644
---
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java
+++
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java
@@ -20,7 +20,7 @@ package org.apache.ratis.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.LinkedList;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -49,7 +49,7 @@ public final class ReferenceCountedLeakDetector {
private ReferenceCountedLeakDetector() {
}
- static synchronized void enable(boolean advanced) {
+ public static synchronized void enable(boolean advanced) {
FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE);
}
@@ -108,6 +108,10 @@ public final class ReferenceCountedLeakDetector {
return value;
}
+ final int getCount() {
+ return count.get();
+ }
+
@Override
public V retain() {
// n < 0: exception
@@ -138,85 +142,221 @@ public final class ReferenceCountedLeakDetector {
}
private static class SimpleTracing<T> extends Impl<T> {
- private final UncheckedAutoCloseable leakTracker;
+ private final LeakDetector leakDetector;
+ private final Class<?> valueClass;
+ private String valueString = null;
+ private Runnable removeMethod = null;
SimpleTracing(T value, Runnable retainMethod, Consumer<Boolean>
releaseMethod, LeakDetector leakDetector) {
super(value, retainMethod, releaseMethod);
- final Class<?> clazz = value.getClass();
- this.leakTracker = leakDetector.track(this,
- () -> LOG.warn("LEAK: A {} is not released properly",
clazz.getName()));
+ this.valueClass = value.getClass();
+ this.leakDetector = leakDetector;
+ }
+
+ String getTraceString(int count) {
+ return "(" + valueClass + ", count=" + count + ", value=" + valueString
+ ")";
+ }
+
+ /** @return the leak message if there is a leak; return null if there is
no leak. */
+ String logLeakMessage() {
+ final int count = getCount();
+ if (count == 0) { // never retain
+ return null;
+ }
+ final String message = "LEAK: " + getTraceString(count);
+ LOG.warn(message);
+ return message;
}
@Override
- public boolean release() {
- boolean released = super.release();
+ public synchronized T get() {
+ try {
+ return super.get();
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to get: " +
getTraceString(getCount()), e);
+ }
+ }
+
+ @Override
+ public synchronized T retain() {
+ final T value;
+ try {
+ value = super.retain();
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to retain: " +
getTraceString(getCount()), e);
+ }
+ if (getCount() == 1) { // this is the first retain
+ this.removeMethod = leakDetector.track(this, this::logLeakMessage);
+ this.valueString = value.toString();
+ }
+ return value;
+ }
+
+ @Override
+ public synchronized boolean release() {
+ final boolean released;
+ try {
+ released = super.release();
+ } catch (Exception e) {
+ throw new IllegalStateException("Failed to release: " +
getTraceString(getCount()), e);
+ }
+
if (released) {
- leakTracker.close();
+ Preconditions.assertNotNull(removeMethod, () -> "Not yet retained
(removeMethod == null): " + valueClass);
+ removeMethod.run();
}
return released;
}
}
- private static class AdvancedTracing<T> extends Impl<T> {
- private final UncheckedAutoCloseable leakTracker;
- private final List<StackTraceElement[]> retainsTraces;
- private final List<StackTraceElement[]> releaseTraces;
+ private static class AdvancedTracing<T> extends SimpleTracing<T> {
+ enum Op {CREATION, RETAIN, RELEASE, CURRENT}
- AdvancedTracing(T value, Runnable retainMethod, Consumer<Boolean>
releaseMethod, LeakDetector leakDetector) {
- super(value, retainMethod, releaseMethod);
+ static class Counts {
+ private final int refCount;
+ private final int retainCount;
+ private final int releaseCount;
+
+ Counts() {
+ this.refCount = 0;
+ this.retainCount = 0;
+ this.releaseCount = 0;
+ }
+
+ Counts(Op op, Counts previous) {
+ if (op == Op.RETAIN) {
+ this.refCount = previous.refCount + 1;
+ this.retainCount = previous.retainCount + 1;
+ this.releaseCount = previous.releaseCount;
+ } else if (op == Op.RELEASE) {
+ this.refCount = previous.refCount - 1;
+ this.retainCount = previous.retainCount;
+ this.releaseCount = previous.releaseCount + 1;
+ } else {
+ throw new IllegalStateException("Unexpected op: " + op);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "refCount=" + refCount
+ + ", retainCount=" + retainCount
+ + ", releaseCount=" + releaseCount;
+ }
+ }
+
+ static class TraceInfo {
+ private final int id;
+ private final Op op;
+ private final int previousRefCount;
+ private final Counts counts;
+
+ private final String threadInfo;
+ private final StackTraceElement[] stackTraces;
+ private final int newTraceElementIndex;
+
+ TraceInfo(int id, Op op, TraceInfo previous, int previousRefCount) {
+ this.id = id;
+ this.op = op;
+ this.previousRefCount = previousRefCount;
+ this.counts = previous == null? new Counts()
+ : op == Op.CURRENT ? previous.counts
+ : new Counts(op, previous.counts);
+
+ final Thread thread = Thread.currentThread();
+ this.threadInfo = "Thread_" + thread.getId() + ":" + thread.getName();
+ this.stackTraces = thread.getStackTrace();
+ this.newTraceElementIndex = previous == null? stackTraces.length - 1
+ : findFirstUnequalFromTail(this.stackTraces, previous.stackTraces);
+ }
+
+ static <T> int findFirstUnequalFromTail(T[] current, T[] previous) {
+ int c = current.length - 1;
+ for(int p = previous.length - 1; p >= 0; p--, c--) {
+ if (!previous[p].equals(current[c])) {
+ return c;
+ }
+ }
+ return -1;
+ }
+
+ private StringBuilder appendTo(StringBuilder b) {
+ b.append(op).append("_").append(id)
+ .append(": previousRefCount=").append(previousRefCount)
+ .append(", ").append(counts)
+ .append(", ").append(threadInfo).append("\n");
+ final int n = newTraceElementIndex + 1;
+ int line = 3;
+ for (; line <= n && line < stackTraces.length; line++) {
+ b.append(" ").append(stackTraces[line]).append("\n");
+ }
+ if (line < stackTraces.length) {
+ b.append(" ...\n");
+ }
+ return b;
+ }
+
+ @Override
+ public String toString() {
+ return appendTo(new StringBuilder()).toString();
+ }
+ }
- StackTraceElement[] createStrace =
Thread.currentThread().getStackTrace();
- final Class<?> clazz = value.getClass();
- final List<StackTraceElement[]> localRetainsTraces = new LinkedList<>();
- final List<StackTraceElement[]> localReleaseTraces = new LinkedList<>();
+ private final List<TraceInfo> traceInfos = new ArrayList<>();
+ private TraceInfo previous;
- this.leakTracker = leakDetector.track(this, () ->
- LOG.warn("LEAK: A {} is not released properly.\nCreation
trace:\n{}\n" +
- "Retain traces({}):\n{}\nRelease traces({}):\n{}",
- clazz.getName(), formatStackTrace(createStrace, 3),
- localRetainsTraces.size(), formatStackTraces(localRetainsTraces,
2),
- localReleaseTraces.size(), formatStackTraces(localReleaseTraces,
2)));
+ AdvancedTracing(T value, Runnable retainMethod, Consumer<Boolean>
releaseMethod, LeakDetector leakDetector) {
+ super(value, retainMethod, releaseMethod, leakDetector);
+ addTraceInfo(Op.CREATION, -1);
+ }
- this.retainsTraces = localRetainsTraces;
- this.releaseTraces = localReleaseTraces;
+ private synchronized TraceInfo addTraceInfo(Op op, int previousRefCount) {
+ final TraceInfo current = new TraceInfo(traceInfos.size(), op, previous,
previousRefCount);
+ traceInfos.add(current);
+ previous = current;
+ return current;
}
+
@Override
- public T retain() {
- T retain = super.retain();
- retainsTraces.add(Thread.currentThread().getStackTrace());
- return retain;
+ public synchronized T retain() {
+ final int previousRefCount = getCount();
+ final T retained = super.retain();
+ final TraceInfo info = addTraceInfo(Op.RETAIN, previousRefCount);
+ Preconditions.assertSame(getCount(), info.counts.refCount, "refCount");
+ return retained;
}
@Override
- public boolean release() {
- boolean released = super.release();
- if (released) {
- leakTracker.close();
- }
- releaseTraces.add(Thread.currentThread().getStackTrace());
+ public synchronized boolean release() {
+ final int previousRefCount = getCount();
+ final boolean released = super.release();
+ final TraceInfo info = addTraceInfo(Op.RELEASE, previousRefCount);
+ final int count = getCount();
+ final int expected = count == -1? 0 : count;
+ Preconditions.assertSame(expected, info.counts.refCount, "refCount");
return released;
}
- }
- private static String formatStackTrace(StackTraceElement[] stackTrace, int
startIdx) {
- final StringBuilder sb = new StringBuilder();
- for (int line = startIdx; line < stackTrace.length; line++) {
- sb.append(stackTrace[line]).append("\n");
+ @Override
+ synchronized String getTraceString(int count) {
+ return super.getTraceString(count) + getTraceInfosString();
}
- return sb.toString();
- }
- private static String formatStackTraces(List<StackTraceElement[]>
stackTraces, int startIdx) {
- final StringBuilder sb = new StringBuilder();
- stackTraces.forEach(stackTrace -> {
- if (sb.length() > 0) {
- sb.append("\n");
- }
- for (int line = startIdx; line < stackTrace.length; line++) {
- sb.append(stackTrace[line]).append("\n");
+ private String getTraceInfosString() {
+ final int n = traceInfos.size();
+ final StringBuilder b = new StringBuilder(n << 10).append("
#TraceInfos=").append(n);
+ TraceInfo last = null;
+ for (TraceInfo info : traceInfos) {
+ info.appendTo(b.append("\n"));
+ last = info;
}
- });
- return sb.toString();
+
+ // append current track info
+ final TraceInfo current = new TraceInfo(n, Op.CURRENT, last, getCount());
+ current.appendTo(b.append("\n"));
+
+ return b.toString();
+ }
}
}
diff --git
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
index b2c53182d..1fc72c344 100644
---
a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
+++
b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java
@@ -182,13 +182,4 @@ public interface ReferenceCountedObject<T> {
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod,
Runnable releaseMethod) {
return wrap(value, retainMethod, ignored -> releaseMethod.run());
}
-
- static void enableLeakDetection() {
- ReferenceCountedLeakDetector.enable(false);
- }
-
- static void enableAdvancedLeakDetection() {
- ReferenceCountedLeakDetector.enable(true);
- }
-
}
diff --git
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 47f9e1d4b..fe12e29f1 100644
---
a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++
b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -32,7 +32,7 @@ import
org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.util.NetUtils;
-import org.apache.ratis.util.ReferenceCountedObject;
+import org.apache.ratis.util.ReferenceCountedLeakDetector;
import org.junit.Assert;
import java.util.Optional;
@@ -51,7 +51,8 @@ public class MiniRaftClusterWithGrpc extends
MiniRaftCluster.RpcBase {
};
static {
- ReferenceCountedObject.enableLeakDetection();
+ // TODO move it to MiniRaftCluster for detecting non-gRPC cases
+ ReferenceCountedLeakDetector.enable(false);
}
public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
diff --git
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 9a54700ec..86ebfa52c 100644
---
a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++
b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -136,17 +136,27 @@ public abstract class MiniRaftCluster implements
Closeable {
final StackTraceElement caller =
JavaUtils.getCallerStackTraceElement();
LOG.info("Running " + caller.getMethodName());
final CLUSTER cluster = newCluster(numServers, numListeners);
+ Throwable failed = null;
try {
if (startCluster) {
cluster.start();
}
testCase.accept(cluster);
- } catch(Exception t) {
+ } catch(Throwable t) {
LOG.info(cluster.printServers());
LOG.error("Failed " + caller, t);
+ failed = t;
throw t;
} finally {
- cluster.shutdown();
+ try {
+ cluster.shutdown();
+ } catch (Exception e) {
+ if (failed == null) {
+ throw e;
+ } else {
+ failed.addSuppressed(e);
+ }
+ }
}
}
@@ -847,10 +857,24 @@ public abstract class MiniRaftCluster implements
Closeable {
final ExecutorService executor =
Executors.newFixedThreadPool(servers.size(), (t) ->
Daemon.newBuilder().setName("MiniRaftCluster-" +
THREAD_COUNT.incrementAndGet()).setRunnable(t).build());
getServers().forEach(proxy -> executor.submit(() ->
JavaUtils.runAsUnchecked(proxy::close)));
+ final int maxRetries = 30;
+ final TimeDuration retrySleep = TimeDuration.ONE_SECOND;
try {
executor.shutdown();
// just wait for a few seconds
- executor.awaitTermination(5, TimeUnit.SECONDS);
+ boolean terminated = false;
+
+ for(int i = 0; i < maxRetries && !terminated; ) {
+ terminated = executor.awaitTermination(retrySleep.getDuration(),
retrySleep.getUnit());
+ if (!terminated) {
+ i++;
+ if (i < maxRetries) {
+ LOG.warn("{}/{}: Not yet able to shutdown executor, will wait
again ...", i, maxRetries);
+ } else {
+ LOG.error("Failed to shutdown executor, some servers may be still
running:\n{}", printServers());
+ }
+ }
+ }
} catch (InterruptedException e) {
LOG.warn("shutdown interrupted", e);
Thread.currentThread().interrupt();
@@ -864,9 +888,13 @@ public abstract class MiniRaftCluster implements Closeable
{
try {
RaftTestUtil.gc();
} catch (InterruptedException e) {
- LOG.info("gc interrupted.");
+ LOG.warn("gc interrupted.", e);
+ }
+ try {
+ ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks(maxRetries,
retrySleep);
+ } catch (InterruptedException e) {
+ LOG.warn("LeakDetector interrupted.", e);
}
- ReferenceCountedLeakDetector.getLeakDetector().assertNoLeaks();
}
/**