This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 9a1bb62822806ea947236b2f5491ecb3669fabde
Merge: 98e798f b5d0626
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Jan 17 14:09:10 2022 +0100

    Merge branch 'cassandra-3.11' into cassandra-4.0

 CHANGES.txt                                        |   1 +
 .../locator/AbstractReplicationStrategy.java       |  92 ++++++++++++-----
 .../apache/cassandra/locator/TokenMetadata.java    |   9 +-
 .../distributed/test/ring/BootstrapTest.java       |   1 +
 .../test/ring/ReadsDuringBootstrapTest.java        | 114 +++++++++++++++++++++
 .../locator/AbstractReplicationStrategyTest.java   |  45 ++++++++
 6 files changed, 236 insertions(+), 26 deletions(-)

diff --cc CHANGES.txt
index 0896d56,5e8213f..d9303f8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,36 -1,17 +1,37 @@@
 -3.11.12
 - * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028)
 +4.0.2
 + * Don't block gossip when clearing repair snapshots (CASSANDRA-17168)
 + * Deduplicate warnings for deprecated parameters (changed names) 
(CASSANDRA-17160)
 + * Update ant-junit to version 1.10.12 (CASSANDRA-17218)
 + * Add droppable tombstone metrics to nodetool tablestats (CASSANDRA-16308)
 + * Fix disk failure triggered when enabling FQL on an unclean directory 
(CASSANDRA-17136)
 + * Fixed broken classpath when multiple jars in build directory 
(CASSANDRA-17129)
 + * DebuggableThreadPoolExecutor does not propagate client warnings 
(CASSANDRA-17072)
 + * internode_send_buff_size_in_bytes and internode_recv_buff_size_in_bytes 
have new names. Backward compatibility with the old names added 
(CASSANDRA-17141)
 + * Remove unused configuration parameters from cassandra.yaml 
(CASSANDRA-17132)
 + * Queries performed with NODE_LOCAL consistency level do not update request 
metrics (CASSANDRA-17052)
 + * Fix multiple full sources can be select unexpectedly for bootstrap 
streaming (CASSANDRA-16945)
 + * Fix cassandra.yaml formatting of parameters (CASSANDRA-17131)
 + * Add backward compatibility for CQLSSTableWriter Date fields 
(CASSANDRA-17117)
 + * Push initial client connection messages to trace (CASSANDRA-17038)
 + * Correct the internode message timestamp if sending node has wrapped 
(CASSANDRA-16997)
 + * Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965)
 + * Avoid rewriting all sstables during cleanup when transient replication is 
enabled (CASSANDRA-16966)
 + * Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987)
 + * Avoid trying to acquire 0 permits from the rate limiter when taking 
snapshot (CASSANDRA-16872)
 + * Upgrade Caffeine to 2.5.6 (CASSANDRA-15153)
 + * Include SASI components to snapshots (CASSANDRA-15134)
 + * Fix missed wait latencies in the output of `nodetool tpstats -F` 
(CASSANDRA-16938)
 + * Remove all the state pollution between tests in SSTableReaderTest 
(CASSANDRA-16888)
 + * Delay auth setup until after gossip has settled to avoid unavailables on 
startup (CASSANDRA-16783)
 + * Fix clustering order logic in CREATE MATERIALIZED VIEW (CASSANDRA-16898)
 + * org.apache.cassandra.db.rows.ArrayCell#unsharedHeapSizeExcludingData 
includes data twice (CASSANDRA-16900)
 + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 +Merged from 3.11:
   * Add key validation to ssstablescrub (CASSANDRA-16969)
   * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851)
 - * Include SASI components to snapshots (CASSANDRA-15134)
   * Make assassinate more resilient to missing tokens (CASSANDRA-16847)
 - * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies 
(CASSANDRA-16854)
 - * Validate SASI tokenizer options before adding index to schema 
(CASSANDRA-15135)
 - * Fixup scrub output when no data post-scrub and clear up old use of row, 
which really means partition (CASSANDRA-16835)
 - * Fix ant-junit dependency issue (CASSANDRA-16827)
 - * Reduce thread contention in CommitLogSegment and HintsBuffer 
(CASSANDRA-16072)
 - * Avoid sending CDC column if not enabled (CASSANDRA-16770)
  Merged from 3.0:
