This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 00fff3e  In JVM dtests need to clean up after instance shutdown
00fff3e is described below

commit 00fff3ee6e6c0142529de621bcaeee5790a0c235
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Jan 11 16:09:06 2019 +0000

    In JVM dtests need to clean up after instance shutdown
    
    Followup - isolate executions for a given node to an executor owned by the 
instance
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-14922
---
 .../async/NettyStreamingMessageSender.java         |   2 +-
 .../org/apache/cassandra/utils/FBUtilities.java    |  21 +++--
 .../org/apache/cassandra/distributed/Instance.java |  66 ++++++++++----
 .../cassandra/distributed/InvokableInstance.java   | 100 ++++++++++++++++++---
 .../apache/cassandra/distributed/TestCluster.java  |  39 ++------
 5 files changed, 158 insertions(+), 70 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
index 3fa80f5..8511a87 100644
--- 
a/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
+++ 
b/src/java/org/apache/cassandra/streaming/async/NettyStreamingMessageSender.java
@@ -511,7 +511,7 @@ public class NettyStreamingMessageSender implements 
StreamingMessageSender
         List<Future<Void>> futures = new 
ArrayList<>(threadToChannelMap.size());
         for (Channel channel : threadToChannelMap.values())
             futures.add(channel.close());
-        FBUtilities.waitOnFutures(futures, 10 * 1000);
+        FBUtilities.waitOnFutures(futures, 10, TimeUnit.SECONDS);
         threadToChannelMap.clear();
         fileTransferExecutor.shutdownNow();
 
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index 8f30fce..f7aec8b 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -383,28 +383,37 @@ public class FBUtilities
 
     public static <T> List<T> waitOnFutures(Iterable<? extends Future<? 
extends T>> futures)
     {
-        return waitOnFutures(futures, -1);
+        return waitOnFutures(futures, -1, null);
     }
 
     /**
-     * Block for a collection of futures, with an optional timeout for each 
future.
+     * Block for a collection of futures, with optional timeout.
      *
      * @param futures
-     * @param ms The number of milliseconds to wait on each future. If this 
value is less than or equal to zero,
+     * @param timeout The number of units to wait in total. If this value is 
less than or equal to zero,
      *           no tiemout value will be passed to {@link Future#get()}.
+     * @param units The units of timeout.
      */
