This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 21a363becc8e2a2c1b3042b9544f1c47606f5bab Merge: 7a63cc2 b1efb8e Author: David Capwell <[email protected]> AuthorDate: Mon Oct 5 19:06:01 2020 -0700 Merge branch 'cassandra-3.11' into trunk build.xml | 2 +- .../distributed/impl/AbstractCluster.java | 37 ++++++++++++++++++++-- .../impl/DelegatingInvokableInstance.java | 1 + .../cassandra/distributed/impl/Instance.java | 3 +- .../cassandra/distributed/impl/InstanceConfig.java | 14 ++++++-- .../ShutdownException.java} | 18 ++++------- .../distributed/test/FailingRepairTest.java | 6 ++++ .../test/FullRepairCoordinatorFastTest.java | 1 + .../test/IncrementalRepairCoordinatorFastTest.java | 1 + .../distributed/test/NetworkTopologyTest.java | 15 +++++---- .../test/PreviewRepairCoordinatorFastTest.java | 2 ++ .../cassandra/distributed/test/StreamingTest.java | 5 ++- .../cassandra/distributed/upgrade/UpgradeTest.java | 6 +++- 13 files changed, 86 insertions(+), 25 deletions(-) diff --cc build.xml index 40729a6,82c35e9..6961cf9 --- a/build.xml +++ b/build.xml @@@ -582,14 -412,19 +582,14 @@@ <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations" version="2.9.10"/> <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/> <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/> - <dependency groupId="com.github.jbellis" artifactId="jamm" version="0.3.0"/> + <dependency groupId="com.github.jbellis" artifactId="jamm" version="${jamm.version}"/> - <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7"> - <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/> - <exclusion groupId="junit" artifactId="junit"/> - </dependency> - <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/> - <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.2"> - <exclusion groupId="commons-logging" artifactId="commons-logging"/> - </dependency> - <dependency groupId="junit" artifactId="junit" version="4.6" /> + <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.23"/> + <dependency groupId="junit" artifactId="junit" version="4.12" /> <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" /> + <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" /> + <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" /> - <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.4" /> + <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.5" /> <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10"> <exclusion groupId="commons-lang" artifactId="commons-lang"/> </dependency> diff --cc test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java index a28c935,a6a6336..b6f359a --- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java @@@ -31,9 -33,10 +33,11 @@@ import java.util.concurrent.Future import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; + import java.util.function.BiPredicate; import java.util.function.Consumer; + import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import com.google.common.collect.Sets; @@@ -61,9 -64,11 +65,10 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.shared.InstanceClassLoader; import org.apache.cassandra.distributed.shared.MessageFilters; import org.apache.cassandra.distributed.shared.NetworkTopology; + import org.apache.cassandra.distributed.shared.ShutdownException; import org.apache.cassandra.distributed.shared.Versions; import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Verb; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.SimpleCondition; @@@ -119,30 -122,12 +124,33 @@@ public abstract class AbstractCluster< // mutated by user-facing API private final MessageFilters filters; + private final INodeProvisionStrategy.Strategy nodeProvisionStrategy; private final BiConsumer<ClassLoader, Integer> instanceInitializer; + private final int datadirCount; + private volatile Thread.UncaughtExceptionHandler previousHandler = null; + private volatile BiPredicate<Integer, Throwable> ignoreUncaughtThrowable = null; + private final List<Throwable> uncaughtExceptions = new CopyOnWriteArrayList<>(); - private volatile Thread.UncaughtExceptionHandler previousHandler = null; + /** + * Common builder, add methods that are applicable to both Cluster and Upgradable cluster here. + */ + public static abstract class AbstractBuilder<I extends IInstance, C extends ICluster, B extends AbstractBuilder<I, C, B>> + extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, B> + { + private INodeProvisionStrategy.Strategy nodeProvisionStrategy = INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces; + + public AbstractBuilder(Factory<I, C, B> factory) + { + super(factory); + } + + public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy nodeProvisionStrategy) + { + this.nodeProvisionStrategy = nodeProvisionStrategy; + return (B) this; + } + } + protected class Wrapper extends DelegatingInvokableInstance implements IUpgradeableInstance { @@@ -321,10 -299,14 +330,10 @@@ private InstanceConfig createInstanceConfig(int nodeNum) { - String ipPrefix = "127.0." + subnet + "."; - String seedIp = ipPrefix + "1"; - String ipAddress = ipPrefix + nodeNum; + INodeProvisionStrategy provisionStrategy = nodeProvisionStrategy.create(subnet); long token = tokenSupplier.token(nodeNum); - - NetworkTopology topology = NetworkTopology.build(ipPrefix, broadcastPort, nodeIdTopology); - - InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, topology, root, String.valueOf(token), seedIp, datadirCount); + NetworkTopology topology = buildNetworkTopology(provisionStrategy, nodeIdTopology); - InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, Long.toString(token)); ++ InstanceConfig config = InstanceConfig.generate(nodeNum, provisionStrategy, topology, root, Long.toString(token), datadirCount); if (configUpdater != null) configUpdater.accept(config); @@@ -651,10 -619,22 +660,22 @@@ handler.uncaughtException(thread, error); return; } + InstanceClassLoader cl = (InstanceClassLoader) thread.getContextClassLoader(); get(cl.getInstanceId()).uncaughtException(thread, error); + + BiPredicate<Integer, Throwable> ignore = ignoreUncaughtThrowable; + I instance = get(cl.getInstanceId()); + if ((ignore == null || !ignore.test(cl.getInstanceId(), error)) && instance != null && !instance.isShutdown()) + uncaughtExceptions.add(error); + } + + @Override + public void setUncaughtExceptionsFilter(BiPredicate<Integer, Throwable> ignoreUncaughtThrowable) + { + this.ignoreUncaughtThrowable = ignoreUncaughtThrowable; } - + @Override public void close() { @@@ -671,22 -651,23 +692,34 @@@ FileUtils.deleteRecursive(root); Thread.setDefaultUncaughtExceptionHandler(previousHandler); previousHandler = null; - + checkAndResetUncaughtExceptions(); - + //checkForThreadLeaks(); //withThreadLeakCheck(futures); } + + @Override + public void checkAndResetUncaughtExceptions() + { + List<Throwable> drain = new ArrayList<>(uncaughtExceptions.size()); + uncaughtExceptions.removeIf(e -> { + drain.add(e); + return true; + }); + if (!drain.isEmpty()) + throw new ShutdownException(drain); + } + private void checkForThreadLeaks() + { + //This is an alternate version of the thread leak check that just checks to see if any threads are still alive + // with the context classloader. + Set<Thread> threadSet = Thread.getAllStackTraces().keySet(); + threadSet.stream().filter(t->t.getContextClassLoader() instanceof InstanceClassLoader).forEach(t->{ + t.setContextClassLoader(null); + throw new RuntimeException("Unterminated thread detected " + t.getName() + " in group " + t.getThreadGroup().getName()); + }); + } + // We do not want this check to run every time until we fix problems with tread stops private void withThreadLeakCheck(List<Future<?>> futures) { diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java index 038698b,81c501c..03058dc --- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java @@@ -712,4 -801,4 +712,5 @@@ public class Instance extends IsolatedE } return accumulate; } - } + } ++ diff --cc test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java index a8ed918,1f39f76..2bb486f --- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java @@@ -267,30 -261,32 +267,40 @@@ public class InstanceConfig implements return (String)params.get(name); } - public static InstanceConfig generate(int nodeNum, String ipAddress, NetworkTopology networkTopology, File root, String token, String seedIp, int datadirCount) + public static InstanceConfig generate(int nodeNum, + INodeProvisionStrategy provisionStrategy, + NetworkTopology networkTopology, + File root, - String token) ++ String token, ++ int datadirCount) { return new InstanceConfig(nodeNum, networkTopology, - ipAddress, - ipAddress, - ipAddress, - ipAddress, - seedIp, + provisionStrategy.ipAddress(nodeNum), + provisionStrategy.ipAddress(nodeNum), + provisionStrategy.ipAddress(nodeNum), + provisionStrategy.ipAddress(nodeNum), + provisionStrategy.seedIp(), + provisionStrategy.seedPort(), String.format("%s/node%d/saved_caches", root, nodeNum), - new String[] { String.format("%s/node%d/data", root, nodeNum) }, + datadirs(datadirCount, root, nodeNum), String.format("%s/node%d/commitlog", root, nodeNum), String.format("%s/node%d/hints", root, nodeNum), String.format("%s/node%d/cdc", root, nodeNum), - token); + token, + provisionStrategy.storagePort(nodeNum), + provisionStrategy.nativeTransportPort(nodeNum)); } + private static String[] datadirs(int datadirCount, File root, int nodeNum) + { + String datadirFormat = String.format("%s/node%d/data%%d", root.getPath(), nodeNum); + String [] datadirs = new String[datadirCount]; + for (int i = 0; i < datadirs.length; i++) + datadirs[i] = String.format(datadirFormat, i); + return datadirs; + } + public InstanceConfig forVersion(Versions.Major major) { switch (major) diff --cc test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java index bad6d87,0000000..7feefa3 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java @@@ -1,348 -1,0 +1,354 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.impl.InstanceKiller; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; +import org.apache.cassandra.io.util.ChannelProxy; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; +import org.apache.cassandra.service.StorageService; + +@RunWith(Parameterized.class) +public class FailingRepairTest extends TestBaseImpl implements Serializable +{ + private static ICluster<IInvokableInstance> CLUSTER; + + private final Verb messageType; + private final RepairParallelism parallelism; + private final boolean withTracing; + private final SerializableRunnable setup; + + public FailingRepairTest(Verb messageType, RepairParallelism parallelism, boolean withTracing, SerializableRunnable setup) + { + this.messageType = messageType; + this.parallelism = parallelism; + this.withTracing = withTracing; + this.setup = setup; + } + + @Parameters(name = "{0}/{1}/{2}") + public static Collection<Object[]> messages() + { + List<Object[]> tests = new ArrayList<>(); + for (RepairParallelism parallelism : RepairParallelism.values()) + { + for (Boolean withTracing : Arrays.asList(Boolean.TRUE, Boolean.FALSE)) + { + tests.add(new Object[]{ Verb.VALIDATION_REQ, parallelism, withTracing, failingReaders(Verb.VALIDATION_REQ, parallelism, withTracing) }); + } + } + return tests; + } + + private static SerializableRunnable failingReaders(Verb type, RepairParallelism parallelism, boolean withTracing) + { + return () -> { + String cfName = getCfName(type, parallelism, withTracing); + ColumnFamilyStore cf = Keyspace.open(KEYSPACE).getColumnFamilyStore(cfName); + cf.forceBlockingFlush(); + Set<SSTableReader> remove = cf.getLiveSSTables(); + Set<SSTableReader> replace = new HashSet<>(); + if (type == Verb.VALIDATION_REQ) + { + for (SSTableReader r : remove) + replace.add(new FailingSSTableReader(r)); + } + else + { + throw new UnsupportedOperationException("verb: " + type); + } + cf.getTracker().removeUnsafe(remove); + cf.addSSTables(replace); + }; + } + + private static String getCfName(Verb type, RepairParallelism parallelism, boolean withTracing) + { + return type.name().toLowerCase() + "_" + parallelism.name().toLowerCase() + "_" + withTracing; + } + + @BeforeClass + public static void setupCluster() throws IOException + { + // streaming requires networking ATM + // streaming also requires gossip or isn't setup properly + CLUSTER = init(Cluster.build() + .withNodes(2) + .withConfig(c -> c.with(Feature.NETWORK) + .with(Feature.GOSSIP) + .set("disk_failure_policy", "die")) + .start()); ++ CLUSTER.setUncaughtExceptionsFilter((throwable) -> { ++ if (throwable.getClass().toString().contains("InstanceShutdown") || // can't check instanceof as it is thrown by a different classloader ++ throwable.getMessage() != null && throwable.getMessage().contains("Parent repair session with id")) ++ return true; ++ return false; ++ }); + } + + @AfterClass + public static void teardownCluster() throws Exception + { + if (CLUSTER != null) + CLUSTER.close(); + } + + @Before + public void cleanupState() + { + for (int i = 1; i <= CLUSTER.size(); i++) + CLUSTER.get(i).runOnInstance(InstanceKiller::clear); + } + + @Test(timeout = 10 * 60 * 1000) + public void testFailingMessage() throws IOException + { + final int replica = 1; + final int coordinator = 2; + String tableName = getCfName(messageType, parallelism, withTracing); + String fqtn = KEYSPACE + "." + tableName; + + CLUSTER.schemaChange("CREATE TABLE " + fqtn + " (k INT, PRIMARY KEY (k))"); + + // create data which will NOT conflict + int lhsOffset = 10; + int rhsOffset = 20; + int limit = rhsOffset + (rhsOffset - lhsOffset); + + // setup data which is consistent on both sides + for (int i = 0; i < lhsOffset; i++) + CLUSTER.coordinator(replica) + .execute("INSERT INTO " + fqtn + " (k) VALUES (?)", ConsistencyLevel.ALL, i); + + // create data on LHS which does NOT exist in RHS + for (int i = lhsOffset; i < rhsOffset; i++) + CLUSTER.get(replica).executeInternal("INSERT INTO " + fqtn + " (k) VALUES (?)", i); + + // create data on RHS which does NOT exist in LHS + for (int i = rhsOffset; i < limit; i++) + CLUSTER.get(coordinator).executeInternal("INSERT INTO " + fqtn + " (k) VALUES (?)", i); + + // at this point, the two nodes should be out of sync, so confirm missing data + // node 1 + Object[][] node1Records = toRows(IntStream.range(0, rhsOffset)); + Object[][] node1Actuals = toNaturalOrder(CLUSTER.get(replica).executeInternal("SELECT k FROM " + fqtn)); + Assert.assertArrayEquals(node1Records, node1Actuals); + + // node 2 + Object[][] node2Records = toRows(IntStream.concat(IntStream.range(0, lhsOffset), IntStream.range(rhsOffset, limit))); + Object[][] node2Actuals = toNaturalOrder(CLUSTER.get(coordinator).executeInternal("SELECT k FROM " + fqtn)); + Assert.assertArrayEquals(node2Records, node2Actuals); + + // Inject the failure + CLUSTER.get(replica).runOnInstance(() -> setup.run()); + + // run a repair which is expected to fail + List<String> repairStatus = CLUSTER.get(coordinator).callOnInstance(() -> { + // need all ranges on the host + String ranges = StorageService.instance.getLocalAndPendingRanges(KEYSPACE).stream() + .map(r -> r.left + ":" + r.right) + .collect(Collectors.joining(",")); + Map<String, String> args = new HashMap<String, String>() + {{ + put(RepairOption.PARALLELISM_KEY, parallelism.getName()); + put(RepairOption.PRIMARY_RANGE_KEY, "false"); + put(RepairOption.INCREMENTAL_KEY, "false"); + put(RepairOption.TRACE_KEY, Boolean.toString(withTracing)); + put(RepairOption.PULL_REPAIR_KEY, "false"); + put(RepairOption.FORCE_REPAIR_KEY, "false"); + put(RepairOption.RANGES_KEY, ranges); + put(RepairOption.COLUMNFAMILIES_KEY, tableName); + }}; + int cmd = StorageService.instance.repairAsync(KEYSPACE, args); + Assert.assertFalse("repair return status was 0, expected non-zero return status, 0 indicates repair not submitted", cmd == 0); + List<String> status; + do + { + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + status = StorageService.instance.getParentRepairStatus(cmd); + } while (status == null || status.get(0).equals(ParentRepairStatus.IN_PROGRESS.name())); + + return status; + }); + Assert.assertEquals(repairStatus.toString(), ParentRepairStatus.FAILED, ParentRepairStatus.valueOf(repairStatus.get(0))); + + // its possible that the coordinator gets the message that the replica failed before the replica completes + // shutting down; this then means that isKilled could be updated after the fact + IInvokableInstance replicaInstance = CLUSTER.get(replica); + while (replicaInstance.killAttempts() <= 0) + Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS); + + Assert.assertEquals("replica should be killed", 1, replicaInstance.killAttempts()); + Assert.assertEquals("coordinator should not be killed", 0, CLUSTER.get(coordinator).killAttempts()); + } + + private static Object[][] toNaturalOrder(Object[][] actuals) + { + // data is returned in token order, so rather than try to be fancy and order expected in token order + // convert it to natural + int[] values = new int[actuals.length]; + for (int i = 0; i < values.length; i++) + values[i] = (Integer) actuals[i][0]; + Arrays.sort(values); + return toRows(IntStream.of(values)); + } + + private static Object[][] toRows(IntStream values) + { + return values + .mapToObj(v -> new Object[]{ v }) + .toArray(Object[][]::new); + } + + private static final class FailingSSTableReader extends ForwardingSSTableReader + { + + private FailingSSTableReader(SSTableReader delegate) + { + super(delegate); + } + + public ISSTableScanner getScanner() + { + return new FailingISSTableScanner(); + } + + public ISSTableScanner getScanner(Collection<Range<Token>> ranges) + { + return new FailingISSTableScanner(); + } + + public ISSTableScanner getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator) + { + return new FailingISSTableScanner(); + } + + public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener) + { + return new FailingISSTableScanner(); + } + + public ChannelProxy getDataChannel() + { + throw new RuntimeException(); + } + + public String toString() + { + return "FailingSSTableReader[" + super.toString() + "]"; + } + } + + private static final class FailingISSTableScanner implements ISSTableScanner + { + public long getLengthInBytes() + { + return 0; + } + + public long getCompressedLengthInBytes() + { + return 0; + } + + public long getCurrentPosition() + { + return 0; + } + + public long getBytesScanned() + { + return 0; + } + + public Set<SSTableReader> getBackingSSTables() + { + return Collections.emptySet(); + } + + public TableMetadata metadata() + { + return null; + } + + public void close() + { + + } + + public boolean hasNext() + { + throw new CorruptSSTableException(new IOException("Test commands it"), "mahahahaha!"); + } + + public UnfilteredRowIterator next() + { + throw new CorruptSSTableException(new IOException("Test commands it"), "mahahahaha!"); + } + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java index e380985,0000000..0ce6f8d mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java @@@ -1,34 -1,0 +1,35 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism; +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType; + +@RunWith(Parameterized.class) +public class FullRepairCoordinatorFastTest extends RepairCoordinatorFast +{ + public FullRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications) + { + super(RepairType.FULL, parallelism, withNotifications); ++ CLUSTER.setUncaughtExceptionsFilter((instance, ex) -> true); + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java index 7a4c98e,0000000..4aa1cb4 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java @@@ -1,34 -1,0 +1,35 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism; +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType; + +@RunWith(Parameterized.class) +public class IncrementalRepairCoordinatorFastTest extends RepairCoordinatorFast +{ + public IncrementalRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications) + { + super(RepairType.INCREMENTAL, parallelism, withNotifications); ++ CLUSTER.setUncaughtExceptionsFilter((throwable) -> throwable.getMessage().contains("prepare fail")); + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java index bafef05,0000000..c94a700 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java @@@ -1,34 -1,0 +1,36 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism; +import org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType; + +@RunWith(Parameterized.class) +public class PreviewRepairCoordinatorFastTest extends RepairCoordinatorFast +{ + public PreviewRepairCoordinatorFastTest(RepairParallelism parallelism, boolean withNotifications) + { + super(RepairType.PREVIEW, parallelism, withNotifications); ++ CLUSTER.setUncaughtExceptionsFilter((throwable) -> throwable.getMessage().contains("prepare fail") || ++ throwable.getMessage().contains("snapshot fail")); + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java index 956f21e,0000000..b68d268 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java @@@ -1,182 -1,0 +1,185 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import org.junit.Assert; +import org.junit.Test; + ++import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamSession; +import org.apache.cassandra.streaming.messages.StreamMessage; + +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.streaming.StreamSession.State.PREPARING; +import static org.apache.cassandra.streaming.StreamSession.State.STREAMING; +import static org.apache.cassandra.streaming.StreamSession.State.WAIT_COMPLETE; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_ACK; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYN; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYNACK; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.RECEIVED; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM; +import static org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM_INIT; + +public class StreamingTest extends TestBaseImpl +{ + + private void testStreaming(int nodes, int replicationFactor, int rowCount, String compactionStrategy) throws Throwable + { - try (Cluster cluster = (Cluster) builder().withNodes(nodes).withConfig(config -> config.with(NETWORK)).start()) ++ try (Cluster cluster = builder().withNodes(nodes) ++ .withDataDirCount(1) // this test expects there to only be a single sstable to stream (with ddirs = 3, we get 3 sstables) ++ .withConfig(config -> config.with(NETWORK)).start()) + { + cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};"); + cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'true'}", KEYSPACE, compactionStrategy)); + + for (int i = 0 ; i < rowCount ; ++i) + { + for (int n = 1 ; n < nodes ; ++n) + cluster.get(n).executeInternal(String.format("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), Integer.toString(i)); + } + + cluster.get(nodes).executeInternal("TRUNCATE system.available_ranges;"); + { + Object[][] results = cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE)); + Assert.assertEquals(0, results.length); + } + + // collect message and state + registerSink(cluster, nodes); + + cluster.get(nodes).runOnInstance(() -> StorageService.instance.rebuild(null, KEYSPACE, null, null)); + { + Object[][] results = cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE)); + Assert.assertEquals(1000, results.length); + Arrays.sort(results, Comparator.comparingInt(a -> Integer.parseInt((String) a[0]))); + for (int i = 0 ; i < results.length ; ++i) + { + Assert.assertEquals(Integer.toString(i), results[i][0]); + Assert.assertEquals("value1", results[i][1]); + Assert.assertEquals("value2", results[i][2]); + } + } + } + } + + @Test + public void test() throws Throwable + { + testStreaming(2, 2, 1000, "LeveledCompactionStrategy"); + } + + public static void registerSink(Cluster cluster, int initiatorNodeId) + { + IInvokableInstance initiatorNode = cluster.get(initiatorNodeId); + InetSocketAddress initiator = initiatorNode.broadcastAddress(); + MessageStateSinkImpl initiatorSink = new MessageStateSinkImpl(); + + for (int node = 1; node <= cluster.size(); node++) + { + if (initiatorNodeId == node) + continue; + + IInvokableInstance followerNode = cluster.get(node); + InetSocketAddress follower = followerNode.broadcastAddress(); + + // verify on initiator's stream session + initiatorSink.messages(follower, Arrays.asList(PREPARE_SYNACK, STREAM, StreamMessage.Type.COMPLETE)); + initiatorSink.states(follower, Arrays.asList(PREPARING, STREAMING, WAIT_COMPLETE, StreamSession.State.COMPLETE)); + + // verify on follower's stream session + MessageStateSinkImpl followerSink = new MessageStateSinkImpl(); + followerSink.messages(initiator, Arrays.asList(STREAM_INIT, PREPARE_SYN, PREPARE_ACK, RECEIVED)); + followerSink.states(initiator, Arrays.asList(PREPARING, STREAMING, StreamSession.State.COMPLETE)); + followerNode.runOnInstance(() -> StreamSession.sink = followerSink); + } + + cluster.get(initiatorNodeId).runOnInstance(() -> StreamSession.sink = initiatorSink); + } + + @VisibleForTesting + public static class MessageStateSinkImpl implements StreamSession.MessageStateSink, Serializable + { + // use enum ordinal instead of enum to walk around inter-jvm class loader issue, only classes defined in + // InstanceClassLoader#sharedClassNames are shareable between server jvm and test jvm + public final Map<InetAddress, Queue<Integer>> messageSink = new ConcurrentHashMap<>(); + public final Map<InetAddress, Queue<Integer>> stateTransitions = new ConcurrentHashMap<>(); + + public void messages(InetSocketAddress peer, List<StreamMessage.Type> messages) + { + messageSink.put(peer.getAddress(), messages.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new))); + } + + public void states(InetSocketAddress peer, List<StreamSession.State> states) + { + stateTransitions.put(peer.getAddress(), states.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new))); + } + + @Override + public void recordState(InetAddressAndPort from, StreamSession.State state) + { + Queue<Integer> states = stateTransitions.get(from.address); + if (states.peek() == null) + Assert.fail("Unexpected state " + state); + + int expected = states.poll(); + Assert.assertEquals(StreamSession.State.values()[expected], state); + } + + @Override + public void recordMessage(InetAddressAndPort from, StreamMessage.Type message) + { + if (message == StreamMessage.Type.KEEP_ALIVE) + return; + + Queue<Integer> messages = messageSink.get(from.address); + if (messages.peek() == null) + Assert.fail("Unexpected message " + message); + + int expected = messages.poll(); + Assert.assertEquals(StreamMessage.Type.values()[expected], message); + } + + @Override + public void onClose(InetAddressAndPort from) + { + Queue<Integer> states = stateTransitions.get(from.address); + Assert.assertTrue("Missing states: " + states, states.isEmpty()); + + Queue<Integer> messages = messageSink.get(from.address); + Assert.assertTrue("Missing messages: " + messages, messages.isEmpty()); + } + } +} diff --cc test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java index 88dd0f9,5970992..b2af816 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java @@@ -32,8 -37,7 +32,12 @@@ public class UpgradeTest extends Upgrad { new TestCase() .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X) + .upgrade(Versions.Major.v30, Versions.Major.v3X, Versions.Major.v4) .setup((cluster) -> { ++ //TODO fix - this is a jvm-dtest bug where we get null when we want the NoPayload object ++ cluster.setUncaughtExceptionsFilter(t -> ++ t instanceof IllegalArgumentException ++ && t.getStackTrace()[0].getClassName().startsWith("org.apache.cassandra.net.NoPayload")); cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)"); @@@ -42,11 -46,44 +46,11 @@@ }) .runAfterClusterUpgrade((cluster) -> { assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", - ConsistencyLevel.ALL, - 1), - row(1, 1, 1), - row(1, 2, 2), - row(1, 3, 3)); - }).run(); - } - - @Test - public void mixedModePagingTest() throws Throwable - { - new TestCase() - .upgrade(Versions.Major.v22, Versions.Major.v30) - .nodes(2) - .nodesToUpgrade(2) - .setup((cluster) -> { - cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}"); - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) with compact storage"); - for (int i = 0; i < 100; i++) - for (int j = 0; j < 200; j++) - cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j); - cluster.forEach((i) -> i.flush(KEYSPACE)); - for (int i = 0; i < 100; i++) - for (int j = 10; j < 30; j++) - cluster.coordinator(2).execute("DELETE FROM " + KEYSPACE + ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j); - cluster.forEach((i) -> i.flush(KEYSPACE)); - }) - .runAfterClusterUpgrade((cluster) -> { - for (int i = 0; i < 100; i++) - { - for (int pageSize = 10; pageSize < 100; pageSize++) - { - Iterator<Object[]> res = cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", - ConsistencyLevel.ALL, - pageSize, i); - Assert.assertEquals(180, Iterators.size(res)); - } - } + ConsistencyLevel.ALL, + 1), + row(1, 1, 1), + row(1, 2, 2), + row(1, 3, 3)); }).run(); } --} ++} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