+  * Avoid race in AbstractReplicationStrategy endpoint caching 
(CASSANDRA-16673)
   * Fix abort when window resizing during cqlsh COPY (CASSANDRA-15230)
   * Fix slow keycache load which blocks startup for tables with many sstables 
(CASSANDRA-14898)
   * Fix rare NPE caused by batchlog replay / node decomission races 
(CASSANDRA-17049)
diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 7891895,f4dc3b6..909a7f6
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@@ -23,9 -24,11 +23,10 @@@ import java.lang.reflect.Method
  import java.util.Collection;
  import java.util.Collections;
  import java.util.Map;
+ import java.util.concurrent.atomic.AtomicReference;
  
 -import com.google.common.annotations.VisibleForTesting;
 -import com.google.common.collect.HashMultimap;
 -import com.google.common.collect.Multimap;
 +import com.google.common.base.Preconditions;
 +
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -51,13 -53,12 +52,10 @@@ public abstract class AbstractReplicati
  {
      private static final Logger logger = 
LoggerFactory.getLogger(AbstractReplicationStrategy.class);
  
 -    @VisibleForTesting
 -    final String keyspaceName;
 -    private Keyspace keyspace;
      public final Map<String, String> configOptions;
 +    protected final String keyspaceName;
      private final TokenMetadata tokenMetadata;
- 
-     // track when the token range changes, signaling we need to invalidate 
our endpoint cache
-     private volatile long lastInvalidatedVersion = 0;
- 
 -    private final ReplicaCache<Token, ArrayList<InetAddress>> replicas = new 
ReplicaCache<>();
++    private final ReplicaCache<Token, EndpointsForRange> replicas = new 
ReplicaCache<>();
      public IEndpointSnitch snitch;
  
      protected AbstractReplicationStrategy(String keyspaceName, TokenMetadata 
tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
@@@ -68,28 -70,12 +66,11 @@@
          this.snitch = snitch;
          this.configOptions = configOptions == null ? Collections.<String, 
String>emptyMap() : configOptions;
          this.keyspaceName = keyspaceName;
 -        // lazy-initialize keyspace itself since we don't create them until 
after the replication strategies
      }
  
-     private final Map<Token, EndpointsForRange> cachedReplicas = new 
NonBlockingHashMap<>();
- 
-     public EndpointsForRange getCachedReplicas(Token t)
 -    private ArrayList<InetAddress> getCachedEndpoints(long ringVersion, Token 
t)
++    public EndpointsForRange getCachedReplicas(long ringVersion, Token t)
      {
-         long lastVersion = tokenMetadata.getRingVersion();
- 
-         if (lastVersion > lastInvalidatedVersion)
-         {
-             synchronized (this)
-             {
-                 if (lastVersion > lastInvalidatedVersion)
-                 {
-                     logger.trace("clearing cached endpoints");
-                     cachedReplicas.clear();
-                     lastInvalidatedVersion = lastVersion;
-                 }
-             }
-         }
- 
-         return cachedReplicas.get(t);
+         return replicas.get(ringVersion, t);
      }
  
      /**
@@@ -99,33 -85,22 +80,34 @@@
       * @param searchPosition the position the natural endpoints are requested 
for
       * @return a copy of the natural endpoints for the given token
       */
 -    public ArrayList<InetAddress> getNaturalEndpoints(RingPosition 
searchPosition)
 +    public EndpointsForToken getNaturalReplicasForToken(RingPosition<?> 
searchPosition)
 +    {
 +        return 
getNaturalReplicas(searchPosition).forToken(searchPosition.getToken());
 +    }
 +
 +    public EndpointsForRange getNaturalReplicas(RingPosition<?> 
searchPosition)
      {
          Token searchToken = searchPosition.getToken();
+         long currentRingVersion = tokenMetadata.getRingVersion();
          Token keyToken = 
TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken);
-         EndpointsForRange endpoints = getCachedReplicas(keyToken);
 -        ArrayList<InetAddress> endpoints = 
getCachedEndpoints(currentRingVersion, keyToken);
++        EndpointsForRange endpoints = getCachedReplicas(currentRingVersion, 
keyToken);
          if (endpoints == null)
          {
              TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap();
              // if our cache got invalidated, it's possible there is a new 
token to account for too
              keyToken = TokenMetadata.firstToken(tm.sortedTokens(), 
searchToken);
 -            endpoints = new 
ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm));
 +            endpoints = calculateNaturalReplicas(searchToken, tm);