-    public static <T> List<T> waitOnFutures(Iterable<? extends Future<? 
extends T>> futures, long ms)
+    public static <T> List<T> waitOnFutures(Iterable<? extends Future<? 
extends T>> futures, long timeout, TimeUnit units)
     {
+        long endNanos = 0;
+        if (timeout > 0)
+            endNanos = System.nanoTime() + units.toNanos(timeout);
         List<T> results = new ArrayList<>();
         Throwable fail = null;
         for (Future<? extends T> f : futures)
         {
             try
             {
-                if (ms <= 0)
+                if (endNanos == 0)
+                {
                     results.add(f.get());
+                }
                 else
-                    results.add(f.get(ms, TimeUnit.MILLISECONDS));
+                {
+                    long waitFor = Math.max(1, endNanos - System.nanoTime());
+                    results.add(f.get(waitFor, TimeUnit.NANOSECONDS));
+                }
             }
             catch (Throwable t)
             {
diff --git a/test/distributed/org/apache/cassandra/distributed/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/Instance.java
index c68b961..0c4a260 100644
--- a/test/distributed/org/apache/cassandra/distributed/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/Instance.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
 
@@ -88,7 +89,7 @@ public class Instance extends InvokableInstance
 
     public Instance(InstanceConfig config, ClassLoader classLoader)
     {
-        super(classLoader);
+        super("node-" + config.num, classLoader);
         this.config = config;
     }
 
@@ -337,16 +338,15 @@ public class Instance extends InvokableInstance
 
     void shutdown()
     {
-        runOnInstance(() -> {
+        acceptsOnInstance((ExecutorService executor) -> {
             Throwable error = null;
-            error = runAndMergeThrowable(error,
+            error = parallelRun(error, executor,
+                    Gossiper.instance::stop,
                     CompactionManager.instance::forceShutdown,
                     BatchlogManager.instance::shutdown,
                     HintsService.instance::shutdownBlocking,
                     CommitLog.instance::shutdownBlocking,
-                    Gossiper.instance::stop,
                     SecondaryIndexManager::shutdownExecutors,
-                    MessagingService.instance()::shutdown,
                     ColumnFamilyStore::shutdownFlushExecutor,
                     ColumnFamilyStore::shutdownPostFlushExecutor,
                     ColumnFamilyStore::shutdownReclaimExecutor,
@@ -354,42 +354,70 @@ public class Instance extends InvokableInstance
                     PendingRangeCalculatorService.instance::shutdownExecutor,
                     BufferPool::shutdownLocalCleaner,
                     Ref::shutdownReferenceReaper,
-                    StageManager::shutdownAndWait,
-                    SharedExecutorPool.SHARED::shutdown,
                     Memtable.MEMORY_POOL::shutdown,
                     ScheduledExecutors::shutdownAndWait,
-                    SSTableReader::shutdownBlocking);
-
-            error = shutdownAndWait(error, 
ActiveRepairService.repairCommandExecutor);
+                    SSTableReader::shutdownBlocking,
+                    () -> 
shutdownAndWait(ActiveRepairService.repairCommandExecutor)
+            );
+            error = parallelRun(error, executor,
+                                MessagingService.instance()::shutdown
+            );
+            error = parallelRun(error, executor,
+                                StageManager::shutdownAndWait,
+                                SharedExecutorPool.SHARED::shutdown
+            );
             LoggerContext loggerContext = (LoggerContext) 
LoggerFactory.getILoggerFactory();
             loggerContext.stop();
             Throwables.maybeFail(error);
-        });
+        }).accept(isolatedExecutor);
+        super.shutdown();
     }
 
-    private static Throwable shutdownAndWait(Throwable existing, 
ExecutorService executor)
+    private static void shutdownAndWait(ExecutorService executor)
     {
-        return runAndMergeThrowable(existing, () -> {
+        try
+        {
             executor.shutdownNow();
             executor.awaitTermination(20, TimeUnit.SECONDS);
-            assert executor.isTerminated() && executor.isShutdown() : executor;
-        });
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        assert executor.isTerminated() && executor.isShutdown() : executor;
     }
 
-    private static Throwable runAndMergeThrowable(Throwable existing, 
ThrowingRunnable ... runnables)
+    private static Throwable parallelRun(Throwable accumulate, ExecutorService 
runOn, ThrowingRunnable ... runnables)
     {
+        List<Future<Throwable>> results = new ArrayList<>();
         for (ThrowingRunnable runnable : runnables)
         {
+            results.add(runOn.submit(() -> {
+                try
+                {
+                    runnable.run();
+                    return null;
+                }
+                catch (Throwable t)
+                {
+                    return t;
+                }
+            }));
+        }
+        for (Future<Throwable> future : results)
+        {
             try
             {
-                runnable.run();
+                Throwable t = future.get();
+                if (t != null)
+                    throw t;
             }
             catch (Throwable t)
             {
-                existing = Throwables.merge(existing, t);
+                accumulate = Throwables.merge(accumulate, t);
             }
         }
-        return existing;
+        return accumulate;
     }
 
     public static interface ThrowingRunnable
diff --git 
a/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java 
b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
index f646ae1..9fb543d 100644
--- a/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
+++ b/test/distributed/org/apache/cassandra/distributed/InvokableInstance.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.distributed;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
@@ -27,18 +28,26 @@ import java.io.Serializable;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.utils.Throwables;
+
 public abstract class InvokableInstance
 {
+    protected final ExecutorService isolatedExecutor;
     private final ClassLoader classLoader;
     private final Method deserializeOnInstance;
 
-    public InvokableInstance(ClassLoader classLoader)
+    public InvokableInstance(String name, ClassLoader classLoader)
     {
+        this.isolatedExecutor = Executors.newCachedThreadPool(new 
NamedThreadFactory(name, Thread.NORM_PRIORITY, classLoader, new 
ThreadGroup(name)));
         this.classLoader = classLoader;
         try
         {
@@ -50,31 +59,34 @@ public abstract class InvokableInstance
         }
     }
 
-    public interface SerializableCallable<T> extends Callable<T>, Serializable 
{ public T call(); }
-    public <T> SerializableCallable<T> callsOnInstance(SerializableCallable<T> 
call) { return (SerializableCallable<T>) transferOneObject(call); }
+    public interface CallableNoExcept<T> extends Callable<T> { public T 
call(); }
+    public interface SerializableCallable<T> extends CallableNoExcept<T>, 
Serializable { }
+    public <T> CallableNoExcept<T> callsOnInstance(SerializableCallable<T> 
call) { return invokesOnExecutor((SerializableCallable<T>) 
transferOneObject(call), isolatedExecutor); }
     public <T> T callOnInstance(SerializableCallable<T> call) { return 
callsOnInstance(call).call(); }
 
     public interface SerializableRunnable extends Runnable, Serializable {}
-    public SerializableRunnable runsOnInstance(SerializableRunnable run) { 
return (SerializableRunnable) transferOneObject(run); }
+    public Runnable runsOnInstance(SerializableRunnable run) { return 
invokesOnExecutor((SerializableRunnable) transferOneObject(run), 
isolatedExecutor); }
     public void runOnInstance(SerializableRunnable run) { 
runsOnInstance(run).run(); }
 
     public interface SerializableConsumer<T> extends Consumer<T>, Serializable 
{}
-    public <T> SerializableConsumer<T> 
acceptsOnInstance(SerializableConsumer<T> consumer) { return 
(SerializableConsumer<T>) transferOneObject(consumer); }
+    public <T> Consumer<T> acceptsOnInstance(SerializableConsumer<T> consumer) 
{ return invokesOnExecutor((SerializableConsumer<T>) 
transferOneObject(consumer), isolatedExecutor); }
 
     public interface SerializableBiConsumer<T1, T2> extends BiConsumer<T1, 
T2>, Serializable {}
-    public <T1, T2> SerializableBiConsumer<T1, T2> 
acceptsOnInstance(SerializableBiConsumer<T1, T2> consumer) { return 
(SerializableBiConsumer<T1, T2>) transferOneObject(consumer); }
+    public <T1, T2> BiConsumer<T1, T2> 
acceptsOnInstance(SerializableBiConsumer<T1, T2> consumer) { return 
invokesOnExecutor((SerializableBiConsumer<T1, T2>) transferOneObject(consumer), 
isolatedExecutor); }
 
     public interface SerializableFunction<I, O> extends Function<I, O>, 
Serializable {}
-    public <I, O> SerializableFunction<I, O> 
appliesOnInstance(SerializableFunction<I, O> f) { return 
(SerializableFunction<I, O>) transferOneObject(f); }
+    public <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> 
f) { return invokesOnExecutor((SerializableFunction<I, O>) 
transferOneObject(f), isolatedExecutor); }
 
     public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, 
I2, O>, Serializable {}
-    public <I1, I2, O> SerializableBiFunction<I1, I2, O> 
appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return 
(SerializableBiFunction<I1, I2, O>) transferOneObject(f); }
+    public <I1, I2, O> BiFunction<I1, I2, O> 
appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return 
invokesOnExecutor((SerializableBiFunction<I1, I2, O>) transferOneObject(f), 
isolatedExecutor); }
 
-    public interface SerializableTriFunction<I1, I2, I3, O> extends 
Serializable
+    public interface TriFunction<I1, I2, I3, O>
     {
         O apply(I1 i1, I2 i2, I3 i3);
     }
-    public <I1, I2, I3, O> SerializableTriFunction<I1, I2, I3, O> 
appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return 
(SerializableTriFunction<I1, I2, I3, O>) transferOneObject(f); }
+    public interface SerializableTriFunction<I1, I2, I3, O> extends 
Serializable, TriFunction<I1, I2, I3, O> { }
+
+    public <I1, I2, I3, O> TriFunction<I1, I2, I3, O> 
appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return 
invokesOnExecutor((SerializableTriFunction<I1, I2, I3, O>) 
transferOneObject(f), isolatedExecutor); }
 
     public interface InstanceFunction<I, O> extends 
