This is an automated email from the ASF dual-hosted git repository.

blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new dfc279a  Fix flaky test o.a.c.d.test.*RepairCoordinatorFastTest
dfc279a is described below

commit dfc279a22a5563ac7a832a586914d5410426e9b7
Author: David Capwell <[email protected]>
AuthorDate: Sat Mar 21 20:57:50 2020 -0700

    Fix flaky test o.a.c.d.test.*RepairCoordinatorFastTest
    
    patch by David Capwell; reviewed by Ekaterina Dimitrova and Benjamin Lerer
    for CASSANDRA-15650
---
 .../cassandra/concurrent/NamedThreadFactory.java   |   5 +-
 src/java/org/apache/cassandra/tools/NodeProbe.java |   2 +-
 .../org/apache/cassandra/tools/RepairRunner.java   |  40 +-
 .../org/apache/cassandra/utils/Throwables.java     |  10 +
 .../cassandra/distributed/api/NodeToolResult.java  |  48 +-
 .../distributed/impl/IsolatedExecutor.java         |   8 +-
 ...=> FullRepairCoordinatorNeighbourDownTest.java} |   4 +-
 .../test/FullRepairCoordinatorTimeoutTest.java     |  16 +
 ...ementalRepairCoordinatorNeighbourDownTest.java} |   4 +-
 .../IncrementalRepairCoordinatorTimeoutTest.java   |  16 +
 ...PreviewRepairCoordinatorNeighbourDownTest.java} |   4 +-
 .../test/PreviewRepairCoordinatorTimeoutTest.java  |  16 +
 .../distributed/test/RepairCoordinatorFast.java    | 482 +++++++++++----------
 ...ow.java => RepairCoordinatorNeighbourDown.java} | 180 ++++----
 .../distributed/test/RepairCoordinatorTimeout.java |  67 +++
 .../org/apache/cassandra/utils/AssertUtil.java     | 128 ++++++
 16 files changed, 665 insertions(+), 365 deletions(-)