-             cachedReplicas.put(keyToken, endpoints);
+             replicas.put(tm.getRingVersion(), keyToken, endpoints);
          }
  
 -        return new ArrayList<>(endpoints);
 +        return endpoints;
 +    }
 +
 +    public Replica getLocalReplicaFor(RingPosition<?> searchPosition)
 +    {
 +        return getNaturalReplicas(searchPosition)
 +               .byEndpoint()
 +               .get(FBUtilities.getBroadcastAddressAndPort());
      }
  
      /**
@@@ -457,4 -317,65 +439,64 @@@
                  throw new ConfigurationException(String.format("Unrecognized 
strategy option {%s} passed to %s for keyspace %s", key, 
getClass().getSimpleName(), keyspaceName));
          }
      }
+ 
 -    @VisibleForTesting
 -    public static class ReplicaCache<K, V>
++    static class ReplicaCache<K, V>
+     {
+         private final AtomicReference<ReplicaHolder<K, V>> cachedReplicas = 
new AtomicReference<>(new ReplicaHolder<>(0, 4));
+ 
+         V get(long ringVersion, K keyToken)
+         {
+             ReplicaHolder<K, V> replicaHolder = maybeClearAndGet(ringVersion);
+             if (replicaHolder == null)
+                 return null;
+ 
+             return replicaHolder.replicas.get(keyToken);
+         }
+ 
+         void put(long ringVersion, K keyToken, V endpoints)
+         {
+             ReplicaHolder<K, V> current = maybeClearAndGet(ringVersion);
+             if (current != null)
+             {
+                 // if we have the same ringVersion, but already know about 
the keyToken the endpoints should be the same
+                 current.replicas.putIfAbsent(keyToken, endpoints);
+             }
+         }
+ 
+         ReplicaHolder<K, V> maybeClearAndGet(long ringVersion)
+         {
+             ReplicaHolder<K, V> current = cachedReplicas.get();
+             if (ringVersion == current.ringVersion)
+                 return current;
+             else if (ringVersion < current.ringVersion) // things have 
already moved on
+                 return null;
+ 
+             // If ring version has changed, create a fresh replica holder and 
try to replace the current one.
+             // This may race with other threads that have the same new ring 
version and one will win and the loosers
+             // will be garbage collected
+             ReplicaHolder<K, V> cleaned = new ReplicaHolder<>(ringVersion, 
current.replicas.size());
+             cachedReplicas.compareAndSet(current, cleaned);
+ 
+             // A new ring version may have come along while making the new 
holder, so re-check the
+             // reference and return the ring version if the same, otherwise 
return null as there is no point
+             // in using it.
+             current = cachedReplicas.get();
+             if (ringVersion == current.ringVersion)
+                 return current;
+             else
+                 return null;
+         }
+     }
+ 
+     static class ReplicaHolder<K, V>
+     {
+         private final long ringVersion;
+         private final NonBlockingHashMap<K, V> replicas;
+ 
+         ReplicaHolder(long ringVersion, int expectedEntries)
+         {
+             this.ringVersion = ringVersion;
+             this.replicas = new NonBlockingHashMap<>(expectedEntries);
+         }
+     }
  }
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index ab21045,0221187..108e218
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -120,16 -123,13 +120,21 @@@ public class TokenMetadat
               DatabaseDescriptor.getPartitioner());
      }
  
 -    private TokenMetadata(BiMultiValMap<Token, InetAddress> 
tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, 
IPartitioner partitioner)
 +    public TokenMetadata(IEndpointSnitch snitch)
 +    {
 +        this(SortedBiMultiValMap.create(),
 +             HashBiMap.create(),
 +             Topology.builder(() -> snitch).build(),
 +             DatabaseDescriptor.getPartitioner());
 +    }
 +
 +    private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> 
tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology 
topology, IPartitioner partitioner)
      {
+         this(tokenToEndpointMap, endpointsMap, topology, partitioner, 0);
+     }
+ 
 -    private TokenMetadata(BiMultiValMap<Token, InetAddress> 
tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, 
IPartitioner partitioner, long ringVersion)
++    private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> 
tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology 
topology, IPartitioner partitioner, long ringVersion)
+     {
          this.tokenToEndpointMap = tokenToEndpointMap;
          this.topology = topology;
          this.partitioner = partitioner;
@@@ -645,10 -646,11 +651,11 @@@
          lock.readLock().lock();
          try
          {
 -            return new 
TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, 
inetaddressCmp),
 +            return new 
TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap),
                                       HashBiMap.create(endpointToHostIdMap),
                                       topology,
-                                      partitioner);
+                                      partitioner,
+                                      ringVersion);
          }
          finally
          {
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
index 065ef0e,0000000..52d0f16
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java
@@@ -1,148 -1,0 +1,149 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test.ring;
 +
 +import java.util.Map;
 +import java.util.stream.Collectors;
 +import java.util.stream.IntStream;
 +
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.ICluster;
 +import org.apache.cassandra.distributed.api.IInstanceConfig;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.TokenSupplier;
 +import org.apache.cassandra.distributed.shared.NetworkTopology;
 +import org.apache.cassandra.distributed.test.TestBaseImpl;
 +
 +import static java.util.Arrays.asList;
 +import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
 +import static 
org.apache.cassandra.distributed.action.GossipHelper.withProperty;
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +
 +public class BootstrapTest extends TestBaseImpl
 +{
 +    @Test
 +    public void bootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
 +                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
 +            withProperty("cassandra.join_ring", false,
 +                         () -> newInstance.startup(cluster));
 +
 +            cluster.forEach(statusToBootstrap(newInstance));
 +
 +            cluster.run(asList(pullSchemaFrom(cluster.get(1)),
 +                               bootstrap()),
 +                        newInstance.config().num());
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect 
row state",
 +                                    100L,
 +                                    e.getValue().longValue());
 +        }
 +    }
 +
 +    @Test
 +    public void readWriteDuringBootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
 +                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            IInstanceConfig config = cluster.newInstanceConfig();
 +            IInvokableInstance newInstance = cluster.bootstrap(config);
 +            withProperty("cassandra.join_ring", false,
 +                         () -> newInstance.startup(cluster));
 +
 +            cluster.forEach(statusToBootstrap(newInstance));
 +
 +            populate(cluster,0, 100);
 +
 +            Assert.assertEquals(100, newInstance.executeInternal("SELECT 
*FROM " + KEYSPACE + ".tbl").length);
 +        }
 +    }
 +
 +    @Test
 +    public void autoBootstrapTest() throws Throwable
 +    {
 +        int originalNodeCount = 2;
 +        int expandedNodeCount = originalNodeCount + 1;
 +
 +        try (Cluster cluster = builder().withNodes(originalNodeCount)
 +                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
 +                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
 +                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP))
 +                                        .start())
 +        {
 +            populate(cluster,0, 100);
 +            bootstrapAndJoinNode(cluster);
 +
 +            for (Map.Entry<Integer, Long> e : count(cluster).entrySet())
 +                Assert.assertEquals("Node " + e.getKey() + " has incorrect 
row state", e.getValue().longValue(), 100L);
 +        }
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to)
 +    {
 +        populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM);
 +    }
 +
 +    public static void populate(ICluster cluster, int from, int to, int 
coord, int rf, ConsistencyLevel cl)
 +    {
 +        cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " 
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + 
"};");
 +        cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
 +        for (int i = from; i < to; i++)
 +        {
 +            cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + 
".tbl (pk, ck, v) VALUES (?, ?, ?)",
 +                                               cl,
 +                                               i, i, i);
 +        }
 +    }
 +
 +    public static Map<Integer, Long> count(ICluster cluster)
 +    {
 +        return IntStream.rangeClosed(1, cluster.size())
 +                        .boxed()
 +                        .collect(Collectors.toMap(nodeId -> nodeId,
 +                                                  nodeId -> (Long) 
cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + 
".tbl")[0][0]));
 +    }
++
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java
index 0000000,0000000..4898479
new file mode 100644
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java
@@@ -1,0 -1,0 +1,114 @@@
++/*
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements.  See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership.  The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License.  You may obtain a copy of the License at
++ *
++ *     http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.cassandra.distributed.test.ring;
++
++import java.io.IOException;
++import java.util.concurrent.Callable;
++import java.util.concurrent.CountDownLatch;
++import java.util.concurrent.ExecutionException;
++import java.util.concurrent.ExecutorService;
++import java.util.concurrent.Executors;
++import java.util.concurrent.Future;
++import java.util.concurrent.TimeoutException;
++import java.util.concurrent.atomic.AtomicBoolean;
++
++import org.junit.Test;
++
++import net.bytebuddy.ByteBuddy;
++import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
++import net.bytebuddy.implementation.MethodDelegation;
++import net.bytebuddy.implementation.bind.annotation.FieldValue;
++import net.bytebuddy.implementation.bind.annotation.SuperCall;
++import org.apache.cassandra.dht.Token;
++import org.apache.cassandra.distributed.Cluster;
++import org.apache.cassandra.distributed.api.ConsistencyLevel;
++import org.apache.cassandra.distributed.api.TokenSupplier;
++import org.apache.cassandra.distributed.shared.NetworkTopology;
++import org.apache.cassandra.distributed.test.TestBaseImpl;
++import org.apache.cassandra.locator.AbstractReplicationStrategy;
++import org.apache.cassandra.locator.EndpointsForRange;
++
++import static net.bytebuddy.matcher.ElementMatchers.named;
++import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
++import static org.apache.cassandra.distributed.api.Feature.NETWORK;
++
++public class ReadsDuringBootstrapTest extends TestBaseImpl
++{
++    @Test
++    public void readsDuringBootstrapTest() throws IOException, 
ExecutionException, InterruptedException, TimeoutException
++    {
++        int originalNodeCount = 3;
++        int expandedNodeCount = originalNodeCount + 1;
++        ExecutorService es = Executors.newSingleThreadExecutor();
++        try (Cluster cluster = builder().withNodes(originalNodeCount)
++                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount))
++                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, 
"dc0", "rack0"))
++                                        .withConfig(config -> 
config.with(NETWORK, GOSSIP)
++                                                                    
.set("read_request_timeout_in_ms", Integer.MAX_VALUE)
++                                                                    
.set("request_timeout_in_ms", Integer.MAX_VALUE))
++                                        .withInstanceInitializer(BB::install)
++                                        .start())
++        {
++            String query = withKeyspace("SELECT * FROM %s.tbl WHERE id = ?");
++            cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS 
%s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"));
++            cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int 
PRIMARY KEY)"));
++            cluster.get(1).runOnInstance(() -> BB.block.set(true));
++            Future<?> read = es.submit(() -> 
cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, 3));
++            long mark = cluster.get(1).logs().mark();
++            bootstrapAndJoinNode(cluster);
++            cluster.get(1).logs().watchFor(mark, "New node /127.0.0.4");
++            cluster.get(1).runOnInstance(() -> BB.block.set(false));
++            // populate cache
++            for (int i = 0; i < 10; i++)
++                cluster.coordinator(1).execute(query, 
ConsistencyLevel.QUORUM, i);
++            cluster.get(1).runOnInstance(() -> BB.latch.countDown());
++            read.get();
++        }
++        finally
++        {
++            es.shutdown();
++        }
++    }
++
++    public static class BB
++    {
++        public static final AtomicBoolean block = new AtomicBoolean();
++        public static final CountDownLatch latch = new CountDownLatch(1);
++        private static void install(ClassLoader cl, Integer instanceId)
++        {
++            if (instanceId != 1)
++                return;
++            new ByteBuddy().rebase(AbstractReplicationStrategy.class)
++                           .method(named("getCachedReplicas"))
++                           .intercept(MethodDelegation.to(BB.class))
++                           .make()
++                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
++        }
++
++        public static EndpointsForRange getCachedReplicas(long ringVersion, 
Token t,
++                                                          
@FieldValue("keyspaceName") String keyspaceName,
++                                                          @SuperCall 
Callable<EndpointsForRange> zuper) throws Exception
++        {
++            if (keyspaceName.equals(KEYSPACE) && block.get())
++                latch.await();
++            return zuper.call();
++        }
++    }
++
++}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to