This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new d55c9f6cbd CEP-15: (C*) Accord message processing should avoid being 
passed on to a Stage and run directly in the messageing handler
d55c9f6cbd is described below

commit d55c9f6cbdc77bacd59e5e51a830aab8758406ef
Author: David Capwell <[email protected]>
AuthorDate: Thu Apr 27 11:30:50 2023 -0700

    CEP-15: (C*) Accord message processing should avoid being passed on to a 
Stage and run directly in the messageing handler
    
    patch by David Capwell; reviewed by Ariel Weisberg, Benedict Elliott Smith 
for CASSANDRA-18364
---
 .build/build-accord.xml                            |  4 +-
 .../pre-commit/100-verify-submodules-pushed.sh     |  1 -
 modules/accord                                     |  2 +-
 .../org/apache/cassandra/concurrent/Stage.java     |  1 -
 src/java/org/apache/cassandra/config/Config.java   |  4 +-
 .../cassandra/config/DatabaseDescriptor.java       | 30 ++++-----
 .../cassandra/config/OptionaldPositiveInt.java     | 73 ++++++++++++++++++++++
 .../cassandra/config/YamlConfigurationLoader.java  | 26 +++++---
 src/java/org/apache/cassandra/net/Verb.java        | 30 ++++-----
 .../cassandra/service/accord/AccordCallback.java   | 21 ++++---
 .../service/accord/AccordCommandStore.java         | 33 +++-------
 .../service/accord/AccordCommandStores.java        |  5 +-
 .../service/accord/AccordMessageSink.java          | 23 ++++---
 .../cassandra/service/accord/AccordService.java    |  8 +--
 .../service/accord/async/AsyncOperation.java       |  2 +-
 .../accord/serializers/ReadDataSerializers.java    |  6 +-
 .../cassandra/service/accord/txn/TxnData.java      |  3 +
 .../org/apache/cassandra/utils/FBUtilities.java    |  9 +--
 .../cassandra/simulator/ClusterSimulation.java     |  7 +--
 .../AbstractPairOfSequencesPaxosSimulation.java    |  4 +-
 .../cassandra/simulator/paxos/PaxosSimulation.java | 32 ++++++++--
 .../simulator/systems/SimulatedAction.java         |  5 +-
 .../config/DatabaseDescriptorRefTest.java          |  1 +
 .../config/YamlConfigurationLoaderTest.java        | 51 +++++++++++++++
 .../service/accord/AccordMessageSinkTest.java      |  3 +-
 25 files changed, 264 insertions(+), 120 deletions(-)

diff --git a/.build/build-accord.xml b/.build/build-accord.xml
index eba85912d5..6fc716d2d0 100644
--- a/.build/build-accord.xml
+++ b/.build/build-accord.xml
@@ -27,8 +27,10 @@
         <arg value="publishToMavenLocal" />
         <arg value="-x" />
         <arg value="test" />
-        <arg value="-x" />
+        <!-- since so much development is done from this hook, by adding 
checkstyle and rat will avoid issues earlier -->
         <arg value="rat" />
+        <arg value="checkstyleMain" />
+        <arg value="checkstyleTest" />
         <arg value="-Paccord_group=org.apache.cassandra" />
         <arg value="-Paccord_artifactId=cassandra-accord" />
         <arg value="-Paccord_version=${version}" />