SerializableBiFunction<Instance, I, O> {}
 
@@ -130,4 +142,72 @@ public abstract class InvokableInstance
         }
     }
 
+    private static <V> CallableNoExcept<V> 
invokesOnExecutor(SerializableCallable<V> callable, ExecutorService invokeOn)
+    {
+        return () -> {
+            try
+            {
+                return invokeOn.submit(callable).get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (ExecutionException e)
+            {
+                Throwables.maybeFail(e.getCause());
+                throw new AssertionError();
+            }
+        };
+    }
+
+    private static Runnable invokesOnExecutor(SerializableRunnable runnable, 
ExecutorService invokeOn)
+    {
+        return () -> {
+            try
+            {
+                invokeOn.submit(runnable).get();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            catch (ExecutionException e)
+            {
+                Throwables.maybeFail(e.getCause());
+                throw new AssertionError();
+            }
+        };
+    }
+
+    private static <A> Consumer<A> invokesOnExecutor(SerializableConsumer<A> 
consumer, ExecutorService invokeOn)
+    {
+        return (a) -> invokesOnExecutor(() -> consumer.accept(a), 
invokeOn).run();
+    }
+
+    private static <A, B> BiConsumer<A, B> 
invokesOnExecutor(SerializableBiConsumer<A, B> consumer, ExecutorService 
invokeOn)
+    {
+        return (a, b) -> invokesOnExecutor(() -> consumer.accept(a, b), 
invokeOn).run();
+    }
+
+    private static <A, B> Function<A, B> 
invokesOnExecutor(SerializableFunction<A, B> f, ExecutorService invokeOn)
+    {
+        return (a) -> invokesOnExecutor(() -> f.apply(a), invokeOn).call();
+    }
+
+    private static <A, B, C> BiFunction<A, B, C> 
invokesOnExecutor(SerializableBiFunction<A, B, C> f, ExecutorService invokeOn)
+    {
+        return (a, b) -> invokesOnExecutor(() -> f.apply(a, b), 
invokeOn).call();
+    }
+
+    private static <A, B, C, D> SerializableTriFunction<A, B, C, D> 
invokesOnExecutor(SerializableTriFunction<A, B, C, D> f, ExecutorService 
invokeOn)
+    {
+        return (a, b, c) -> invokesOnExecutor(() -> f.apply(a, b, c), 
invokeOn).call();
+    }
+
+    void shutdown()
+    {
+        isolatedExecutor.shutdownNow();
+    }
+
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/TestCluster.java 
b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
index cc6cf81..5056661 100644
--- a/test/distributed/org/apache/cassandra/distributed/TestCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/TestCluster.java
@@ -31,22 +31,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
 import com.google.common.collect.Sets;
 
-import io.netty.util.concurrent.FastThreadLocal;
-import io.netty.util.concurrent.FastThreadLocalThread;
-import io.netty.util.internal.InternalThreadLocalMap;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
@@ -84,8 +76,6 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
  */
 public class TestCluster implements AutoCloseable
 {
-    private final ExecutorService exec = Executors.newCachedThreadPool(new 
NamedThreadFactory("cluster-async-tasks"));
-
     private final File root;
     private final List<Instance> instances;
     private final Coordinator coordinator;
@@ -104,7 +94,7 @@ public class TestCluster implements AutoCloseable
     void launch()
     {
         FBUtilities.waitOnFutures(instances.stream()
-                .map(i -> exec.submit(() -> i.launch(this)))
+                .map(i -> i.isolatedExecutor.submit(() -> i.launch(this)))
                 .collect(Collectors.toList())
         );
         for (Instance instance : instances)
@@ -276,36 +266,17 @@ public class TestCluster implements AutoCloseable
     }
 
     @Override
-    public void close() throws InterruptedException, TimeoutException, 
ExecutionException
+    public void close()
     {
         List<Future<?>> futures = instances.stream()
-                .map(i -> exec.submit(i::shutdown))
+                .map(i -> i.isolatedExecutor.submit(i::shutdown))
                 .collect(Collectors.toList());
 
         // Make sure to only delete directory when threads are stopped
-        Future combined = exec.submit(() -> {
-            FBUtilities.waitOnFutures(futures);
-            FileUtils.deleteRecursive(root);
-        });
-
-        combined.get(60, TimeUnit.SECONDS);
-
-        exec.shutdownNow();
-        exec.awaitTermination(10, TimeUnit.SECONDS);
+        FBUtilities.waitOnFutures(futures, 60, TimeUnit.SECONDS);
+        FileUtils.deleteRecursive(root);
 
         //withThreadLeakCheck(futures);
-        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
-        for (Thread thread : threadSet)
-        {
-            if (thread instanceof FastThreadLocalThread)
-                ((FastThreadLocalThread)thread).setThreadLocalMap(null);
-        }
-
-        InternalThreadLocalMap.remove();
-        InternalThreadLocalMap.destroy();
-
-        FastThreadLocal.removeAll();
-        FastThreadLocal.destroy();
         System.gc();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to