This is an automated email from the ASF dual-hosted git repository.
konstantinov pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2daa0f0ba0 Fix flaky ReadSpeculationTest: with TCM the second replica
in a read plan can be different, it depends on an order of adding nodes to a
cluster. To fix it NetworkTopologyProximity implementation is adjusted using
ByteBuddy to make a predictable order of nodes to read (the same idea as in
python read repair dtests is used). Also, dynamic snitch is disabled to avoid
re-ordering of replicas in a read plan by it. Logging of actual configuration
in java dtests is added to s [...]
2daa0f0ba0 is described below
commit 2daa0f0ba0647ee76a2b3ffc349407751031d0dd
Author: Dmitry Konstantinov <[email protected]>
AuthorDate: Sun Feb 16 11:50:20 2025 +0000
Fix flaky ReadSpeculationTest: with TCM the second replica in a read plan
can be different, it depends on an order of adding nodes to a cluster.
To fix it NetworkTopologyProximity implementation is adjusted using
ByteBuddy to make a predictable order of nodes to read (the same idea as in
python read repair dtests is used). Also, dynamic snitch is disabled to avoid
re-ordering of replicas in a read plan by it. Logging of actual configuration
in java dtests is added to simplify troubleshooting (the default logic does not
print it because logging is initialized in tests later).
Patch by Dmitry Konstantinov; reviewed by Brandon Williams,Stefan
Miklosovic for CASSANDRA-20251
---
.../cassandra/distributed/impl/Instance.java | 1 +
.../distributed/test/ReadSpeculationTest.java | 86 ++++++++++++++++++++--
2 files changed, 79 insertions(+), 8 deletions(-)
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index cf4b6cdd7a..b98b36c47c 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -713,6 +713,7 @@ public class Instance extends IsolatedExecutor implements
IInvokableInstance
startJmx();
LoggingSupportFactory.getLoggingSupport().onStartup();
logSystemInfo(inInstancelogger);
+ Config.log(DatabaseDescriptor.getRawConfig());
FileUtils.setFSErrorHandler(new DefaultFSErrorHandler());
DatabaseDescriptor.createAllDirectories();
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReadSpeculationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ReadSpeculationTest.java
index fce1de26ea..445315f343 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ReadSpeculationTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ReadSpeculationTest.java
@@ -18,43 +18,92 @@
package org.apache.cassandra.distributed.test;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import net.bytebuddy.implementation.bind.annotation.SuperCall;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.ScheduledThreadPoolExecutorPlus;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IInstanceInitializer;
import org.apache.cassandra.distributed.impl.CoordinatorHelper;
import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.NetworkTopologyProximity;
+import org.apache.cassandra.locator.ReplicaCollection;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.service.CassandraDaemon;
import org.apache.cassandra.transport.Dispatcher;
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
public class ReadSpeculationTest extends TestBaseImpl
{
+ private static final Logger logger =
LoggerFactory.getLogger(ReadSpeculationTest.class);
+ private static final String TABLE = "tbl";
+ private static final String PK_VALUE = "1";
@Test
public void speculateTest() throws Throwable
{
try (Cluster cluster = builder().withNodes(3)
+ .withConfig(config ->
config.set("dynamic_snitch", false))
+ .withInstanceInitializer(new
FixNodeOrderForReads())
.start())
{
- cluster.get(1).runOnInstance(() -> {
+ cluster.forEach(instance -> instance.runOnInstance(() -> {
// Disable updater since we will force time
((ScheduledThreadPoolExecutorPlus)
ScheduledExecutors.optionalTasks).remove(CassandraDaemon.SPECULATION_THRESHOLD_UPDATER);
- });
+ }));
cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE +
" WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + 3 +
"}");
- cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE +
".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH speculative_retry =
'2000ms';");
+ cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE +
"." + TABLE + " (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH
speculative_retry = '2000ms';");
- // force speculation; rely on IP order
+ List<InetAddress> readPlanEndpoints =
cluster.get(1).applyOnInstance((none) -> {
+ Keyspace keyspace = Keyspace.openIfExists(KEYSPACE);
+ ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(TABLE);
+ DecoratedKey dk = cfs.decorateKey(bytes(PK_VALUE));
+ ReplicaPlan.ForTokenRead plan = ReplicaPlans.forRead(keyspace,
dk.getToken(), null,
+ QUORUM,
cfs.metadata().params.speculativeRetry);
+ return
plan.contacts().endpointList().stream().map(InetSocketAddress::getAddress).collect(Collectors.toList());
+ }, null);
+ logger.info("Replicas provided in a read plan contacts: {}",
readPlanEndpoints);
+ logger.info("Cluster instances: {}", cluster.stream().map(instance
-> instance.broadcastAddress().getAddress()).collect(Collectors.toList()));
+ int firstReplica = 0;
+ int secondReplica = 0;
+ for (int i = 1; i <= 3; i++)
+ {
+ if (match(cluster, i, readPlanEndpoints, 0))
+ firstReplica = i;
+ if (match(cluster, i, readPlanEndpoints, 1))
+ secondReplica = i;
+ }
+ logger.info("1st replica to read from: {}, 2nd replica: {}",
firstReplica, secondReplica);
+ Assert.assertEquals(1, firstReplica);
+ Assert.assertEquals(2, secondReplica);
+ // force speculation by dropping all messages sent to the 2nd read
replica
cluster.filters().allVerbs().from(1).to(2).drop();
@@ -157,12 +206,12 @@ public class ReadSpeculationTest extends TestBaseImpl
DatabaseDescriptor.setReadRpcTimeout(rpcTimeoutMs);
DatabaseDescriptor.setCQLStartTime(cqlStartTime);
- ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
long speculatedBefore = cfs.metric.speculativeRetries.getCount();
long before = System.nanoTime();
cfs.sampleReadLatencyMicros = speculationTimeoutMicros;
- CoordinatorHelper.unsafeExecuteInternal("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1",
+ CoordinatorHelper.unsafeExecuteInternal("SELECT * FROM " +
KEYSPACE + "." + TABLE + " WHERE pk = " + PK_VALUE,
ConsistencyLevel.QUORUM,
ConsistencyLevel.QUORUM,
new
Dispatcher.RequestTime(before - enqueuedNsAgo,
@@ -180,14 +229,14 @@ public class ReadSpeculationTest extends TestBaseImpl
DatabaseDescriptor.setReadRpcTimeout(rpcTimeoutMs);
DatabaseDescriptor.setCQLStartTime(cqlStartTime);
- ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
long speculatedBefore = cfs.metric.speculativeRetries.getCount();
long before = System.nanoTime();
cfs.sampleReadLatencyMicros = speculationTimeoutMicros;
try
{
- CoordinatorHelper.unsafeExecuteInternal("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1",
+ CoordinatorHelper.unsafeExecuteInternal("SELECT * FROM " +
KEYSPACE + "." + TABLE + " WHERE pk = " + PK_VALUE,
ConsistencyLevel.QUORUM,
ConsistencyLevel.QUORUM,
new
Dispatcher.RequestTime(before - enqueuedNsAgo,
@@ -206,4 +255,25 @@ public class ReadSpeculationTest extends TestBaseImpl
}
}
+ private static boolean match(Cluster cluster, int instanceId,
List<InetAddress> readCandidates, int positionInThePlan)
+ {
+ return
cluster.get(instanceId).broadcastAddress().getAddress().equals(readCandidates.get(positionInThePlan));
+ }
+
+ public static class FixNodeOrderForReads implements IInstanceInitializer
+ {
+ @Override
+ public void initialise(ClassLoader cl, ThreadGroup group, int node,
int generation)
+ {
+ new ByteBuddy().rebase(NetworkTopologyProximity.class)
+
.method(named("sortedByProximity")).intercept(MethodDelegation.to(FixNodeOrderForReads.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static <C extends ReplicaCollection<? extends C>> C
sortedByProximity(final InetAddressAndPort address, C replicas, @SuperCall
Callable<C> real) throws Exception
+ {
+ return replicas.sorted(java.util.Comparator.naturalOrder());
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]