This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit d1fbca392aa5b2232f82970a0c87cad3e6eacab8
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Mar 30 16:54:36 2023 +0100

    [CEP-21] Rewrite o.a.c.distributed.test.SchemaTest
    
    patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Alex Petrov
    for CASSANDRA-18459
---
 .../cassandra/distributed/impl/Instance.java       |   1 +
 .../distributed/impl/TestChangeListener.java       | 119 +++++++++++++++++
 .../cassandra/distributed/shared/ClusterUtils.java | 134 +++++++++++++++++++
 .../cassandra/distributed/test/SchemaTest.java     | 148 ++++++++++++++-------
 .../distributed/test/log/ConsistentLeaveTest.java  |  19 ++-
 .../distributed/test/log/ConsistentMoveTest.java   |  22 ++-
 .../distributed/test/log/FailedLeaveTest.java      |   2 +
 .../distributed/test/log/FuzzTestBase.java         |  66 ---------
 .../log/InProgressSequenceCoordinationTest.java    |   1 +
 .../distributed/test/log/ResumableStartupTest.java |   1 +
 .../test/ring/ConsistentBootstrapTest.java         |  29 ++--
 .../distributed/test/schema/SchemaTest.java        |  55 --------
 12 files changed, 389 insertions(+), 208 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 40451380ea..9468692790 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -696,6 +696,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
                                 registerOutboundFilter(cluster);
         });
         QueryProcessor.registerStatementInvalidatingListener();
+        TestChangeListener.register();
 
         // We need to persist this as soon as possible after startup checks.
         // This should be the first write to SystemKeyspace (CASSANDRA-11742)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/TestChangeListener.java
 
b/test/distributed/org/apache/cassandra/distributed/impl/TestChangeListener.java
new file mode 100644
index 0000000000..b73e9a9b8f
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/TestChangeListener.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.impl;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Predicate;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.listeners.ChangeListener;
+import org.apache.cassandra.utils.concurrent.WaitQueue;
+
+public class TestChangeListener implements ChangeListener
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(TestChangeListener.class);
+    public static final TestChangeListener instance = new TestChangeListener();
+
+    public static void register()
+    {
+        logger.debug("Registered TestChangeListener");
+        ClusterMetadataService.instance().log().addListener(instance);
+    }
+
+    private final List<Predicate<Epoch>> preCommitPredicates = new 
ArrayList<>();
+    private final List<Predicate<Epoch>> postCommitPredicates = new 
ArrayList<>();
+    private final WaitQueue waiters = WaitQueue.newWaitQueue();
+
+    @Override
+    public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next)
+    {
+        Iterator<Predicate<Epoch>> iter = preCommitPredicates.iterator();
+        while (iter.hasNext())
+        {
+            if (iter.next().test(next.epoch))
+            {
+                logger.debug("Epoch matches pre-commit predicate, pausing");
+                pause();
+                iter.remove();
+            }
+        }
+    }
+
+    @Override
+    public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next)
+    {
+        Iterator<Predicate<Epoch>> iter = postCommitPredicates.iterator();
+        while (iter.hasNext())
+        {
+            if (iter.next().test(next.epoch))
+            {
+                logger.debug("Epoch matches post-commit predicate, pausing");
+                pause();
+                iter.remove();
+            }
+        }
+    }
+
+    public void pauseBefore(Epoch epoch, Runnable onMatch)
+    {
+        logger.debug("Requesting pause before enacting {}", epoch);
+        preCommitPredicates.add((e) -> {
+            if (e.is(epoch))
+            {
+                onMatch.run();
+                return true;
+            }
+            return false;
+        });
+    }
+
+    public void pauseAfter(Epoch epoch, Runnable onMatch)
+    {
+        logger.debug("Requesting pause after enacting {}", epoch);
+        postCommitPredicates.add((e) -> {
+            if (e.is(epoch))
+            {
+                onMatch.run();
+                return true;
+            }
+            return false;
+        });
+    }
+
+    public void pause()
+    {
+        WaitQueue.Signal signal = waiters.register();
+        logger.debug("Log follower is paused, waiting...");
+        signal.awaitUninterruptibly();
+        logger.debug("Resumed log follower...");
+    }
+
+    public void unpause()
+    {
+        logger.debug("Unpausing log follower");
+        waiters.signalAll();
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java 
b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
index b28fe90685..a1a23ea211 100644
--- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.distributed.shared;
 
+import java.io.Serializable;
 import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.security.Permission;
@@ -31,10 +32,12 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
+import java.util.function.BiPredicate;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.regex.Matcher;
@@ -61,6 +64,8 @@ import org.apache.cassandra.distributed.impl.AbstractCluster;
 import org.apache.cassandra.distributed.impl.InstanceConfig;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.VersionedValue;
+import org.apache.cassandra.distributed.impl.TestChangeListener;
+import org.apache.cassandra.distributed.test.log.TestProcessor;
 import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
@@ -72,11 +77,14 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Commit;
 import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.Transformation;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.ownership.PlacementForRange;
 import org.apache.cassandra.tools.SystemExitException;
 import org.apache.cassandra.utils.Isolated;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 
 import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
@@ -413,6 +421,132 @@ public class ClusterUtils
         ClusterUtils.waitForCMSToQuiesce(cluster, 
getClusterMetadataVersion(leader), ignored);
     }
 
