This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 00fff3e In JVM dtests need to clean up after instance shutdown
00fff3e is described below
commit 00fff3ee6e6c0142529de621bcaeee5790a0c235
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Jan 11 16:09:06 2019 +0000
In JVM dtests need to clean up after instance shutdown
Followup - isolate executions for a given node to an executor owned by the
instance
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-14922
---
.../async/NettyStreamingMessageSender.java | 2 +-
.../org/apache/cassandra/utils/FBUtilities.java | 21 +++--
.../org/apache/cassandra/distributed/Instance.java | 66 ++++++++++----
.../cassandra/distributed/InvokableInstance.java | 100 ++++++++++++++++++---
.../apache/cassandra/distributed/TestCluster.java | 39 ++------
5 files changed, 158 insertions(+), 70 deletions(-)
diff --git
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index 3fa80f5..8511a87 100644
---
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -511,7 +511,7 @@ public class NettyStreamingMessageSender implements
StreamingMessageSender
List<Future<Void>> futures = new
ArrayList<>(threadToChannelMap.size());
for (Channel channel : threadToChannelMap.values())
futures.add(channel.close());
- FBUtilities.waitOnFutures(futures, 10 * 1000);
+ FBUtilities.waitOnFutures(futures, 10, TimeUnit.SECONDS);
threadToChannelMap.clear();
fileTransferExecutor.shutdownNow();
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 8f30fce..f7aec8b 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -383,28 +383,37 @@ public class FBUtilities
public static <T> List<T> waitOnFutures(Iterable<? extends Future<?
extends T>> futures)
{
- return waitOnFutures(futures, -1);
+ return waitOnFutures(futures, -1, null);
}
/**
- * Block for a collection of futures, with an optional timeout for each
future.
+ * Block for a collection of futures, with optional timeout.
*
* @param futures
- * @param ms The number of milliseconds to wait on each future. If this
value is less than or equal to zero,
+ * @param timeout The number of units to wait in total. If this value is
less than or equal to zero,
* no tiemout value will be passed to {@link Future#get()}.
+ * @param units The units of timeout.
*/
- public static <T> List<T> waitOnFutures(Iterable<? extends Future<?
extends T>> futures, long ms)
+ public static <T> List<T> waitOnFutures(Iterable<? extends Future<?
extends T>> futures, long timeout, TimeUnit units)
{
+ long endNanos = 0;
+ if (timeout > 0)
+ endNanos = System.nanoTime() + units.toNanos(timeout);
List<T> results = new ArrayList<>();
Throwable fail = null;
for (Future<? extends T> f : futures)
{
try
{
- if (ms <= 0)
+ if (endNanos == 0)
+ {
results.add(f.get());
+ }
else
- results.add(f.get(ms, TimeUnit.MILLISECONDS));
+ {
+ long waitFor = Math.max(1, endNanos - System.nanoTime());
+ results.add(f.get(waitFor, TimeUnit.NANOSECONDS));
+ }
}
catch (Throwable t)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java
b/test/distributed/org/apache/cassandra/distributed/Instance.java
index c68b961..0c4a260 100644
--- a/test/distributed/org/apache/cassandra/distributed/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/Instance.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
@@ -88,7 +89,7 @@ public class Instance extends InvokableInstance
public Instance(InstanceConfig config, ClassLoader classLoader)
{
- super(classLoader);
+ super("node-" + config.num, classLoader);
this.config = config;
}
@@ -337,16 +338,15 @@ public class Instance extends InvokableInstance
void shutdown()
{
- runOnInstance(() -> {
+ acceptsOnInstance((ExecutorService executor) -> {
Throwable error = null;
- error = runAndMergeThrowable(error,
+ error = parallelRun(error, executor,
+ Gossiper.instance::stop,
CompactionManager.instance::forceShutdown,
BatchlogManager.instance::shutdown,
HintsService.instance::shutdownBlocking,
CommitLog.instance::shutdownBlocking,
- Gossiper.instance::stop,
SecondaryIndexManager::shutdownExecutors,
- MessagingService.instance()::shutdown,
ColumnFamilyStore::shutdownFlushExecutor,
ColumnFamilyStore::shutdownPostFlushExecutor,
ColumnFamilyStore::shutdownReclaimExecutor,
@@ -354,42 +354,70 @@ public class Instance extends InvokableInstance
PendingRangeCalculatorService.instance::shutdownExecutor,
BufferPool::shutdownLocalCleaner,
Ref::shutdownReferenceReaper,
- StageManager::shutdownAndWait,
- SharedExecutorPool.SHARED::shutdown,
Memtable.MEMORY_POOL::shutdown,
ScheduledExecutors::shutdownAndWait,
- SSTableReader::shutdownBlocking);
-
- error = shutdownAndWait(error,
ActiveRepairService.repairCommandExecutor);
+ SSTableReader::shutdownBlocking,
+ () ->
shutdownAndWait(ActiveRepairService.repairCommandExecutor)
+ );
+ error = parallelRun(error, executor,
+ MessagingService.instance()::shutdown
+ );
+ error = parallelRun(error, executor,
+ StageManager::shutdownAndWait,
+ SharedExecutorPool.SHARED::shutdown
+ );
LoggerContext loggerContext = (LoggerContext)
LoggerFactory.getILoggerFactory();
loggerContext.stop();
Throwables.maybeFail(error);
- });
+ }).accept(isolatedExecutor);
+ super.shutdown();
}
- private static Throwable shutdownAndWait(Throwable existing,
ExecutorService executor)
+ private static void shutdownAndWait(ExecutorService executor)
{
- return runAndMergeThrowable(existing, () -> {
+ try
+ {
executor.shutdownNow();
executor.awaitTermination(20, TimeUnit.SECONDS);
- assert executor.isTerminated() && executor.isShutdown() : executor;
- });
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ assert executor.isTerminated() && executor.isShutdown() : executor;
}
- private static Throwable runAndMergeThrowable(Throwable existing,
ThrowingRunnable ... runnables)
+ private static Throwable parallelRun(Throwable accumulate, ExecutorService
runOn, ThrowingRunnable ... runnables)
{
+ List<Future<Throwable>> results = new ArrayList<>();
for (ThrowingRunnable runnable : runnables)
{
+ results.add(runOn.submit(() -> {
+ try
+ {
+ runnable.run();
+ return null;
+ }
+ catch (Throwable t)
+ {
+ return t;
+ }
+ }));
+ }
+ for (Future<Throwable> future : results)
+ {
try
{
- runnable.run();
+ Throwable t = future.get();
+ if (t != null)
+ throw t;
}
catch (Throwable t)
{
- existing = Throwables.merge(existing, t);
+ accumulate = Throwables.merge(accumulate, t);
}
}
- return existing;
+ return accumulate;
}
public static interface ThrowingRunnable
diff --git
a/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
index f646ae1..9fb543d 100644
--- a/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -27,18 +28,26 @@ import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.Throwables;
+
public abstract class InvokableInstance
{
+ protected final ExecutorService isolatedExecutor;
private final ClassLoader classLoader;
private final Method deserializeOnInstance;
- public InvokableInstance(ClassLoader classLoader)
+ public InvokableInstance(String name, ClassLoader classLoader)
{
+ this.isolatedExecutor = Executors.newCachedThreadPool(new
NamedThreadFactory(name, Thread.NORM_PRIORITY, classLoader, new
ThreadGroup(name)));
this.classLoader = classLoader;
try
{
@@ -50,31 +59,34 @@ public abstract class InvokableInstance
}
}
- public interface SerializableCallable<T> extends Callable<T>, Serializable
{ public T call(); }
- public <T> SerializableCallable<T> callsOnInstance(SerializableCallable<T>
call) { return (SerializableCallable<T>) transferOneObject(call); }
+ public interface CallableNoExcept<T> extends Callable<T> { public T
call(); }
+ public interface SerializableCallable<T> extends CallableNoExcept<T>,
Serializable { }
+ public <T> CallableNoExcept<T> callsOnInstance(SerializableCallable<T>
call) { return invokesOnExecutor((SerializableCallable<T>)
transferOneObject(call), isolatedExecutor); }
public <T> T callOnInstance(SerializableCallable<T> call) { return
callsOnInstance(call).call(); }
public interface SerializableRunnable extends Runnable, Serializable {}
- public SerializableRunnable runsOnInstance(SerializableRunnable run) {
return (SerializableRunnable) transferOneObject(run); }
+ public Runnable runsOnInstance(SerializableRunnable run) { return
invokesOnExecutor((SerializableRunnable) transferOneObject(run),
isolatedExecutor); }
public void runOnInstance(SerializableRunnable run) {
runsOnInstance(run).run(); }
public interface SerializableConsumer<T> extends Consumer<T>, Serializable
{}
- public <T> SerializableConsumer<T>
acceptsOnInstance(SerializableConsumer<T> consumer) { return
(SerializableConsumer<T>) transferOneObject(consumer); }
+ public <T> Consumer<T> acceptsOnInstance(SerializableConsumer<T> consumer)
{ return invokesOnExecutor((SerializableConsumer<T>)
transferOneObject(consumer), isolatedExecutor); }
public interface SerializableBiConsumer<T1, T2> extends BiConsumer<T1,
T2>, Serializable {}
- public <T1, T2> SerializableBiConsumer<T1, T2>
acceptsOnInstance(SerializableBiConsumer<T1, T2> consumer) { return
(SerializableBiConsumer<T1, T2>) transferOneObject(consumer); }
+ public <T1, T2> BiConsumer<T1, T2>
acceptsOnInstance(SerializableBiConsumer<T1, T2> consumer) { return
invokesOnExecutor((SerializableBiConsumer<T1, T2>) transferOneObject(consumer),
isolatedExecutor); }
public interface SerializableFunction<I, O> extends Function<I, O>,
Serializable {}
- public <I, O> SerializableFunction<I, O>
appliesOnInstance(SerializableFunction<I, O> f) { return
(SerializableFunction<I, O>) transferOneObject(f); }
+ public <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O>
f) { return invokesOnExecutor((SerializableFunction<I, O>)
transferOneObject(f), isolatedExecutor); }
public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1,
I2, O>, Serializable {}
- public <I1, I2, O> SerializableBiFunction<I1, I2, O>
appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return
(SerializableBiFunction<I1, I2, O>) transferOneObject(f); }
+ public <I1, I2, O> BiFunction<I1, I2, O>
appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return
invokesOnExecutor((SerializableBiFunction<I1, I2, O>) transferOneObject(f),
isolatedExecutor); }
- public interface SerializableTriFunction<I1, I2, I3, O> extends
Serializable
+ public interface TriFunction<I1, I2, I3, O>
{
O apply(I1 i1, I2 i2, I3 i3);
}
- public <I1, I2, I3, O> SerializableTriFunction<I1, I2, I3, O>
appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return
(SerializableTriFunction<I1, I2, I3, O>) transferOneObject(f); }
+ public interface SerializableTriFunction<I1, I2, I3, O> extends
Serializable, TriFunction<I1, I2, I3, O> { }
+
+ public <I1, I2, I3, O> TriFunction<I1, I2, I3, O>
appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return
invokesOnExecutor((SerializableTriFunction<I1, I2, I3, O>)
transferOneObject(f), isolatedExecutor); }
public interface InstanceFunction<I, O> extends
SerializableBiFunction<Instance, I, O> {}
@@ -130,4 +142,72 @@ public abstract class InvokableInstance
}
}
+ private static <V> CallableNoExcept<V>
invokesOnExecutor(SerializableCallable<V> callable, ExecutorService invokeOn)
+ {
+ return () -> {
+ try
+ {
+ return invokeOn.submit(callable).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwables.maybeFail(e.getCause());
+ throw new AssertionError();
+ }
+ };
+ }
+
+ private static Runnable invokesOnExecutor(SerializableRunnable runnable,
ExecutorService invokeOn)
+ {
+ return () -> {
+ try
+ {
+ invokeOn.submit(runnable).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (ExecutionException e)
+ {
+ Throwables.maybeFail(e.getCause());
+ throw new AssertionError();
+ }
+ };
+ }
+
+ private static <A> Consumer<A> invokesOnExecutor(SerializableConsumer<A>
consumer, ExecutorService invokeOn)
+ {
+ return (a) -> invokesOnExecutor(() -> consumer.accept(a),
invokeOn).run();
+ }
+
+ private static <A, B> BiConsumer<A, B>
invokesOnExecutor(SerializableBiConsumer<A, B> consumer, ExecutorService
invokeOn)
+ {
+ return (a, b) -> invokesOnExecutor(() -> consumer.accept(a, b),
invokeOn).run();
+ }
+
+ private static <A, B> Function<A, B>
invokesOnExecutor(SerializableFunction<A, B> f, ExecutorService invokeOn)
+ {
+ return (a) -> invokesOnExecutor(() -> f.apply(a), invokeOn).call();
+ }
+
+ private static <A, B, C> BiFunction<A, B, C>
invokesOnExecutor(SerializableBiFunction<A, B, C> f, ExecutorService invokeOn)
+ {
+ return (a, b) -> invokesOnExecutor(() -> f.apply(a, b),
invokeOn).call();
+ }
+
+ private static <A, B, C, D> SerializableTriFunction<A, B, C, D>
invokesOnExecutor(SerializableTriFunction<A, B, C, D> f, ExecutorService
invokeOn)
+ {
+ return (a, b, c) -> invokesOnExecutor(() -> f.apply(a, b, c),
invokeOn).call();
+ }
+
+ void shutdown()
+ {
+ isolatedExecutor.shutdownNow();
+ }
+
}
diff --git a/test/distributed/org/apache/cassandra/distributed/TestCluster.java
b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
index cc6cf81..5056661 100644
--- a/test/distributed/org/apache/cassandra/distributed/TestCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
@@ -31,22 +31,14 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import com.google.common.collect.Sets;
-import io.netty.util.concurrent.FastThreadLocal;
-import io.netty.util.concurrent.FastThreadLocalThread;
-import io.netty.util.internal.InternalThreadLocalMap;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
import org.apache.cassandra.db.Keyspace;
@@ -84,8 +76,6 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
*/
public class TestCluster implements AutoCloseable
{
- private final ExecutorService exec = Executors.newCachedThreadPool(new
NamedThreadFactory("cluster-async-tasks"));
-
private final File root;
private final List<Instance> instances;
private final Coordinator coordinator;
@@ -104,7 +94,7 @@ public class TestCluster implements AutoCloseable
void launch()
{
FBUtilities.waitOnFutures(instances.stream()
- .map(i -> exec.submit(() -> i.launch(this)))
+ .map(i -> i.isolatedExecutor.submit(() -> i.launch(this)))
.collect(Collectors.toList())
);
for (Instance instance : instances)
@@ -276,36 +266,17 @@ public class TestCluster implements AutoCloseable
}
@Override
- public void close() throws InterruptedException, TimeoutException,
ExecutionException
+ public void close()
{
List<Future<?>> futures = instances.stream()
- .map(i -> exec.submit(i::shutdown))
+ .map(i -> i.isolatedExecutor.submit(i::shutdown))
.collect(Collectors.toList());
// Make sure to only delete directory when threads are stopped
- Future combined = exec.submit(() -> {
- FBUtilities.waitOnFutures(futures);
- FileUtils.deleteRecursive(root);
- });
-
- combined.get(60, TimeUnit.SECONDS);
-
- exec.shutdownNow();
- exec.awaitTermination(10, TimeUnit.SECONDS);
+ FBUtilities.waitOnFutures(futures, 60, TimeUnit.SECONDS);
+ FileUtils.deleteRecursive(root);
//withThreadLeakCheck(futures);
- Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
- for (Thread thread : threadSet)
- {
- if (thread instanceof FastThreadLocalThread)
- ((FastThreadLocalThread)thread).setThreadLocalMap(null);
- }
-
- InternalThreadLocalMap.remove();
- InternalThreadLocalMap.destroy();
-
- FastThreadLocal.removeAll();
- FastThreadLocal.destroy();
System.gc();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]