This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit f3a4f344537b868ff89bab8952f18a3817281221 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Tue Oct 8 12:40:30 2024 +0100 ExclusiveSyncPoints should always wait for a simple quorum --- modules/accord | 2 +- .../cassandra/service/accord/AccordMessageSink.java | 1 - .../service/accord/repair/RepairSyncPointAdapter.java | 2 +- .../service/accord/repair/RequiredResponseTracker.java | 4 ++-- .../cassandra/distributed/test/accord/AccordLoadTest.java | 15 +++++++++++++-- 5 files changed, 17 insertions(+), 7 deletions(-) diff --git a/modules/accord b/modules/accord index aa70db4b72..d24d8f5d86 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit aa70db4b72f74e370dc8f3ec01697d6296cbdaa0 +Subproject commit d24d8f5d866783b71205631944cfecf9a63d4f0c diff --git a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java index 621e84735a..22fe8f5ad2 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java +++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java @@ -56,7 +56,6 @@ import org.apache.cassandra.net.ResponseContext; import org.apache.cassandra.net.Verb; import static accord.messages.MessageType.Kind.REMOTE; -import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; public class AccordMessageSink implements MessageSink diff --git a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java index 58c9f4b65f..767e57fd9f 100644 --- a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java +++ b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java @@ -61,7 +61,7 @@ public class RepairSyncPointAdapter<U extends Unseekable> extends CoordinationAd public void execute(Node node, Topologies all, FullRoute<?> route, ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, BiConsumer<? super SyncPoint<U>, Throwable> callback) { RequiredResponseTracker tracker = new RequiredResponseTracker(requiredResponses, all); - ExecuteSyncPoint.ExecuteBlocking<U> execute = new ExecuteSyncPoint.ExecuteBlocking<>(node, new SyncPoint<U>(txnId, deps, (FullRoute<U>) route), tracker, executeAt); + ExecuteSyncPoint.ExecuteBlocking<U> execute = new ExecuteSyncPoint.ExecuteBlocking<>(node, new SyncPoint<>(txnId, deps, (FullRoute<U>) route), tracker, executeAt); execute.addCallback(callback); execute.start(); } diff --git a/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java b/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java index 130e914969..ac2651dcd3 100644 --- a/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java +++ b/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java @@ -21,9 +21,9 @@ package org.apache.cassandra.service.accord.repair; import java.util.HashSet; import java.util.Set; -import accord.coordinate.tracking.AbstractSimpleTracker; import accord.coordinate.tracking.RequestStatus; import accord.coordinate.tracking.ShardTracker; +import accord.coordinate.tracking.SimpleTracker; import accord.local.Node; import accord.topology.Shard; import accord.topology.Topologies; @@ -32,7 +32,7 @@ import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail; import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange; import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success; -public class RequiredResponseTracker extends AbstractSimpleTracker<RequiredResponseTracker.RequiredResponseShardTracker> +public class RequiredResponseTracker extends SimpleTracker<RequiredResponseTracker.RequiredResponseShardTracker> { public static class RequiredResponseShardTracker extends ShardTracker { diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java index 4120c921b1..8fc86c3fdc 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.distributed.api.IMessage; import org.apache.cassandra.distributed.api.IMessageFilters; @@ -61,7 +62,7 @@ public class AccordLoadTest extends AccordTestBase public static void setUp() throws IOException { CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis()))); - AccordTestBase.setupCluster(builder -> builder, 3); + AccordTestBase.setupCluster(builder -> builder.withConfig(config -> config.with(Feature.values())), 3); } @Ignore @@ -92,6 +93,7 @@ public class AccordLoadTest extends AccordTestBase final int repairInterval = 3000; final int compactionInterval = 3000; final int flushInterval = 1000; + final int restartInterval = 10_000; final int batchSizeLimit = 1000; final long batchTime = TimeUnit.SECONDS.toNanos(10); final int concurrency = 100; @@ -101,6 +103,7 @@ public class AccordLoadTest extends AccordTestBase long nextRepairAt = repairInterval; long nextCompactionAt = compactionInterval; long nextFlushAt = flushInterval; + long nextRestartAt = restartInterval; final BitSet initialised = new BitSet(); Random random = new Random(); @@ -168,7 +171,6 @@ public class AccordLoadTest extends AccordTestBase ((AccordService) AccordService.instance()).journal().checkAllCommands(); }); }); - } if ((nextFlushAt -= batchSize) <= 0) @@ -181,6 +183,15 @@ public class AccordLoadTest extends AccordTestBase })); } + if ((nextRestartAt -= batchSize) <= 0) + { + nextRestartAt += flushInterval; + int nodeIdx = random.nextInt(cluster.size()); + System.out.printf("restarting node %d...\n", nodeIdx); + cluster.get(nodeIdx).shutdown().get(); + cluster.get(nodeIdx).startup(); + } + final Date date = new Date(); System.out.printf("%tT rate: %.2f/s (%d total)\n", date, (((float)batchSizeLimit * 1000) / NANOSECONDS.toMillis(System.nanoTime() - batchStart)), batchSize); System.out.printf("%tT percentiles: %d %d %d %d\n", date, histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, histogram.percentile(.75)/1000, histogram.percentile(1)/1000); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
