This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 21a363becc8e2a2c1b3042b9544f1c47606f5bab
Merge: 7a63cc2 b1efb8e
Author: David Capwell <[email protected]>
AuthorDate: Mon Oct 5 19:06:01 2020 -0700

    Merge branch 'cassandra-3.11' into trunk

 build.xml                                          |  2 +-
 .../distributed/impl/AbstractCluster.java          | 37 ++++++++++++++++++++--
 .../impl/DelegatingInvokableInstance.java          |  1 +
 .../cassandra/distributed/impl/Instance.java       |  3 +-
 .../cassandra/distributed/impl/InstanceConfig.java | 14 ++++++--
 .../ShutdownException.java}                        | 18 ++++-------
 .../distributed/test/FailingRepairTest.java        |  6 ++++
 .../test/FullRepairCoordinatorFastTest.java        |  1 +
 .../test/IncrementalRepairCoordinatorFastTest.java |  1 +
 .../distributed/test/NetworkTopologyTest.java      | 15 +++++----
 .../test/PreviewRepairCoordinatorFastTest.java     |  2 ++
 .../cassandra/distributed/test/StreamingTest.java  |  5 ++-
 .../cassandra/distributed/upgrade/UpgradeTest.java |  6 +++-
 13 files changed, 86 insertions(+), 25 deletions(-)

diff --cc build.xml
index 40729a6,82c35e9..6961cf9
--- a/build.xml
+++ b/build.xml
@@@ -582,14 -412,19 +582,14 @@@
            <dependency groupId="com.fasterxml.jackson.core" 
artifactId="jackson-annotations" version="2.9.10"/>
            <dependency groupId="com.googlecode.json-simple" 
artifactId="json-simple" version="1.1"/>
            <dependency groupId="com.boundary" artifactId="high-scale-lib" 
version="1.0.6"/>
 -          <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="0.3.0"/>
 +          <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="${jamm.version}"/>
  
 -          <dependency groupId="com.thinkaurelius.thrift" 
artifactId="thrift-server" version="0.3.7">
 -            <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/>
 -            <exclusion groupId="junit" artifactId="junit"/>
 -          </dependency>
 -          <dependency groupId="org.yaml" artifactId="snakeyaml" 
version="1.11"/>
 -          <dependency groupId="org.apache.thrift" artifactId="libthrift" 
version="0.9.2">
 -               <exclusion groupId="commons-logging" 
artifactId="commons-logging"/>
 -          </dependency>
 -          <dependency groupId="junit" artifactId="junit" version="4.6" />
 +          <dependency groupId="org.yaml" artifactId="snakeyaml" 
version="1.23"/>
 +          <dependency groupId="junit" artifactId="junit" version="4.12" />
            <dependency groupId="org.mockito" artifactId="mockito-core" 
version="3.2.4" />
 +          <dependency groupId="org.quicktheories" artifactId="quicktheories" 
version="0.25" />
 +          <dependency groupId="com.google.code.java-allocation-instrumenter" 
artifactId="java-allocation-instrumenter" 
version="${allocation-instrumenter.version}" />
-           <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.4" />
+           <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.5" />
            <dependency groupId="org.apache.rat" artifactId="apache-rat" 
version="0.10">
               <exclusion groupId="commons-lang" artifactId="commons-lang"/>
            </dependency>
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index a28c935,a6a6336..b6f359a
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@@ -31,9 -33,10 +33,11 @@@ import java.util.concurrent.Future
  import java.util.concurrent.TimeUnit;
  import java.util.concurrent.atomic.AtomicInteger;
  import java.util.function.BiConsumer;
+ import java.util.function.BiPredicate;
  import java.util.function.Consumer;
+ import java.util.function.Predicate;
  import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
  import java.util.stream.Stream;
  
  import com.google.common.collect.Sets;
@@@ -61,9 -64,11 +65,10 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.shared.InstanceClassLoader;
  import org.apache.cassandra.distributed.shared.MessageFilters;
  import org.apache.cassandra.distributed.shared.NetworkTopology;
+ import org.apache.cassandra.distributed.shared.ShutdownException;
  import org.apache.cassandra.distributed.shared.Versions;
  import org.apache.cassandra.io.util.FileUtils;
 -import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.Verb;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.concurrent.SimpleCondition;
  
@@@ -119,30 -122,12 +124,33 @@@ public abstract class AbstractCluster<
  
      // mutated by user-facing API
      private final MessageFilters filters;
 +    private final INodeProvisionStrategy.Strategy nodeProvisionStrategy;
      private final BiConsumer<ClassLoader, Integer> instanceInitializer;
