This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9a1bb62822806ea947236b2f5491ecb3669fabde Merge: 98e798f b5d0626 Author: Marcus Eriksson <[email protected]> AuthorDate: Mon Jan 17 14:09:10 2022 +0100 Merge branch 'cassandra-3.11' into cassandra-4.0 CHANGES.txt | 1 + .../locator/AbstractReplicationStrategy.java | 92 ++++++++++++----- .../apache/cassandra/locator/TokenMetadata.java | 9 +- .../distributed/test/ring/BootstrapTest.java | 1 + .../test/ring/ReadsDuringBootstrapTest.java | 114 +++++++++++++++++++++ .../locator/AbstractReplicationStrategyTest.java | 45 ++++++++ 6 files changed, 236 insertions(+), 26 deletions(-) diff --cc CHANGES.txt index 0896d56,5e8213f..d9303f8 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,36 -1,17 +1,37 @@@ -3.11.12 - * Upgrade snakeyaml to 1.26 in 3.11 (CASSANDRA=17028) +4.0.2 + * Don't block gossip when clearing repair snapshots (CASSANDRA-17168) + * Deduplicate warnings for deprecated parameters (changed names) (CASSANDRA-17160) + * Update ant-junit to version 1.10.12 (CASSANDRA-17218) + * Add droppable tombstone metrics to nodetool tablestats (CASSANDRA-16308) + * Fix disk failure triggered when enabling FQL on an unclean directory (CASSANDRA-17136) + * Fixed broken classpath when multiple jars in build directory (CASSANDRA-17129) + * DebuggableThreadPoolExecutor does not propagate client warnings (CASSANDRA-17072) + * internode_send_buff_size_in_bytes and internode_recv_buff_size_in_bytes have new names. Backward compatibility with the old names added (CASSANDRA-17141) + * Remove unused configuration parameters from cassandra.yaml (CASSANDRA-17132) + * Queries performed with NODE_LOCAL consistency level do not update request metrics (CASSANDRA-17052) + * Fix multiple full sources can be select unexpectedly for bootstrap streaming (CASSANDRA-16945) + * Fix cassandra.yaml formatting of parameters (CASSANDRA-17131) + * Add backward compatibility for CQLSSTableWriter Date fields (CASSANDRA-17117) + * Push initial client connection messages to trace (CASSANDRA-17038) + * Correct the internode message timestamp if sending node has wrapped (CASSANDRA-16997) + * Avoid race causing us to return null in RangesAtEndpoint (CASSANDRA-16965) + * Avoid rewriting all sstables during cleanup when transient replication is enabled (CASSANDRA-16966) + * Prevent CQLSH from failure on Python 3.10 (CASSANDRA-16987) + * Avoid trying to acquire 0 permits from the rate limiter when taking snapshot (CASSANDRA-16872) + * Upgrade Caffeine to 2.5.6 (CASSANDRA-15153) + * Include SASI components to snapshots (CASSANDRA-15134) + * Fix missed wait latencies in the output of `nodetool tpstats -F` (CASSANDRA-16938) + * Remove all the state pollution between tests in SSTableReaderTest (CASSANDRA-16888) + * Delay auth setup until after gossip has settled to avoid unavailables on startup (CASSANDRA-16783) + * Fix clustering order logic in CREATE MATERIALIZED VIEW (CASSANDRA-16898) + * org.apache.cassandra.db.rows.ArrayCell#unsharedHeapSizeExcludingData includes data twice (CASSANDRA-16900) + * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) +Merged from 3.11: * Add key validation to ssstablescrub (CASSANDRA-16969) * Update Jackson from 2.9.10 to 2.12.5 (CASSANDRA-16851) - * Include SASI components to snapshots (CASSANDRA-15134) * Make assassinate more resilient to missing tokens (CASSANDRA-16847) - * Exclude Jackson 1.x transitive dependency of hadoop* provided dependencies (CASSANDRA-16854) - * Validate SASI tokenizer options before adding index to schema (CASSANDRA-15135) - * Fixup scrub output when no data post-scrub and clear up old use of row, which really means partition (CASSANDRA-16835) - * Fix ant-junit dependency issue (CASSANDRA-16827) - * Reduce thread contention in CommitLogSegment and HintsBuffer (CASSANDRA-16072) - * Avoid sending CDC column if not enabled (CASSANDRA-16770) Merged from 3.0: + * Avoid race in AbstractReplicationStrategy endpoint caching (CASSANDRA-16673) * Fix abort when window resizing during cqlsh COPY (CASSANDRA-15230) * Fix slow keycache load which blocks startup for tables with many sstables (CASSANDRA-14898) * Fix rare NPE caused by batchlog replay / node decomission races (CASSANDRA-17049) diff --cc src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index 7891895,f4dc3b6..909a7f6 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@@ -23,9 -24,11 +23,10 @@@ import java.lang.reflect.Method import java.util.Collection; import java.util.Collections; import java.util.Map; + import java.util.concurrent.atomic.AtomicReference; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; +import com.google.common.base.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -51,13 -53,12 +52,10 @@@ public abstract class AbstractReplicati { private static final Logger logger = LoggerFactory.getLogger(AbstractReplicationStrategy.class); - @VisibleForTesting - final String keyspaceName; - private Keyspace keyspace; public final Map<String, String> configOptions; + protected final String keyspaceName; private final TokenMetadata tokenMetadata; - - // track when the token range changes, signaling we need to invalidate our endpoint cache - private volatile long lastInvalidatedVersion = 0; - - private final ReplicaCache<Token, ArrayList<InetAddress>> replicas = new ReplicaCache<>(); ++ private final ReplicaCache<Token, EndpointsForRange> replicas = new ReplicaCache<>(); public IEndpointSnitch snitch; protected AbstractReplicationStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) @@@ -68,28 -70,12 +66,11 @@@ this.snitch = snitch; this.configOptions = configOptions == null ? Collections.<String, String>emptyMap() : configOptions; this.keyspaceName = keyspaceName; - // lazy-initialize keyspace itself since we don't create them until after the replication strategies } - private final Map<Token, EndpointsForRange> cachedReplicas = new NonBlockingHashMap<>(); - - public EndpointsForRange getCachedReplicas(Token t) - private ArrayList<InetAddress> getCachedEndpoints(long ringVersion, Token t) ++ public EndpointsForRange getCachedReplicas(long ringVersion, Token t) { - long lastVersion = tokenMetadata.getRingVersion(); - - if (lastVersion > lastInvalidatedVersion) - { - synchronized (this) - { - if (lastVersion > lastInvalidatedVersion) - { - logger.trace("clearing cached endpoints"); - cachedReplicas.clear(); - lastInvalidatedVersion = lastVersion; - } - } - } - - return cachedReplicas.get(t); + return replicas.get(ringVersion, t); } /** @@@ -99,33 -85,22 +80,34 @@@ * @param searchPosition the position the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */ - public ArrayList<InetAddress> getNaturalEndpoints(RingPosition searchPosition) + public EndpointsForToken getNaturalReplicasForToken(RingPosition<?> searchPosition) + { + return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken()); + } + + public EndpointsForRange getNaturalReplicas(RingPosition<?> searchPosition) { Token searchToken = searchPosition.getToken(); + long currentRingVersion = tokenMetadata.getRingVersion(); Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken); - EndpointsForRange endpoints = getCachedReplicas(keyToken); - ArrayList<InetAddress> endpoints = getCachedEndpoints(currentRingVersion, keyToken); ++ EndpointsForRange endpoints = getCachedReplicas(currentRingVersion, keyToken); if (endpoints == null) { TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap(); // if our cache got invalidated, it's possible there is a new token to account for too keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken); - endpoints = new ArrayList<InetAddress>(calculateNaturalEndpoints(searchToken, tm)); + endpoints = calculateNaturalReplicas(searchToken, tm); - cachedReplicas.put(keyToken, endpoints); + replicas.put(tm.getRingVersion(), keyToken, endpoints); } - return new ArrayList<>(endpoints); + return endpoints; + } + + public Replica getLocalReplicaFor(RingPosition<?> searchPosition) + { + return getNaturalReplicas(searchPosition) + .byEndpoint() + .get(FBUtilities.getBroadcastAddressAndPort()); } /** @@@ -457,4 -317,65 +439,64 @@@ throw new ConfigurationException(String.format("Unrecognized strategy option {%s} passed to %s for keyspace %s", key, getClass().getSimpleName(), keyspaceName)); } } + - @VisibleForTesting - public static class ReplicaCache<K, V> ++ static class ReplicaCache<K, V> + { + private final AtomicReference<ReplicaHolder<K, V>> cachedReplicas = new AtomicReference<>(new ReplicaHolder<>(0, 4)); + + V get(long ringVersion, K keyToken) + { + ReplicaHolder<K, V> replicaHolder = maybeClearAndGet(ringVersion); + if (replicaHolder == null) + return null; + + return replicaHolder.replicas.get(keyToken); + } + + void put(long ringVersion, K keyToken, V endpoints) + { + ReplicaHolder<K, V> current = maybeClearAndGet(ringVersion); + if (current != null) + { + // if we have the same ringVersion, but already know about the keyToken the endpoints should be the same + current.replicas.putIfAbsent(keyToken, endpoints); + } + } + + ReplicaHolder<K, V> maybeClearAndGet(long ringVersion) + { + ReplicaHolder<K, V> current = cachedReplicas.get(); + if (ringVersion == current.ringVersion) + return current; + else if (ringVersion < current.ringVersion) // things have already moved on + return null; + + // If ring version has changed, create a fresh replica holder and try to replace the current one. + // This may race with other threads that have the same new ring version and one will win and the loosers + // will be garbage collected + ReplicaHolder<K, V> cleaned = new ReplicaHolder<>(ringVersion, current.replicas.size()); + cachedReplicas.compareAndSet(current, cleaned); + + // A new ring version may have come along while making the new holder, so re-check the + // reference and return the ring version if the same, otherwise return null as there is no point + // in using it. + current = cachedReplicas.get(); + if (ringVersion == current.ringVersion) + return current; + else + return null; + } + } + + static class ReplicaHolder<K, V> + { + private final long ringVersion; + private final NonBlockingHashMap<K, V> replicas; + + ReplicaHolder(long ringVersion, int expectedEntries) + { + this.ringVersion = ringVersion; + this.replicas = new NonBlockingHashMap<>(expectedEntries); + } + } } diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java index ab21045,0221187..108e218 --- a/src/java/org/apache/cassandra/locator/TokenMetadata.java +++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java @@@ -120,16 -123,13 +120,21 @@@ public class TokenMetadat DatabaseDescriptor.getPartitioner()); } - private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner) + public TokenMetadata(IEndpointSnitch snitch) + { + this(SortedBiMultiValMap.create(), + HashBiMap.create(), + Topology.builder(() -> snitch).build(), + DatabaseDescriptor.getPartitioner()); + } + + private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner) { + this(tokenToEndpointMap, endpointsMap, topology, partitioner, 0); + } + - private TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, BiMap<InetAddress, UUID> endpointsMap, Topology topology, IPartitioner partitioner, long ringVersion) ++ private TokenMetadata(BiMultiValMap<Token, InetAddressAndPort> tokenToEndpointMap, BiMap<InetAddressAndPort, UUID> endpointsMap, Topology topology, IPartitioner partitioner, long ringVersion) + { this.tokenToEndpointMap = tokenToEndpointMap; this.topology = topology; this.partitioner = partitioner; @@@ -645,10 -646,11 +651,11 @@@ lock.readLock().lock(); try { - return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp), + return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap), HashBiMap.create(endpointToHostIdMap), topology, - partitioner); + partitioner, + ringVersion); } finally { diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java index 065ef0e,0000000..52d0f16 mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/BootstrapTest.java @@@ -1,148 -1,0 +1,149 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.ring; + +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.ICluster; +import org.apache.cassandra.distributed.api.IInstanceConfig; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.NetworkTopology; +import org.apache.cassandra.distributed.test.TestBaseImpl; + +import static java.util.Arrays.asList; +import static org.apache.cassandra.distributed.action.GossipHelper.bootstrap; +import static org.apache.cassandra.distributed.action.GossipHelper.pullSchemaFrom; +import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap; +import static org.apache.cassandra.distributed.action.GossipHelper.withProperty; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public class BootstrapTest extends TestBaseImpl +{ + @Test + public void bootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + populate(cluster,0, 100); + + IInstanceConfig config = cluster.newInstanceConfig(); + IInvokableInstance newInstance = cluster.bootstrap(config); + withProperty("cassandra.join_ring", false, + () -> newInstance.startup(cluster)); + + cluster.forEach(statusToBootstrap(newInstance)); + + cluster.run(asList(pullSchemaFrom(cluster.get(1)), + bootstrap()), + newInstance.config().num()); + + for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) + Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", + 100L, + e.getValue().longValue()); + } + } + + @Test + public void readWriteDuringBootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + IInstanceConfig config = cluster.newInstanceConfig(); + IInvokableInstance newInstance = cluster.bootstrap(config); + withProperty("cassandra.join_ring", false, + () -> newInstance.startup(cluster)); + + cluster.forEach(statusToBootstrap(newInstance)); + + populate(cluster,0, 100); + + Assert.assertEquals(100, newInstance.executeInternal("SELECT *FROM " + KEYSPACE + ".tbl").length); + } + } + + @Test + public void autoBootstrapTest() throws Throwable + { + int originalNodeCount = 2; + int expandedNodeCount = originalNodeCount + 1; + + try (Cluster cluster = builder().withNodes(originalNodeCount) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) + .withConfig(config -> config.with(NETWORK, GOSSIP)) + .start()) + { + populate(cluster,0, 100); + bootstrapAndJoinNode(cluster); + + for (Map.Entry<Integer, Long> e : count(cluster).entrySet()) + Assert.assertEquals("Node " + e.getKey() + " has incorrect row state", e.getValue().longValue(), 100L); + } + } + + public static void populate(ICluster cluster, int from, int to) + { + populate(cluster, from, to, 1, 3, ConsistencyLevel.QUORUM); + } + + public static void populate(ICluster cluster, int from, int to, int coord, int rf, ConsistencyLevel cl) + { + cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};"); + cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + for (int i = from; i < to; i++) + { + cluster.coordinator(coord).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)", + cl, + i, i, i); + } + } + + public static Map<Integer, Long> count(ICluster cluster) + { + return IntStream.rangeClosed(1, cluster.size()) + .boxed() + .collect(Collectors.toMap(nodeId -> nodeId, + nodeId -> (Long) cluster.get(nodeId).executeInternal("SELECT count(*) FROM " + KEYSPACE + ".tbl")[0][0])); + } ++ +} diff --cc test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java index 0000000,0000000..4898479 new file mode 100644 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/ring/ReadsDuringBootstrapTest.java @@@ -1,0 -1,0 +1,114 @@@ ++/* ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.cassandra.distributed.test.ring; ++ ++import java.io.IOException; ++import java.util.concurrent.Callable; ++import java.util.concurrent.CountDownLatch; ++import java.util.concurrent.ExecutionException; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++import java.util.concurrent.Future; ++import java.util.concurrent.TimeoutException; ++import java.util.concurrent.atomic.AtomicBoolean; ++ ++import org.junit.Test; ++ ++import net.bytebuddy.ByteBuddy; ++import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; ++import net.bytebuddy.implementation.MethodDelegation; ++import net.bytebuddy.implementation.bind.annotation.FieldValue; ++import net.bytebuddy.implementation.bind.annotation.SuperCall; ++import org.apache.cassandra.dht.Token; ++import org.apache.cassandra.distributed.Cluster; ++import org.apache.cassandra.distributed.api.ConsistencyLevel; ++import org.apache.cassandra.distributed.api.TokenSupplier; ++import org.apache.cassandra.distributed.shared.NetworkTopology; ++import org.apache.cassandra.distributed.test.TestBaseImpl; ++import org.apache.cassandra.locator.AbstractReplicationStrategy; ++import org.apache.cassandra.locator.EndpointsForRange; ++ ++import static net.bytebuddy.matcher.ElementMatchers.named; ++import static org.apache.cassandra.distributed.api.Feature.GOSSIP; ++import static org.apache.cassandra.distributed.api.Feature.NETWORK; ++ ++public class ReadsDuringBootstrapTest extends TestBaseImpl ++{ ++ @Test ++ public void readsDuringBootstrapTest() throws IOException, ExecutionException, InterruptedException, TimeoutException ++ { ++ int originalNodeCount = 3; ++ int expandedNodeCount = originalNodeCount + 1; ++ ExecutorService es = Executors.newSingleThreadExecutor(); ++ try (Cluster cluster = builder().withNodes(originalNodeCount) ++ .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(expandedNodeCount)) ++ .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0")) ++ .withConfig(config -> config.with(NETWORK, GOSSIP) ++ .set("read_request_timeout_in_ms", Integer.MAX_VALUE) ++ .set("request_timeout_in_ms", Integer.MAX_VALUE)) ++ .withInstanceInitializer(BB::install) ++ .start()) ++ { ++ String query = withKeyspace("SELECT * FROM %s.tbl WHERE id = ?"); ++ cluster.schemaChange(withKeyspace("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};")); ++ cluster.schemaChange(withKeyspace("CREATE TABLE %s.tbl (id int PRIMARY KEY)")); ++ cluster.get(1).runOnInstance(() -> BB.block.set(true)); ++ Future<?> read = es.submit(() -> cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, 3)); ++ long mark = cluster.get(1).logs().mark(); ++ bootstrapAndJoinNode(cluster); ++ cluster.get(1).logs().watchFor(mark, "New node /127.0.0.4"); ++ cluster.get(1).runOnInstance(() -> BB.block.set(false)); ++ // populate cache ++ for (int i = 0; i < 10; i++) ++ cluster.coordinator(1).execute(query, ConsistencyLevel.QUORUM, i); ++ cluster.get(1).runOnInstance(() -> BB.latch.countDown()); ++ read.get(); ++ } ++ finally ++ { ++ es.shutdown(); ++ } ++ } ++ ++ public static class BB ++ { ++ public static final AtomicBoolean block = new AtomicBoolean(); ++ public static final CountDownLatch latch = new CountDownLatch(1); ++ private static void install(ClassLoader cl, Integer instanceId) ++ { ++ if (instanceId != 1) ++ return; ++ new ByteBuddy().rebase(AbstractReplicationStrategy.class) ++ .method(named("getCachedReplicas")) ++ .intercept(MethodDelegation.to(BB.class)) ++ .make() ++ .load(cl, ClassLoadingStrategy.Default.INJECTION); ++ } ++ ++ public static EndpointsForRange getCachedReplicas(long ringVersion, Token t, ++ @FieldValue("keyspaceName") String keyspaceName, ++ @SuperCall Callable<EndpointsForRange> zuper) throws Exception ++ { ++ if (keyspaceName.equals(KEYSPACE) && block.get()) ++ latch.await(); ++ return zuper.call(); ++ } ++ } ++ ++} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
