This is an automated email from the ASF dual-hosted git repository.

konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2daa0f0ba0 Fix flaky ReadSpeculationTest: with TCM the second replica 
in a read plan can be different, it depends on an order of adding nodes to a 
cluster. To fix it NetworkTopologyProximity implementation is adjusted using 
ByteBuddy to make a predictable order of nodes to read (the same idea as in 
python read repair dtests is used). Also, dynamic snitch is disabled to avoid 
re-ordering of replicas in a read plan by it. Logging of actual configuration 
in java dtests is added to s [...]
2daa0f0ba0 is described below

commit 2daa0f0ba0647ee76a2b3ffc349407751031d0dd
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Sun Feb 16 11:50:20 2025 +0000

    Fix flaky ReadSpeculationTest: with TCM the second replica in a read plan 
can be different, it depends on an order of adding nodes to a cluster.
    To fix it NetworkTopologyProximity implementation is adjusted using 
ByteBuddy to make a predictable order of nodes to read (the same idea as in 
python read repair dtests is used). Also, dynamic snitch is disabled to avoid 
re-ordering of replicas in a read plan by it. Logging of actual configuration 
in java dtests is added to simplify troubleshooting (the default logic does not 
print it because logging is initialized in tests later).
    
    Patch by Dmitry Konstantinov; reviewed by Brandon Williams,Stefan 
Miklosovic for CASSANDRA-20251
---
 .../cassandra/distributed/impl/Instance.java       |  1 +
 .../distributed/test/ReadSpeculationTest.java      | 86 ++++++++++++++++++++--
 2 files changed, 79 insertions(+), 8 deletions(-)

diff --git 
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index cf4b6cdd7a..b98b36c47c 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -713,6 +713,7 @@ public class Instance extends IsolatedExecutor implements 
IInvokableInstance
             startJmx();
         LoggingSupportFactory.getLoggingSupport().onStartup();
         logSystemInfo(inInstancelogger);
+        Config.log(DatabaseDescriptor.getRawConfig());
 
         FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
         DatabaseDescriptor.createAllDirectories();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadSpeculationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ReadSpeculationTest.java