+    public static Callable<Void> pauseBeforeEnacting(IInvokableInstance 
instance, Epoch epoch)
+    {
+        return pauseBeforeEnacting(instance, epoch, 10, TimeUnit.SECONDS);
+    }
+
+    protected static Callable<Void> pauseBeforeEnacting(IInvokableInstance 
instance,
+                                                        Epoch epoch,
+                                                        long wait,
+                                                        TimeUnit waitUnit)
+    {
+        return instance.callOnInstance(() -> {
+            TestChangeListener listener = TestChangeListener.instance;
+            AsyncPromise<?> promise = new AsyncPromise<>();
+            listener.pauseBefore(epoch, () -> promise.setSuccess(null));
+            return () -> {
+                try
+                {
+                    promise.get(wait, waitUnit);
+                    return null;
+                }
+                catch (Throwable e)
+                {
+                    throw new RuntimeException(e);
+                }
+            };
+        });
+    }
+
+    public static Callable<Void> pauseAfterEnacting(IInvokableInstance 
instance, Epoch epoch)
+    {
+        return pauseAfterEnacting(instance, epoch, 10, TimeUnit.SECONDS);
+    }
+
+    protected static Callable<Void> pauseAfterEnacting(IInvokableInstance 
instance,
+                                                       Epoch epoch,
+                                                       long wait,
+                                                       TimeUnit waitUnit)
+    {
+        return instance.callOnInstance(() -> {
+            TestChangeListener listener = TestChangeListener.instance;
+            AsyncPromise<?> promise = new AsyncPromise<>();
+            listener.pauseAfter(epoch, () -> promise.setSuccess(null));
+            return () -> {
+                try
+                {
+                    promise.get(wait, waitUnit);
+                    return null;
+                }
+                catch (Throwable e)
+                {
+                    throw new RuntimeException(e);
+                }
+            };
+        });
+    }
+
+    public static Callable<Epoch> pauseBeforeCommit(IInvokableInstance 
cmsInstance, SerializablePredicate<Transformation> predicate)
+    {
+        Callable<Long> remoteCallable = cmsInstance.callOnInstance(() -> {
+            TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
+            AsyncPromise<Epoch> promise = new AsyncPromise<>();
+            processor.pauseIf(predicate, () -> 
promise.setSuccess(ClusterMetadata.current().epoch));
+            return () -> {
+                try
+                {
+                    return promise.get(30, TimeUnit.SECONDS).getEpoch();
+                }
+                catch (Throwable e)
+                {
+                    throw new RuntimeException(e);
+                }
+            };
+        });
+        return () -> Epoch.create(remoteCallable.call());
+
+    }
+
+    public static Callable<Epoch> getSequenceAfterCommit(IInvokableInstance 
cmsInstance,
+                                                         
SerializableBiPredicate<Transformation, Commit.Result> predicate)
+    {
+        Callable<Long> remoteCallable = cmsInstance.callOnInstance(() -> {
+            TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
+
+            AsyncPromise<Epoch> promise = new AsyncPromise<>();
+            processor.registerCommitPredicate((event, result) -> {
+                if (predicate.test(event, result))
+                {
+                    
promise.setSuccess(result.success().replication.latestEpoch());
+                    return true;
+                }
+
+                return false;
+            });
+            return () -> {
+                try
+                {
+                    return promise.get(30, TimeUnit.SECONDS).getEpoch();
+                }
+                catch (Throwable e)
+                {
+                    throw new RuntimeException(e);
+                }
+            };
+        });
+
+        return () -> Epoch.create(remoteCallable.call());
+    }
+
+    public static void unpauseCommits(IInvokableInstance instance)
+    {
+        instance.runOnInstance(() -> {
+            TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
+            processor.unpause();
+        });
+    }
+
+    public static void unpauseEnactment(IInvokableInstance instance)
+    {
+        instance.runOnInstance(() -> TestChangeListener.instance.unpause());
+    }
+
+    public static interface SerializablePredicate<T> extends Predicate<T>, 
Serializable
+    {}
+
+    public static interface SerializableBiPredicate<T1, T2> extends 
BiPredicate<T1, T2>, Serializable {}
+
     private static class ClusterMetadataVersion
     {
         public final int node;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
index 3d54e53712..4104ea7f02 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.util.concurrent.Callable;
+
 import org.junit.Test;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
@@ -27,10 +29,17 @@ import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.transformations.AlterSchema;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionFactory;
 
 import static java.time.Duration.ofSeconds;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.pauseAfterEnacting;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeCommit;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeEnacting;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.unpauseCommits;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.unpauseEnactment;
 import static 
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
 import static org.junit.Assert.assertTrue;
 
@@ -46,15 +55,40 @@ public class SchemaTest extends TestBaseImpl
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int,  primary key (pk, ck))");
             String name = "aaa";
