This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f3a4f344537b868ff89bab8952f18a3817281221
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Oct 8 12:40:30 2024 +0100

    ExclusiveSyncPoints should always wait for a simple quorum
---
 modules/accord                                            |  2 +-
 .../cassandra/service/accord/AccordMessageSink.java       |  1 -
 .../service/accord/repair/RepairSyncPointAdapter.java     |  2 +-
 .../service/accord/repair/RequiredResponseTracker.java    |  4 ++--
 .../cassandra/distributed/test/accord/AccordLoadTest.java | 15 +++++++++++++--
 5 files changed, 17 insertions(+), 7 deletions(-)

diff --git a/modules/accord b/modules/accord
index aa70db4b72..d24d8f5d86 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit aa70db4b72f74e370dc8f3ec01697d6296cbdaa0
+Subproject commit d24d8f5d866783b71205631944cfecf9a63d4f0c
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index 621e84735a..22fe8f5ad2 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -56,7 +56,6 @@ import org.apache.cassandra.net.ResponseContext;
 import org.apache.cassandra.net.Verb;
 
 import static accord.messages.MessageType.Kind.REMOTE;
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 public class AccordMessageSink implements MessageSink
diff --git 
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
 
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
index 58c9f4b65f..767e57fd9f 100644
--- 
a/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
+++ 
b/src/java/org/apache/cassandra/service/accord/repair/RepairSyncPointAdapter.java
@@ -61,7 +61,7 @@ public class RepairSyncPointAdapter<U extends Unseekable> 
extends CoordinationAd
     public void execute(Node node, Topologies all, FullRoute<?> route, 
ExecutePath path, TxnId txnId, Txn txn, Timestamp executeAt, Deps deps, 
BiConsumer<? super SyncPoint<U>, Throwable> callback)
     {
         RequiredResponseTracker tracker = new 
RequiredResponseTracker(requiredResponses, all);
-        ExecuteSyncPoint.ExecuteBlocking<U> execute = new 
ExecuteSyncPoint.ExecuteBlocking<>(node, new SyncPoint<U>(txnId, deps, 
(FullRoute<U>) route), tracker, executeAt);
+        ExecuteSyncPoint.ExecuteBlocking<U> execute = new 
ExecuteSyncPoint.ExecuteBlocking<>(node, new SyncPoint<>(txnId, deps, 
(FullRoute<U>) route), tracker, executeAt);
         execute.addCallback(callback);
         execute.start();
     }
diff --git 
a/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java
 
b/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java
index 130e914969..ac2651dcd3 100644
--- 
a/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java
+++ 
b/src/java/org/apache/cassandra/service/accord/repair/RequiredResponseTracker.java
@@ -21,9 +21,9 @@ package org.apache.cassandra.service.accord.repair;
 import java.util.HashSet;
 import java.util.Set;
 
-import accord.coordinate.tracking.AbstractSimpleTracker;
 import accord.coordinate.tracking.RequestStatus;
 import accord.coordinate.tracking.ShardTracker;
+import accord.coordinate.tracking.SimpleTracker;
 import accord.local.Node;
 import accord.topology.Shard;
 import accord.topology.Topologies;
@@ -32,7 +32,7 @@ import static 
accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Fail;
 import static 
accord.coordinate.tracking.AbstractTracker.ShardOutcomes.NoChange;
 import static accord.coordinate.tracking.AbstractTracker.ShardOutcomes.Success;
 
-public class RequiredResponseTracker extends 
AbstractSimpleTracker<RequiredResponseTracker.RequiredResponseShardTracker>
+public class RequiredResponseTracker extends 
SimpleTracker<RequiredResponseTracker.RequiredResponseShardTracker>
 {
     public static class RequiredResponseShardTracker extends ShardTracker
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
index 4120c921b1..8fc86c3fdc 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordLoadTest.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.IMessageFilters;
@@ -61,7 +62,7 @@ public class AccordLoadTest extends AccordTestBase
     public static void setUp() throws IOException
     {
         
CassandraRelevantProperties.SIMULATOR_STARTED.setString(Long.toString(MILLISECONDS.toSeconds(currentTimeMillis())));
-        AccordTestBase.setupCluster(builder -> builder, 3);
+        AccordTestBase.setupCluster(builder -> builder.withConfig(config -> 
config.with(Feature.values())), 3);
     }
 
     @Ignore
@@ -92,6 +93,7 @@ public class AccordLoadTest extends AccordTestBase
                  final int repairInterval = 3000;
                  final int compactionInterval = 3000;
                  final int flushInterval = 1000;
+                 final int restartInterval = 10_000;
                  final int batchSizeLimit = 1000;
                  final long batchTime = TimeUnit.SECONDS.toNanos(10);
                  final int concurrency = 100;
@@ -101,6 +103,7 @@ public class AccordLoadTest extends AccordTestBase
                  long nextRepairAt = repairInterval;
                  long nextCompactionAt = compactionInterval;
                  long nextFlushAt = flushInterval;
+                 long nextRestartAt = restartInterval;
                  final BitSet initialised = new BitSet();
 
                  Random random = new Random();
@@ -168,7 +171,6 @@ public class AccordLoadTest extends AccordTestBase
                                  ((AccordService) 
AccordService.instance()).journal().checkAllCommands();
                              });
                          });
-
                      }
 
                      if ((nextFlushAt -= batchSize) <= 0)
@@ -181,6 +183,15 @@ public class AccordLoadTest extends AccordTestBase
                          }));
                      }
 
+                     if ((nextRestartAt -= batchSize) <= 0)
+                     {
+                         nextRestartAt += flushInterval;
+                         int nodeIdx = random.nextInt(cluster.size());
+                         System.out.printf("restarting node %d...\n", nodeIdx);
+                         cluster.get(nodeIdx).shutdown().get();
+                         cluster.get(nodeIdx).startup();
+                     }
+
                      final Date date = new Date();
                      System.out.printf("%tT rate: %.2f/s (%d total)\n", date, 
(((float)batchSizeLimit * 1000) / NANOSECONDS.toMillis(System.nanoTime() - 
batchStart)), batchSize);
                      System.out.printf("%tT percentiles: %d %d %d %d\n", date, 
histogram.percentile(.25)/1000, histogram.percentile(.5)/1000, 
histogram.percentile(.75)/1000, histogram.percentile(1)/1000);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to