diff --git a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java 
b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
index 7cc73bd..bcf686f 100644
--- a/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/NamedThreadFactory.java
@@ -36,7 +36,10 @@ public class NamedThreadFactory implements ThreadFactory
 {
     private static volatile String globalPrefix;
     public static void setGlobalPrefix(String prefix) { globalPrefix = prefix; 
}
-    public static String globalPrefix() { return globalPrefix == null ? "" : 
globalPrefix; }
+    public static String globalPrefix() {
+        String prefix = globalPrefix;
+        return prefix == null ? "" : prefix;
+    }
 
     public final String id;
     private final int priority;
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 180b231..f9f2c1a 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -415,7 +415,7 @@ public class NodeProbe implements AutoCloseable
         }
         catch (Exception e)
         {
-            throw new IOException(e) ;
+            throw new IOException(e);
         }
         finally
         {
diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java 
b/src/java/org/apache/cassandra/tools/RepairRunner.java
index d1b3409..593bc26 100644
--- a/src/java/org/apache/cassandra/tools/RepairRunner.java
+++ b/src/java/org/apache/cassandra/tools/RepairRunner.java
@@ -25,6 +25,9 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 
+import com.google.common.base.Throwables;
+
+import org.apache.cassandra.repair.messages.RepairOption;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageServiceMBean;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -40,7 +43,7 @@ public class RepairRunner extends 
JMXNotificationProgressListener
     private final StorageServiceMBean ssProxy;
     private final String keyspace;
     private final Map<String, String> options;
-    private final Condition condition = new SimpleCondition();
+    private final SimpleCondition condition = new SimpleCondition();
 
     private int cmd;
     private volatile Exception error;
@@ -59,8 +62,8 @@ public class RepairRunner extends 
JMXNotificationProgressListener
         if (cmd <= 0)
         {
             // repairAsync can only return 0 for replication factor 1.
-            String message = String.format("[%s] Replication factor is 1. No 
repair is needed for keyspace '%s'", format.format(System.currentTimeMillis()), 
keyspace);
-            out.println(message);
+            String message = String.format("Replication factor is 1. No repair 
is needed for keyspace '%s'", keyspace);
+            printMessage(message);
         }
         else
         {
@@ -69,6 +72,14 @@ public class RepairRunner extends 
JMXNotificationProgressListener
                 queryForCompletedRepair(String.format("After waiting for poll 
interval of %s seconds",
                                                       
NodeProbe.JMX_NOTIFICATION_POLL_INTERVAL_SECONDS));
             }
+            Exception error = this.error;
+            if (error == null)
+            {
+                // notifications are lossy so its possible to see complete and 
not error; request latest state
+                // from the server
+                queryForCompletedRepair("condition satisfied");
+                error = this.error;
+            }
             if (error != null)
             {
                 throw error;
@@ -111,12 +122,12 @@ public class RepairRunner extends 
JMXNotificationProgressListener
     public void progress(String tag, ProgressEvent event)
     {
         ProgressEventType type = event.getType();
-        String message = String.format("[%s] %s", 
format.format(System.currentTimeMillis()), event.getMessage());
+        String message = event.getMessage();
         if (type == ProgressEventType.PROGRESS)
         {
             message = message + " (progress: " + (int) 
event.getProgressPercentage() + "%)";
         }
-        out.println(message);
+        printMessage(message);
         if (type == ProgressEventType.ERROR)
         {
             error = new RuntimeException(String.format("Repair job has failed 
with the error message: %s. " +
@@ -136,9 +147,9 @@ public class RepairRunner extends 
JMXNotificationProgressListener
         String queriedString = "queried for parent session status and";
         if (status == null)
         {
-            String message = String.format("[%s] %s %s couldn't find repair 
status for cmd: %s", triggeringCondition,
-                                           queriedString, 
format.format(System.currentTimeMillis()), cmd);
-            out.println(message);
+            String message = String.format("%s %s couldn't find repair status 
for cmd: %s", triggeringCondition,
+                                           queriedString, cmd);
+            printMessage(message);
         }
         else
         {
@@ -148,8 +159,8 @@ public class RepairRunner extends 
JMXNotificationProgressListener
             {
                 case COMPLETED:
                 case FAILED:
-                    out.println(String.format("[%s] %s %s discovered repair 
%s.",
-                                              
this.format.format(System.currentTimeMillis()), triggeringCondition,
+                    printMessage(String.format("%s %s discovered repair %s.",
+                                              triggeringCondition,
                                               queriedString, 
parentRepairStatus.name().toLowerCase()));
                     if (parentRepairStatus == 
ActiveRepairService.ParentRepairStatus.FAILED)
                     {
@@ -161,7 +172,7 @@ public class RepairRunner extends 
JMXNotificationProgressListener
                 case IN_PROGRESS:
                     break;
                 default:
-                    out.println(String.format("[%s] WARNING Encountered 
unexpected RepairRunnable.ParentRepairStatus: %s", System.currentTimeMillis(), 
parentRepairStatus));
+                    printMessage(String.format("WARNING Encountered unexpected 
RepairRunnable.ParentRepairStatus: %s", parentRepairStatus));
                     printMessages(messages);
                     break;
             }
@@ -172,7 +183,12 @@ public class RepairRunner extends 
JMXNotificationProgressListener
     {
         for (String message : messages)
         {
-            out.println(String.format("[%s] %s", 
this.format.format(System.currentTimeMillis()), message));
+            printMessage(message);
         }
     }
+
+    private void printMessage(String message)
+    {
+        out.println(String.format("[%s] %s", 
this.format.format(System.currentTimeMillis()), message));
+    }
 }
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java 
b/src/java/org/apache/cassandra/utils/Throwables.java
index 9c6da60..f727b5a 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -231,6 +231,16 @@ public final class Throwables
     }
 
     /**
+     * throw the exception as a unchecked exception, wrapping if a checked 
exception, else rethroing as is.
+     */
+    public static RuntimeException throwAsUncheckedException(Throwable t)
+    {
+        if (t instanceof Error)
+            throw (Error) t;
+        throw unchecked(t);
+    }
+
+    /**
      * A shortcut for {@code unchecked(unwrapped(t))}. This is called 
"cleaned" because this basically removes the annoying
      * cruft surrounding an exception :).
      */
diff --git 
a/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java 
b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
index 9ba1127..8f33ae5 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/NodeToolResult.java
@@ -19,11 +19,14 @@
 package org.apache.cassandra.distributed.api;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import javax.management.Notification;
 
+import com.google.common.base.Throwables;
 import org.junit.Assert;
 
 public class NodeToolResult
@@ -68,18 +71,22 @@ public class NodeToolResult
 
     public final class Asserts {
         public Asserts success() {
-            Assert.assertEquals("nodetool command " + commandAndArgs[0] + " 
was not successful", 0, rc);
+            if (rc != 0)
+                fail("was not successful");
             return this;
         }
 
         public Asserts failure() {
-            Assert.assertNotEquals("nodetool command " + commandAndArgs[0] + " 
was successful but not expected to be", 0, rc);
+            if (rc == 0)
+                fail("was successful but not expected to be");
             return this;
         }
 
-        public Asserts errorContains(String msg) {
+        public Asserts errorContains(String... messages) {
+            Assert.assertNotEquals("no error messages defined to check 
against", 0, messages.length);
             Assert.assertNotNull("No exception was found but expected one", 
error);
-            Assert.assertTrue("Error message '" + error.getMessage() + "' does 
not contain '" + msg + "'", error.getMessage().contains(msg));
+            if (!Stream.of(messages).anyMatch(msg -> 
error.getMessage().contains(msg)))
+                fail("Error message '" + error.getMessage() + "' does not 
contain any of " + Arrays.toString(messages));
             return this;
         }
 
@@ -91,7 +98,7 @@ public class NodeToolResult
                     return this;
                 }
             }
-            Assert.fail("Unable to locate message " + msg + " in 
notifications: " + notifications);
+            fail("Unable to locate message " + msg + " in notifications: " + 
NodeToolResult.toString(notifications));
             return this; // unreachable
         }
 
@@ -106,9 +113,38 @@ public class NodeToolResult
                     }
                 }
             }
-            Assert.fail("Unable to locate message '" + msg + "' in 
notifications: " + notifications);
+            fail("Unable to locate message '" + msg + "' in notifications: " + 
NodeToolResult.toString(notifications));
             return this; // unreachable
         }
+
+        private void fail(String message)
+        {
+            StringBuilder sb = new StringBuilder();
+            sb.append("nodetool command 
").append(Arrays.toString(commandAndArgs)).append(" 
").append(message).append("\n");
+            sb.append("Notifications:\n");
+            for (Notification n : notifications)
+                sb.append(NodeToolResult.toString(n)).append("\n");
+            if (error != null)
+                
sb.append("Error:\n").append(Throwables.getStackTraceAsString(error)).append("\n");
+            throw new AssertionError(sb.toString());
+        }
+    }
+
+    private static String toString(Collection<Notification> notifications)
+    {
+        return 
notifications.stream().map(NodeToolResult::toString).collect(Collectors.joining(",
 "));
+    }
+
+    private static String toString(Notification notification)
+    {
+        ProgressEventType type = 
ProgressEventType.values()[notificationType(notification)];
+        String msg = notification.getMessage();
+        Object src = notification.getSource();
+        return "Notification{" +
+               "type=" + type +
+               ", src=" + src +
+               ", message=" + msg +
+               "}";
     }
 
     private static int notificationType(Notification n)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java 
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
index fc31fdf..0d8f96f 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/IsolatedExecutor.java
@@ -47,6 +47,7 @@ import ch.qos.logback.classic.LoggerContext;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.utils.ExecutorUtils;
+import org.apache.cassandra.utils.Throwables;
 
 public class IsolatedExecutor implements IIsolatedExecutor
 {
@@ -65,7 +66,7 @@ public class IsolatedExecutor implements IIsolatedExecutor
 
     public Future<Void> shutdown()
     {
-        isolatedExecutor.shutdown();
+        isolatedExecutor.shutdownNow();
 
         /* Use a thread pool with a core pool size of zero to terminate the 
thread as soon as possible
         ** so the instance class loader can be garbage collected.  Uses a 
custom thread factory
@@ -202,11 +203,12 @@ public class IsolatedExecutor implements IIsolatedExecutor
         }
         catch (InterruptedException e)
         {
-            throw new RuntimeException(e);
+            Thread.currentThread().interrupt();
+            throw Throwables.throwAsUncheckedException(e);
         }
         catch (ExecutionException e)
         {
-            throw new RuntimeException(e.getCause());
+            throw Throwables.throwAsUncheckedException(e.getCause());
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from 
test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
rename to 
test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
index d3904b3..1053925 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorSlowTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
 @RunWith(Parameterized.class)
-public class FullRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class FullRepairCoordinatorNeighbourDownTest extends 
RepairCoordinatorNeighbourDown
 {
-    public FullRepairCoordinatorSlowTest(RepairParallelism parallelism, 
boolean withNotifications)
+    public FullRepairCoordinatorNeighbourDownTest(RepairParallelism 
parallelism, boolean withNotifications)
     {
         super(RepairType.FULL, parallelism, withNotifications);
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..d91cb5d
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class FullRepairCoordinatorTimeoutTest extends RepairCoordinatorTimeout
+{
+    public FullRepairCoordinatorTimeoutTest(RepairParallelism parallelism, 
boolean withNotifications)
+    {
+        super(RepairType.FULL, parallelism, withNotifications);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from 
test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
rename to 
test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
index 7f9b35f..af17567 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorSlowTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
 @RunWith(Parameterized.class)
-public class IncrementalRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class IncrementalRepairCoordinatorNeighbourDownTest extends 
RepairCoordinatorNeighbourDown
 {
-    public IncrementalRepairCoordinatorSlowTest(RepairParallelism parallelism, 
boolean withNotifications)
+    public IncrementalRepairCoordinatorNeighbourDownTest(RepairParallelism 
parallelism, boolean withNotifications)
     {
         super(RepairType.INCREMENTAL, parallelism, withNotifications);
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..0fdae57
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class IncrementalRepairCoordinatorTimeoutTest extends 
RepairCoordinatorTimeout
+{
+    public IncrementalRepairCoordinatorTimeoutTest(RepairParallelism 
parallelism, boolean withNotifications)
+    {
+        super(RepairType.INCREMENTAL, parallelism, withNotifications);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
similarity index 85%
rename from 
test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
rename to 
test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
index 2d52475..1926f9b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorSlowTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorNeighbourDownTest.java
@@ -25,9 +25,9 @@ import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParall
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 
 @RunWith(Parameterized.class)
-public class PreviewRepairCoordinatorSlowTest extends RepairCoordinatorSlow
+public class PreviewRepairCoordinatorNeighbourDownTest extends 
RepairCoordinatorNeighbourDown
 {
-    public PreviewRepairCoordinatorSlowTest(RepairParallelism parallelism, 
boolean withNotifications)
+    public PreviewRepairCoordinatorNeighbourDownTest(RepairParallelism 
parallelism, boolean withNotifications)
     {
         super(RepairType.PREVIEW, parallelism, withNotifications);
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java
new file mode 100644
index 0000000..8b90909
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorTimeoutTest.java
@@ -0,0 +1,16 @@
+package org.apache.cassandra.distributed.test;
+
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+
+@RunWith(Parameterized.class)
+public class PreviewRepairCoordinatorTimeoutTest extends 
RepairCoordinatorTimeout
+{
+    public PreviewRepairCoordinatorTimeoutTest(RepairParallelism parallelism, 
boolean withNotifications)
+    {
+        super(RepairType.PREVIEW, parallelism, withNotifications);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
index b05706a..8d26b2e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorFast.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.time.Duration;
 import java.util.Set;
 
 import com.google.common.collect.Iterables;
@@ -43,6 +44,7 @@ import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.asser
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairSuccess;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
 
 public abstract class RepairCoordinatorFast extends RepairCoordinatorBase
 {
@@ -51,104 +53,113 @@ public abstract class RepairCoordinatorFast extends 
RepairCoordinatorBase
         super(repairType, parallelism, withNotifications);
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void simple() {
         String table = tableName("simple");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY KEY 
(key))", KEYSPACE, table));
-        CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) VALUES 
(?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, table);
-        result.asserts().success();
-        if (withNotifications)
-        {
-            result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting 
repair command")
-                  .notificationContains(ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.SUCCESS, repairType 
!= RepairType.PREVIEW ? "Repair completed successfully": "Repair preview 
completed successfully")
-                  .notificationContains(ProgressEventType.COMPLETE, 
"finished");
-        }
-
-        if (repairType != RepairType.PREVIEW)
-        {
-            assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
-        }
-        else
-        {
-            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
-        }
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, PRIMARY 
KEY (key))", KEYSPACE, table));
+            CLUSTER.coordinator(1).execute(format("INSERT INTO %s.%s (key) 
VALUES (?)", KEYSPACE, table), ConsistencyLevel.ANY, "some text");
 
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, KEYSPACE, table);
+            result.asserts().success();
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting 
repair command")
+                      .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.SUCCESS, 
repairType != RepairType.PREVIEW ? "Repair completed successfully": "Repair 
preview completed successfully")
+                      .notificationContains(ProgressEventType.COMPLETE, 
"finished");
+            }
+
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairSuccess(CLUSTER, KEYSPACE, table);
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 
2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void missingKeyspace()
     {
-        // as of this moment the check is done in nodetool so the JMX 
notifications are not imporant
-        // nor is the history stored
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, "doesnotexist");
-        result.asserts()
-              .failure()
-              .errorContains("Keyspace [doesnotexist] does not exist.");
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            // as of this moment the check is done in nodetool so the JMX 
notifications are not imporant
+            // nor is the history stored
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, "doesnotexist");
+            result.asserts()
+                  .failure()
+                  .errorContains("Keyspace [doesnotexist] does not exist.");
 
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 
2));
 
-        assertParentRepairNotExist(CLUSTER, "doesnotexist");
+            assertParentRepairNotExist(CLUSTER, "doesnotexist");
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void missingTable()
     {
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, "doesnotexist");
-        result.asserts()
-              .failure();
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            String tableName = tableName("doesnotexist");
+            NodeToolResult result = repair(2, KEYSPACE, tableName);
             result.asserts()
-                  .errorContains("failed with error Unknown keyspace/cf pair 
(distributed_test_keyspace.doesnotexist)")
-                  // Start notification is ignored since this is checked 
during setup (aka before start)
-                  .notificationContains(ProgressEventType.ERROR, "failed with 
error Unknown keyspace/cf pair (distributed_test_keyspace.doesnotexist)")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished 
with error");
-        }
+                  .failure();
+            if (withNotifications)
+            {
+                result.asserts()
+                      .errorContains("Unknown keyspace/cf pair 
(distributed_test_keyspace." + tableName + ")")
+                      // Start notification is ignored since this is checked 
during setup (aka before start)
+                      .notificationContains(ProgressEventType.ERROR, "failed 
with error Unknown keyspace/cf pair (distributed_test_keyspace." + tableName + 
")")
+                      .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, "doesnotexist");
 
-        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 
2));
+            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void noTablesToRepair()
     {
         // index CF currently don't support repair, so they get dropped when 
listed
         // this is done in this test to cause the keyspace to have 0 tables to 
repair, which causes repair to no-op
         // early and skip.
         String table = tableName("withindex");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-        CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s (value)", 
postfix(), KEYSPACE, table));
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        // if CF has a . in it, it is assumed to be a 2i which rejects repairs
-        NodeToolResult result = repair(2, KEYSPACE, table + ".value");
-        result.asserts().success();
-        if (withNotifications)
-        {
-            result.asserts()
-                  .notificationContains("Empty keyspace")
-                  .notificationContains("skipping repair: " + KEYSPACE)
-                  // Start notification is ignored since this is checked 
during setup (aka before start)
-                  .notificationContains(ProgressEventType.SUCCESS, "Empty 
keyspace") // will fail since success isn't returned; only complete
-                  .notificationContains(ProgressEventType.COMPLETE, 
"finished"); // will fail since it doesn't do this
-        }
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+            CLUSTER.schemaChange(format("CREATE INDEX value_%s ON %s.%s 
(value)", postfix(), KEYSPACE, table));
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            // if CF has a . in it, it is assumed to be a 2i which rejects 
repairs
+            NodeToolResult result = repair(2, KEYSPACE, table + ".value");
+            result.asserts().success();
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains("Empty keyspace")
+                      .notificationContains("skipping repair: " + KEYSPACE)
+                      // Start notification is ignored since this is checked 
during setup (aka before start)
+                      .notificationContains(ProgressEventType.SUCCESS, "Empty 
keyspace") // will fail since success isn't returned; only complete
+                      .notificationContains(ProgressEventType.COMPLETE, 
"finished"); // will fail since it doesn't do this
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table + ".value");
 
-        // this is actually a SKIP and not a FAILURE, so shouldn't increment
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            // this is actually a SKIP and not a FAILURE, so shouldn't 
increment
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 
2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void intersectingRange()
     {
         // this test exists to show that this case will cause repair to 
finish; success or failure isn't imporant
@@ -156,229 +167,238 @@ public abstract class RepairCoordinatorFast extends 
RepairCoordinatorBase
         // repair to fail but it didn't, this would be fine and this test 
should be updated to reflect the new
         // semantic
         String table = tableName("intersectingrange");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-
-        //TODO dtest api for this?
-        LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
-            Set<Range<Token>> ranges = 
StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
-            Range<Token> range = Iterables.getFirst(ranges, null);
-            long left = (long) range.left.getTokenValue();
-            long right = (long) range.right.getTokenValue();
-            return new LongTokenRange(left, right);
-        });
-        LongTokenRange intersectingRange = new 
LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, table,
-                                       "--start-token", 
Long.toString(intersectingRange.minExclusive),
-                                       "--end-token", 
Long.toString(intersectingRange.maxInclusive));
-        result.asserts()
-              .failure()
-              .errorContains("Requested range " + intersectingRange + " 
intersects a local range (" + tokenRange + ") but is not fully contained in 
one");
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+
+            //TODO dtest api for this?
+            LongTokenRange tokenRange = CLUSTER.get(2).callOnInstance(() -> {
+                Set<Range<Token>> ranges = 
StorageService.instance.getLocalReplicas(KEYSPACE).ranges();
+                Range<Token> range = Iterables.getFirst(ranges, null);
+                long left = (long) range.left.getTokenValue();
+                long right = (long) range.right.getTokenValue();
+                return new LongTokenRange(left, right);
+            });
+            LongTokenRange intersectingRange = new 
LongTokenRange(tokenRange.maxInclusive - 7, tokenRange.maxInclusive + 7);
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, KEYSPACE, table,
+                                           "--start-token", 
Long.toString(intersectingRange.minExclusive),
+                                           "--end-token", 
Long.toString(intersectingRange.maxInclusive));
             result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting 
repair command")
-                  .notificationContains(ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.ERROR, "Requested 
range " + intersectingRange + " intersects a local range (" + tokenRange + ") 
but is not fully contained in one")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished 
with error");
-        }
+                  .failure()
+                  .errorContains("Requested range " + intersectingRange + " 
intersects a local range (" + tokenRange + ") but is not fully contained in 
one");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting 
repair command")
+                      .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, 
"Requested range " + intersectingRange + " intersects a local range (" + 
tokenRange + ") but is not fully contained in one")
+                      .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 
2));
+            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void unknownHost()
     {
         String table = tableName("unknownhost");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", 
"thisreally.should.not.exist.apache.org");
-        result.asserts()
-              .failure()
-              .errorContains("Unknown host specified 
thisreally.should.not.exist.apache.org");
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", 
"thisreally.should.not.exist.apache.org");
             result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting 
repair command")
-                  .notificationContains(ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.ERROR, "Unknown host 
specified thisreally.should.not.exist.apache.org")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished 
with error");
-        }
+                  .failure()
+                  .errorContains("Unknown host specified 
thisreally.should.not.exist.apache.org");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting 
repair command")
+                      .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, "Unknown 
host specified thisreally.should.not.exist.apache.org")
+                      .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 
2));
+            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void desiredHostNotCoordinator()
     {
         // current limitation is that the coordinator must be apart of the 
repair, so as long as that exists this test
         // verifies that the validation logic will termniate the repair 
properly
         String table = tableName("desiredhostnotcoordinator");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", 
"localhost");
-        result.asserts()
-              .failure()
-              .errorContains("The current host must be part of the repair");
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(2, KEYSPACE, table, "--in-hosts", 
"localhost");
             result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting 
repair command")
-                  .notificationContains(ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.ERROR, "The current 
host must be part of the repair")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished 
with error");
-        }
+                  .failure()
+                  .errorContains("The current host must be part of the 
repair");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting 
repair command")
+                      .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, "The 
current host must be part of the repair")
+                      .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        Assert.assertEquals(repairExceptions + 1, getRepairExceptions(CLUSTER, 
2));
+            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void onlyCoordinator()
     {
         // this is very similar to ::desiredHostNotCoordinator but has the 
difference that the only host to do repair
         // is the coordinator
         String table = tableName("onlycoordinator");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-
-        long repairExceptions = getRepairExceptions(CLUSTER, 2);
-        NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", 
"localhost");
-        result.asserts()
-              .failure()
-              .errorContains("Specified hosts [localhost] do not share range");
-        if (withNotifications)
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 2);
+            NodeToolResult result = repair(1, KEYSPACE, table, "--in-hosts", 
"localhost");
             result.asserts()
-                  .notificationContains(ProgressEventType.START, "Starting 
repair command")
-                  .notificationContains(ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
-                  .notificationContains(ProgressEventType.ERROR, "Specified 
hosts [localhost] do not share range")
-                  .notificationContains(ProgressEventType.COMPLETE, "finished 
with error");
-        }
+                  .failure()
+                  .errorContains("Specified hosts [localhost] do not share 
range");
+            if (withNotifications)
+            {
+                result.asserts()
+                      .notificationContains(ProgressEventType.START, "Starting 
repair command")
+                      .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
+                      .notificationContains(ProgressEventType.ERROR, 
"Specified hosts [localhost] do not share range")
+                      .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
+            }
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        //TODO should this be marked as fail to match others?  Should they not 
be marked?
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 2));
+            //TODO should this be marked as fail to match others?  Should they 
not be marked?
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 
2));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void replicationFactorOne()
     {
         // In the case of rf=1 repair fails to create a cmd handle so node 
tool exists early
         String table = tableName("one");
-        // since cluster is shared and this test gets called multiple times, 
need "IF NOT EXISTS" so the second+ attempt
-        // does not fail
-        CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS replicationfactor 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
-        CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s (key 
text, value text, PRIMARY KEY (key))", table));
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            // since cluster is shared and this test gets called multiple 
times, need "IF NOT EXISTS" so the second+ attempt
+            // does not fail
+            CLUSTER.schemaChange("CREATE KEYSPACE IF NOT EXISTS 
replicationfactor WITH replication = {'class': 'SimpleStrategy', 
'replication_factor': 1};");
+            CLUSTER.schemaChange(format("CREATE TABLE replicationfactor.%s 
(key text, value text, PRIMARY KEY (key))", table));
 
-        long repairExceptions = getRepairExceptions(CLUSTER, 1);
-        NodeToolResult result = repair(1, "replicationfactor", table);
-        result.asserts()
-              .success();
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, "replicationfactor", table);
+            result.asserts()
+                  .success();
 
-        assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
 
-        Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 1));
+            Assert.assertEquals(repairExceptions, getRepairExceptions(CLUSTER, 
1));
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void prepareFailure()
     {
         String table = tableName("preparefailure");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-        IMessageFilters.Filter filter = 
CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
-            throw new RuntimeException("prepare fail");
-        })).drop();
-        try
-        {
-            long repairExceptions = getRepairExceptions(CLUSTER, 1);
-            NodeToolResult result = repair(1, KEYSPACE, table);
-            result.asserts()
-                  .failure()
-                  .errorContains("Got negative replies from endpoints");
-            if (withNotifications)
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+            IMessageFilters.Filter filter = 
CLUSTER.verbs(Verb.PREPARE_MSG).messagesMatching(of(m -> {
+                throw new RuntimeException("prepare fail");
+            })).drop();
+            try
             {
+                long repairExceptions = getRepairExceptions(CLUSTER, 1);
+                NodeToolResult result = repair(1, KEYSPACE, table);
                 result.asserts()
-                      .notificationContains(ProgressEventType.START, "Starting 
repair command")
-                      .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
-                      .notificationContains(ProgressEventType.ERROR, "Got 
negative replies from endpoints")
-                      .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
-            }
-
-            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 1));
-            if (repairType != RepairType.PREVIEW)
-            {
-                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, 
table, "Got negative replies from endpoints");
+                      .failure()
+                      .errorContains("Got negative replies from endpoints");
+                if (withNotifications)
+                {
+                    result.asserts()
+                          .notificationContains(ProgressEventType.START, 
"Starting repair command")
+                          .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
+                          .notificationContains(ProgressEventType.ERROR, "Got 
negative replies from endpoints")
+                          .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
+                }
+
+                Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 1));
+                if (repairType != RepairType.PREVIEW)
+                {
+                    assertParentRepairFailedWithMessageContains(CLUSTER, 
KEYSPACE, table, "Got negative replies from endpoints");
+                }
+                else
+                {
+                    assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+                }
             }
-            else
+            finally
             {
-                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+                filter.off();
             }
-        }
-        finally
-        {
-            filter.off();
-        }
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void snapshotFailure()
     {
         Assume.assumeFalse("incremental does not do snapshot", repairType == 
RepairType.INCREMENTAL);
         Assume.assumeFalse("Parallel repair does not perform snapshots", 
parallelism == RepairParallelism.PARALLEL);
 
         String table = tableName("snapshotfailure");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-        IMessageFilters.Filter filter = 
CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
-            throw new RuntimeException("snapshot fail");
-        })).drop();
-        try
-        {
-            long repairExceptions = getRepairExceptions(CLUSTER, 1);
-            NodeToolResult result = repair(1, KEYSPACE, table);
-            result.asserts()
-                  .failure();
-            if (withNotifications)
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+            IMessageFilters.Filter filter = 
CLUSTER.verbs(Verb.SNAPSHOT_MSG).messagesMatching(of(m -> {
+                throw new RuntimeException("snapshot fail");
+            })).drop();
+            try
             {
+                long repairExceptions = getRepairExceptions(CLUSTER, 1);
+                NodeToolResult result = repair(1, KEYSPACE, table);
                 result.asserts()
-                      .errorContains("Could not create snapshot")
-                      .notificationContains(ProgressEventType.START, "Starting 
repair command")
-                      .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
-                      .notificationContains(ProgressEventType.ERROR, "Could 
not create snapshot ")
-                      .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
-            }
-            else
-            {
-                // Right now coordination doesn't propgate the first 
exception, so we only know "there exists a issue".
-                // With notifications on nodetool will see the error then 
complete, so the cmd state (what nodetool
-                // polls on) is ignored.  With notifications off, the poll 
await fails and queries cmd state, and that
-                // will have the below error.
-                // NOTE: this isn't desireable, would be good to propgate
-                result.asserts()
-                      .errorContains("Some repair failed");
+                      .failure()
+                      // Right now coordination doesn't propgate the first 
exception, so we only know "there exists a issue".
+                      // With notifications on nodetool will see the error 
then complete, so the cmd state (what nodetool
+                      // polls on) is ignored.  With notifications off or 
dropped, the poll await fails and queries cmd
+                      // state, and that will have the below error.
+                      // NOTE: this isn't desireable, would be good to propgate
+                      .errorContains("Could not create snapshot", "Some repair 
failed");
+                if (withNotifications)
+                {
+                    result.asserts()
+                          .notificationContains(ProgressEventType.START, 
"Starting repair command")
+                          .notificationContains(ProgressEventType.START, 
"repairing keyspace " + KEYSPACE + " with repair options")
+                          .notificationContains(ProgressEventType.ERROR, 
"Could not create snapshot ")
+                          .notificationContains(ProgressEventType.COMPLETE, 
"finished with error");
+                }
+
+                Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 1));
+                if (repairType != RepairType.PREVIEW)
+                {
+                    assertParentRepairFailedWithMessageContains(CLUSTER, 
KEYSPACE, table, "Could not create snapshot");
+                }
+                else
+                {
+                    assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+                }
             }
-
-            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 1));
-            if (repairType != RepairType.PREVIEW)
+            finally
             {
-                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, 
table, "Could not create snapshot");
+                filter.off();
             }
-            else
-            {
-                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
-            }
-        }
-        finally
-        {
-            filter.off();
-        }
+        });
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
similarity index 52%
rename from 
test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
rename to 
test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
index 7be8ed1..db01b13 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorSlow.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorNeighbourDown.java
@@ -19,17 +19,17 @@
 package org.apache.cassandra.distributed.test;
 
 import java.net.UnknownHostException;
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
 import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
@@ -43,128 +43,107 @@ import static 
org.apache.cassandra.distributed.api.IMessageFilters.Matcher.of;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
 import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
 
-public abstract class RepairCoordinatorSlow extends RepairCoordinatorBase
+public abstract class RepairCoordinatorNeighbourDown extends 
RepairCoordinatorBase
 {
-    public RepairCoordinatorSlow(RepairType repairType, RepairParallelism 
parallelism, boolean withNotifications)
+    public RepairCoordinatorNeighbourDown(RepairType repairType, 
RepairParallelism parallelism, boolean withNotifications)
     {
         super(repairType, parallelism, withNotifications);
     }
 
-    @Test(timeout = 1 * 60 * 1000)
-    public void prepareRPCTimeout()
+    @Before
+    public void beforeTest()
     {
-        String table = tableName("preparerpctimeout");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-        IMessageFilters.Filter filter = CLUSTER.verbs(Verb.PREPARE_MSG).drop();
-        try
-        {
-            long repairExceptions = getRepairExceptions(CLUSTER, 1);
-            NodeToolResult result = repair(1, KEYSPACE, table);
-            result.asserts()
-                  .failure()
-                  .errorContains("Got negative replies from endpoints 
[127.0.0.2:7012]");
-            if (withNotifications)
+        CLUSTER.filters().reset();
+        CLUSTER.forEach(i -> {
+            try
             {
-                result.asserts()
-                      
.notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair 
command")
-                      
.notificationContains(NodeToolResult.ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
-                      
.notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative 
replies from endpoints [127.0.0.2:7012]")
-                      
.notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with 
error");
+                i.startup();
             }
-
-            if (repairType != RepairType.PREVIEW)
+            catch (IllegalStateException e)
             {
-                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, 
table, "Got negative replies from endpoints [127.0.0.2:7012]");
+                // ignore, node wasn't down
             }
-            else
-            {
-                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
-            }
-
-            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 1));
-        }
-        finally
-        {
-            filter.off();
-        }
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
-    public void neighbourDown() throws InterruptedException, ExecutionException
+    @Test
+    public void neighbourDown()
     {
         String table = tableName("neighbourdown");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-        Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
-        String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> 
FBUtilities.getBroadcastAddressAndPort().toString());
-        try
-        {
-            // wait for the node to stop
-            shutdownFuture.get();
-            // wait for the failure detector to detect this
-            CLUSTER.get(1).runOnInstance(() -> {
-                InetAddressAndPort neighbor;
-                try
-                {
-                    neighbor = InetAddressAndPort.getByName(downNodeAddress);
-                }
-                catch (UnknownHostException e)
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+            Future<Void> shutdownFuture = CLUSTER.get(2).shutdown();
+            String downNodeAddress = CLUSTER.get(2).callOnInstance(() -> 
FBUtilities.getBroadcastAddressAndPort().toString());
+            try
+            {
+                // wait for the node to stop
+                shutdownFuture.get();
+                // wait for the failure detector to detect this
+                CLUSTER.get(1).runOnInstance(() -> {
+                    InetAddressAndPort neighbor;
+                    try
+                    {
+                        neighbor = 
InetAddressAndPort.getByName(downNodeAddress);
+                    }
+                    catch (UnknownHostException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    while (FailureDetector.instance.isAlive(neighbor))
+                        Uninterruptibles.sleepUninterruptibly(500, 
TimeUnit.MILLISECONDS);
+                });
+
+                long repairExceptions = getRepairExceptions(CLUSTER, 1);
+                NodeToolResult result = repair(1, KEYSPACE, table);
+                result.asserts()
+                      .failure()
+                      .errorContains("Endpoint not alive");
+                if (withNotifications)
                 {
-                    throw new RuntimeException(e);
+                    result.asserts()
+                          
.notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair 
command")
+                          
.notificationContains(NodeToolResult.ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
+                          
.notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not 
alive")
+                          
.notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with 
error");
                 }
-                while (FailureDetector.instance.isAlive(neighbor))
-                    Uninterruptibles.sleepUninterruptibly(500, 
TimeUnit.MILLISECONDS);
-            });
 
-            long repairExceptions = getRepairExceptions(CLUSTER, 1);
-            NodeToolResult result = repair(1, KEYSPACE, table);
-            result.asserts()
-                  .failure()
-                  .errorContains("Endpoint not alive");
-            if (withNotifications)
+                Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 1));
+            }
+            finally
             {
-                result.asserts()
-                      
.notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair 
command")
-                      
.notificationContains(NodeToolResult.ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
-                      
.notificationContains(NodeToolResult.ProgressEventType.ERROR, "Endpoint not 
alive")
-                      
.notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with 
error");
+                CLUSTER.get(2).startup();
             }
 
-            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 1));
-        }
-        finally
-        {
-            CLUSTER.get(2).startup();
-        }
-
-        // make sure to call outside of the try/finally so the node is up so 
we can actually query
-        if (repairType != RepairType.PREVIEW)
-        {
-            assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, 
table, "Endpoint not alive");
-        }
-        else
-        {
-            assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
-        }
+            // make sure to call outside of the try/finally so the node is up 
so we can actually query
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, 
table, "Endpoint not alive");
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+        });
     }
 
-    @Test(timeout = 1 * 60 * 1000)
+    @Test
     public void validationParticipentCrashesAndComesBack()
     {
         // Test what happens when a participant restarts in the middle of 
validation
         // Currently this isn't recoverable but could be.
         // TODO since this is a real restart, how would I test "long pause"? 
Can't send SIGSTOP since same procress
         String table = tableName("validationparticipentcrashesandcomesback");
-        CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value text, 
PRIMARY KEY (key))", KEYSPACE, table));
-        AtomicReference<Future<Void>> participantShutdown = new 
AtomicReference<>();
-        IMessageFilters.Filter filter = 
CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
-            // the nice thing about this is that this lambda is "capturing" 
and not "transfer", what this means is that
-            // this lambda isn't serialized and any object held isn't copied.
-            participantShutdown.set(CLUSTER.get(2).shutdown());
-            return true; // drop it so this node doesn't reply before shutdown.
-        })).drop();
-        try
-        {
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+            AtomicReference<Future<Void>> participantShutdown = new 
AtomicReference<>();
+            CLUSTER.verbs(Verb.VALIDATION_REQ).to(2).messagesMatching(of(m -> {
+                // the nice thing about this is that this lambda is 
"capturing" and not "transfer", what this means is that
+                // this lambda isn't serialized and any object held isn't 
copied.
+                participantShutdown.set(CLUSTER.get(2).shutdown());
+                return true; // drop it so this node doesn't reply before 
shutdown.
+            })).drop();
             // since nodetool is blocking, need to handle participantShutdown 
in the background
             CompletableFuture<Void> recovered = CompletableFuture.runAsync(() 
-> {
                 try {
@@ -215,15 +194,6 @@ public abstract class RepairCoordinatorSlow extends 
RepairCoordinatorBase
             {
                 assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
             }
-        }
-        finally
-        {
-            filter.off();
-            try {
-                CLUSTER.get(2).startup();
-            } catch (Exception e) {
-                // if you call startup twice it is allowed to fail, so ignore 
it... hope this didn't brike the other tests =x
-            }
-        }
+        });
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
new file mode 100644
index 0000000..7a08187
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/RepairCoordinatorTimeout.java
@@ -0,0 +1,67 @@
+package org.apache.cassandra.distributed.test;
+
+import java.time.Duration;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
+import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
+import org.apache.cassandra.net.Verb;
+
+import static java.lang.String.format;
+import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairFailedWithMessageContains;
+import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.assertParentRepairNotExist;
+import static 
org.apache.cassandra.distributed.test.DistributedRepairUtils.getRepairExceptions;
+import static org.apache.cassandra.utils.AssertUtil.assertTimeoutPreemptively;
+
+public abstract class RepairCoordinatorTimeout extends RepairCoordinatorBase
+{
+    public RepairCoordinatorTimeout(RepairType repairType, RepairParallelism 
parallelism, boolean withNotifications)
+    {
+        super(repairType, parallelism, withNotifications);
+    }
+
+    @Before
+    public void beforeTest()
+    {
+        CLUSTER.filters().reset();
+    }
+
+    @Test
+    public void prepareRPCTimeout()
+    {
+        String table = tableName("preparerpctimeout");
+        assertTimeoutPreemptively(Duration.ofMinutes(1), () -> {
+            CLUSTER.schemaChange(format("CREATE TABLE %s.%s (key text, value 
text, PRIMARY KEY (key))", KEYSPACE, table));
+            CLUSTER.verbs(Verb.PREPARE_MSG).drop();
+
+            long repairExceptions = getRepairExceptions(CLUSTER, 1);
+            NodeToolResult result = repair(1, KEYSPACE, table);
+            result.asserts()
+                  .failure()
+                  .errorContains("Got negative replies from endpoints 
[127.0.0.2:7012]");
+            if (withNotifications)
+            {
+                result.asserts()
+                      
.notificationContains(NodeToolResult.ProgressEventType.START, "Starting repair 
command")
+                      
.notificationContains(NodeToolResult.ProgressEventType.START, "repairing 
keyspace " + KEYSPACE + " with repair options")
+                      
.notificationContains(NodeToolResult.ProgressEventType.ERROR, "Got negative 
replies from endpoints [127.0.0.2:7012]")
+                      
.notificationContains(NodeToolResult.ProgressEventType.COMPLETE, "finished with 
error");
+            }
+
+            if (repairType != RepairType.PREVIEW)
+            {
+                assertParentRepairFailedWithMessageContains(CLUSTER, KEYSPACE, 
table, "Got negative replies from endpoints [127.0.0.2:7012]");
+            }
+            else
+            {
+                assertParentRepairNotExist(CLUSTER, KEYSPACE, table);
+            }
+
+            Assert.assertEquals(repairExceptions + 1, 
getRepairExceptions(CLUSTER, 1));
+        });
+    }
+}
diff --git a/test/unit/org/apache/cassandra/utils/AssertUtil.java 
b/test/unit/org/apache/cassandra/utils/AssertUtil.java
new file mode 100644
index 0000000..4d35ede
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/AssertUtil.java
@@ -0,0 +1,128 @@
+package org.apache.cassandra.utils;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+
+public final class AssertUtil
+{
+    private AssertUtil()
+    {
+
+    }
+
+    /**
+     * Launch the input in another thread, throws a assert failure if it takes 
longer than the defined timeout.
+     *
+     * An attempt to halt the thread uses an interrupt, but only works if the 
underline logic respects it.
+     *
+     * The assert message will contain the stacktrace at the time of the 
timeout; grouped by common threads.
+     */
+    public static void assertTimeoutPreemptively(Duration timeout, Executable 
fn)
+    {
+        StackTraceElement caller = Thread.currentThread().getStackTrace()[2];
+        assertTimeoutPreemptively(caller, timeout, () -> {
+            fn.execute();
+            return null;
+        });
+    }
+
+    /**
+     * Launch the input in another thread, throws a assert failure if it takes 
longer than the defined timeout.
+     *
+     * An attempt to halt the thread uses an interrupt, but only works if the 
underline logic respects it.
+     *
+     * The assert message will contain the stacktrace at the time of the 
timeout; grouped by common threads.
+     */
+    public static <T> T assertTimeoutPreemptively(Duration timeout, 
ThrowingSupplier<T> supplier)
+    {
+        StackTraceElement caller = Thread.currentThread().getStackTrace()[2];
+        return assertTimeoutPreemptively(caller, timeout, supplier);
+    }
+
+    private static <T> T assertTimeoutPreemptively(StackTraceElement caller, 
Duration timeout, ThrowingSupplier<T> supplier)
+    {
+
+        String[] split = caller.getClassName().split("\\.");
+        String simpleClassName = split[split.length - 1];
+        ExecutorService executorService = 
Executors.newSingleThreadExecutor(new NamedThreadFactory("TimeoutTest-" + 
simpleClassName + "#" + caller.getMethodName()));
+        try
+        {
+            Future<T> future = executorService.submit(() -> {
+                try {
+                    return supplier.get();
+                }
+                catch (Throwable throwable) {
+                    throw Throwables.throwAsUncheckedException(throwable);
+                }
+            });
+
+            long timeoutInNanos = timeout.toNanos();
+            try
+            {
+                return future.get(timeoutInNanos, TimeUnit.NANOSECONDS);
+            }
+            catch (TimeoutException ex)
+            {
+                future.cancel(true);
+                Map<Thread, StackTraceElement[]> threadDump = 
Thread.getAllStackTraces();
+                StringBuilder sb = new StringBuilder("execution timed out 
after ").append(TimeUnit.NANOSECONDS.toMillis(timeoutInNanos)).append(" ms\n");
+                Multimap<List<StackTraceElement>, Thread> groupCommonThreads = 
HashMultimap.create();
+                for (Map.Entry<Thread, StackTraceElement[]> e : 
threadDump.entrySet())
+                    groupCommonThreads.put(Arrays.asList(e.getValue()), 
e.getKey());
+
+                for (Map.Entry<List<StackTraceElement>, Collection<Thread>> e 
: groupCommonThreads.asMap().entrySet())
+                {
+                    sb.append("Threads: ");
+                    Joiner.on(", ").appendTo(sb, 
e.getValue().stream().map(Thread::getName).iterator());
+                    sb.append("\n");
+                    for (StackTraceElement elem : e.getKey())
+                        
sb.append("\t").append(elem.getClassName()).append(".").append(elem.getMethodName()).append("[").append(elem.getLineNumber()).append("]\n");
+                    sb.append("\n");
+                }
+                throw new AssertionError(sb.toString());
+            }
+            catch (InterruptedException e)
+            {
+                Thread.currentThread().interrupt();
+                throw Throwables.throwAsUncheckedException(e);
+            }
+            catch (ExecutionException ex)
+            {
+                throw Throwables.throwAsUncheckedException(ex.getCause());
+            }
+            catch (Throwable ex)
+            {
+                throw Throwables.throwAsUncheckedException(ex);
+            }
+        }
+        finally
+        {
+            executorService.shutdownNow();
+        }
+    }
+
+    public interface ThrowingSupplier<T>
+    {
+        T get() throws Throwable;
+    }
+
+    public interface Executable
+    {
+        void execute() throws Throwable;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to