-            cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE + 
".tbl ADD " + name + " list<int>");
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1, v2) values (?,1,1,1)", 1);
-            selectSilent(cluster, name);
+            // have the CMS node pause directly before committing the ALTER 
TABLE so we can infer the next epoch
+            Callable<Epoch> beforeCommit = pauseBeforeCommit(cluster.get(1), 
(e) -> e instanceof AlterSchema);
+            new Thread(() -> {
+                cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE 
+ ".tbl ADD " + name + " list<int>");
+                cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1, v2) values (?,1,1,1)", 1);
+            }).start();
+
+            Epoch targetEpoch = beforeCommit.call().nextEpoch();
+            // pause the replica immediately before and after enacting the 
ALTER TABLE stmt
+            Callable<?> beforeEnactedOnReplica = 
pauseBeforeEnacting(cluster.get(2), targetEpoch);
+            Callable<?> afterEnactedOnReplica = 
pauseAfterEnacting(cluster.get(2), targetEpoch);
+            // unpause the CMS node and allow it to commit and replicate the 
ALTER TABLE
+            unpauseCommits(cluster.get(1));
+
+            // Wait for the replica to signal that it has paused before 
enacting the schema change
+            // then execute the query and assert that a schema disagreement 
error was triggered
+            beforeEnactedOnReplica.call();
+            selectExpectingError(cluster, name);
+
+            // unpause the replica and wait until it notifies that it has 
enacted the schema change
+            unpauseEnactment(cluster.get(2));
+            afterEnactedOnReplica.call();
+            unpauseEnactment(cluster.get(2));
 
             cluster.get(2).flush(KEYSPACE);
-            cluster.get(2).schemaChangeInternal("ALTER TABLE " + KEYSPACE + 
".tbl ADD " + name + " list<int>");
+            // now that the replica has enacted the alter table, an attempt to 
repeat it should be rejected
+            alterTableExpectingError(cluster.get(2), name);
+            // bouncing the replica should be safe as SSTables aren't loaded 
until the log replay is complete
+            // and the schema is in it's most up to date state
             cluster.get(2).shutdown().get();
             cluster.get(2).startup();
+            cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl 
WHERE pk = ?"), ConsistencyLevel.ALL, 1);
             cluster.get(2).forceCompact(KEYSPACE, "tbl");
+            cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl 
WHERE pk = ?"), ConsistencyLevel.ALL, 1);
         }
     }
 
@@ -65,21 +99,50 @@ public class SchemaTest extends TestBaseImpl
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int,  primary key (pk, ck))");
             String name = "v10";
-            cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE + 
".tbl ADD " + name + " list<int>");
-            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1, v2) values (?,1,1,1)", 1);
-            selectSilent(cluster, name);
+
+            // have the CMS node pause directly before committing the ALTER 
TABLE so we can infer the next epoch
+            Callable<Epoch> beforeCommit = pauseBeforeCommit(cluster.get(1), 
(e) -> e instanceof AlterSchema);
+            new Thread(() -> {
+                cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE 
+ ".tbl ADD " + name + " list<int>");
+                cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v1, v2) values (?,1,1,1)", 1);
+            }).start();
+            Epoch targetEpoch = beforeCommit.call().nextEpoch();
+
+            // pause the replica immediately before and after enacting the 
ALTER TABLE stmt
+            Callable<?> beforeEnactedOnReplica = 
pauseBeforeEnacting(cluster.get(2), targetEpoch);
+            Callable<?> afterEnactedOnReplica = 
pauseAfterEnacting(cluster.get(2), targetEpoch);
+            // unpause the CMS node and allow it to commit and replicate the 
ALTER TABLE
+            unpauseCommits(cluster.get(1));
+
+            // Wait for the replica to signal that it has paused before 
enacting the schema change
+            // then execute the query and assert that a schema disagreement 
error was triggered
+            beforeEnactedOnReplica.call();
+            selectExpectingError(cluster, name);
+
+            // unpause the replica and wait until it notifies that it has 
enacted the schema change
+            unpauseEnactment(cluster.get(2));
+            afterEnactedOnReplica.call();
+            unpauseEnactment(cluster.get(2));
+
             cluster.get(2).flush(KEYSPACE);