index fce1de26ea..445315f343 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ReadSpeculationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReadSpeculationTest.java
@@ -18,43 +18,92 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.ScheduledThreadPoolExecutorPlus;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceInitializer;
 import org.apache.cassandra.distributed.impl.CoordinatorHelper;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyProximity;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.service.CassandraDaemon;
 import org.apache.cassandra.transport.Dispatcher;
 
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
 import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
 
 public class ReadSpeculationTest extends TestBaseImpl
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(ReadSpeculationTest.class);
+    private static final String TABLE = "tbl";
+    private static final String PK_VALUE = "1";
 
     @Test
     public void speculateTest() throws Throwable
     {
         try (Cluster cluster = builder().withNodes(3)
+                                        .withConfig(config -> 
config.set("dynamic_snitch", false))
+                                        .withInstanceInitializer(new 
FixNodeOrderForReads())
                                         .start())
         {
-            cluster.get(1).runOnInstance(() -> {
+            cluster.forEach(instance -> instance.runOnInstance(() -> {
                 // Disable updater since we will force time
                 ((ScheduledThreadPoolExecutorPlus) 
ScheduledExecutors.optionalTasks).remove(CassandraDaemon.SPECULATION_THRESHOLD_UPDATER);
-            });
+            }));
             cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + 
" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 + 
"}");
-            cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + 
".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH speculative_retry = 
'2000ms';");
+            cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + 
"." + TABLE + " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH 
speculative_retry = '2000ms';");
 
-            // force speculation; rely on IP order
+            List<InetAddress> readPlanEndpoints = 
cluster.get(1).applyOnInstance((none) -> {
+                Keyspace keyspace = Keyspace.openIfExists(KEYSPACE);
+                ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(TABLE);
+                DecoratedKey dk = cfs.decorateKey(bytes(PK_VALUE));
+                ReplicaPlan.ForTokenRead plan = ReplicaPlans.forRead(keyspace, 
dk.getToken(), null,
+                                                                     QUORUM, 
cfs.metadata().params.speculativeRetry);
+                return 
plan.contacts().endpointList().stream().map(InetSocketAddress::getAddress).collect(Collectors.toList());
+            }, null);
+            logger.info("Replicas provided in a read plan contacts: {}", 
readPlanEndpoints);
+            logger.info("Cluster instances: {}", cluster.stream().map(instance 
-> instance.broadcastAddress().getAddress()).collect(Collectors.toList()));
+            int firstReplica = 0;
+            int secondReplica = 0;
+            for (int i = 1; i <= 3; i++)
+            {
+                if (match(cluster, i, readPlanEndpoints, 0))
+                    firstReplica = i;
+                if (match(cluster, i, readPlanEndpoints, 1))
+                    secondReplica = i;
+            }
+            logger.info("1st replica to read from: {}, 2nd replica: {}", 
firstReplica, secondReplica);
+            Assert.assertEquals(1, firstReplica);
+            Assert.assertEquals(2, secondReplica);
+            // force speculation by dropping all messages sent to the 2nd read 
replica
             cluster.filters().allVerbs().from(1).to(2).drop();
 
 
@@ -157,12 +206,12 @@ public class ReadSpeculationTest extends TestBaseImpl
             DatabaseDescriptor.setReadRpcTimeout(rpcTimeoutMs);
             DatabaseDescriptor.setCQLStartTime(cqlStartTime);
 
-            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
             long speculatedBefore = cfs.metric.speculativeRetries.getCount();
             long before = System.nanoTime();
             cfs.sampleReadLatencyMicros = speculationTimeoutMicros;
 
-            CoordinatorHelper.unsafeExecuteInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+            CoordinatorHelper.unsafeExecuteInternal("SELECT * FROM " + 
KEYSPACE + "." + TABLE + " WHERE pk = " + PK_VALUE,
                                                     ConsistencyLevel.QUORUM,
                                                     ConsistencyLevel.QUORUM,
                                                     new 
Dispatcher.RequestTime(before - enqueuedNsAgo,
@@ -180,14 +229,14 @@ public class ReadSpeculationTest extends TestBaseImpl
             DatabaseDescriptor.setReadRpcTimeout(rpcTimeoutMs);
             DatabaseDescriptor.setCQLStartTime(cqlStartTime);
 
-            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+            ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
             long speculatedBefore = cfs.metric.speculativeRetries.getCount();
             long before = System.nanoTime();
             cfs.sampleReadLatencyMicros = speculationTimeoutMicros;
 
             try
             {
-                CoordinatorHelper.unsafeExecuteInternal("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                CoordinatorHelper.unsafeExecuteInternal("SELECT * FROM " + 
KEYSPACE + "." + TABLE + " WHERE pk = " + PK_VALUE,
                                                         
ConsistencyLevel.QUORUM,
                                                         
ConsistencyLevel.QUORUM,
                                                         new 
Dispatcher.RequestTime(before - enqueuedNsAgo,
@@ -206,4 +255,25 @@ public class ReadSpeculationTest extends TestBaseImpl
         }
     }
 
+    private static boolean match(Cluster cluster, int instanceId, 
List<InetAddress> readCandidates, int positionInThePlan)
+    {
+        return 
cluster.get(instanceId).broadcastAddress().getAddress().equals(readCandidates.get(positionInThePlan));
+    }
+
+    public static class FixNodeOrderForReads implements IInstanceInitializer
+    {
+        @Override
+        public void initialise(ClassLoader cl, ThreadGroup group, int node, 
int generation)
+        {
+            new ByteBuddy().rebase(NetworkTopologyProximity.class)
+                           
.method(named("sortedByProximity")).intercept(MethodDelegation.to(FixNodeOrderForReads.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static <C extends ReplicaCollection<? extends C>> C 
sortedByProximity(final InetAddressAndPort address, C replicas, @SuperCall 
Callable<C> real) throws Exception
+        {
+            return replicas.sorted(java.util.Comparator.naturalOrder());
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to