+     private final int datadirCount;
 +    private volatile Thread.UncaughtExceptionHandler previousHandler = null;
+     private volatile BiPredicate<Integer, Throwable> ignoreUncaughtThrowable 
= null;
+     private final List<Throwable> uncaughtExceptions = new 
CopyOnWriteArrayList<>();
  
 -    private volatile Thread.UncaughtExceptionHandler previousHandler = null;
 +    /**
 +     * Common builder, add methods that are applicable to both Cluster and 
Upgradable cluster here.
 +     */
 +    public static abstract class AbstractBuilder<I extends IInstance, C 
extends ICluster, B extends AbstractBuilder<I, C, B>>
 +        extends org.apache.cassandra.distributed.shared.AbstractBuilder<I, C, 
B>
 +    {
 +        private INodeProvisionStrategy.Strategy nodeProvisionStrategy = 
INodeProvisionStrategy.Strategy.MultipleNetworkInterfaces;
 +
 +        public AbstractBuilder(Factory<I, C, B> factory)
 +        {
 +            super(factory);
 +        }
 +
 +        public B withNodeProvisionStrategy(INodeProvisionStrategy.Strategy 
nodeProvisionStrategy)
 +        {
 +            this.nodeProvisionStrategy = nodeProvisionStrategy;
 +            return (B) this;
 +        }
 +    }
 +
  
      protected class Wrapper extends DelegatingInvokableInstance implements 
IUpgradeableInstance
      {
@@@ -321,10 -299,14 +330,10 @@@
  
      private InstanceConfig createInstanceConfig(int nodeNum)
      {
 -        String ipPrefix = "127.0." + subnet + ".";
 -        String seedIp = ipPrefix + "1";
 -        String ipAddress = ipPrefix + nodeNum;
 +        INodeProvisionStrategy provisionStrategy = 
nodeProvisionStrategy.create(subnet);
          long token = tokenSupplier.token(nodeNum);
 -
 -        NetworkTopology topology = NetworkTopology.build(ipPrefix, 
broadcastPort, nodeIdTopology);
 -
 -        InstanceConfig config = InstanceConfig.generate(nodeNum, ipAddress, 
topology, root, String.valueOf(token), seedIp, datadirCount);
 +        NetworkTopology topology = buildNetworkTopology(provisionStrategy, 
nodeIdTopology);
-         InstanceConfig config = InstanceConfig.generate(nodeNum, 
provisionStrategy, topology, root, Long.toString(token));
++        InstanceConfig config = InstanceConfig.generate(nodeNum, 
provisionStrategy, topology, root, Long.toString(token), datadirCount);
          if (configUpdater != null)
              configUpdater.accept(config);
  
@@@ -651,10 -619,22 +660,22 @@@
                  handler.uncaughtException(thread, error);
              return;
          }
+ 
          InstanceClassLoader cl = (InstanceClassLoader) 
thread.getContextClassLoader();
          get(cl.getInstanceId()).uncaughtException(thread, error);
+ 
+         BiPredicate<Integer, Throwable> ignore = ignoreUncaughtThrowable;
+         I instance = get(cl.getInstanceId());
+         if ((ignore == null || !ignore.test(cl.getInstanceId(), error)) && 
instance != null && !instance.isShutdown())
+             uncaughtExceptions.add(error);
+     }
+ 
+     @Override
+     public void setUncaughtExceptionsFilter(BiPredicate<Integer, Throwable> 
ignoreUncaughtThrowable)
+     {
+         this.ignoreUncaughtThrowable = ignoreUncaughtThrowable;
      }
 -    
 +
      @Override
      public void close()
      {
@@@ -671,22 -651,23 +692,34 @@@
              FileUtils.deleteRecursive(root);
          Thread.setDefaultUncaughtExceptionHandler(previousHandler);
          previousHandler = null;
- 
+         checkAndResetUncaughtExceptions();
 -
 +        //checkForThreadLeaks();
          //withThreadLeakCheck(futures);
      }
+ 
+     @Override
+     public void checkAndResetUncaughtExceptions()
+     {
+         List<Throwable> drain = new ArrayList<>(uncaughtExceptions.size());
+         uncaughtExceptions.removeIf(e -> {
+             drain.add(e);
+             return true;
+         });
+         if (!drain.isEmpty())
+             throw new ShutdownException(drain);
+     }
  
 +    private void checkForThreadLeaks()
 +    {
 +        //This is an alternate version of the thread leak check that just 
checks to see if any threads are still alive
 +        // with the context classloader.
 +        Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
 +        threadSet.stream().filter(t->t.getContextClassLoader() instanceof 
InstanceClassLoader).forEach(t->{
 +            t.setContextClassLoader(null);
 +            throw new RuntimeException("Unterminated thread detected " + 
t.getName() + " in group " + t.getThreadGroup().getName());
 +        });
 +    }
 +
      // We do not want this check to run every time until we fix problems with 