-            cluster.get(2).schemaChangeInternal("ALTER TABLE " + KEYSPACE + 
".tbl ADD " + name + " list<int>");
+            // now that the replica has enacted the alter table, an attempt to 
repeat it should be rejected
+            alterTableExpectingError(cluster.get(2), name);
             cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1, v2, " + name + ") values (?,1,1,1,[1])", 1);
             cluster.get(2).flush(KEYSPACE);
             cluster.get(2).forceCompact(KEYSPACE, "tbl");
+
+            // bouncing the replica should be safe as SSTables aren't loaded 
until the log replay is complete
+            // and the schema is in it's most up to date state
             cluster.get(2).shutdown().get();
             cluster.get(2).startup();
+
+            cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl 
WHERE pk = ?"), ConsistencyLevel.ALL, 1);
             cluster.get(2).forceCompact(KEYSPACE, "tbl");
+            cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl 
WHERE pk = ?"), ConsistencyLevel.ALL, 1);
         }
     }
 
-    private void selectSilent(Cluster cluster, String name)
+    private void selectExpectingError(Cluster cluster, String name)
     {
         try
         {
@@ -99,26 +162,42 @@ public class SchemaTest extends TestBaseImpl
         }
     }
 
+    private void alterTableExpectingError(IInvokableInstance instance, String 
name)
+    {
+        try
+        {
+            instance.schemaChangeInternal("ALTER TABLE " + KEYSPACE + ".tbl 
ADD " + name + " list<int>");
+        }
+        catch (Exception e)
+        {
+            boolean causeIsColumnExists = false;
+            Throwable cause = e;
+            while (cause != null)
+            {
+                if (cause.getMessage() != null && 
cause.getMessage().contains("Column with name '" + name + "' already exists"))
+                    causeIsColumnExists = true;
+                cause = cause.getCause();
+            }
+            assertTrue(causeIsColumnExists);
+        }
+    }
+
     /**
-     * The purpose of this test is to verify manual schema reset functinality.
+     * The original purpose of this test was to verify manual schema reset 
functionality, but with schema updates being
+     * serialized in the cluster metadata log local schema reset no longer 
makes sense so the assertions have been
+     * modified to verify that schema changes are correctly propagated to down 
nodes once they come back up.
      * <p>
      * There is a 2-node cluster and a TABLE_ONE created. The schema version 
is agreed on both nodes. Then the 2nd node
      * is shutdown. We introduce a disagreement by dropping TABLE_ONE and 
creating TABLE_TWO on the 1st node. Therefore,
      * the 1st node has a newer schema version with TABLE_TWO, while the 
shutdown 2nd node has older schema version with
      * TABLE_ONE.
      * <p>
-     * At this point, if we just started the 2nd node, it would sync its 
schema by getting fresh mutations from the 1st
-     * node which would result in both nodes having only the definition of 
TABLE_TWO.
-     * <p>
-     * However, before starting the 2nd node the schema is reset on the 1st 
node, so the 1st node will discard its local
-     * schema whenever it manages to fetch a schema definition from some other 
node (the 2nd node in this case).
-     * It is expected to end up with both nodes having only the definition of 
TABLE_ONE.
+     * At this point, if we just start the 2nd node, it would sync its schema 
by getting the transformations that it
+     * missed while down, which would result in both nodes having only the 
definition of TABLE_TWO.
      * <p>
-     * In the second phase of the test we simply break the schema on the 1st 
node and call reset to fetch the schema
-     * definition it from the 2nd node.
      */
     @Test
-    public void schemaReset() throws Throwable
+    public void schemaPropagationToDownNode() throws Throwable
     {
         CassandraRelevantProperties.MIGRATION_DELAY.setLong(10000);
         CassandraRelevantProperties.SCHEMA_PULL_INTERVAL_MS.setLong(10000);
@@ -137,43 +216,16 @@ public class SchemaTest extends TestBaseImpl
                                         .allMatch(e -> 
e.equals(getBroadcastAddressAndPort()));
             }));
 
