This is an automated email from the ASF dual-hosted git repository.

bereng pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new fcea0b6fd8 CASSANDRA-19633 Replaced node is stuck in a loop 
calculating ranges
fcea0b6fd8 is described below

commit fcea0b6fd8aa6685e7a7e4be5bcde0ee87efc75f
Author: Bereng <berenguerbl...@gmail.com>
AuthorDate: Thu Apr 10 13:33:33 2025 +0200

    CASSANDRA-19633 Replaced node is stuck in a loop calculating ranges
---
 .../config/CassandraRelevantProperties.java        |  9 ++-
 .../org/apache/cassandra/dht/RangeStreamer.java    |  9 ++-
 .../org/apache/cassandra/dht/BootStrapperTest.java | 86 ++++++++++++++++++----
 3 files changed, 84 insertions(+), 20 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java 
b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index 1757768897..c5286f17fa 100644
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@ -259,8 +259,13 @@ public enum CassandraRelevantProperties
     /**
      * Number of replicas required to store batchlog for atomicity, only 
accepts values of 1 or 2.
      */
-    
REQUIRED_BATCHLOG_REPLICA_COUNT("cassandra.batchlog.required_replica_count", 
"2")
-    ;
+    
REQUIRED_BATCHLOG_REPLICA_COUNT("cassandra.batchlog.required_replica_count", 
"2"),
+
+    /**
+     * Do not try to calculate optimal streaming candidates. This can take a 
lot of time in some configs specially
+     * with vnodes.
+     */
+    
SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation",
 "false");
 
     CassandraRelevantProperties(String key, String defaultVal)
     {
diff --git a/src/java/org/apache/cassandra/dht/RangeStreamer.java 
b/src/java/org/apache/cassandra/dht/RangeStreamer.java
index dda6863153..42bcea1998 100644
--- a/src/java/org/apache/cassandra/dht/RangeStreamer.java
+++ b/src/java/org/apache/cassandra/dht/RangeStreamer.java
@@ -41,6 +41,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.gms.FailureDetector;
@@ -331,8 +332,12 @@ public class RangeStreamer
 
         Multimap<InetAddressAndPort, FetchReplica> workMap;
         //Only use the optimized strategy if we don't care about strict 
sources, have a replication factor > 1, and no
-        //transient replicas.
-        if (useStrictSource || strat == null || 
strat.getReplicationFactor().allReplicas == 1 || 
strat.getReplicationFactor().hasTransientReplicas())
+        //transient replicas or it is intentionally skipped.
+        if 
(CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.getBoolean()
 ||
+            useStrictSource ||
+            strat == null ||
+            strat.getReplicationFactor().allReplicas == 1 ||
+            strat.getReplicationFactor().hasTransientReplicas())
         {
             workMap = convertPreferredEndpointsToWorkMap(fetchMap);
         }
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java 
b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 05d42cf32c..4b2a56a2d2 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.dht;
 import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
@@ -28,8 +29,10 @@ import com.google.common.collect.Multimap;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 
 import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
@@ -42,16 +45,35 @@ import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.StreamOperation;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
-
+@RunWith(BMUnitRunner.class)
 public class BootStrapperTest
 {
     static IPartitioner oldPartitioner;
-
     static Predicate<Replica> originalAlivePredicate = 
RangeStreamer.ALIVE_PREDICATE;
+    public static AtomicBoolean nonOptimizationHit = new AtomicBoolean(false);
+    public static AtomicBoolean optimizationHit = new AtomicBoolean(false);
+    private static final IFailureDetector mockFailureDetector = new 
IFailureDetector()
+    {
+        public boolean isAlive(InetAddressAndPort ep)
+        {
+            return true;
+        }
+
+        public void interpret(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
+        public void report(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
+        public void 
registerFailureDetectionEventListener(IFailureDetectionEventListener listener) 
{ throw new UnsupportedOperationException(); }
+        public void 
unregisterFailureDetectionEventListener(IFailureDetectionEventListener 
listener) { throw new UnsupportedOperationException(); }
+        public void remove(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
+        public void forceConviction(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
+    };
+
     @BeforeClass
     public static void setup() throws ConfigurationException
     {
@@ -83,6 +105,52 @@ public class BootStrapperTest
         }
     }
 
+    @Test
+    @BMRules(rules = { @BMRule(name = "Make sure the non-optimized path is 
picked up for some operations",
+                               targetClass = 
"org.apache.cassandra.dht.RangeStreamer",
+                               targetMethod = 
"convertPreferredEndpointsToWorkMap(EndpointsByReplica)",
+                               action = 
"org.apache.cassandra.dht.BootStrapperTest.nonOptimizationHit.set(true)"),
+                       @BMRule(name = "Make sure the optimized path is picked 
up for some operations",
+                               targetClass = 
"org.apache.cassandra.dht.RangeStreamer",
+                               targetMethod = 
"getOptimizedWorkMap(EndpointsByReplica,Collection,String)",
+                               action = 
"org.apache.cassandra.dht.BootStrapperTest.optimizationHit.set(true)") })
+    public void testStreamingCandidatesOptmizationSkip() throws 
UnknownHostException
+    {
+        testSkipStreamingCandidatesOptmizationFeatureFlag(true, true, false);
+        testSkipStreamingCandidatesOptmizationFeatureFlag(false, true, true);
+    }
+
+    private void testSkipStreamingCandidatesOptmizationFeatureFlag(boolean 
disableOptimization, boolean nonOptimizedPathHit, boolean optimizedPathHit) 
throws UnknownHostException
+    {
+        try
+        {
+            nonOptimizationHit.set(false);
+            optimizationHit.set(false);
+            
CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.setBoolean(disableOptimization);
+
+            for (String keyspaceName : 
Schema.instance.getNonLocalStrategyKeyspaces())
+            {
+                StorageService ss = StorageService.instance;
+                TokenMetadata tmd = ss.getTokenMetadata();
+
+                generateFakeEndpoints(10);
+                Token myToken = tmd.partitioner.getRandomToken();
+                InetAddressAndPort myEndpoint = 
InetAddressAndPort.getByName("127.0.0.1");
+
+                assertEquals(10, tmd.sortedTokens().size());
+                RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, 
StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new 
StreamStateStore(), mockFailureDetector, false, 1);
+                s.addRanges(keyspaceName, 
Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd,
 myToken, myEndpoint));
+            }
+
+            assertEquals(nonOptimizedPathHit, nonOptimizationHit.get());
+            assertEquals(optimizedPathHit, optimizationHit.get());
+        }
+        finally
+        {
+            
CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.reset();
+        }
+    }
+
     private RangeStreamer testSourceTargetComputation(String keyspaceName, int 
numOldNodes, int replicationFactor) throws UnknownHostException
     {
         StorageService ss = StorageService.instance;
@@ -93,20 +161,6 @@ public class BootStrapperTest
         InetAddressAndPort myEndpoint = 
InetAddressAndPort.getByName("127.0.0.1");
 
         assertEquals(numOldNodes, tmd.sortedTokens().size());
-        IFailureDetector mockFailureDetector = new IFailureDetector()
-        {
-            public boolean isAlive(InetAddressAndPort ep)
-            {
-                return true;
-            }
-
-            public void interpret(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
-            public void report(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
-            public void 
registerFailureDetectionEventListener(IFailureDetectionEventListener listener) 
{ throw new UnsupportedOperationException(); }
-            public void 
unregisterFailureDetectionEventListener(IFailureDetectionEventListener 
listener) { throw new UnsupportedOperationException(); }
-            public void remove(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
-            public void forceConviction(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
-        };
         RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, 
StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new 
StreamStateStore(), mockFailureDetector, false, 1);
         assertNotNull(Keyspace.open(keyspaceName));
         s.addRanges(keyspaceName, 
Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd,
 myToken, myEndpoint));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to