diff --git a/.build/git/git-hooks/pre-commit/100-verify-submodules-pushed.sh 
b/.build/git/git-hooks/pre-commit/100-verify-submodules-pushed.sh
index c54099ac0f..aee8f658a1 100755
--- a/.build/git/git-hooks/pre-commit/100-verify-submodules-pushed.sh
+++ b/.build/git/git-hooks/pre-commit/100-verify-submodules-pushed.sh
@@ -84,7 +84,6 @@ _main() {
       _log "\t\tgit config --local 
cassandra.pre-commit.verify-submodules.enabled false"
       _log "\tOr"
       _log "\t\tgit config --local 
cassandra.pre-commit.verify-submodule-${file}.enabled false"
-      set -x
       git_sub_dir="${file}/.git"
       branch="$(git config -f .gitmodules "submodule.${file}.branch")"
       [[ -z "${branch:-}" ]] && error "Submodule ${file} does not define a 
branch"
diff --git a/modules/accord b/modules/accord
index 08aaab6e33..8226b2d775 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 08aaab6e33d43406e0649146144e4df67648602a
+Subproject commit 8226b2d7759319d7a0b0c823ab09b4344c5423f7
diff --git a/src/java/org/apache/cassandra/concurrent/Stage.java 
b/src/java/org/apache/cassandra/concurrent/Stage.java
index 4a7552ab24..992c0c54f0 100644
--- a/src/java/org/apache/cassandra/concurrent/Stage.java
+++ b/src/java/org/apache/cassandra/concurrent/Stage.java
@@ -46,7 +46,6 @@ public enum Stage
     MUTATION          (true,  "MutationStage",         "request",  
DatabaseDescriptor::getConcurrentWriters,        
DatabaseDescriptor::setConcurrentWriters,        
Stage::multiThreadedLowSignalStage),
     COUNTER_MUTATION  (true,  "CounterMutationStage",  "request",  
DatabaseDescriptor::getConcurrentCounterWriters, 
DatabaseDescriptor::setConcurrentCounterWriters, 
Stage::multiThreadedLowSignalStage),
     VIEW_MUTATION     (true,  "ViewMutationStage",     "request",  
DatabaseDescriptor::getConcurrentViewWriters,    
DatabaseDescriptor::setConcurrentViewWriters,    
Stage::multiThreadedLowSignalStage),
-    ACCORD            (true,  "AccordStage",           "request",  
DatabaseDescriptor::getConcurrentAccordOps,      
DatabaseDescriptor::setConcurrentAccordOps,      
Stage::multiThreadedLowSignalStage),
     GOSSIP            (true,  "GossipStage",           "internal", () -> 1,    
                                     null,                                      
      Stage::singleThreadedStage),
     REQUEST_RESPONSE  (false, "RequestResponseStage",  "request",  
FBUtilities::getAvailableProcessors,             null,                          
                  Stage::multiThreadedLowSignalStage),
     ANTI_ENTROPY      (false, "AntiEntropyStage",      "internal", () -> 1,    
                                     null,                                      
      Stage::singleThreadedStage),
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index fbba2facf3..b674ceb92a 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -171,10 +171,9 @@ public class Config
 
     public int concurrent_reads = 32;
     public int concurrent_writes = 32;
-    public int concurrent_accord_operations = 32;
     public int concurrent_counter_writes = 32;
     public int concurrent_materialized_view_writes = 32;
-    public int available_processors = -1;
+    public OptionaldPositiveInt available_processors = new 
OptionaldPositiveInt(Integer.getInteger("cassandra.available_processors", 
OptionaldPositiveInt.UNDEFINED_VALUE));
 
     @Deprecated
     public Integer concurrent_replicates = null;
@@ -596,6 +595,7 @@ public class Config
     public volatile boolean use_statements_enabled = true;
 
     public boolean accord_transactions_enabled = false;
+    public OptionaldPositiveInt accord_shard_count = 
OptionaldPositiveInt.UNDEFINED;
 
     /**
      * Optionally disable asynchronous UDF execution.
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a48b034568..ee8d12b236 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -519,9 +519,6 @@ public class DatabaseDescriptor
         if (conf.concurrent_counter_writes < 2)
             throw new ConfigurationException("concurrent_counter_writes must 
be at least 2, but was " + conf.concurrent_counter_writes, false);
 
-        if (conf.concurrent_accord_operations < 1)
-            throw new ConfigurationException("concurrent_accord_operations 
must be at least 1, but was " + conf.concurrent_accord_operations, false);
-
         if (conf.concurrent_replicates != null)
             logger.warn("concurrent_replicates has been deprecated and should 
be removed from cassandra.yaml");
 
@@ -2092,20 +2089,6 @@ public class DatabaseDescriptor
         conf.concurrent_materialized_view_writes = 
concurrent_materialized_view_writes;
     }
 
-    public static int getConcurrentAccordOps()
-    {
-        return conf.concurrent_accord_operations;
-    }
-
-    public static void setConcurrentAccordOps(int concurrent_operations)
-    {
-        if (concurrent_operations < 0)
-        {
-            throw new IllegalArgumentException("Concurrent accord operations 
must be non-negative");
-        }
-        conf.concurrent_accord_operations = concurrent_operations;
-    }
-
     public static int getFlushWriters()
     {
         return conf.memtable_flush_writers;
@@ -2113,7 +2096,13 @@ public class DatabaseDescriptor
 
     public static int getAvailableProcessors()
     {
-        return conf == null ? -1 : conf.available_processors;
+        OptionaldPositiveInt ap = conf == null ? 
OptionaldPositiveInt.UNDEFINED : conf.available_processors;
+        return ap.or(Runtime.getRuntime()::availableProcessors);
+    }
+
+    public static void setAvailableProcessors(int value)
+    {
+        conf.available_processors = new OptionaldPositiveInt(value);
     }
 
     public static int getConcurrentCompactors()
@@ -4492,6 +4481,11 @@ public class DatabaseDescriptor
         conf.accord_transactions_enabled = b;
     }
 
+    public static int getAccordShardCount()
+    {
+        return 
conf.accord_shard_count.or(DatabaseDescriptor::getAvailableProcessors);
+    }
+
     public static boolean getForceNewPreparedStatementBehaviour()
     {
         return conf.force_new_prepared_statement_behaviour;
diff --git a/src/java/org/apache/cassandra/config/OptionaldPositiveInt.java 
b/src/java/org/apache/cassandra/config/OptionaldPositiveInt.java
new file mode 100644
index 0000000000..ea33b7af98
--- /dev/null
+++ b/src/java/org/apache/cassandra/config/OptionaldPositiveInt.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+import java.util.Objects;
+import java.util.function.IntSupplier;
+
+public class OptionaldPositiveInt
+{
+    public static final int UNDEFINED_VALUE = -1;
+    public static final OptionaldPositiveInt UNDEFINED = new 
OptionaldPositiveInt(UNDEFINED_VALUE);
+
+    private final int value;
+
+    public OptionaldPositiveInt(int value)
+    {
+        if (!(value == -1 || value >= 1))
+            throw new IllegalArgumentException(String.format("Only -1 
(undefined) and positive values are allowed; given %d", value));
+        this.value = value;
+    }
+
+    public boolean isDefined()
+    {
+        return value != UNDEFINED_VALUE;
+    }
+
+    public int or(int defaultValue)
+    {
+        return value == UNDEFINED_VALUE ? defaultValue : value;
+    }
+
+    public int or(IntSupplier defaultValue)
+    {
+        return value == UNDEFINED_VALUE ? defaultValue.getAsInt() : value;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        OptionaldPositiveInt that = (OptionaldPositiveInt) o;
+        return value == that.value;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(value);
+    }
+
+    @Override
+    public String toString()
+    {
+        return !isDefined() ? "null" : Integer.toString(value);
+    }
+}
diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java 
b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
index 528accdb74..7606e86024 100644
--- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
+++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java
@@ -133,16 +133,7 @@ public class YamlConfigurationLoader implements 
ConfigurationLoader
                 throw new AssertionError(e);
             }
 
-            SafeConstructor constructor = new CustomConstructor(Config.class, 
Yaml.class.getClassLoader());
-            Map<Class<?>, Map<String, Replacement>> replacements = 
getNameReplacements(Config.class);
-            verifyReplacements(replacements, configBytes);
-            PropertiesChecker propertiesChecker = new 
PropertiesChecker(replacements);
-            constructor.setPropertyUtils(propertiesChecker);
-            Yaml yaml = new Yaml(constructor);
-            Config result = loadConfig(yaml, configBytes);
-            propertiesChecker.check();
-            maybeAddSystemProperties(result);
-            return result;
+            return loadConfig(configBytes);
         }
         catch (YAMLException e)
         {
@@ -150,6 +141,21 @@ public class YamlConfigurationLoader implements 
ConfigurationLoader
         }
     }
 
+    @VisibleForTesting
+    static Config loadConfig(byte[] configBytes)
+    {
+        SafeConstructor constructor = new CustomConstructor(Config.class, 
Yaml.class.getClassLoader());
+        Map<Class<?>, Map<String, Replacement>> replacements = 
getNameReplacements(Config.class);
+        verifyReplacements(replacements, configBytes);
+        PropertiesChecker propertiesChecker = new 
PropertiesChecker(replacements);
+        constructor.setPropertyUtils(propertiesChecker);
+        Yaml yaml = new Yaml(constructor);
+        Config result = loadConfig(yaml, configBytes);
+        propertiesChecker.check();
+        maybeAddSystemProperties(result);
+        return result;
+    }
+
     private static void maybeAddSystemProperties(Object obj)
     {
         if 
(CassandraRelevantProperties.CONFIG_ALLOW_SYSTEM_PROPERTIES.getBoolean())
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 15584b7e8f..781261e655 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -230,29 +230,29 @@ public enum Verb
     // accord
     ACCORD_SIMPLE_RSP               (119, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> EnumSerializer.simpleReply,           RESPONSE_HANDLER                   
                                         ),
     ACCORD_PREACCEPT_RSP            (121, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> PreacceptSerializers.reply,           RESPONSE_HANDLER                   
                                         ),
-    ACCORD_PREACCEPT_REQ            (120, P2, writeTimeout, ACCORD,            
 () -> PreacceptSerializers.request,         () -> 
AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP          ),
+    ACCORD_PREACCEPT_REQ            (120, P2, writeTimeout, IMMEDIATE,         
 () -> PreacceptSerializers.request,         () -> 
AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP          ),
     ACCORD_ACCEPT_RSP               (124, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> AcceptSerializers.reply,              RESPONSE_HANDLER                   
                                         ),
-    ACCORD_ACCEPT_REQ               (122, P2, writeTimeout, ACCORD,            
 () -> AcceptSerializers.request,            () -> 
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP             ),
-    ACCORD_ACCEPT_INVALIDATE_REQ    (123, P2, writeTimeout, ACCORD,            
 () -> AcceptSerializers.invalidate,         () -> 
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP             ),
+    ACCORD_ACCEPT_REQ               (122, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.request,            () -> 
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP             ),
+    ACCORD_ACCEPT_INVALIDATE_REQ    (123, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.invalidate,         () -> 
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP             ),
     ACCORD_READ_RSP                 (128, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> ReadDataSerializers.reply,            RESPONSE_HANDLER                   
                                         ),
-    ACCORD_READ_REQ                 (127, P2, writeTimeout, ACCORD,            
 () -> ReadDataSerializers.request,          () -> 
AccordService.instance().verbHandler(), ACCORD_READ_RSP               ),
-    ACCORD_COMMIT_REQ               (125, P2, writeTimeout, ACCORD,            
 () -> CommitSerializers.request,            () -> 
AccordService.instance().verbHandler(), ACCORD_READ_RSP               ),
-    ACCORD_COMMIT_INVALIDATE_REQ    (126, P2, writeTimeout, ACCORD,            
 () -> CommitSerializers.invalidate,         () -> 
AccordService.instance().verbHandler()                                ),
+    ACCORD_READ_REQ                 (127, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.request,          () -> 
AccordService.instance().verbHandler(), ACCORD_READ_RSP               ),
+    ACCORD_COMMIT_REQ               (125, P2, writeTimeout, IMMEDIATE,         
 () -> CommitSerializers.request,            () -> 
AccordService.instance().verbHandler(), ACCORD_READ_RSP               ),
+    ACCORD_COMMIT_INVALIDATE_REQ    (126, P2, writeTimeout, IMMEDIATE,         
 () -> CommitSerializers.invalidate,         () -> 
AccordService.instance().verbHandler()                                ),
     ACCORD_APPLY_RSP                (130, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> ApplySerializers.reply,               RESPONSE_HANDLER                   
                                         ),
-    ACCORD_APPLY_REQ                (129, P2, writeTimeout, ACCORD,            
 () -> ApplySerializers.request,             () -> 
AccordService.instance().verbHandler(), ACCORD_APPLY_RSP              ),
+    ACCORD_APPLY_REQ                (129, P2, writeTimeout, IMMEDIATE,         
 () -> ApplySerializers.request,             () -> 
AccordService.instance().verbHandler(), ACCORD_APPLY_RSP              ),
     ACCORD_RECOVER_RSP              (132, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> RecoverySerializers.reply,            RESPONSE_HANDLER                   
                                         ),
-    ACCORD_RECOVER_REQ              (131, P2, writeTimeout, ACCORD,            
 () -> RecoverySerializers.request,          () -> 
AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP            ),
+    ACCORD_RECOVER_REQ              (131, P2, writeTimeout, IMMEDIATE,         
 () -> RecoverySerializers.request,          () -> 
AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP            ),
     ACCORD_BEGIN_INVALIDATE_RSP     (134, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> BeginInvalidationSerializers.reply,   RESPONSE_HANDLER                   
                                         ),
-    ACCORD_BEGIN_INVALIDATE_REQ     (133, P2, writeTimeout, ACCORD,            
 () -> BeginInvalidationSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP   ),
+    ACCORD_BEGIN_INVALIDATE_REQ     (133, P2, writeTimeout, IMMEDIATE,         
 () -> BeginInvalidationSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP   ),
     ACCORD_WAIT_COMMIT_RSP          (136, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> WaitOnCommitSerializer.reply,         RESPONSE_HANDLER                   
                                         ),
-    ACCORD_WAIT_COMMIT_REQ          (135, P2, writeTimeout, ACCORD,            
 () -> WaitOnCommitSerializer.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP        ),
-    ACCORD_INFORM_OF_TXNID_REQ      (137, P2, writeTimeout, ACCORD,            
 () -> InformOfTxnIdSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
-    ACCORD_INFORM_HOME_DURABLE_REQ  (138, P2, writeTimeout, ACCORD,            
 () -> InformHomeDurableSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
-    ACCORD_INFORM_DURABLE_REQ       (139, P2, writeTimeout, ACCORD,            
 () -> InformDurableSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
+    ACCORD_WAIT_COMMIT_REQ          (135, P2, writeTimeout, IMMEDIATE,         
 () -> WaitOnCommitSerializer.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP        ),
+    ACCORD_INFORM_OF_TXNID_REQ      (137, P2, writeTimeout, IMMEDIATE,         
 () -> InformOfTxnIdSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
+    ACCORD_INFORM_HOME_DURABLE_REQ  (138, P2, writeTimeout, IMMEDIATE,         
 () -> InformHomeDurableSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
+    ACCORD_INFORM_DURABLE_REQ       (139, P2, writeTimeout, IMMEDIATE,         
 () -> InformDurableSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
     ACCORD_CHECK_STATUS_RSP         (141, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> CheckStatusSerializers.reply,         RESPONSE_HANDLER                   
                                         ),
-    ACCORD_CHECK_STATUS_REQ         (140, P2, writeTimeout, ACCORD,            
 () -> CheckStatusSerializers.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_CHECK_STATUS_RSP       ),
+    ACCORD_CHECK_STATUS_REQ         (140, P2, writeTimeout, IMMEDIATE,         
 () -> CheckStatusSerializers.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_CHECK_STATUS_RSP       ),
     ACCORD_GET_DEPS_RSP             (143, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> GetDepsSerializers.reply,             RESPONSE_HANDLER                   
                                         ),
-    ACCORD_GET_DEPS_REQ             (142, P2, writeTimeout, ACCORD,            
 () -> GetDepsSerializers.request,           () -> 
AccordService.instance().verbHandler(), ACCORD_GET_DEPS_RSP           ),
+    ACCORD_GET_DEPS_REQ             (142, P2, writeTimeout, IMMEDIATE,         
 () -> GetDepsSerializers.request,           () -> 
AccordService.instance().verbHandler(), ACCORD_GET_DEPS_RSP           ),
 
 
     // generic failure response
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCallback.java 
b/src/java/org/apache/cassandra/service/accord/AccordCallback.java
index 60b5d6988a..20ed9fad69 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCallback.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCallback.java
@@ -22,28 +22,29 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import accord.coordinate.Timeout;
+import accord.local.AgentExecutor;
 import accord.messages.Callback;
+import accord.messages.SafeCallback;
 import accord.messages.Reply;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.RequestCallback;
 
-class AccordCallback<T extends Reply> implements RequestCallback<T>
+class AccordCallback<T extends Reply> extends SafeCallback<T> implements 
RequestCallback<T>
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordCallback.class);
-    private final Callback<T> callback;
 
-    public AccordCallback(Callback<T> callback)
+    public AccordCallback(AgentExecutor executor, Callback<T> callback)
     {
-        this.callback = callback;
+        super(executor, callback);
     }
 
     @Override
     public void onResponse(Message<T> msg)
     {
         logger.debug("Received response {} from {}", msg.payload, msg.from());
-        callback.onSuccess(EndpointMapping.endpointToId(msg.from()), 
msg.payload);
+        success(EndpointMapping.endpointToId(msg.from()), msg.payload);
     }
 
     private static Throwable convertReason(RequestFailureReason reason)
@@ -56,9 +57,15 @@ class AccordCallback<T extends Reply> implements 
RequestCallback<T>
     @Override
     public void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason)
     {
-        logger.debug("Received failure {} from {} for {}", failureReason, 
from, callback);
+        logger.debug("Received failure {} from {} for {}", failureReason, 
from, this);
         // TODO (now): we should distinguish timeout failures with some 
placeholder Exception
-        callback.onFailure(EndpointMapping.endpointToId(from), 
convertReason(failureReason));
+        failure(EndpointMapping.endpointToId(from), 
convertReason(failureReason));
+    }
+
+    @Override
+    public boolean trackLatencyForSnitch()
+    {
+        return true;
     }
 
     @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 70962298f4..1633dc7930 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -50,7 +50,7 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 
-public class AccordCommandStore implements CommandStore
+public class AccordCommandStore extends CommandStore
 {
     private static long getThreadId(ExecutorService executor)
     {
@@ -68,7 +68,6 @@ public class AccordCommandStore implements CommandStore
         }
     }
 
-    private final int id;
     private final long threadId;
     public final String loggingId;
     private final ExecutorService executor;
@@ -79,12 +78,6 @@ public class AccordCommandStore implements CommandStore
     private AccordSafeCommandStore current = null;
     private long lastSystemTimestampMicros = Long.MIN_VALUE;
 
-    private final NodeTimeService time;
-    private final Agent agent;
-    private final DataStore dataStore;
-    private final ProgressLog progressLog;
-    private final RangesForEpochHolder rangesForEpochHolder;
-
     public AccordCommandStore(int id,
                               NodeTimeService time,
                               Agent agent,
@@ -92,24 +85,20 @@ public class AccordCommandStore implements CommandStore
                               ProgressLog.Factory progressLogFactory,
                               RangesForEpochHolder rangesForEpoch)
     {
-        this.id = id;
-        this.time = time;
-        this.agent = agent;
-        this.dataStore = dataStore;
-        this.progressLog = progressLogFactory.create(this);
-        this.rangesForEpochHolder = rangesForEpoch;
+        super(id, time, agent, dataStore, progressLogFactory, rangesForEpoch);
         this.loggingId = String.format("[%s]", id);
         this.executor = 
executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + id + 
']');
         this.threadId = getThreadId(this.executor);
         this.stateCache = new AccordStateCache(8<<20);
         this.commandCache = stateCache.instance(TxnId.class, 
accord.local.Command.class, AccordSafeCommand::new, AccordObjectSizes::command);
         this.commandsForKeyCache = stateCache.instance(RoutableKey.class, 
CommandsForKey.class, AccordSafeCommandsForKey::new, 
AccordObjectSizes::commandsForKey);
+        executor.execute(() -> CommandStore.register(this));
     }
 
     @Override
-    public int id()
+    public boolean inStore()
     {
-        return id;
+        return Thread.currentThread().getId() == threadId;
     }
 
     public void setCacheSize(long bytes)
@@ -125,12 +114,12 @@ public class AccordCommandStore implements CommandStore
 
     public void checkInStoreThread()
     {
-        Invariants.checkState(Thread.currentThread().getId() == threadId);
+        Invariants.checkState(inStore());
     }
 
     public void checkNotInStoreThread()
     {
-        Invariants.checkState(Thread.currentThread().getId() != threadId);
+        Invariants.checkState(!inStore());
     }
 
     public ExecutorService executor()
@@ -197,13 +186,7 @@ public class AccordCommandStore implements CommandStore
 
     public DataStore dataStore()
     {
-        return dataStore;
-    }
-
-    @Override
-    public Agent agent()
-    {
-        return agent;
+        return store;
     }
 
     NodeTimeService time()
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
index 0708f092d2..45208e5c34 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java
@@ -25,14 +25,15 @@ import accord.local.CommandStores;
 import accord.local.NodeTimeService;
 import accord.local.ShardDistributor;
 import accord.topology.Topology;
+import accord.utils.RandomSource;
 
 public class AccordCommandStores extends CommandStores<AccordCommandStore>
 {
     private long cacheSize;
-    AccordCommandStores(NodeTimeService time, Agent agent, DataStore store,
+    AccordCommandStores(NodeTimeService time, Agent agent, DataStore store, 
RandomSource random,
                         ShardDistributor shardDistributor, ProgressLog.Factory 
progressLogFactory)
     {
-        super(time, agent, store, shardDistributor, progressLogFactory, 
AccordCommandStore::new);
+        super(time, agent, store, random, shardDistributor, 
progressLogFactory, AccordCommandStore::new);
         setCacheSize(maxCacheSize());
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index c7f1591e92..ff83f70756 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -20,14 +20,15 @@ package org.apache.cassandra.service.accord;
 
 import java.util.EnumMap;
 import java.util.Map;
-import java.util.Objects;
 
 import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import accord.api.Agent;
 import accord.api.MessageSink;
+import accord.local.AgentExecutor;
 import accord.local.Node;
 import accord.messages.Callback;
 import accord.messages.MessageType;
@@ -93,23 +94,25 @@ public class AccordMessageSink implements MessageSink
         return VerbMapping.instance.mapping.get(type);
     }
 
+    private final Agent agent;
     private final Messaging messaging;
 
-    public AccordMessageSink(Messaging messaging)
+    public AccordMessageSink(Agent agent, Messaging messaging)
     {
+        this.agent = agent;
         this.messaging = messaging;
     }
 
-    public AccordMessageSink()
+    public AccordMessageSink(Agent agent)
     {
-        this(MessagingService.instance());
+        this(agent, MessagingService.instance());
     }
 
     @Override
     public void send(Node.Id to, Request request)
     {
         Verb verb = getVerb(request.type());
-        Objects.requireNonNull(verb, "verb");
+        Preconditions.checkNotNull(verb, "Verb is null for type %s", 
request.type());
         Message<Request> message = Message.out(verb, request);
         InetAddressAndPort endpoint = getEndpoint(to);
         logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
@@ -117,14 +120,14 @@ public class AccordMessageSink implements MessageSink
     }
 
     @Override
-    public void send(Node.Id to, Request request, Callback callback)
+    public void send(Node.Id to, Request request, AgentExecutor executor, 
Callback callback)
     {
         Verb verb = getVerb(request.type());
-        Preconditions.checkArgument(verb != null);
+        Preconditions.checkNotNull(verb, "Verb is null for type %s", 
request.type());
         Message<Request> message = Message.out(verb, request);
         InetAddressAndPort endpoint = getEndpoint(to);
         logger.debug("Sending {} {} to {}", verb, message.payload, endpoint);
-        messaging.sendWithCallback(message, endpoint, new 
AccordCallback<>((Callback<Reply>) callback));
+        messaging.sendWithCallback(message, endpoint, new 
AccordCallback<>(executor, (Callback<Reply>) callback));
     }
 
     @Override
@@ -132,7 +135,9 @@ public class AccordMessageSink implements MessageSink
     {
         Message<?> replyTo = (Message<?>) replyContext;
         Message<?> replyMsg = replyTo.responseWith(reply);
-        Preconditions.checkArgument(replyMsg.verb() == getVerb(reply.type()));
+        Verb verb = getVerb(reply.type());
+        Preconditions.checkNotNull(verb, "Verb is null for type %s", 
reply.type());
+        Preconditions.checkArgument(replyMsg.verb() == verb, "Expected reply 
message with verb %s but got %s; reply type was %s", replyMsg.verb(), verb, 
reply.type());
         InetAddressAndPort endpoint = getEndpoint(replyingToNode);
         logger.debug("Replying {} {} to {}", replyMsg.verb(), 
replyMsg.payload, endpoint);
         messaging.send(replyMsg, endpoint);
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index a86cb70c53..d8b9f4d89b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -61,7 +61,6 @@ import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps;
 import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
@@ -134,7 +133,8 @@ public class AccordService implements IAccordService, 
Shutdownable
     {
         Node.Id localId = 
EndpointMapping.endpointToId(FBUtilities.getBroadcastAddressAndPort());
         logger.info("Starting accord with nodeId {}", localId);
-        this.messageSink = new AccordMessageSink();
+        AccordAgent agent = new AccordAgent();
+        this.messageSink = new AccordMessageSink(agent);
         this.configService = new AccordConfigurationService(localId);
         this.scheduler = new AccordScheduler();
         this.node = new Node(localId,
@@ -142,8 +142,8 @@ public class AccordService implements IAccordService, 
Shutdownable
                              configService,
                              AccordService::uniqueNow,
                              () -> null,
-                             new KeyspaceSplitter(new 
EvenSplit<>(getConcurrentAccordOps(), getPartitioner().accordSplitter())),
-                             new AccordAgent(),
+                             new KeyspaceSplitter(new 
EvenSplit<>(DatabaseDescriptor.getAccordShardCount(), 
getPartitioner().accordSplitter())),
+                             agent,
                              new DefaultRandom(),
                              scheduler,
                              SizeOfIntersectionSorter.SUPPLIER,
diff --git 
a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java 
b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
index 2f2dd2ce6d..ce5bb125f1 100644
--- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
+++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java
@@ -302,7 +302,7 @@ public abstract class AsyncOperation<R> extends 
AsyncChains.Head<R> implements R
     {
         Invariants.checkArgument(this.callback == null);
         this.callback = callback;
-        commandStore.executor().submit(this);
+        commandStore.executor().execute(this);
     }
 
     private static Iterable<RoutableKey> toRoutableKeys(Seekables<?, ?> keys)
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
index 4899316cc5..a344028b49 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializers.java
@@ -81,7 +81,7 @@ public class ReadDataSerializers
 
             out.writeByte(0);
             ReadOk readOk = (ReadOk) reply;
-            TxnData.serializer.serialize((TxnData) readOk.data, out, version);
+            TxnData.nullableSerializer.serialize((TxnData) readOk.data, out, 
version);
         }
 
         @Override
@@ -91,7 +91,7 @@ public class ReadDataSerializers
             if (id != 0)
                 return nacks[id - 1];
 
-            return new ReadOk(TxnData.serializer.deserialize(in, version));
+            return new ReadOk(TxnData.nullableSerializer.deserialize(in, 
version));
         }
 
         @Override
@@ -101,7 +101,7 @@ public class ReadDataSerializers
                 return TypeSizes.BYTE_SIZE;
 
             ReadOk readOk = (ReadOk) reply;
-            return TypeSizes.BYTE_SIZE + 
TxnData.serializer.serializedSize((TxnData) readOk.data, version);
+            return TypeSizes.BYTE_SIZE + 
TxnData.nullableSerializer.serializedSize((TxnData) readOk.data, version);
         }
     };
 }
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
index a095d8ba78..c3d8f6e18d 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java
@@ -40,6 +40,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.utils.NullableSerializer;
 import org.apache.cassandra.utils.ObjectSizes;
 
 public class TxnData implements Data, Result, Iterable<FilteredPartition>
@@ -196,4 +197,6 @@ public class TxnData implements Data, Result, 
Iterable<FilteredPartition>
             return size;
         }
     };
+
+    public static final IVersionedSerializer<TxnData> nullableSerializer = 
NullableSerializer.wrap(serializer);
 }
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java 
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index c760659405..714c2c66f4 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -134,19 +134,14 @@ public class FBUtilities
 
     private static volatile String previousReleaseVersionString;
 
-    private static int availableProcessors = 
Integer.getInteger("cassandra.available_processors", 
DatabaseDescriptor.getAvailableProcessors());
-
     public static void setAvailableProcessors(int value)
     {
-        availableProcessors = value;
+        DatabaseDescriptor.setAvailableProcessors(value);
     }
 
     public static int getAvailableProcessors()
     {
-        if (availableProcessors > 0)
-            return availableProcessors;
-        else
-            return Runtime.getRuntime().availableProcessors();
+        return DatabaseDescriptor.getAvailableProcessors();
     }
 
     public static final int MAX_UNSIGNED_SHORT = 0xFFFF;
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java 
b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
index e8809c606c..ac3a773453 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/ClusterSimulation.java
@@ -87,7 +87,6 @@ import org.apache.cassandra.simulator.utils.KindOfSequence;
 import org.apache.cassandra.simulator.utils.LongRange;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.Closeable;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Throwables;
 import org.apache.cassandra.utils.concurrent.Ref;
 import org.apache.cassandra.utils.memory.BufferPool;
@@ -544,7 +543,7 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
                    .set("concurrent_counter_writes", take(1, 4))
                    .set("concurrent_materialized_view_writes", take(1, 4))
                    .set("concurrent_reads", take(1, 4))
-                   .forceSet("available_processors", take(3, 4));
+                   .set("available_processors", take(3, 4));
         }
 
         // begin allocating for a new node
@@ -591,7 +590,7 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
             if (remaining * min <= allocationPool)
                 return min;
             if (times == remaining)
-                return allocationPool / remaining;
+                return Math.max(allocationPool / remaining, min);
             if (times + 1 == remaining)
                 return random.uniform(Math.max(min, (allocationPool - max) / 
times), Math.min(max, (allocationPool - min) / times));
 
@@ -602,7 +601,6 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
         }
     }
 
-
     public final RandomSource random;
     public final SimulatedSystems simulated;
     public final Cluster cluster;
@@ -720,7 +718,6 @@ public class ClusterSimulation<S extends Simulation> 
implements AutoCloseable
                              @Override
                              public void beforeStartup(IInstance i)
                              {
-                                 ((IInvokableInstance) 
i).unsafeAcceptOnThisThread(FBUtilities::setAvailableProcessors, 
i.config().getInt("available_processors"));
                                  ((IInvokableInstance) 
i).unsafeAcceptOnThisThread(IfInterceptibleThread::setThreadLocalRandomCheck, 
(LongConsumer) threadLocalRandomCheck);
 
                                  int num = i.config().num();
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
index 5a528468ea..844a8df368 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
@@ -209,6 +209,8 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
             public Action get()
             {
                 int[] primaryKeyIndex = consume(simulated.random, available);
+                if (primaryKeyIndex == null)
+                    return Actions.empty("All primary keys are taken, try 
again later");
                 long untilNanos = simulated.time.nanoTime() + 
SECONDS.toNanos(simulateKeyForSeconds.select(simulated.random));
                 int concurrency = 
withinKeyConcurrency.select(simulated.random);
                 Supplier<Action> supplier = factory.apply(simulated, 
primaryKeyIndex);
@@ -249,7 +251,7 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
     private int[] consume(RandomSource random, List<Integer> available)
     {
         if (available.isEmpty())
-            throw new AssertionError("available partitions are empty!");
+            return null;
         int numPartitions = available.size() == 1 || 
!allowMultiplePartitions() ? 1 : random.uniform(1, available.size());
         int[] partitions = new int[numPartitions];
         for (int counter = 0; counter < numPartitions; counter++)
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
index 2b600f0062..e02cd848d5 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
@@ -18,7 +18,9 @@
 
 package org.apache.cassandra.simulator.paxos;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -286,19 +288,39 @@ public abstract class PaxosSimulation implements 
Simulation, ClusterActionListen
 
     private RuntimeException logAndThrow()
     {
-        Integer causedByPrimaryKey = null;
-        Throwable causedByThrowable = null;
+        class Violation
+        {
+            final int primaryKey;
+            final Throwable cause;
+
+            Violation(int primaryKey, Throwable cause)
+            {
+                this.primaryKey = primaryKey;
+                this.cause = cause;
+            }
+        }
+        List<Violation> violations = new ArrayList<>();
         for (Throwable t : simulated.failures.get())
         {
+            Integer causedByPrimaryKey;
             if (null != (causedByPrimaryKey = causedBy(t)))
             {
-                causedByThrowable = t;
+                violations.add(new Violation(causedByPrimaryKey, t));
                 break;
             }
         }
 
-        log(causedByPrimaryKey);
-        Throwable t = (causedByPrimaryKey != null) ? causedByThrowable : 
simulated.failures.get().get(0);
+        if (!violations.isEmpty())
+        {
+            AssertionError error = new AssertionError("History violations 
detected");
+            violations.forEach(v -> {
+                log(v.primaryKey);
+                error.addSuppressed(v.cause);
+            });
+            throw error;
+        }
+
+        Throwable t = simulated.failures.get().get(0);
         Throwables.throwIfUnchecked(t);
         throw new RuntimeException(t);
     }
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedAction.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedAction.java
index aa0a400b54..9fd86547b0 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedAction.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedAction.java
@@ -382,8 +382,11 @@ public abstract class SimulatedAction extends Action 
implements InterceptorOfCon
                     notify = from;
                 }
                 boolean isTimeout = deliver != FAILURE;
+                Executor notifierExecutor = notify.executorFor(verb.id);
+                if (notifierExecutor instanceof ImmediateExecutor)
+                    notifierExecutor = notify.executor();
                 InterceptedExecution.InterceptedTaskExecution failTask = new 
InterceptedRunnableExecution(
-                    (InterceptingExecutor) notify.executorFor(verb.id),
+                    (InterceptingExecutor) notifierExecutor,
                     () -> notify.unsafeApplyOnThisThread((socketAddress, id, 
innerIsTimeout) -> {
                         InetAddressAndPort address = 
InetAddressAndPort.getByAddress(socketAddress);
                         RequestCallbacks.CallbackInfo callback = 
instance().callbacks.remove(id, address);
diff --git 
a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
index 50eee33393..80522706d4 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorRefTest.java
@@ -137,6 +137,7 @@ public class DatabaseDescriptorRefTest
     "org.apache.cassandra.config.GuardrailsOptions$ConsistencyLevels",
     "org.apache.cassandra.config.GuardrailsOptions$TableProperties",
     "org.apache.cassandra.config.ParameterizedClass",
+    "org.apache.cassandra.config.OptionaldPositiveInt",
     "org.apache.cassandra.config.ReplicaFilteringProtectionOptions",
     "org.apache.cassandra.config.StartupChecksOptions",
     "org.apache.cassandra.config.SubnetGroups",
diff --git 
a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java 
b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
index 06be1dc209..a48eb82ed0 100644
--- a/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
+++ b/test/unit/org/apache/cassandra/config/YamlConfigurationLoaderTest.java
@@ -31,6 +31,9 @@ import java.util.function.Predicate;
 import com.google.common.collect.ImmutableMap;
 import org.junit.Test;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
 import org.apache.cassandra.distributed.shared.WithProperties;
 import org.apache.cassandra.io.util.File;
 import org.yaml.snakeyaml.error.YAMLException;
@@ -359,6 +362,36 @@ public class YamlConfigurationLoaderTest
         assertThat(from("sstable_preemptive_open_interval_in_mb", 
-2).sstable_preemptive_open_interval).isNull();
     }
 
+    @Test
+    public void process()
+    {
+        for (Type type : Type.values())
+        {
+            Config c = fromType(type, "available_processors", 4);
+            assertThat(c.available_processors).isEqualTo(new 
OptionaldPositiveInt(4));
+            
assertThat(c.accord_shard_count).isEqualTo(OptionaldPositiveInt.UNDEFINED);
+
+            c = fromType(type, "available_processors", 3, 
"accord_shard_count", 1);
+            assertThat(c.available_processors).isEqualTo(new 
OptionaldPositiveInt(3));
+            assertThat(c.accord_shard_count).isEqualTo(new 
OptionaldPositiveInt(1));
+        }
+    }
+
+    private enum Type { MAP, YAML }
+
+    private static Config fromType(Type type, Object... values)
+    {
+        switch (type)
+        {
+            case MAP:
+                return from(values);
+            case YAML:
+                return fromYaml(values);
+            default:
+                throw new AssertionError("Unexpected type: " + type);
+        }
+    }
+
     private static Config from(Object... values)
     {
         assert values.length % 2 == 0 : "Map can only be created with an even 
number of inputs: given " + values.length;
@@ -368,6 +401,24 @@ public class YamlConfigurationLoaderTest
         return YamlConfigurationLoader.fromMap(builder.build(), Config.class);
     }
 
+    private static Config fromYaml(Object... values)
+    {
+        assert values.length % 2 == 0 : "Map can only be created with an even 
number of inputs: given " + values.length;
+        ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
+        for (int i = 0; i < values.length; i += 2)
+            builder.put((String) values[i], values[i + 1]);
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        try
+        {
+            byte[] bytes = mapper.writeValueAsBytes(builder.build());
+            return YamlConfigurationLoader.loadConfig(bytes);
+        }
+        catch (JsonProcessingException e)
+        {
+            throw new AssertionError("Unable to convert map to YAML", e);
+        }
+    }
+
     private static Config load(String path)
     {
         URL url = 
YamlConfigurationLoaderTest.class.getClassLoader().getResource(path);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
index 82394eaed0..a1657bb38f 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.accord;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import accord.api.Agent;
 import accord.local.Node;
 import accord.messages.InformOfTxnId;
 import accord.messages.SimpleReply;
@@ -48,7 +49,7 @@ public class AccordMessageSinkTest
         SimpleReply reply = SimpleReply.Ok;
 
         Messaging messaging = Mockito.mock(Messaging.class);
-        AccordMessageSink sink = new AccordMessageSink(messaging);
+        AccordMessageSink sink = new 
AccordMessageSink(Mockito.mock(Agent.class), messaging);
         sink.reply(new Node.Id(1), req, reply);
 
         Mockito.verify(messaging).send(Mockito.any(), Mockito.any());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to