-            // when there is no node to fetch the schema from, reset local 
schema should immediately fail
-//            
Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> {
-//                cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
-//            }).withMessageContaining("Cannot reset local schema when there 
are no other live nodes");
-
             // now, let's make a disagreement, the shutdown node 2 has a 
definition of TABLE_ONE, while the running
             // node 1 will have a definition of TABLE_TWO
             cluster.coordinator(1).execute(String.format("DROP TABLE %s.%s", 
KEYSPACE, TABLE_ONE), ConsistencyLevel.ONE);
             cluster.coordinator(1).execute(String.format("CREATE TABLE %s.%s 
(pk INT PRIMARY KEY, v TEXT)", KEYSPACE, TABLE_TWO), ConsistencyLevel.ONE);
             await(30).until(() -> checkTablesPropagated(cluster.get(1), false, 
true));
 
-            // Schema.resetLocalSchema is guarded by some conditions which 
would not let us reset schema if there is no
-            // live node in the cluster, therefore we simply call 
SchemaUpdateHandler.clear (this is the only real thing
-            // being done by Schema.resetLocalSchema under the hood)
-//            SerializableCallable<Boolean> clear = () -> 
Schema.instance.updateHandler.clear().awaitUninterruptibly(1, TimeUnit.MINUTES);
-//            Future<Boolean> clear1 = 
cluster.get(1).asyncCallsOnInstance(clear).call();
-//            assertFalse(clear1.isDone());
-
             // when the 2nd node is started, schema should be back in sync
             cluster.get(2).startup();
-//            await(30).until(() -> clear1.isDone() && clear1.get());
-
-            // this proves that reset schema works on the 1st node - the most 
recent change should be discarded because
-            // it receives the schema from the 2nd node and applies it on 
empty schema
-            await(60).until(() -> checkTablesPropagated(cluster.get(1), true, 
false));
-
-            // now let's break schema locally and let it be reset
-//            cluster.get(1).runOnInstance(() -> 
Schema.instance.getLocalKeyspaces()
-//                                                              
.get(SchemaConstants.SCHEMA_KEYSPACE_NAME)
-//                                                              
.get().tables.forEach(t -> ColumnFamilyStore.getIfExists(t.keyspace, 
t.name).truncateBlockingWithoutSnapshot()));
-
-            // when schema is removed and there is a node to fetch it from, 
the 1st node should immediately restore it
-//            cluster.get(1).runOnInstance(() -> 
Schema.instance.resetLocalSchema());
-            // note that we should not wait for this to be true because 
resetLocalSchema is blocking
-            // and after successfully completing it, the schema should be 
already back in sync
-            assertTrue(checkTablesPropagated(cluster.get(1), true, false));
-            assertTrue(checkTablesPropagated(cluster.get(2), true, false));
+            assertTrue(checkTablesPropagated(cluster.get(1), false, true));
+            assertTrue(checkTablesPropagated(cluster.get(2), false, true));
         }
     }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentLeaveTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentLeaveTest.java
index b502fbbb48..0dd2c887fc 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentLeaveTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentLeaveTest.java
@@ -38,18 +38,20 @@ import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.fuzz.HarryHelper;
 import org.apache.cassandra.distributed.fuzz.InJVMTokenAwareVisitorExecutor;
 import org.apache.cassandra.distributed.fuzz.InJvmSut;
-import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicationFactor;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.transformations.PrepareLeave;
 
 import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getClusterMetadataVersion;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeCommit;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.unpauseCommits;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce;
 import static org.junit.Assert.assertFalse;
 
 public class ConsistentLeaveTest extends FuzzTestBase
@@ -70,7 +72,7 @@ public class ConsistentLeaveTest extends FuzzTestBase
         {
             IInvokableInstance cmsInstance = cluster.get(1);
             IInvokableInstance leavingInstance = cluster.get(2);
-            ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance);
+            waitForCMSToQuiesce(cluster, cmsInstance);
 
             configBuilder.setSUT(() -> new InJvmSut(cluster));
             Run run = configBuilder.build().createRun();
@@ -79,7 +81,7 @@ public class ConsistentLeaveTest extends FuzzTestBase
                                            " WITH replication = {'class': 
'SimpleStrategy', 'replication_factor' : 2};",
                                            ConsistencyLevel.ALL);
             cluster.coordinator(1).execute(run.schemaSpec.compile().cql(), 
ConsistencyLevel.ALL);
-            ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance);
+            waitForCMSToQuiesce(cluster, cmsInstance);
 
             QuiescentLocalStateChecker model = new 
QuiescentLocalStateChecker(run, ReplicationFactor.fullOnly(2));
             Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new);
@@ -103,20 +105,17 @@ public class ConsistentLeaveTest extends FuzzTestBase
             model.validateAll();
 
             // Make sure there can be only one FinishLeave in flight
-            ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance);
+            waitForCMSToQuiesce(cluster, cmsInstance);
             // set expectation of finish leave & retrieve the sequence when it 
gets committed
             Epoch currentEpoch = getClusterMetadataVersion(cmsInstance);
             Callable<Epoch> finishedLeaving = 
