This is an automated email from the ASF dual-hosted git repository. bereng pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new fcea0b6fd8 CASSANDRA-19633 Replaced node is stuck in a loop calculating ranges fcea0b6fd8 is described below commit fcea0b6fd8aa6685e7a7e4be5bcde0ee87efc75f Author: Bereng <berenguerbl...@gmail.com> AuthorDate: Thu Apr 10 13:33:33 2025 +0200 CASSANDRA-19633 Replaced node is stuck in a loop calculating ranges --- .../config/CassandraRelevantProperties.java | 9 ++- .../org/apache/cassandra/dht/RangeStreamer.java | 9 ++- .../org/apache/cassandra/dht/BootStrapperTest.java | 86 ++++++++++++++++++---- 3 files changed, 84 insertions(+), 20 deletions(-) diff --git a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index 1757768897..c5286f17fa 100644 --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@ -259,8 +259,13 @@ public enum CassandraRelevantProperties /** * Number of replicas required to store batchlog for atomicity, only accepts values of 1 or 2. */ - REQUIRED_BATCHLOG_REPLICA_COUNT("cassandra.batchlog.required_replica_count", "2") - ; + REQUIRED_BATCHLOG_REPLICA_COUNT("cassandra.batchlog.required_replica_count", "2"), + + /** + * Do not try to calculate optimal streaming candidates. This can take a lot of time in some configs specially + * with vnodes. + */ + SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false"); CassandraRelevantProperties(String key, String defaultVal) { diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java b/src/java/org/apache/cassandra/dht/RangeStreamer.java index dda6863153..42bcea1998 100644 --- a/src/java/org/apache/cassandra/dht/RangeStreamer.java +++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java @@ -41,6 +41,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.gms.FailureDetector; @@ -331,8 +332,12 @@ public class RangeStreamer Multimap<InetAddressAndPort, FetchReplica> workMap; //Only use the optimized strategy if we don't care about strict sources, have a replication factor > 1, and no - //transient replicas. - if (useStrictSource || strat == null || strat.getReplicationFactor().allReplicas == 1 || strat.getReplicationFactor().hasTransientReplicas()) + //transient replicas or it is intentionally skipped. + if (CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.getBoolean() || + useStrictSource || + strat == null || + strat.getReplicationFactor().allReplicas == 1 || + strat.getReplicationFactor().hasTransientReplicas()) { workMap = convertPreferredEndpointsToWorkMap(fetchMap); } diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index 05d42cf32c..4b2a56a2d2 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.dht; import java.net.UnknownHostException; import java.util.List; import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Predicate; import com.google.common.base.Predicates; @@ -28,8 +29,10 @@ import com.google.common.collect.Multimap; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.junit.runner.RunWith; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.RangeStreamer.FetchReplica; @@ -42,16 +45,35 @@ import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamOperation; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; - +@RunWith(BMUnitRunner.class) public class BootStrapperTest { static IPartitioner oldPartitioner; - static Predicate<Replica> originalAlivePredicate = RangeStreamer.ALIVE_PREDICATE; + public static AtomicBoolean nonOptimizationHit = new AtomicBoolean(false); + public static AtomicBoolean optimizationHit = new AtomicBoolean(false); + private static final IFailureDetector mockFailureDetector = new IFailureDetector() + { + public boolean isAlive(InetAddressAndPort ep) + { + return true; + } + + public void interpret(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } + public void report(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } + public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } + public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } + public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } + public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } + }; + @BeforeClass public static void setup() throws ConfigurationException { @@ -83,6 +105,52 @@ public class BootStrapperTest } } + @Test + @BMRules(rules = { @BMRule(name = "Make sure the non-optimized path is picked up for some operations", + targetClass = "org.apache.cassandra.dht.RangeStreamer", + targetMethod = "convertPreferredEndpointsToWorkMap(EndpointsByReplica)", + action = "org.apache.cassandra.dht.BootStrapperTest.nonOptimizationHit.set(true)"), + @BMRule(name = "Make sure the optimized path is picked up for some operations", + targetClass = "org.apache.cassandra.dht.RangeStreamer", + targetMethod = "getOptimizedWorkMap(EndpointsByReplica,Collection,String)", + action = "org.apache.cassandra.dht.BootStrapperTest.optimizationHit.set(true)") }) + public void testStreamingCandidatesOptmizationSkip() throws UnknownHostException + { + testSkipStreamingCandidatesOptmizationFeatureFlag(true, true, false); + testSkipStreamingCandidatesOptmizationFeatureFlag(false, true, true); + } + + private void testSkipStreamingCandidatesOptmizationFeatureFlag(boolean disableOptimization, boolean nonOptimizedPathHit, boolean optimizedPathHit) throws UnknownHostException + { + try + { + nonOptimizationHit.set(false); + optimizationHit.set(false); + CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.setBoolean(disableOptimization); + + for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) + { + StorageService ss = StorageService.instance; + TokenMetadata tmd = ss.getTokenMetadata(); + + generateFakeEndpoints(10); + Token myToken = tmd.partitioner.getRandomToken(); + InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1"); + + assertEquals(10, tmd.sortedTokens().size()); + RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1); + s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint)); + } + + assertEquals(nonOptimizedPathHit, nonOptimizationHit.get()); + assertEquals(optimizedPathHit, optimizationHit.get()); + } + finally + { + CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.reset(); + } + } + private RangeStreamer testSourceTargetComputation(String keyspaceName, int numOldNodes, int replicationFactor) throws UnknownHostException { StorageService ss = StorageService.instance; @@ -93,20 +161,6 @@ public class BootStrapperTest InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1"); assertEquals(numOldNodes, tmd.sortedTokens().size()); - IFailureDetector mockFailureDetector = new IFailureDetector() - { - public boolean isAlive(InetAddressAndPort ep) - { - return true; - } - - public void interpret(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } - public void report(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } - public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } - public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } - public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } - public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } - }; RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1); assertNotNull(Keyspace.open(keyspaceName)); s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org