tread stops
      private void withThreadLeakCheck(List<Future<?>> futures)
      {
diff --cc test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index 038698b,81c501c..03058dc
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@@ -712,4 -801,4 +712,5 @@@ public class Instance extends IsolatedE
          }
          return accumulate;
      }
- }
+ }
++
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
index a8ed918,1f39f76..2bb486f
--- a/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/InstanceConfig.java
@@@ -267,30 -261,32 +267,40 @@@ public class InstanceConfig implements 
          return (String)params.get(name);
      }
  
 -    public static InstanceConfig generate(int nodeNum, String ipAddress, 
NetworkTopology networkTopology, File root, String token, String seedIp, int 
datadirCount)
 +    public static InstanceConfig generate(int nodeNum,
 +                                          INodeProvisionStrategy 
provisionStrategy,
 +                                          NetworkTopology networkTopology,
 +                                          File root,
-                                           String token)
++                                          String token,
++                                          int datadirCount)
      {
          return new InstanceConfig(nodeNum,
                                    networkTopology,
 -                                  ipAddress,
 -                                  ipAddress,
 -                                  ipAddress,
 -                                  ipAddress,
 -                                  seedIp,
 +                                  provisionStrategy.ipAddress(nodeNum),
 +                                  provisionStrategy.ipAddress(nodeNum),
 +                                  provisionStrategy.ipAddress(nodeNum),
 +                                  provisionStrategy.ipAddress(nodeNum),
 +                                  provisionStrategy.seedIp(),
 +                                  provisionStrategy.seedPort(),
                                    String.format("%s/node%d/saved_caches", 
root, nodeNum),
-                                   new String[] { 
String.format("%s/node%d/data", root, nodeNum) },
+                                   datadirs(datadirCount, root, nodeNum),
                                    String.format("%s/node%d/commitlog", root, 
nodeNum),
                                    String.format("%s/node%d/hints", root, 
nodeNum),
                                    String.format("%s/node%d/cdc", root, 
nodeNum),
 -                                  token);
 +                                  token,
 +                                  provisionStrategy.storagePort(nodeNum),
 +                                  
provisionStrategy.nativeTransportPort(nodeNum));
      }
  