getSequenceAfterCommit(cmsInstance, (e, r) -> e instanceof 
PrepareLeave.FinishLeave && r.isSuccess());
-            cmsInstance.runOnInstance(() -> {
-                TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
-                processor.unpause();
-            });
+            unpauseCommits(cmsInstance);
             Epoch nextEpoch = finishedLeaving.call();
             Assert.assertEquals(String.format("Epoch %s should have 
immediately superseded epoch %s.", nextEpoch, currentEpoch),
                                 nextEpoch.getEpoch(), currentEpoch.getEpoch() 
+ 1);
 
             // wait for the cluster to all witness the finish join event
-            ClusterUtils.waitForCMSToQuiesce(cluster, nextEpoch);
+            waitForCMSToQuiesce(cluster, nextEpoch);
 
             assertGossipStatus(cluster, leavingInstance.config().num(), 
"LEFT");
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentMoveTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentMoveTest.java
index 425cd7a62c..d046fc5021 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentMoveTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentMoveTest.java
@@ -41,17 +41,19 @@ import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.fuzz.HarryHelper;
 import org.apache.cassandra.distributed.fuzz.InJVMTokenAwareVisitorExecutor;
 import org.apache.cassandra.distributed.fuzz.InJvmSut;
-import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicationFactor;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.transformations.PrepareMove;
 
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeCommit;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.unpauseCommits;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -73,7 +75,7 @@ public class ConsistentMoveTest extends FuzzTestBase
         {
             IInvokableInstance cmsInstance = cluster.get(1);
             IInvokableInstance movingInstance = cluster.get(2);
-            ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance);
+            waitForCMSToQuiesce(cluster, cmsInstance);
 
             configBuilder.setSUT(() -> new InJvmSut(cluster));
             Run run = configBuilder.build().createRun();
@@ -82,7 +84,7 @@ public class ConsistentMoveTest extends FuzzTestBase
                                            " WITH replication = {'class': 
'SimpleStrategy', 'replication_factor' : 2};",
                                            ConsistencyLevel.ALL);
             cluster.coordinator(1).execute(run.schemaSpec.compile().cql(), 
ConsistencyLevel.ALL);
-            ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance);
+            waitForCMSToQuiesce(cluster, cmsInstance);
 
             FuzzTestBase.QuiescentLocalStateChecker model = new 
FuzzTestBase.QuiescentLocalStateChecker(run, ReplicationFactor.fullOnly(2));
             Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new);
@@ -92,7 +94,7 @@ public class ConsistentMoveTest extends FuzzTestBase
             model.validateAll();
 
             // Make sure there can be only one FinishLeave in flight
