This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit d1fbca392aa5b2232f82970a0c87cad3e6eacab8 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Thu Mar 30 16:54:36 2023 +0100 [CEP-21] Rewrite o.a.c.distributed.test.SchemaTest patch by Sam Tunnicliffe; reviewed by Marcus Eriksson and Alex Petrov for CASSANDRA-18459 --- .../cassandra/distributed/impl/Instance.java | 1 + .../distributed/impl/TestChangeListener.java | 119 +++++++++++++++++ .../cassandra/distributed/shared/ClusterUtils.java | 134 +++++++++++++++++++ .../cassandra/distributed/test/SchemaTest.java | 148 ++++++++++++++------- .../distributed/test/log/ConsistentLeaveTest.java | 19 ++- .../distributed/test/log/ConsistentMoveTest.java | 22 ++- .../distributed/test/log/FailedLeaveTest.java | 2 + .../distributed/test/log/FuzzTestBase.java | 66 --------- .../log/InProgressSequenceCoordinationTest.java | 1 + .../distributed/test/log/ResumableStartupTest.java | 1 + .../test/ring/ConsistentBootstrapTest.java | 29 ++-- .../distributed/test/schema/SchemaTest.java | 55 -------- 12 files changed, 389 insertions(+), 208 deletions(-) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 40451380ea..9468692790 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@ -696,6 +696,7 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance registerOutboundFilter(cluster); }); QueryProcessor.registerStatementInvalidatingListener(); + TestChangeListener.register(); // We need to persist this as soon as possible after startup checks. // This should be the first write to SystemKeyspace (CASSANDRA-11742) diff --git a/test/distributed/org/apache/cassandra/distributed/impl/TestChangeListener.java b/test/distributed/org/apache/cassandra/distributed/impl/TestChangeListener.java new file mode 100644 index 0000000000..b73e9a9b8f --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/impl/TestChangeListener.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.function.Predicate; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.listeners.ChangeListener; +import org.apache.cassandra.utils.concurrent.WaitQueue; + +public class TestChangeListener implements ChangeListener +{ + private static final Logger logger = LoggerFactory.getLogger(TestChangeListener.class); + public static final TestChangeListener instance = new TestChangeListener(); + + public static void register() + { + logger.debug("Registered TestChangeListener"); + ClusterMetadataService.instance().log().addListener(instance); + } + + private final List<Predicate<Epoch>> preCommitPredicates = new ArrayList<>(); + private final List<Predicate<Epoch>> postCommitPredicates = new ArrayList<>(); + private final WaitQueue waiters = WaitQueue.newWaitQueue(); + + @Override + public void notifyPreCommit(ClusterMetadata prev, ClusterMetadata next) + { + Iterator<Predicate<Epoch>> iter = preCommitPredicates.iterator(); + while (iter.hasNext()) + { + if (iter.next().test(next.epoch)) + { + logger.debug("Epoch matches pre-commit predicate, pausing"); + pause(); + iter.remove(); + } + } + } + + @Override + public void notifyPostCommit(ClusterMetadata prev, ClusterMetadata next) + { + Iterator<Predicate<Epoch>> iter = postCommitPredicates.iterator(); + while (iter.hasNext()) + { + if (iter.next().test(next.epoch)) + { + logger.debug("Epoch matches post-commit predicate, pausing"); + pause(); + iter.remove(); + } + } + } + + public void pauseBefore(Epoch epoch, Runnable onMatch) + { + logger.debug("Requesting pause before enacting {}", epoch); + preCommitPredicates.add((e) -> { + if (e.is(epoch)) + { + onMatch.run(); + return true; + } + return false; + }); + } + + public void pauseAfter(Epoch epoch, Runnable onMatch) + { + logger.debug("Requesting pause after enacting {}", epoch); + postCommitPredicates.add((e) -> { + if (e.is(epoch)) + { + onMatch.run(); + return true; + } + return false; + }); + } + + public void pause() + { + WaitQueue.Signal signal = waiters.register(); + logger.debug("Log follower is paused, waiting..."); + signal.awaitUninterruptibly(); + logger.debug("Resumed log follower..."); + } + + public void unpause() + { + logger.debug("Unpausing log follower"); + waiters.signalAll(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java index b28fe90685..a1a23ea211 100644 --- a/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java +++ b/test/distributed/org/apache/cassandra/distributed/shared/ClusterUtils.java @@ -18,6 +18,7 @@ package org.apache.cassandra.distributed.shared; +import java.io.Serializable; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.security.Permission; @@ -31,10 +32,12 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.function.BiConsumer; +import java.util.function.BiPredicate; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.regex.Matcher; @@ -61,6 +64,8 @@ import org.apache.cassandra.distributed.impl.AbstractCluster; import org.apache.cassandra.distributed.impl.InstanceConfig; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.distributed.impl.TestChangeListener; +import org.apache.cassandra.distributed.test.log.TestProcessor; import org.apache.cassandra.io.util.File; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; @@ -72,11 +77,14 @@ import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Commit; import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.Transformation; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.ownership.PlacementForRange; import org.apache.cassandra.tools.SystemExitException; import org.apache.cassandra.utils.Isolated; +import org.apache.cassandra.utils.concurrent.AsyncPromise; import org.apache.cassandra.utils.concurrent.CountDownLatch; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; @@ -413,6 +421,132 @@ public class ClusterUtils ClusterUtils.waitForCMSToQuiesce(cluster, getClusterMetadataVersion(leader), ignored); } + public static Callable<Void> pauseBeforeEnacting(IInvokableInstance instance, Epoch epoch) + { + return pauseBeforeEnacting(instance, epoch, 10, TimeUnit.SECONDS); + } + + protected static Callable<Void> pauseBeforeEnacting(IInvokableInstance instance, + Epoch epoch, + long wait, + TimeUnit waitUnit) + { + return instance.callOnInstance(() -> { + TestChangeListener listener = TestChangeListener.instance; + AsyncPromise<?> promise = new AsyncPromise<>(); + listener.pauseBefore(epoch, () -> promise.setSuccess(null)); + return () -> { + try + { + promise.get(wait, waitUnit); + return null; + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + }; + }); + } + + public static Callable<Void> pauseAfterEnacting(IInvokableInstance instance, Epoch epoch) + { + return pauseAfterEnacting(instance, epoch, 10, TimeUnit.SECONDS); + } + + protected static Callable<Void> pauseAfterEnacting(IInvokableInstance instance, + Epoch epoch, + long wait, + TimeUnit waitUnit) + { + return instance.callOnInstance(() -> { + TestChangeListener listener = TestChangeListener.instance; + AsyncPromise<?> promise = new AsyncPromise<>(); + listener.pauseAfter(epoch, () -> promise.setSuccess(null)); + return () -> { + try + { + promise.get(wait, waitUnit); + return null; + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + }; + }); + } + + public static Callable<Epoch> pauseBeforeCommit(IInvokableInstance cmsInstance, SerializablePredicate<Transformation> predicate) + { + Callable<Long> remoteCallable = cmsInstance.callOnInstance(() -> { + TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); + AsyncPromise<Epoch> promise = new AsyncPromise<>(); + processor.pauseIf(predicate, () -> promise.setSuccess(ClusterMetadata.current().epoch)); + return () -> { + try + { + return promise.get(30, TimeUnit.SECONDS).getEpoch(); + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + }; + }); + return () -> Epoch.create(remoteCallable.call()); + + } + + public static Callable<Epoch> getSequenceAfterCommit(IInvokableInstance cmsInstance, + SerializableBiPredicate<Transformation, Commit.Result> predicate) + { + Callable<Long> remoteCallable = cmsInstance.callOnInstance(() -> { + TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); + + AsyncPromise<Epoch> promise = new AsyncPromise<>(); + processor.registerCommitPredicate((event, result) -> { + if (predicate.test(event, result)) + { + promise.setSuccess(result.success().replication.latestEpoch()); + return true; + } + + return false; + }); + return () -> { + try + { + return promise.get(30, TimeUnit.SECONDS).getEpoch(); + } + catch (Throwable e) + { + throw new RuntimeException(e); + } + }; + }); + + return () -> Epoch.create(remoteCallable.call()); + } + + public static void unpauseCommits(IInvokableInstance instance) + { + instance.runOnInstance(() -> { + TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); + processor.unpause(); + }); + } + + public static void unpauseEnactment(IInvokableInstance instance) + { + instance.runOnInstance(() -> TestChangeListener.instance.unpause()); + } + + public static interface SerializablePredicate<T> extends Predicate<T>, Serializable + {} + + public static interface SerializableBiPredicate<T1, T2> extends BiPredicate<T1, T2>, Serializable {} + private static class ClusterMetadataVersion { public final int node; diff --git a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java index 3d54e53712..4104ea7f02 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java @@ -18,6 +18,8 @@ package org.apache.cassandra.distributed.test; +import java.util.concurrent.Callable; + import org.junit.Test; import org.apache.cassandra.config.CassandraRelevantProperties; @@ -27,10 +29,17 @@ import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.transformations.AlterSchema; import org.awaitility.Awaitility; import org.awaitility.core.ConditionFactory; import static java.time.Duration.ofSeconds; +import static org.apache.cassandra.distributed.shared.ClusterUtils.pauseAfterEnacting; +import static org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeCommit; +import static org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeEnacting; +import static org.apache.cassandra.distributed.shared.ClusterUtils.unpauseCommits; +import static org.apache.cassandra.distributed.shared.ClusterUtils.unpauseEnactment; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; import static org.junit.Assert.assertTrue; @@ -46,15 +55,40 @@ public class SchemaTest extends TestBaseImpl { cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, primary key (pk, ck))"); String name = "aaa"; - cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE + ".tbl ADD " + name + " list<int>"); - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) values (?,1,1,1)", 1); - selectSilent(cluster, name); + // have the CMS node pause directly before committing the ALTER TABLE so we can infer the next epoch + Callable<Epoch> beforeCommit = pauseBeforeCommit(cluster.get(1), (e) -> e instanceof AlterSchema); + new Thread(() -> { + cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE + ".tbl ADD " + name + " list<int>"); + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) values (?,1,1,1)", 1); + }).start(); + + Epoch targetEpoch = beforeCommit.call().nextEpoch(); + // pause the replica immediately before and after enacting the ALTER TABLE stmt + Callable<?> beforeEnactedOnReplica = pauseBeforeEnacting(cluster.get(2), targetEpoch); + Callable<?> afterEnactedOnReplica = pauseAfterEnacting(cluster.get(2), targetEpoch); + // unpause the CMS node and allow it to commit and replicate the ALTER TABLE + unpauseCommits(cluster.get(1)); + + // Wait for the replica to signal that it has paused before enacting the schema change + // then execute the query and assert that a schema disagreement error was triggered + beforeEnactedOnReplica.call(); + selectExpectingError(cluster, name); + + // unpause the replica and wait until it notifies that it has enacted the schema change + unpauseEnactment(cluster.get(2)); + afterEnactedOnReplica.call(); + unpauseEnactment(cluster.get(2)); cluster.get(2).flush(KEYSPACE); - cluster.get(2).schemaChangeInternal("ALTER TABLE " + KEYSPACE + ".tbl ADD " + name + " list<int>"); + // now that the replica has enacted the alter table, an attempt to repeat it should be rejected + alterTableExpectingError(cluster.get(2), name); + // bouncing the replica should be safe as SSTables aren't loaded until the log replay is complete + // and the schema is in it's most up to date state cluster.get(2).shutdown().get(); cluster.get(2).startup(); + cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ?"), ConsistencyLevel.ALL, 1); cluster.get(2).forceCompact(KEYSPACE, "tbl"); + cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ?"), ConsistencyLevel.ALL, 1); } } @@ -65,21 +99,50 @@ public class SchemaTest extends TestBaseImpl { cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, primary key (pk, ck))"); String name = "v10"; - cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE + ".tbl ADD " + name + " list<int>"); - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) values (?,1,1,1)", 1); - selectSilent(cluster, name); + + // have the CMS node pause directly before committing the ALTER TABLE so we can infer the next epoch + Callable<Epoch> beforeCommit = pauseBeforeCommit(cluster.get(1), (e) -> e instanceof AlterSchema); + new Thread(() -> { + cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE + ".tbl ADD " + name + " list<int>"); + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) values (?,1,1,1)", 1); + }).start(); + Epoch targetEpoch = beforeCommit.call().nextEpoch(); + + // pause the replica immediately before and after enacting the ALTER TABLE stmt + Callable<?> beforeEnactedOnReplica = pauseBeforeEnacting(cluster.get(2), targetEpoch); + Callable<?> afterEnactedOnReplica = pauseAfterEnacting(cluster.get(2), targetEpoch); + // unpause the CMS node and allow it to commit and replicate the ALTER TABLE + unpauseCommits(cluster.get(1)); + + // Wait for the replica to signal that it has paused before enacting the schema change + // then execute the query and assert that a schema disagreement error was triggered + beforeEnactedOnReplica.call(); + selectExpectingError(cluster, name); + + // unpause the replica and wait until it notifies that it has enacted the schema change + unpauseEnactment(cluster.get(2)); + afterEnactedOnReplica.call(); + unpauseEnactment(cluster.get(2)); + cluster.get(2).flush(KEYSPACE); - cluster.get(2).schemaChangeInternal("ALTER TABLE " + KEYSPACE + ".tbl ADD " + name + " list<int>"); + // now that the replica has enacted the alter table, an attempt to repeat it should be rejected + alterTableExpectingError(cluster.get(2), name); cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2, " + name + ") values (?,1,1,1,[1])", 1); cluster.get(2).flush(KEYSPACE); cluster.get(2).forceCompact(KEYSPACE, "tbl"); + + // bouncing the replica should be safe as SSTables aren't loaded until the log replay is complete + // and the schema is in it's most up to date state cluster.get(2).shutdown().get(); cluster.get(2).startup(); + + cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ?"), ConsistencyLevel.ALL, 1); cluster.get(2).forceCompact(KEYSPACE, "tbl"); + cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl WHERE pk = ?"), ConsistencyLevel.ALL, 1); } } - private void selectSilent(Cluster cluster, String name) + private void selectExpectingError(Cluster cluster, String name) { try { @@ -99,26 +162,42 @@ public class SchemaTest extends TestBaseImpl } } + private void alterTableExpectingError(IInvokableInstance instance, String name) + { + try + { + instance.schemaChangeInternal("ALTER TABLE " + KEYSPACE + ".tbl ADD " + name + " list<int>"); + } + catch (Exception e) + { + boolean causeIsColumnExists = false; + Throwable cause = e; + while (cause != null) + { + if (cause.getMessage() != null && cause.getMessage().contains("Column with name '" + name + "' already exists")) + causeIsColumnExists = true; + cause = cause.getCause(); + } + assertTrue(causeIsColumnExists); + } + } + /** - * The purpose of this test is to verify manual schema reset functinality. + * The original purpose of this test was to verify manual schema reset functionality, but with schema updates being + * serialized in the cluster metadata log local schema reset no longer makes sense so the assertions have been + * modified to verify that schema changes are correctly propagated to down nodes once they come back up. * <p> * There is a 2-node cluster and a TABLE_ONE created. The schema version is agreed on both nodes. Then the 2nd node * is shutdown. We introduce a disagreement by dropping TABLE_ONE and creating TABLE_TWO on the 1st node. Therefore, * the 1st node has a newer schema version with TABLE_TWO, while the shutdown 2nd node has older schema version with * TABLE_ONE. * <p> - * At this point, if we just started the 2nd node, it would sync its schema by getting fresh mutations from the 1st - * node which would result in both nodes having only the definition of TABLE_TWO. - * <p> - * However, before starting the 2nd node the schema is reset on the 1st node, so the 1st node will discard its local - * schema whenever it manages to fetch a schema definition from some other node (the 2nd node in this case). - * It is expected to end up with both nodes having only the definition of TABLE_ONE. + * At this point, if we just start the 2nd node, it would sync its schema by getting the transformations that it + * missed while down, which would result in both nodes having only the definition of TABLE_TWO. * <p> - * In the second phase of the test we simply break the schema on the 1st node and call reset to fetch the schema - * definition it from the 2nd node. */ @Test - public void schemaReset() throws Throwable + public void schemaPropagationToDownNode() throws Throwable { CassandraRelevantProperties.MIGRATION_DELAY.setLong(10000); CassandraRelevantProperties.SCHEMA_PULL_INTERVAL_MS.setLong(10000); @@ -137,43 +216,16 @@ public class SchemaTest extends TestBaseImpl .allMatch(e -> e.equals(getBroadcastAddressAndPort())); })); - // when there is no node to fetch the schema from, reset local schema should immediately fail -// Assertions.assertThatExceptionOfType(RuntimeException.class).isThrownBy(() -> { -// cluster.get(1).runOnInstance(() -> Schema.instance.resetLocalSchema()); -// }).withMessageContaining("Cannot reset local schema when there are no other live nodes"); - // now, let's make a disagreement, the shutdown node 2 has a definition of TABLE_ONE, while the running // node 1 will have a definition of TABLE_TWO cluster.coordinator(1).execute(String.format("DROP TABLE %s.%s", KEYSPACE, TABLE_ONE), ConsistencyLevel.ONE); cluster.coordinator(1).execute(String.format("CREATE TABLE %s.%s (pk INT PRIMARY KEY, v TEXT)", KEYSPACE, TABLE_TWO), ConsistencyLevel.ONE); await(30).until(() -> checkTablesPropagated(cluster.get(1), false, true)); - // Schema.resetLocalSchema is guarded by some conditions which would not let us reset schema if there is no - // live node in the cluster, therefore we simply call SchemaUpdateHandler.clear (this is the only real thing - // being done by Schema.resetLocalSchema under the hood) -// SerializableCallable<Boolean> clear = () -> Schema.instance.updateHandler.clear().awaitUninterruptibly(1, TimeUnit.MINUTES); -// Future<Boolean> clear1 = cluster.get(1).asyncCallsOnInstance(clear).call(); -// assertFalse(clear1.isDone()); - // when the 2nd node is started, schema should be back in sync cluster.get(2).startup(); -// await(30).until(() -> clear1.isDone() && clear1.get()); - - // this proves that reset schema works on the 1st node - the most recent change should be discarded because - // it receives the schema from the 2nd node and applies it on empty schema - await(60).until(() -> checkTablesPropagated(cluster.get(1), true, false)); - - // now let's break schema locally and let it be reset -// cluster.get(1).runOnInstance(() -> Schema.instance.getLocalKeyspaces() -// .get(SchemaConstants.SCHEMA_KEYSPACE_NAME) -// .get().tables.forEach(t -> ColumnFamilyStore.getIfExists(t.keyspace, t.name).truncateBlockingWithoutSnapshot())); - - // when schema is removed and there is a node to fetch it from, the 1st node should immediately restore it -// cluster.get(1).runOnInstance(() -> Schema.instance.resetLocalSchema()); - // note that we should not wait for this to be true because resetLocalSchema is blocking - // and after successfully completing it, the schema should be already back in sync - assertTrue(checkTablesPropagated(cluster.get(1), true, false)); - assertTrue(checkTablesPropagated(cluster.get(2), true, false)); + assertTrue(checkTablesPropagated(cluster.get(1), false, true)); + assertTrue(checkTablesPropagated(cluster.get(2), false, true)); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentLeaveTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentLeaveTest.java index b502fbbb48..0dd2c887fc 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentLeaveTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentLeaveTest.java @@ -38,18 +38,20 @@ import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.fuzz.HarryHelper; import org.apache.cassandra.distributed.fuzz.InJVMTokenAwareVisitorExecutor; import org.apache.cassandra.distributed.fuzz.InJvmSut; -import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicationFactor; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.transformations.PrepareLeave; import static org.apache.cassandra.distributed.shared.ClusterUtils.getClusterMetadataVersion; +import static org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit; +import static org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeCommit; +import static org.apache.cassandra.distributed.shared.ClusterUtils.unpauseCommits; +import static org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce; import static org.junit.Assert.assertFalse; public class ConsistentLeaveTest extends FuzzTestBase @@ -70,7 +72,7 @@ public class ConsistentLeaveTest extends FuzzTestBase { IInvokableInstance cmsInstance = cluster.get(1); IInvokableInstance leavingInstance = cluster.get(2); - ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance); + waitForCMSToQuiesce(cluster, cmsInstance); configBuilder.setSUT(() -> new InJvmSut(cluster)); Run run = configBuilder.build().createRun(); @@ -79,7 +81,7 @@ public class ConsistentLeaveTest extends FuzzTestBase " WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2};", ConsistencyLevel.ALL); cluster.coordinator(1).execute(run.schemaSpec.compile().cql(), ConsistencyLevel.ALL); - ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance); + waitForCMSToQuiesce(cluster, cmsInstance); QuiescentLocalStateChecker model = new QuiescentLocalStateChecker(run, ReplicationFactor.fullOnly(2)); Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new); @@ -103,20 +105,17 @@ public class ConsistentLeaveTest extends FuzzTestBase model.validateAll(); // Make sure there can be only one FinishLeave in flight - ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance); + waitForCMSToQuiesce(cluster, cmsInstance); // set expectation of finish leave & retrieve the sequence when it gets committed Epoch currentEpoch = getClusterMetadataVersion(cmsInstance); Callable<Epoch> finishedLeaving = getSequenceAfterCommit(cmsInstance, (e, r) -> e instanceof PrepareLeave.FinishLeave && r.isSuccess()); - cmsInstance.runOnInstance(() -> { - TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); - processor.unpause(); - }); + unpauseCommits(cmsInstance); Epoch nextEpoch = finishedLeaving.call(); Assert.assertEquals(String.format("Epoch %s should have immediately superseded epoch %s.", nextEpoch, currentEpoch), nextEpoch.getEpoch(), currentEpoch.getEpoch() + 1); // wait for the cluster to all witness the finish join event - ClusterUtils.waitForCMSToQuiesce(cluster, nextEpoch); + waitForCMSToQuiesce(cluster, nextEpoch); assertGossipStatus(cluster, leavingInstance.config().num(), "LEFT"); diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentMoveTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentMoveTest.java index 425cd7a62c..d046fc5021 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentMoveTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ConsistentMoveTest.java @@ -41,17 +41,19 @@ import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.fuzz.HarryHelper; import org.apache.cassandra.distributed.fuzz.InJVMTokenAwareVisitorExecutor; import org.apache.cassandra.distributed.fuzz.InJvmSut; -import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.Gossiper; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicationFactor; import org.apache.cassandra.service.StorageService; -import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.transformations.PrepareMove; +import static org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit; +import static org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeCommit; +import static org.apache.cassandra.distributed.shared.ClusterUtils.unpauseCommits; +import static org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -73,7 +75,7 @@ public class ConsistentMoveTest extends FuzzTestBase { IInvokableInstance cmsInstance = cluster.get(1); IInvokableInstance movingInstance = cluster.get(2); - ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance); + waitForCMSToQuiesce(cluster, cmsInstance); configBuilder.setSUT(() -> new InJvmSut(cluster)); Run run = configBuilder.build().createRun(); @@ -82,7 +84,7 @@ public class ConsistentMoveTest extends FuzzTestBase " WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 2};", ConsistencyLevel.ALL); cluster.coordinator(1).execute(run.schemaSpec.compile().cql(), ConsistencyLevel.ALL); - ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance); + waitForCMSToQuiesce(cluster, cmsInstance); FuzzTestBase.QuiescentLocalStateChecker model = new FuzzTestBase.QuiescentLocalStateChecker(run, ReplicationFactor.fullOnly(2)); Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new); @@ -92,7 +94,7 @@ public class ConsistentMoveTest extends FuzzTestBase model.validateAll(); // Make sure there can be only one FinishLeave in flight - ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance); + waitForCMSToQuiesce(cluster, cmsInstance); Callable<?> pending = pauseBeforeCommit(cmsInstance, (e) -> e instanceof PrepareMove.FinishMove); new Thread(() -> { @@ -103,15 +105,11 @@ public class ConsistentMoveTest extends FuzzTestBase assertGossipStatus(cluster, movingInstance.config().num(), "MOVING"); + // wait for the cluster to all witness the finish join event Callable<Epoch> finishedMoving = getSequenceAfterCommit(cmsInstance, (e, r) -> e instanceof PrepareMove.FinishMove && r.isSuccess()); - cmsInstance.runOnInstance(() -> { - TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); - processor.unpause(); - }); - + unpauseCommits(cmsInstance); Epoch nextEpoch = finishedMoving.call(); - // wait for the cluster to all witness the finish join event - ClusterUtils.waitForCMSToQuiesce(cluster, nextEpoch); + waitForCMSToQuiesce(cluster, nextEpoch); // Streaming for unbootstrap has finished, any rows from the first batch should have been transferred // from the leaving node to the new replicas. Continue to write at ONE, replication of these rows will diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java index b7509ff40f..a22eb09167 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/FailedLeaveTest.java @@ -45,6 +45,7 @@ import org.apache.cassandra.distributed.fuzz.HarryHelper; import org.apache.cassandra.distributed.fuzz.InJVMTokenAwareVisitorExecutor; import org.apache.cassandra.distributed.fuzz.InJvmSut; import org.apache.cassandra.distributed.shared.ClusterUtils; +import org.apache.cassandra.distributed.shared.ClusterUtils.SerializableBiPredicate; import org.apache.cassandra.locator.ReplicationFactor; import org.apache.cassandra.tcm.Commit; import org.apache.cassandra.tcm.Epoch; @@ -58,6 +59,7 @@ import static net.bytebuddy.matcher.ElementMatchers.named; import static org.apache.cassandra.distributed.shared.ClusterUtils.cancelInProgressSequences; import static org.apache.cassandra.distributed.shared.ClusterUtils.decommission; import static org.apache.cassandra.distributed.shared.ClusterUtils.getClusterMetadataVersion; +import static org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit; public class FailedLeaveTest extends FuzzTestBase { diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/FuzzTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/FuzzTestBase.java index f86b800ae7..02f6b3eb91 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/FuzzTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/FuzzTestBase.java @@ -19,14 +19,10 @@ package org.apache.cassandra.distributed.test.log; -import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; -import java.util.function.BiPredicate; -import java.util.function.Predicate; import org.junit.BeforeClass; @@ -38,19 +34,12 @@ import harry.model.sut.SystemUnderTest; import harry.operations.CompiledStatement; import harry.operations.Query; import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.IIsolatedExecutor; import org.apache.cassandra.distributed.fuzz.HarryHelper; import org.apache.cassandra.distributed.fuzz.InJvmSut; -import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.test.ExecUtil; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.locator.ReplicationFactor; -import org.apache.cassandra.tcm.Commit; -import org.apache.cassandra.tcm.Transformation; -import org.apache.cassandra.tcm.ClusterMetadataService; -import org.apache.cassandra.tcm.Epoch; -import org.apache.cassandra.utils.concurrent.AsyncPromise; import static harry.model.SelectHelper.resultSetToRow; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; @@ -74,58 +63,6 @@ public class FuzzTestBase extends TestBaseImpl .set("request_timeout", String.format("%dms", TimeUnit.MINUTES.toMillis(10)))); } - protected static Callable<Void> pauseBeforeCommit(IInvokableInstance cmsInstance, SerializablePredicate<Transformation> predicate) - { - return cmsInstance.callOnInstance(() -> { - TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); - AsyncPromise<?> promise = new AsyncPromise<>(); - processor.pauseIf(predicate, () -> promise.setSuccess(null)); - return () -> { - try - { - promise.get(30, TimeUnit.SECONDS); - return null; - } - catch (Throwable e) - { - throw new RuntimeException(e); - } - }; - }); - } - - // todo; assumes period = 1 - protected static Callable<Epoch> getSequenceAfterCommit(IInvokableInstance cmsInstance, - SerializableBiPredicate<Transformation, Commit.Result> predicate) - { - Callable<Long> remoteCallable = cmsInstance.callOnInstance(() -> { - TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); - - AsyncPromise<Epoch> promise = new AsyncPromise<>(); - processor.registerCommitPredicate((event, result) -> { - if (predicate.test(event, result)) - { - promise.setSuccess(result.success().replication.latestEpoch()); - return true; - } - - return false; - }); - return () -> { - try - { - return ClusterUtils.encode(promise.get(30, TimeUnit.SECONDS)); - } - catch (Throwable e) - { - throw new RuntimeException(e); - } - }; - }); - - return () -> ClusterUtils.decode(remoteCallable.call()); - } - public static IIsolatedExecutor.SerializableRunnable toRunnable(ExecUtil.ThrowingSerializableRunnable runnable) { return () -> { @@ -202,7 +139,4 @@ public class FuzzTestBase extends TestBaseImpl } } } - - public static interface SerializablePredicate<T> extends Predicate<T>, Serializable {} - public static interface SerializableBiPredicate<T1, T2> extends BiPredicate<T1, T2>, Serializable {} } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java index 0cd3046d57..3d1b4c0102 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/InProgressSequenceCoordinationTest.java @@ -46,6 +46,7 @@ import static org.apache.cassandra.config.CassandraRelevantProperties.PROGRESS_B import static org.apache.cassandra.distributed.Constants.KEY_DTEST_API_STARTUP_FAILURE_AS_SHUTDOWN; import static org.apache.cassandra.distributed.Constants.KEY_DTEST_FULL_STARTUP; import static org.apache.cassandra.distributed.shared.ClusterUtils.addInstance; +import static org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit; import static org.apache.cassandra.net.Verb.TCM_REPLICATION; import static org.apache.cassandra.tcm.sequences.InProgressSequences.SequenceState.BLOCKED; import static org.apache.cassandra.tcm.sequences.InProgressSequences.SequenceState.CONTINUING; diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ResumableStartupTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ResumableStartupTest.java index 83f8d77c89..c1a31776b5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ResumableStartupTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ResumableStartupTest.java @@ -43,6 +43,7 @@ import org.apache.cassandra.service.StorageService; import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; import static org.apache.cassandra.distributed.shared.ClusterUtils.getClusterMetadataVersion; +import static org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit; public class ResumableStartupTest extends FuzzTestBase { diff --git a/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java b/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java index 8f11c02553..d523a32ce5 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/ConsistentBootstrapTest.java @@ -37,14 +37,16 @@ import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.TokenSupplier; import org.apache.cassandra.distributed.fuzz.HarryHelper; import org.apache.cassandra.distributed.fuzz.InJvmSut; -import org.apache.cassandra.distributed.shared.ClusterUtils; import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.distributed.test.log.FuzzTestBase; -import org.apache.cassandra.distributed.test.log.TestProcessor; -import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.transformations.PrepareJoin; +import static org.apache.cassandra.distributed.shared.ClusterUtils.getSequenceAfterCommit; +import static org.apache.cassandra.distributed.shared.ClusterUtils.pauseBeforeCommit; +import static org.apache.cassandra.distributed.shared.ClusterUtils.unpauseCommits; +import static org.apache.cassandra.distributed.shared.ClusterUtils.waitForCMSToQuiesce; + public class ConsistentBootstrapTest extends FuzzTestBase { private static int WRITES = 2000; @@ -63,7 +65,7 @@ public class ConsistentBootstrapTest extends FuzzTestBase .start()) { IInvokableInstance cmsInstance = cluster.get(1); - ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance); + waitForCMSToQuiesce(cluster, cmsInstance); configBuilder.setSUT(() -> new InJvmSut(cluster)); Run run = configBuilder.build().createRun(); @@ -71,7 +73,7 @@ public class ConsistentBootstrapTest extends FuzzTestBase " WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};", ConsistencyLevel.ALL); cluster.coordinator(1).execute(run.schemaSpec.compile().cql(), ConsistencyLevel.ALL); - ClusterUtils.waitForCMSToQuiesce(cluster, cluster.get(1)); + waitForCMSToQuiesce(cluster, cluster.get(1)); Visitor visitor = new LoggingVisitor(run, MutatingRowVisitor::new); QuiescentLocalStateChecker model = new QuiescentLocalStateChecker(run); System.out.println("Starting write phase..."); @@ -86,7 +88,7 @@ public class ConsistentBootstrapTest extends FuzzTestBase .set(Constants.KEY_DTEST_FULL_STARTUP, true); IInvokableInstance newInstance = cluster.bootstrap(config); - // Prime the DPS node to pause before the finish join event is committed + // Prime the CMS node to pause before the finish join event is committed Callable<?> pending = pauseBeforeCommit(cmsInstance, (e) -> e instanceof PrepareJoin.FinishJoin); new Thread(() -> newInstance.startup()).start(); pending.call(); @@ -102,25 +104,18 @@ public class ConsistentBootstrapTest extends FuzzTestBase catch (Throwable t) { // Unpause, since otherwise validation exception will prevent graceful shutdown - cmsInstance.runOnInstance(() -> { - TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); - processor.unpause(); - }); + unpauseCommits(cmsInstance); throw t; } // Make sure there can be only one FinishJoin in flight - ClusterUtils.waitForCMSToQuiesce(cluster, cmsInstance); + waitForCMSToQuiesce(cluster, cmsInstance); // set expectation of finish join & retrieve the sequence when it gets committed Callable<Epoch> bootstrapVisible = getSequenceAfterCommit(cmsInstance, (e, r) -> e instanceof PrepareJoin.FinishJoin && r.isSuccess()); - cmsInstance.runOnInstance(() -> { - TestProcessor processor = (TestProcessor) ((ClusterMetadataService.SwitchableProcessor) ClusterMetadataService.instance().processor()).delegate(); - processor.unpause(); - }); - // wait for the cluster to all witness the finish join event - ClusterUtils.waitForCMSToQuiesce(cluster, bootstrapVisible.call()); + unpauseCommits(cmsInstance); + waitForCMSToQuiesce(cluster, bootstrapVisible.call()); for (int i = 0; i < WRITES; i++) visitor.visit(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/schema/SchemaTest.java b/test/distributed/org/apache/cassandra/distributed/test/schema/SchemaTest.java deleted file mode 100644 index 0661537e72..0000000000 --- a/test/distributed/org/apache/cassandra/distributed/test/schema/SchemaTest.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.distributed.test.schema; - -import org.junit.Test; - -import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.api.ConsistencyLevel; -import org.apache.cassandra.distributed.test.TestBaseImpl; - -import static org.apache.cassandra.distributed.api.Feature.GOSSIP; -import static org.apache.cassandra.distributed.api.Feature.NETWORK; - -public class SchemaTest extends TestBaseImpl -{ - @Test - public void bootstrapTest() throws Throwable - { - try (Cluster cluster = builder().withNodes(3) - .withConfig(config -> config.with(NETWORK, GOSSIP)) - .start()) - { - System.out.println("ABOUT TO CREATE KEYSPACE"); - // TODO: consistency level will become irrelevant - cluster.coordinator(2).execute("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};", - ConsistencyLevel.ALL); - - cluster.coordinator(2).execute("CREATE TABLE " + KEYSPACE + ".tbl (pk int PRIMARY KEY, v int);", - ConsistencyLevel.ALL); - - System.out.println("STARTING READING"); - for (int i = 1; i <= 3; i++) - { - cluster.coordinator(i).execute("SELECT * FROM " + KEYSPACE + ".tbl;", - ConsistencyLevel.ALL); - } - } - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