+     private static String[] datadirs(int datadirCount, File root, int nodeNum)
+     {
+         String datadirFormat = String.format("%s/node%d/data%%d", 
root.getPath(), nodeNum);
+         String [] datadirs = new String[datadirCount];
+         for (int i = 0; i < datadirs.length; i++)
+             datadirs[i] = String.format(datadirFormat, i);
+         return datadirs;
+     }
+ 
      public InstanceConfig forVersion(Versions.Major major)
      {
          switch (major)
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
index bad6d87,0000000..7feefa3
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/FailingRepairTest.java
@@@ -1,348 -1,0 +1,354 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.IOException;
 +import java.io.Serializable;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import com.google.common.util.concurrent.Uninterruptibles;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +import org.junit.runners.Parameterized.Parameters;
 +
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.api.Feature;
 +import org.apache.cassandra.distributed.api.ICluster;
 +import 
org.apache.cassandra.distributed.api.IIsolatedExecutor.SerializableRunnable;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.impl.InstanceKiller;
 +import org.apache.cassandra.io.sstable.CorruptSSTableException;
 +import org.apache.cassandra.io.sstable.ISSTableScanner;
 +import org.apache.cassandra.io.sstable.format.ForwardingSSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableReader;
 +import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
 +import org.apache.cassandra.io.util.ChannelProxy;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
 +import org.apache.cassandra.service.StorageService;
 +
 +@RunWith(Parameterized.class)
 +public class FailingRepairTest extends TestBaseImpl implements Serializable
 +{
 +    private static ICluster<IInvokableInstance> CLUSTER;
 +
 +    private final Verb messageType;
 +    private final RepairParallelism parallelism;
 +    private final boolean withTracing;
 +    private final SerializableRunnable setup;
 +
 +    public FailingRepairTest(Verb messageType, RepairParallelism parallelism, 
boolean withTracing, SerializableRunnable setup)
 +    {
 +        this.messageType = messageType;
 +        this.parallelism = parallelism;
 +        this.withTracing = withTracing;
 +        this.setup = setup;
 +    }
 +
 +    @Parameters(name = "{0}/{1}/{2}")
 +    public static Collection<Object[]> messages()
 +    {
 +        List<Object[]> tests = new ArrayList<>();
 +        for (RepairParallelism parallelism : RepairParallelism.values())
 +        {
 +            for (Boolean withTracing : Arrays.asList(Boolean.TRUE, 
Boolean.FALSE))
 +            {
 +                tests.add(new Object[]{ Verb.VALIDATION_REQ, parallelism, 
withTracing, failingReaders(Verb.VALIDATION_REQ, parallelism, withTracing) });
 +            }
 +        }
 +        return tests;
 +    }
 +
 +    private static SerializableRunnable failingReaders(Verb type, 
RepairParallelism parallelism, boolean withTracing)
 +    {
 +        return () -> {
 +            String cfName = getCfName(type, parallelism, withTracing);
 +            ColumnFamilyStore cf = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(cfName);
 +            cf.forceBlockingFlush();
 +            Set<SSTableReader> remove = cf.getLiveSSTables();
 +            Set<SSTableReader> replace = new HashSet<>();
 +            if (type == Verb.VALIDATION_REQ)
 +            {
 +                for (SSTableReader r : remove)
 +                    replace.add(new FailingSSTableReader(r));
 +            }
 +            else
 +            {
 +                throw new UnsupportedOperationException("verb: " + type);
 +            }
 +            cf.getTracker().removeUnsafe(remove);
 +            cf.addSSTables(replace);
 +        };
 +    }
 +
 +    private static String getCfName(Verb type, RepairParallelism parallelism, 
boolean withTracing)
 +    {
 +        return type.name().toLowerCase() + "_" + 
parallelism.name().toLowerCase() + "_" + withTracing;
 +    }
 +
 +    @BeforeClass
 +    public static void setupCluster() throws IOException
 +    {
 +        // streaming requires networking ATM
 +        // streaming also requires gossip or isn't setup properly
 +        CLUSTER = init(Cluster.build()
 +                              .withNodes(2)
 +                              .withConfig(c -> c.with(Feature.NETWORK)
 +                                             .with(Feature.GOSSIP)
 +                                             .set("disk_failure_policy", 
"die"))
 +                              .start());
++        CLUSTER.setUncaughtExceptionsFilter((throwable) -> {
++            if (throwable.getClass().toString().contains("InstanceShutdown") 
|| // can't check instanceof as it is thrown by a different classloader
++                throwable.getMessage() != null && 
throwable.getMessage().contains("Parent repair session with id"))
++                return true;
++            return false;
++        });
 +    }
 +
 +    @AfterClass
 +    public static void teardownCluster() throws Exception
 +    {
 +        if (CLUSTER != null)
 +            CLUSTER.close();
 +    }
 +
 +    @Before
 +    public void cleanupState()
 +    {
 +        for (int i = 1; i <= CLUSTER.size(); i++)
 +            CLUSTER.get(i).runOnInstance(InstanceKiller::clear);
 +    }
 +
 +    @Test(timeout = 10 * 60 * 1000)
 +    public void testFailingMessage() throws IOException
 +    {
 +        final int replica = 1;
 +        final int coordinator = 2;
 +        String tableName = getCfName(messageType, parallelism, withTracing);
 +        String fqtn = KEYSPACE + "." + tableName;
 +
 +        CLUSTER.schemaChange("CREATE TABLE " + fqtn + " (k INT, PRIMARY KEY 
(k))");
 +
 +        // create data which will NOT conflict
 +        int lhsOffset = 10;
 +        int rhsOffset = 20;
 +        int limit = rhsOffset + (rhsOffset - lhsOffset);
 +
 +        // setup data which is consistent on both sides
 +        for (int i = 0; i < lhsOffset; i++)
 +            CLUSTER.coordinator(replica)
 +                   .execute("INSERT INTO " + fqtn + " (k) VALUES (?)", 
ConsistencyLevel.ALL, i);
 +
 +        // create data on LHS which does NOT exist in RHS
 +        for (int i = lhsOffset; i < rhsOffset; i++)
 +            CLUSTER.get(replica).executeInternal("INSERT INTO " + fqtn + " 
(k) VALUES (?)", i);
 +
 +        // create data on RHS which does NOT exist in LHS
 +        for (int i = rhsOffset; i < limit; i++)
 +            CLUSTER.get(coordinator).executeInternal("INSERT INTO " + fqtn + 
" (k) VALUES (?)", i);
 +
 +        // at this point, the two nodes should be out of sync, so confirm 
missing data
 +        // node 1
 +        Object[][] node1Records = toRows(IntStream.range(0, rhsOffset));
 +        Object[][] node1Actuals = 
toNaturalOrder(CLUSTER.get(replica).executeInternal("SELECT k FROM " + fqtn));
 +        Assert.assertArrayEquals(node1Records, node1Actuals);
 +
 +        // node 2
 +        Object[][] node2Records = toRows(IntStream.concat(IntStream.range(0, 
lhsOffset), IntStream.range(rhsOffset, limit)));
 +        Object[][] node2Actuals = 
toNaturalOrder(CLUSTER.get(coordinator).executeInternal("SELECT k FROM " + 
fqtn));
 +        Assert.assertArrayEquals(node2Records, node2Actuals);
 +
 +        // Inject the failure
 +        CLUSTER.get(replica).runOnInstance(() -> setup.run());
 +
 +        // run a repair which is expected to fail
 +        List<String> repairStatus = 
CLUSTER.get(coordinator).callOnInstance(() -> {
 +            // need all ranges on the host
 +            String ranges = 
StorageService.instance.getLocalAndPendingRanges(KEYSPACE).stream()
 +                                                   .map(r -> r.left + ":" + 
r.right)
 +                                                   
.collect(Collectors.joining(","));
 +            Map<String, String> args = new HashMap<String, String>()
 +            {{
 +                put(RepairOption.PARALLELISM_KEY, parallelism.getName());
 +                put(RepairOption.PRIMARY_RANGE_KEY, "false");
 +                put(RepairOption.INCREMENTAL_KEY, "false");
 +                put(RepairOption.TRACE_KEY, Boolean.toString(withTracing));
 +                put(RepairOption.PULL_REPAIR_KEY, "false");
 +                put(RepairOption.FORCE_REPAIR_KEY, "false");
 +                put(RepairOption.RANGES_KEY, ranges);
 +                put(RepairOption.COLUMNFAMILIES_KEY, tableName);
 +            }};
 +            int cmd = StorageService.instance.repairAsync(KEYSPACE, args);
 +            Assert.assertFalse("repair return status was 0, expected non-zero 
return status, 0 indicates repair not submitted", cmd == 0);
 +            List<String> status;
 +            do
 +            {
 +                Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
 +                status = StorageService.instance.getParentRepairStatus(cmd);
 +            } while (status == null || 
status.get(0).equals(ParentRepairStatus.IN_PROGRESS.name()));
 +
 +            return status;
 +        });
 +        Assert.assertEquals(repairStatus.toString(), 
ParentRepairStatus.FAILED, ParentRepairStatus.valueOf(repairStatus.get(0)));
 +
 +        // its possible that the coordinator gets the message that the 
replica failed before the replica completes
 +        // shutting down; this then means that isKilled could be updated 
after the fact
 +        IInvokableInstance replicaInstance = CLUSTER.get(replica);
 +        while (replicaInstance.killAttempts() <= 0)
 +            Uninterruptibles.sleepUninterruptibly(50, TimeUnit.MILLISECONDS);
 +
 +        Assert.assertEquals("replica should be killed", 1, 
replicaInstance.killAttempts());
 +        Assert.assertEquals("coordinator should not be killed", 0, 
CLUSTER.get(coordinator).killAttempts());
 +    }
 +
 +    private static Object[][] toNaturalOrder(Object[][] actuals)
 +    {
 +        // data is returned in token order, so rather than try to be fancy 
and order expected in token order
 +        // convert it to natural
 +        int[] values = new int[actuals.length];
 +        for (int i = 0; i < values.length; i++)
 +            values[i] = (Integer) actuals[i][0];
 +        Arrays.sort(values);
 +        return toRows(IntStream.of(values));
 +    }
 +
 +    private static Object[][] toRows(IntStream values)
 +    {
 +        return values
 +               .mapToObj(v -> new Object[]{ v })
 +               .toArray(Object[][]::new);
 +    }
 +
 +    private static final class FailingSSTableReader extends 
ForwardingSSTableReader
 +    {
 +
 +        private FailingSSTableReader(SSTableReader delegate)
 +        {
 +            super(delegate);
 +        }
 +
 +        public ISSTableScanner getScanner()
 +        {
 +            return new FailingISSTableScanner();
 +        }
 +
 +        public ISSTableScanner getScanner(Collection<Range<Token>> ranges)
 +        {
 +            return new FailingISSTableScanner();
 +        }
 +
 +        public ISSTableScanner 
getScanner(Iterator<AbstractBounds<PartitionPosition>> rangeIterator)
 +        {
 +            return new FailingISSTableScanner();
 +        }
 +
 +        public ISSTableScanner getScanner(ColumnFilter columns, DataRange 
dataRange, SSTableReadsListener listener)
 +        {
 +            return new FailingISSTableScanner();
 +        }
 +
 +        public ChannelProxy getDataChannel()
 +        {
 +            throw new RuntimeException();
 +        }
 +
 +        public String toString()
 +        {
 +            return "FailingSSTableReader[" + super.toString() + "]";
 +        }
 +    }
 +
 +    private static final class FailingISSTableScanner implements 
ISSTableScanner
 +    {
 +        public long getLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        public long getCompressedLengthInBytes()
 +        {
 +            return 0;
 +        }
 +
 +        public long getCurrentPosition()
 +        {
 +            return 0;
 +        }
 +
 +        public long getBytesScanned()
 +        {
 +            return 0;
 +        }
 +
 +        public Set<SSTableReader> getBackingSSTables()
 +        {
 +            return Collections.emptySet();
 +        }
 +
 +        public TableMetadata metadata()
 +        {
 +            return null;
 +        }
 +
 +        public void close()
 +        {
 +
 +        }
 +
 +        public boolean hasNext()
 +        {
 +            throw new CorruptSSTableException(new IOException("Test commands 
it"), "mahahahaha!");
 +        }
 +
 +        public UnfilteredRowIterator next()
 +        {
 +            throw new CorruptSSTableException(new IOException("Test commands 
it"), "mahahahaha!");
 +        }
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
index e380985,0000000..0ce6f8d
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/FullRepairCoordinatorFastTest.java
@@@ -1,34 -1,0 +1,35 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +
 +import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
 +import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 +
 +@RunWith(Parameterized.class)
 +public class FullRepairCoordinatorFastTest extends RepairCoordinatorFast
 +{
 +    public FullRepairCoordinatorFastTest(RepairParallelism parallelism, 
boolean withNotifications)
 +    {
 +        super(RepairType.FULL, parallelism, withNotifications);
++        CLUSTER.setUncaughtExceptionsFilter((instance, ex) -> true);
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
index 7a4c98e,0000000..4aa1cb4
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/IncrementalRepairCoordinatorFastTest.java
@@@ -1,34 -1,0 +1,35 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +
 +import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
 +import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 +
 +@RunWith(Parameterized.class)
 +public class IncrementalRepairCoordinatorFastTest extends 
RepairCoordinatorFast
 +{
 +    public IncrementalRepairCoordinatorFastTest(RepairParallelism 
parallelism, boolean withNotifications)
 +    {
 +        super(RepairType.INCREMENTAL, parallelism, withNotifications);
++        CLUSTER.setUncaughtExceptionsFilter((throwable) -> 
throwable.getMessage().contains("prepare fail"));
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
index bafef05,0000000..c94a700
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairCoordinatorFastTest.java
@@@ -1,34 -1,0 +1,36 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import org.junit.runner.RunWith;
 +import org.junit.runners.Parameterized;
 +
 +import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairParallelism;
 +import 
org.apache.cassandra.distributed.test.DistributedRepairUtils.RepairType;
 +
 +@RunWith(Parameterized.class)
 +public class PreviewRepairCoordinatorFastTest extends RepairCoordinatorFast
 +{
 +    public PreviewRepairCoordinatorFastTest(RepairParallelism parallelism, 
boolean withNotifications)
 +    {
 +        super(RepairType.PREVIEW, parallelism, withNotifications);
++        CLUSTER.setUncaughtExceptionsFilter((throwable) -> 
throwable.getMessage().contains("prepare fail") ||
++                                                           
throwable.getMessage().contains("snapshot fail"));
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
index 956f21e,0000000..b68d268
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/StreamingTest.java
@@@ -1,182 -1,0 +1,185 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test;
 +
 +import java.io.Serializable;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.util.Arrays;
 +import java.util.Comparator;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Queue;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
++import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.streaming.StreamSession;
 +import org.apache.cassandra.streaming.messages.StreamMessage;
 +
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.apache.cassandra.streaming.StreamSession.State.PREPARING;
 +import static org.apache.cassandra.streaming.StreamSession.State.STREAMING;
 +import static 
org.apache.cassandra.streaming.StreamSession.State.WAIT_COMPLETE;
 +import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_ACK;
 +import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYN;
 +import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.PREPARE_SYNACK;
 +import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.RECEIVED;
 +import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM;
 +import static 
org.apache.cassandra.streaming.messages.StreamMessage.Type.STREAM_INIT;
 +
 +public class StreamingTest extends TestBaseImpl
 +{
 +
 +    private void testStreaming(int nodes, int replicationFactor, int 
rowCount, String compactionStrategy) throws Throwable
 +    {
-         try (Cluster cluster = (Cluster) 
builder().withNodes(nodes).withConfig(config -> config.with(NETWORK)).start())
++        try (Cluster cluster = builder().withNodes(nodes)
++                                        .withDataDirCount(1) // this test 
expects there to only be a single sstable to stream (with ddirs = 3, we get 3 
sstables)
++                                        .withConfig(config -> 
config.with(NETWORK)).start())
 +        {
 +            cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': " + 
replicationFactor + "};");
 +            cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, 
c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 
'true'}", KEYSPACE, compactionStrategy));
 +
 +            for (int i = 0 ; i < rowCount ; ++i)
 +            {
 +                for (int n = 1 ; n < nodes ; ++n)
 +                    cluster.get(n).executeInternal(String.format("INSERT INTO 
%s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');", KEYSPACE), 
Integer.toString(i));
 +            }
 +
 +            cluster.get(nodes).executeInternal("TRUNCATE 
system.available_ranges;");
 +            {
 +                Object[][] results = 
cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM 
%s.cf;", KEYSPACE));
 +                Assert.assertEquals(0, results.length);
 +            }
 +
 +            // collect message and state
 +            registerSink(cluster, nodes);
 +
 +            cluster.get(nodes).runOnInstance(() -> 
StorageService.instance.rebuild(null, KEYSPACE, null, null));
 +            {
 +                Object[][] results = 
cluster.get(nodes).executeInternal(String.format("SELECT k, c1, c2 FROM 
%s.cf;", KEYSPACE));
 +                Assert.assertEquals(1000, results.length);
 +                Arrays.sort(results, Comparator.comparingInt(a -> 
Integer.parseInt((String) a[0])));
 +                for (int i = 0 ; i < results.length ; ++i)
 +                {
 +                    Assert.assertEquals(Integer.toString(i), results[i][0]);
 +                    Assert.assertEquals("value1", results[i][1]);
 +                    Assert.assertEquals("value2", results[i][2]);
 +                }
 +            }
 +        }
 +    }
 +
 +    @Test
 +    public void test() throws Throwable
 +    {
 +        testStreaming(2, 2, 1000, "LeveledCompactionStrategy");
 +    }
 +
 +    public static void registerSink(Cluster cluster, int initiatorNodeId)
 +    {
 +        IInvokableInstance initiatorNode = cluster.get(initiatorNodeId);
 +        InetSocketAddress initiator = initiatorNode.broadcastAddress();
 +        MessageStateSinkImpl initiatorSink = new MessageStateSinkImpl();
 +
 +        for (int node = 1; node <= cluster.size(); node++)
 +        {
 +            if (initiatorNodeId == node)
 +                continue;
 +
 +            IInvokableInstance followerNode = cluster.get(node);
 +            InetSocketAddress follower = followerNode.broadcastAddress();
 +
 +            // verify on initiator's stream session
 +            initiatorSink.messages(follower, Arrays.asList(PREPARE_SYNACK, 
STREAM, StreamMessage.Type.COMPLETE));
 +            initiatorSink.states(follower, Arrays.asList(PREPARING, 
STREAMING, WAIT_COMPLETE, StreamSession.State.COMPLETE));
 +
 +            // verify on follower's stream session
 +            MessageStateSinkImpl followerSink = new MessageStateSinkImpl();
 +            followerSink.messages(initiator, Arrays.asList(STREAM_INIT, 
PREPARE_SYN, PREPARE_ACK, RECEIVED));
 +            followerSink.states(initiator,  Arrays.asList(PREPARING, 
STREAMING, StreamSession.State.COMPLETE));
 +            followerNode.runOnInstance(() -> StreamSession.sink = 
followerSink);
 +        }
 +
 +        cluster.get(initiatorNodeId).runOnInstance(() -> StreamSession.sink = 
initiatorSink);
 +    }
 +
 +    @VisibleForTesting
 +    public static class MessageStateSinkImpl implements 
StreamSession.MessageStateSink, Serializable
 +    {
 +        // use enum ordinal instead of enum to walk around inter-jvm class 
loader issue, only classes defined in
 +        // InstanceClassLoader#sharedClassNames are shareable between server 
jvm and test jvm
 +        public final Map<InetAddress, Queue<Integer>> messageSink = new 
ConcurrentHashMap<>();
 +        public final Map<InetAddress, Queue<Integer>> stateTransitions = new 
ConcurrentHashMap<>();
 +
 +        public void messages(InetSocketAddress peer, List<StreamMessage.Type> 
messages)
 +        {
 +            messageSink.put(peer.getAddress(), 
messages.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new)));
 +        }
 +
 +        public void states(InetSocketAddress peer, List<StreamSession.State> 
states)
 +        {
 +            stateTransitions.put(peer.getAddress(), 
states.stream().map(Enum::ordinal).collect(Collectors.toCollection(LinkedList::new)));
 +        }
 +
 +        @Override
 +        public void recordState(InetAddressAndPort from, StreamSession.State 
state)
 +        {
 +            Queue<Integer> states = stateTransitions.get(from.address);
 +            if (states.peek() == null)
 +                Assert.fail("Unexpected state " + state);
 +
 +            int expected = states.poll();
 +            Assert.assertEquals(StreamSession.State.values()[expected], 
state);
 +        }
 +
 +        @Override
 +        public void recordMessage(InetAddressAndPort from, StreamMessage.Type 
message)
 +        {
 +            if (message == StreamMessage.Type.KEEP_ALIVE)
 +                return;
 +
 +            Queue<Integer> messages = messageSink.get(from.address);
 +            if (messages.peek() == null)
 +                Assert.fail("Unexpected message " + message);
 +
 +            int expected = messages.poll();
 +            Assert.assertEquals(StreamMessage.Type.values()[expected], 
message);
 +        }
 +
 +        @Override
 +        public void onClose(InetAddressAndPort from)
 +        {
 +            Queue<Integer> states = stateTransitions.get(from.address);
 +            Assert.assertTrue("Missing states: " + states, states.isEmpty());
 +
 +            Queue<Integer> messages = messageSink.get(from.address);
 +            Assert.assertTrue("Missing messages: " + messages, 
messages.isEmpty());
 +        }
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
index 88dd0f9,5970992..b2af816
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/UpgradeTest.java
@@@ -32,8 -37,7 +32,12 @@@ public class UpgradeTest extends Upgrad
      {
          new TestCase()
          .upgrade(Versions.Major.v22, Versions.Major.v30, Versions.Major.v3X)
 +        .upgrade(Versions.Major.v30, Versions.Major.v3X, Versions.Major.v4)
          .setup((cluster) -> {
++            //TODO fix - this is a jvm-dtest bug where we get null when we 
want the NoPayload object
++            cluster.setUncaughtExceptionsFilter(t ->
++                                                t instanceof 
IllegalArgumentException 
++                                                && 
t.getStackTrace()[0].getClassName().startsWith("org.apache.cassandra.net.NoPayload"));
              cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck))");
  
              cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v) VALUES (1, 1, 1)");
@@@ -42,11 -46,44 +46,11 @@@
          })
          .runAfterClusterUpgrade((cluster) -> {
              assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = ?",
 -                                                                          
ConsistencyLevel.ALL,
 -                                                                          1),
 -                                           row(1, 1, 1),
 -                                           row(1, 2, 2),
 -                                           row(1, 3, 3));
 -        }).run();
 -    }
 -
 -    @Test
 -    public void mixedModePagingTest() throws Throwable
 -    {
 -        new TestCase()
 -        .upgrade(Versions.Major.v22, Versions.Major.v30)
 -        .nodes(2)
 -        .nodesToUpgrade(2)
 -        .setup((cluster) -> {
 -            cluster.schemaChange("ALTER KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
 -            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck)) with compact storage");
 -            for (int i = 0; i < 100; i++)
 -                for (int j = 0; j < 200; j++)
 -                    cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE 
+ ".tbl (pk, ck, v) VALUES (?, ?, 1)", ConsistencyLevel.ALL, i, j);
 -            cluster.forEach((i) -> i.flush(KEYSPACE));
 -            for (int i = 0; i < 100; i++)
 -                for (int j = 10; j < 30; j++)
 -                    cluster.coordinator(2).execute("DELETE FROM " + KEYSPACE 
+ ".tbl where pk=? and ck=?", ConsistencyLevel.ALL, i, j);
 -            cluster.forEach((i) -> i.flush(KEYSPACE));
 -        })
 -        .runAfterClusterUpgrade((cluster) -> {
 -            for (int i = 0; i < 100; i++)
 -            {
 -                for (int pageSize = 10; pageSize < 100; pageSize++)
 -                {
 -                    Iterator<Object[]> res = 
cluster.coordinator(1).executeWithPaging("SELECT * FROM " + KEYSPACE + ".tbl 
WHERE pk = ?",
 -                                                                              
        ConsistencyLevel.ALL,
 -                                                                              
        pageSize, i);
 -                    Assert.assertEquals(180, Iterators.size(res));
 -                }
 -            }
 +                                                      ConsistencyLevel.ALL,
 +                                                      1),
 +                       row(1, 1, 1),
 +                       row(1, 2, 2),
 +                       row(1, 3, 3));
          }).run();
      }
--}
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to