-            ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance);
+            waitForCMSToQuiesce(cluster, cmsInstance);
 
             Callable<?> pending = pauseBeforeCommit(cmsInstance, (e) -> e 
instanceof PrepareMove.FinishMove);
             new Thread(() -> {
@@ -103,15 +105,11 @@ public class ConsistentMoveTest extends FuzzTestBase
 
             assertGossipStatus(cluster, movingInstance.config().num(), 
"MOVING");
 
+            // wait for the cluster to all witness the finish join event
             Callable<Epoch> finishedMoving = 
getSequenceAfterCommit(cmsInstance, (e, r) -> e instanceof 
PrepareMove.FinishMove && r.isSuccess());
-            cmsInstance.runOnInstance(() -> {
-                TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
-                processor.unpause();
-            });
-
+            unpauseCommits(cmsInstance);
             Epoch nextEpoch = finishedMoving.call();
-            // wait for the cluster to all witness the finish join event
-            ClusterUtils.waitForCMSToQuiesce(cluster, nextEpoch);
+            waitForCMSToQuiesce(cluster, nextEpoch);
 
             // Streaming for unbootstrap has finished, any rows from the first 
batch should have been transferred
             // from the leaving node to the new replicas. Continue to write at 
ONE, replication of these rows will
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java
index b7509ff40f..a22eb09167 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.distributed.fuzz.HarryHelper;
 import org.apache.cassandra.distributed.fuzz.InJVMTokenAwareVisitorExecutor;
 import org.apache.cassandra.distributed.fuzz.InJvmSut;
 import org.apache.cassandra.distributed.shared.ClusterUtils;
+import 
org.apache.cassandra.distributed.shared.ClusterUtils.SerializableBiPredicate;
 import org.apache.cassandra.locator.ReplicationFactor;
 import org.apache.cassandra.tcm.Commit;
 import org.apache.cassandra.tcm.Epoch;
@@ -58,6 +59,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
 import static 
org.apache.cassandra.distributed.shared.ClusterUtils.cancelInProgressSequences;
 import static 
org.apache.cassandra.distributed.shared.ClusterUtils.decommission;
 import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getClusterMetadataVersion;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit;
 
 public class FailedLeaveTest extends FuzzTestBase
 {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/FuzzTestBase.java 
b/test/distributed/org/apache/cassandra/distributed/test/log/FuzzTestBase.java
index f86b800ae7..02f6b3eb91 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/FuzzTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/FuzzTestBase.java
@@ -19,14 +19,10 @@
 package org.apache.cassandra.distributed.test.log;
 
 
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiPredicate;
-import java.util.function.Predicate;
 
 import org.junit.BeforeClass;
 
@@ -38,19 +34,12 @@ import harry.model.sut.SystemUnderTest;
 import harry.operations.CompiledStatement;
 import harry.operations.Query;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.fuzz.HarryHelper;
 import org.apache.cassandra.distributed.fuzz.InJvmSut;
-import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.test.ExecUtil;
 import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.locator.ReplicationFactor;
-import org.apache.cassandra.tcm.Commit;
-import org.apache.cassandra.tcm.Transformation;
-import org.apache.cassandra.tcm.ClusterMetadataService;
-import org.apache.cassandra.tcm.Epoch;
-import org.apache.cassandra.utils.concurrent.AsyncPromise;
 
 import static harry.model.SelectHelper.resultSetToRow;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
@@ -74,58 +63,6 @@ public class FuzzTestBase extends TestBaseImpl
                                .set("request_timeout", String.format("%dms", 
TimeUnit.MINUTES.toMillis(10))));
     }
 
-    protected static Callable<Void> pauseBeforeCommit(IInvokableInstance 
cmsInstance, SerializablePredicate<Transformation> predicate)
-    {
-        return cmsInstance.callOnInstance(() -> {
-            TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
-            AsyncPromise<?> promise = new AsyncPromise<>();
-            processor.pauseIf(predicate, () -> promise.setSuccess(null));
-            return () -> {
-                try
-                {
-                    promise.get(30, TimeUnit.SECONDS);
-                    return null;
-                }
-                catch (Throwable e)
-                {
-                    throw new RuntimeException(e);
-                }
-            };
-        });
-    }
-
-    // todo; assumes period = 1
-    protected static Callable<Epoch> getSequenceAfterCommit(IInvokableInstance 
cmsInstance,
-                                                            
SerializableBiPredicate<Transformation, Commit.Result> predicate)
-    {
-        Callable<Long> remoteCallable = cmsInstance.callOnInstance(() -> {
-            TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
-
-            AsyncPromise<Epoch> promise = new AsyncPromise<>();
-            processor.registerCommitPredicate((event, result) -> {
-                if (predicate.test(event, result))
-                {
-                    
promise.setSuccess(result.success().replication.latestEpoch());
-                    return true;
-                }
-
-                return false;
-            });
-            return () -> {
-                try
-                {
-                    return ClusterUtils.encode(promise.get(30, 
TimeUnit.SECONDS));
-                }
-                catch (Throwable e)
-                {
-                    throw new RuntimeException(e);
-                }
-            };
-        });
-
-        return () -> ClusterUtils.decode(remoteCallable.call());
-    }
-
     public static IIsolatedExecutor.SerializableRunnable 
toRunnable(ExecUtil.ThrowingSerializableRunnable runnable)
     {
         return () -> {
@@ -202,7 +139,4 @@ public class FuzzTestBase extends TestBaseImpl
             }
         }
     }
-
-    public static interface SerializablePredicate<T> extends Predicate<T>, 
Serializable {}
-    public static interface SerializableBiPredicate<T1, T2> extends 
BiPredicate<T1, T2>, Serializable {}
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
index 0cd3046d57..3d1b4c0102 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java
@@ -46,6 +46,7 @@ import static 
org.apache.cassandra.config.CassandraRelevantProperties.PROGRESS_B
 import static 
org.apache.cassandra.distributed.Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN;
 import static 
org.apache.cassandra.distributed.Constants.KEY_DTEST_FULL_STARTUP;
 import static org.apache.cassandra.distributed.shared.ClusterUtils.addInstance;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit;
 import static org.apache.cassandra.net.Verb.TCM_REPLICATION;
 import static 
org.apache.cassandra.tcm.sequences.InProgressSequences.SequenceState.BLOCKED;
 import static 
org.apache.cassandra.tcm.sequences.InProgressSequences.SequenceState.CONTINUING;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ResumableStartupTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ResumableStartupTest.java
index 83f8d77c89..c1a31776b5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ResumableStartupTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ResumableStartupTest.java
@@ -43,6 +43,7 @@ import org.apache.cassandra.service.StorageService;
 
 import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getClusterMetadataVersion;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit;
 
 public class ResumableStartupTest extends FuzzTestBase
 {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java
index 8f11c02553..d523a32ce5 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java
@@ -37,14 +37,16 @@ import 
org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.TokenSupplier;
 import org.apache.cassandra.distributed.fuzz.HarryHelper;
 import org.apache.cassandra.distributed.fuzz.InJvmSut;
-import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.distributed.test.log.FuzzTestBase;
-import org.apache.cassandra.distributed.test.log.TestProcessor;
-import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.transformations.PrepareJoin;
 
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeCommit;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.unpauseCommits;
+import static 
org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce;
+
 public class ConsistentBootstrapTest extends FuzzTestBase
 {
     private static int WRITES = 2000;
@@ -63,7 +65,7 @@ public class ConsistentBootstrapTest extends FuzzTestBase
                                         .start())
         {
             IInvokableInstance cmsInstance = cluster.get(1);
-            ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance);
+            waitForCMSToQuiesce(cluster, cmsInstance);
             configBuilder.setSUT(() -> new InJvmSut(cluster));
             Run run = configBuilder.build().createRun();
 
@@ -71,7 +73,7 @@ public class ConsistentBootstrapTest extends FuzzTestBase
                                            " WITH replication = {'class': 
'SimpleStrategy', 'replication_factor' : 3};",
                                            ConsistencyLevel.ALL);
             cluster.coordinator(1).execute(run.schemaSpec.compile().cql(), 
ConsistencyLevel.ALL);
-            ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1));
+            waitForCMSToQuiesce(cluster, cluster.get(1));
             Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new);
             QuiescentLocalStateChecker model = new 
QuiescentLocalStateChecker(run);
             System.out.println("Starting write phase...");
@@ -86,7 +88,7 @@ public class ConsistentBootstrapTest extends FuzzTestBase
                                             
.set(Constants.KEY_DTEST_FULL_STARTUP, true);
             IInvokableInstance newInstance = cluster.bootstrap(config);
 
-            // Prime the DPS node to pause before the finish join event is 
committed
+            // Prime the CMS node to pause before the finish join event is 
committed
             Callable<?> pending = pauseBeforeCommit(cmsInstance, (e) -> e 
instanceof PrepareJoin.FinishJoin);
             new Thread(() -> newInstance.startup()).start();
             pending.call();
@@ -102,25 +104,18 @@ public class ConsistentBootstrapTest extends FuzzTestBase
             catch (Throwable t)
             {
                 // Unpause, since otherwise validation exception will prevent 
graceful shutdown
-                cmsInstance.runOnInstance(() -> {
-                    TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
-                    processor.unpause();
-                });
+                unpauseCommits(cmsInstance);
                 throw t;
             }
 
             // Make sure there can be only one FinishJoin in flight
-            ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance);
+            waitForCMSToQuiesce(cluster, cmsInstance);
             // set expectation of finish join & retrieve the sequence when it 
gets committed
             Callable<Epoch> bootstrapVisible = 
getSequenceAfterCommit(cmsInstance, (e, r) -> e instanceof 
PrepareJoin.FinishJoin && r.isSuccess());
 
-            cmsInstance.runOnInstance(() -> {
-                TestProcessor processor = (TestProcessor) 
((ClusterMetadataService.SwitchableProcessor) 
ClusterMetadataService.instance().processor()).delegate();
-                processor.unpause();
-            });
-
             // wait for the cluster to all witness the finish join event
-            ClusterUtils.waitForCMSToQuiesce(cluster, bootstrapVisible.call());
+            unpauseCommits(cmsInstance);
+            waitForCMSToQuiesce(cluster, bootstrapVisible.call());
 
             for (int i = 0; i < WRITES; i++)
                 visitor.visit();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/schema/SchemaTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/schema/SchemaTest.java
deleted file mode 100644
index 0661537e72..0000000000
--- 
a/test/distributed/org/apache/cassandra/distributed/test/schema/SchemaTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.test.schema;
-
-import org.junit.Test;
-
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.test.TestBaseImpl;
-
-import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-public class SchemaTest extends TestBaseImpl
-{
-    @Test
-    public void bootstrapTest() throws Throwable
-    {
-        try (Cluster cluster = builder().withNodes(3)
-                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
-                                        .start())
-        {
-            System.out.println("ABOUT TO CREATE KEYSPACE");
-            // TODO: consistency level will become irrelevant
-            cluster.coordinator(2).execute("CREATE KEYSPACE " + KEYSPACE + " 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};",
-                                           ConsistencyLevel.ALL);
-
-            cluster.coordinator(2).execute("CREATE TABLE " + KEYSPACE + ".tbl 
(pk int PRIMARY KEY, v int);",
-                                           ConsistencyLevel.ALL);
-
-            System.out.println("STARTING READING");
-            for (int i = 1; i <= 3; i++)
-            {
-                cluster.coordinator(i).execute("SELECT * FROM " + KEYSPACE + 
".tbl;",
-                                               ConsistencyLevel.ALL);
-            }
-        }
-    }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to