This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit cc6f89124c171d4aea6cd3852000c780f16b7e99
Author: David Capwell <[email protected]>
AuthorDate: Thu Jan 12 12:07:54 2023 -0800

    CASSANDRA-18154: CEP-15: Enhance returning SELECT to allow partition and 
clustering IN clauses to return multiple partitions/rows
---
 .build/cassandra-build-deps-template.xml           |   5 +
 .build/include-accord.sh                           |   2 +-
 .build/parent-pom-template.xml                     |  21 +
 ide/idea-iml-file.xml                              |   1 +
 ide/idea/workspace.xml                             |  35 +-
 .../metrics/AccordClientRequestMetrics.java        |  37 +-
 .../cassandra/service/accord/AccordCommand.java    |   2 +
 .../cassandra/service/accord/AccordService.java    |  27 +
 .../cassandra/utils/logging/ClassNameFilter.java   |  38 +-
 test/conf/logback-simulator.xml                    |  14 +
 .../org/apache/cassandra/distributed/api/Row.java  | 160 ++++++
 .../apache/cassandra/distributed/impl/Query.java   |  12 +-
 .../distributed/util/QueryResultUtil.java          |   2 +
 .../org/apache/cassandra/simulator/ActionList.java |   4 +
 .../AbstractPairOfSequencesPaxosSimulation.java    | 201 +++----
 .../cassandra/simulator/paxos/HistoryChecker.java  |  32 +-
 .../{Observation.java => HistoryValidator.java}    |  41 +-
 .../simulator/paxos/LinearizabilityValidator.java  |  83 +++
 .../simulator/paxos/LoggingHistoryValidator.java   |  73 +++
 .../cassandra/simulator/paxos/Observation.java     |  16 +-
 .../paxos/PairOfSequencesAccordSimulation.java     | 310 +++++------
 .../paxos/PairOfSequencesPaxosSimulation.java      | 165 +++++-
 .../cassandra/simulator/paxos/PaxosSimulation.java |  53 +-
 .../paxos/StrictSerializabilityValidator.java      | 112 ++++
 .../simulator/systems/SimulatedQuery.java          |   5 +-
 .../simulator/paxos/HistoryValidatorTest.java      | 592 +++++++++++++++++++++
 .../cassandra/service/accord/AccordTestUtils.java  |  14 +-
 .../org/apache/cassandra/utils/AssertionUtils.java |  46 +-
 28 files changed, 1724 insertions(+), 379 deletions(-)

diff --git a/.build/cassandra-build-deps-template.xml 
b/.build/cassandra-build-deps-template.xml
index 93387996ef..cfd2806fe7 100644
--- a/.build/cassandra-build-deps-template.xml
+++ b/.build/cassandra-build-deps-template.xml
@@ -127,5 +127,10 @@
       <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-yaml</artifactId>
     </dependency>
+    <dependency>
+      <groupId>accord</groupId>
+      <artifactId>accord</artifactId>
+      <classifier>tests</classifier>
+    </dependency>
   </dependencies>
 </project>
diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index 2144e80b17..eabe4d204b 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -25,7 +25,7 @@ set -o nounset
 bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
 
 accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='ad326d5df8d99d4799fa87de81482e3cb1fb92de'
+accord_branch='5626c7c11400d4cf6d01a8e22517b53a83f5c512'
 accord_src="$bin/cassandra-accord"
 
 checkout() {
diff --git a/.build/parent-pom-template.xml b/.build/parent-pom-template.xml
index c582933234..218065968d 100644
--- a/.build/parent-pom-template.xml
+++ b/.build/parent-pom-template.xml
@@ -679,6 +679,27 @@
         <artifactId>accord</artifactId>
         <version>1.0-SNAPSHOT</version>
       </dependency>
+      <dependency>
+        <groupId>accord</groupId>
+        <artifactId>accord</artifactId>
+        <version>1.0-SNAPSHOT</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <artifactId>org.junit.jupiter</artifactId>
+            <groupId>junit-jupiter-api</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>org.junit.jupiter</artifactId>
+            <groupId>junit-jupiter-engine</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>ch.qos.logback</artifactId>
+            <groupId>logback-classic</groupId>
+          </exclusion>
+        </exclusions>
+      </dependency>
       <dependency>
         <groupId>io.airlift</groupId>
         <artifactId>airline</artifactId>
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index c0b55842fa..4c529892a4 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -39,6 +39,7 @@
             <sourceFolder url="file://$MODULE_DIR$/test/simulator/asm" 
isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/simulator/bootstrap" 
isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/simulator/main" 
isTestSource="true" />
+            <sourceFolder url="file://$MODULE_DIR$/test/simulator/test" 
isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/resources" 
type="java-test-resource" />
             <sourceFolder url="file://$MODULE_DIR$/test/conf" 
type="java-test-resource" />
             <excludeFolder url="file://$MODULE_DIR$/.idea" />
diff --git a/ide/idea/workspace.xml b/ide/idea/workspace.xml
index a89921cc77..9248a8f6c9 100644
--- a/ide/idea/workspace.xml
+++ b/ide/idea/workspace.xml
@@ -167,7 +167,40 @@
       <option name="MAIN_CLASS_NAME" value="" />
       <option name="METHOD_NAME" value="" />
       <option name="TEST_OBJECT" value="class" />
-      <option name="VM_PARAMETERS" 
value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml 
-Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml 
-Dcassandra.logdir=$PROJECT_DIR$/build/test/logs 
-Djava.library.path=$PROJECT_DIR$/lib/sigar-bin 
-Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables 
-Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables 
-Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMet [...]
+      <option name="VM_PARAMETERS" value="
+        -ea 
+        -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin 
+        -Djava.security.egd=file:/dev/urandom
+        -XX:-BackgroundCompilation
+        -XX:-TieredCompilation
+        -XX:ActiveProcessorCount=4
+        -XX:CICompilerCount=1
+        -XX:HeapDumpPath=build/test 
+        -XX:MaxMetaspaceSize=384M 
+        -XX:ReservedCodeCacheSize=256M
+        -XX:SoftRefLRUPolicyMSPerMB=0 
+        -XX:Tier4CompileThreshold=1000
+        
-Xbootclasspath/a:$PROJECT_DIR$/build/test/lib/jars/simulator-bootstrap.jar
+        -Xmx8G
+        -Xss384k
+        -javaagent:$PROJECT_DIR$/build/test/lib/jars/simulator-asm.jar
+
+        -Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml 
+        -Dcassandra.debugrefcount=false
+        -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs 
+        -Dcassandra.memtable_row_overhead_computation_step=100
+        -Dcassandra.reads.thresholds.coordinator.defensive_checks_enabled=true
+        -Dcassandra.ring_delay_ms=10000
+        -Dcassandra.skip_sync=true 
+        -Dcassandra.strict.runtime.checks=true 
+        -Dcassandra.test.simulator.determinismcheck=strict
+        -Dcassandra.test.sstableformatdevelopment=true
+        -Dcassandra.tolerate_sstable_size=true 
+        
-Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables 
+        -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables 
+        
-Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml 
+        -Dmigration-sstable-root=$PROJECT_DIR$/test/data/migration-sstables 
+        " />
       <option name="PARAMETERS" value="" />
       <fork_mode value="class" />
       <option name="WORKING_DIRECTORY" value="" />
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java 
b/src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java
similarity index 52%
copy from 
test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
copy to src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java
index 546fd3179f..c95c3bd11f 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
+++ b/src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java
@@ -16,30 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.simulator.paxos;
+package org.apache.cassandra.metrics;
 
-class Observation implements Comparable<Observation>
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class AccordClientRequestMetrics extends ClientRequestMetrics
 {
-    final int id;
-    final Object[][] result;
-    final int start;
-    final int end;
+    public final Meter preempts;
+    public final Histogram keySize;
 
-    Observation(int id, Object[][] result, int start, int end)
+    public AccordClientRequestMetrics(String scope)
     {
-        this.id = id;
-        this.result = result;
-        this.start = start;
-        this.end = end;
+        super(scope);
+
+        preempts = Metrics.meter(factory.createMetricName("Preempts"));
+        keySize = 
Metrics.histogram(factory.createMetricName("KeySizeHistogram"), false);
     }
 
-    // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many 
pair-wise comparisons the answer is 0
-    public int compareTo(Observation that)
+    @Override
+    public void release()
     {
-        if (this.end < that.start)
-            return -1;
-        if (that.end < this.start)
-            return 1;
-        return 0;
+        super.release();
+        Metrics.remove(factory.createMetricName("Preempts"));
+        Metrics.remove(factory.createMetricName("KeySizeHistogram"));
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 5b4a8c2e9b..8020b29ac7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -706,6 +706,8 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
         if (listener instanceof AccordCommandsForKey)
             return new 
ListenerProxy.CommandsForKeyListenerProxy(((AccordCommandsForKey) 
listener).key());
 
+        //TODO - Support accord.messages.Defer
+
         throw new RuntimeException("Unhandled non-transient listener: " + 
listener);
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 0135c741c3..5fee18e7fe 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
 import com.google.common.annotations.VisibleForTesting;
 
 import accord.api.Result;
+import accord.coordinate.Preempted;
 import accord.coordinate.Timeout;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
@@ -40,6 +41,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.metrics.AccordClientRequestMetrics;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.service.accord.api.AccordAgent;
 import 
org.apache.cassandra.service.accord.api.AccordRoutingKey.KeyspaceSplitter;
@@ -53,9 +55,13 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps;
 import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 public class AccordService implements Shutdownable
 {
+    public static final AccordClientRequestMetrics readMetrics = new 
AccordClientRequestMetrics("AccordRead");
+    public static final AccordClientRequestMetrics writeMetrics = new 
AccordClientRequestMetrics("AccordWrite");
+
     public final Node node;
     private final Shutdownable nodeShutdown;
     private final AccordMessageSink messageSink;
@@ -122,8 +128,11 @@ public class AccordService implements Shutdownable
      */
     public TxnData coordinate(Txn txn, ConsistencyLevel consistencyLevel)
     {
+        AccordClientRequestMetrics metrics = txn.isWrite() ? writeMetrics : 
readMetrics;
+        final long startNanos = nanoTime();
         try
         {
+            metrics.keySize.update(txn.keys().size());
             Future<Result> future = node.coordinate(txn);
             Result result = 
future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
             return (TxnData) result;
@@ -132,17 +141,35 @@ public class AccordService implements Shutdownable
         {
             Throwable cause = e.getCause();
             if (cause instanceof Timeout)
+            {
+                metrics.timeouts.mark();
+                throw throwTimeout(txn, consistencyLevel);
+            }
+            if (cause instanceof Preempted)
+            {
+                metrics.preempts.mark();
+                //TODO need to improve
+                // Coordinator "could" query the accord state to see whats 
going on but that doesn't exist yet.
+                // Protocol also doesn't have a way to denote "unknown" 
outcome, so using a timeout as the closest match
                 throw throwTimeout(txn, consistencyLevel);
+            }
+            metrics.failures.mark();
             throw new RuntimeException(cause);
         }
         catch (InterruptedException e)
         {
+            metrics.failures.mark();
             throw new UncheckedInterruptedException(e);
         }
         catch (TimeoutException e)
         {
+            metrics.timeouts.mark();
             throw throwTimeout(txn, consistencyLevel);
         }
+        finally
+        {
+            metrics.addNano(nanoTime() - startNanos);
+        }
     }
 
     private static RuntimeException throwTimeout(Txn txn, ConsistencyLevel 
consistencyLevel)
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java 
b/src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java
similarity index 54%
copy from 
test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
copy to src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java
index 546fd3179f..ef0ca2c5ca 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
+++ b/src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java
@@ -16,30 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.simulator.paxos;
+package org.apache.cassandra.utils.logging;
 
-class Observation implements Comparable<Observation>
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.filter.AbstractMatcherFilter;
+import ch.qos.logback.core.spi.FilterReply;
+
+public class ClassNameFilter extends AbstractMatcherFilter<ILoggingEvent>
 {
-    final int id;
-    final Object[][] result;
-    final int start;
-    final int end;
+    String loggerName;
+
+    public void setLoggerName(String loggerName)
+    {
+        this.loggerName = loggerName;
+    }
 
-    Observation(int id, Object[][] result, int start, int end)
+    @Override
+    public FilterReply decide(ILoggingEvent event)
     {
-        this.id = id;
-        this.result = result;
-        this.start = start;
-        this.end = end;
+        if (!isStarted()) return FilterReply.NEUTRAL;
+        if (event.getLoggerName().equals(loggerName)) return onMatch;
+        return onMismatch;
     }
 
-    // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many 
pair-wise comparisons the answer is 0
-    public int compareTo(Observation that)
+    @Override
+    public void start()
     {
-        if (this.end < that.start)
-            return -1;
-        if (that.end < this.start)
-            return 1;
-        return 0;
+        if (loggerName != null) super.start();
     }
 }
diff --git a/test/conf/logback-simulator.xml b/test/conf/logback-simulator.xml
index 3566779ea4..69def30afb 100644
--- a/test/conf/logback-simulator.xml
+++ b/test/conf/logback-simulator.xml
@@ -24,6 +24,19 @@
   <!-- Shutdown hook ensures that async appender flushes -->
   <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
 
+  <appender name="HISTORYLOG" class="ch.qos.logback.core.FileAppender">
+    
<file>./build/test/logs/simulator/${run_start}-${run_seed}/history.log</file>
+    <encoder>
+      <pattern>%msg%n</pattern>
+    </encoder>
+    <immediateFlush>true</immediateFlush>
+    <filter class="org.apache.cassandra.utils.logging.ClassNameFilter">
+      
<loggerName>org.apache.cassandra.simulator.paxos.LoggingHistoryValidator</loggerName>
+      <onMatch>ACCEPT</onMatch>
+      <onMismatch>DENY</onMismatch>
+    </filter>
+  </appender>
+
   <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
     
<file>./build/test/logs/simulator/${run_start}-${run_seed}/${instance_id}/system.log</file>
     <encoder>
@@ -56,6 +69,7 @@
   <root level="INFO">
     <appender-ref ref="INSTANCEFILE" />
     <appender-ref ref="STDOUT" />
+    <appender-ref ref="HISTORYLOG" />
   </root>
 </configuration>
 
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Row.java 
b/test/distributed/org/apache/cassandra/distributed/api/Row.java
index 33272ed3d5..556a5716c0 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/Row.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/Row.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.api;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -85,6 +86,17 @@ public class Row
         return (T) results[index];
     }
 
+    public <T> T get(int index, T defaultValue)
+    {
+        checkAccess();
+        if (index < 0 || index >= results.length)
+            throw new NoSuchElementException("by index: " + index);
+        T result = (T) results[index];
+        if (result == null)
+            return defaultValue;
+        return result;
+    }
+
     public <T> T get(String name)
     {
         checkAccess();
@@ -94,66 +106,158 @@ public class Row
         return (T) results[idx];
     }
 
+    public <T> T get(String name, T defaultValue)
+    {
+        checkAccess();
+        int idx = findIndex(name);
+        if (idx == NOT_FOUND)
+            throw new NoSuchElementException("by name: " + name);
+        T result = (T) results[idx];
+        if (result == null)
+            return defaultValue;
+        return result;
+    }
+
+    public Boolean getBoolean(int index)
+    {
+        return get(index);
+    }
+
+    public Boolean getBoolean(int index, Boolean defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
+    public Boolean getBoolean(String name)
+    {
+        return get(name);
+    }
+
+    public Boolean getBoolean(String name, Boolean defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Short getShort(int index)
     {
         return get(index);
     }
 
+    public Short getShort(int index, Short defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Short getShort(String name)
     {
         return get(name);
     }
 
+    public Short getShort(String name, Short defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Integer getInteger(int index)
     {
         return get(index);
     }
 
+    public Integer getInteger(int index, Integer defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Integer getInteger(String name)
     {
         return get(name);
     }
 
+    public Integer getInteger(String name, Integer defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Long getLong(int index)
     {
         return get(index);
     }
 
+    public Long getLong(int index, Long defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Long getLong(String name)
     {
         return get(name);
     }
 
+    public Long getLong(String name, Long defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Float getFloat(int index)
     {
         return get(index);
     }
 
+    public Float getFloat(int index, Float defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Float getFloat(String name)
     {
         return get(name);
     }
 
+    public Float getFloat(String name, Float defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Double getDouble(int index)
     {
         return get(index);
     }
 
+    public Double getDouble(int index, Double defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Double getDouble(String name)
     {
         return get(name);
     }
 
+    public Double getDouble(String name, Double defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public String getString(int index)
     {
         return get(index);
     }
 
+    public String getString(int index, String defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public String getString(String name)
     {
         return get(name);
     }
 
+    public String getString(String name, String defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public UUID getUUID(int index)
     {
         Object uuid = get(index);
@@ -162,6 +266,14 @@ public class Row
         return (UUID) uuid;
     }
 
+    public UUID getUUID(int index, UUID defaultValue)
+    {
+        Object uuid = get(index, defaultValue);
+        if (uuid instanceof TimeUUID)
+            return ((TimeUUID) uuid).asUUID();
+        return (UUID) uuid;
+    }
+
     public UUID getUUID(String name)
     {
         Object uuid = get(name);
@@ -170,26 +282,74 @@ public class Row
         return (UUID) uuid;
     }
 
+    public UUID getUUID(String name, UUID defaultValue)
+    {
+        Object uuid = get(name, defaultValue);
+        if (uuid instanceof TimeUUID)
+            return ((TimeUUID) uuid).asUUID();
+        return (UUID) uuid;
+    }
+
     public Date getTimestamp(int index)
     {
         return get(index);
     }
 
+    public Date getTimestamp(int index, Date defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Date getTimestamp(String name)
     {
         return get(name);
     }
 
+    public Date getTimestamp(String name, Date defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public <T> Set<T> getSet(int index)
     {
         return get(index);
     }
 
+    public <T> Set<T> getSet(int index, Set<T> defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public <T> Set<T> getSet(String name)
     {
         return get(name);
     }
 
+    public <T> Set<T> getSet(String name, Set<T> defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
+    public <T> List<T> getList(int index)
+    {
+        return get(index);
+    }
+
+    public <T> List<T> getList(int index, List<T> defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
+    public <T> List<T> getList(String name)
+    {
+        return get(name);
+    }
+
+    public <T> List<T> getList(String name, List<T> defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     /**
      * Get the row as a array.
      */
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Query.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Query.java
index 823113f857..f40ebca904 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Query.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Query.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
@@ -37,15 +38,15 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
-public class Query implements 
IIsolatedExecutor.SerializableCallable<Object[][]>
+public class Query implements 
IIsolatedExecutor.SerializableCallable<SimpleQueryResult>
 {
     private static final long serialVersionUID = 1L;
 
-    final String query;
+    public final String query;
     final long timestamp;
     final org.apache.cassandra.distributed.api.ConsistencyLevel 
commitConsistencyOrigin;
     final org.apache.cassandra.distributed.api.ConsistencyLevel 
serialConsistencyOrigin;
-    final Object[] boundValues;
+    public final Object[] boundValues;
 
     public Query(String query, long timestamp, 
org.apache.cassandra.distributed.api.ConsistencyLevel commitConsistencyOrigin, 
org.apache.cassandra.distributed.api.ConsistencyLevel serialConsistencyOrigin, 
Object[] boundValues)
     {
@@ -56,7 +57,8 @@ public class Query implements 
IIsolatedExecutor.SerializableCallable<Object[][]>
         this.boundValues = boundValues;
     }
 
-    public Object[][] call()
+    @Override
+    public SimpleQueryResult call()
     {
         ConsistencyLevel commitConsistency = 
toCassandraCL(commitConsistencyOrigin);
         ConsistencyLevel serialConsistency = serialConsistencyOrigin == null ? 
null : toCassandraCL(serialConsistencyOrigin);
@@ -89,7 +91,7 @@ public class Query implements 
IIsolatedExecutor.SerializableCallable<Object[][]>
         if (res != null)
             res.setWarnings(ClientWarn.instance.getWarnings());
 
-        return RowUtil.toQueryResult(res).toObjectArrays();
+        return RowUtil.toQueryResult(res);
     }
 
     public String toString()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java 
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
index 9d08f0a567..a9cfbb16a8 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
@@ -117,6 +117,7 @@ public class QueryResultUtil
     {
         StringBuilder sb = new StringBuilder();
         int rowNum = 1;
+        qr.mark();
         while (qr.hasNext())
         {
             sb.append("@ Row ").append(rowNum).append('\n');
@@ -129,6 +130,7 @@ public class QueryResultUtil
             }
             sb.append(table);
         }
+        qr.reset();
         return sb.toString();
     }
 
diff --git a/test/simulator/main/org/apache/cassandra/simulator/ActionList.java 
b/test/simulator/main/org/apache/cassandra/simulator/ActionList.java
index 64474e6184..2357684043 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/ActionList.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/ActionList.java
@@ -41,6 +41,10 @@ public class ActionList extends AbstractCollection<Action>
     public static ActionList empty() { return EMPTY; }
     public static ActionList of(Action action) { return new ActionList(new 
Action[] { action }); }
     public static ActionList of(Stream<Action> action) { return new 
ActionList(action.toArray(Action[]::new)); }
+    public static ActionList of(Stream<Action> action, Stream<Action>... 
actions)
+    {
+        return new ActionList(Stream.concat(action, 
Stream.of(actions).flatMap(a -> a)).toArray(Action[]::new));
+    }
     public static ActionList of(Collection<Action> actions) { return 
actions.isEmpty() ? EMPTY : new ActionList(actions.toArray(new Action[0])); }
     public static ActionList of(Action ... actions) { return new 
ActionList(actions); }
 
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
index 54a871bb45..5a528468ea 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
@@ -22,35 +22,43 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import javax.annotation.Nullable;
+import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.impl.FileLogAction;
 import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.simulator.Action;
 import org.apache.cassandra.simulator.ActionList;
-import org.apache.cassandra.simulator.ActionListener;
 import org.apache.cassandra.simulator.ActionPlan;
 import org.apache.cassandra.simulator.Actions;
 import org.apache.cassandra.simulator.Debug;
+import org.apache.cassandra.simulator.RandomSource;
 import org.apache.cassandra.simulator.RunnableActionScheduler;
 import org.apache.cassandra.simulator.cluster.ClusterActions;
 import org.apache.cassandra.simulator.cluster.KeyspaceActions;
+import org.apache.cassandra.simulator.logging.RunStartDefiner;
+import org.apache.cassandra.simulator.logging.SeedDefiner;
 import org.apache.cassandra.simulator.systems.SimulatedActionTask;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
 import org.apache.cassandra.simulator.utils.IntRange;
+import org.apache.cassandra.utils.Pair;
 
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -59,7 +67,6 @@ import static 
org.apache.cassandra.simulator.Action.Modifiers.RELIABLE_NO_TIMEOU
 import static 
org.apache.cassandra.simulator.ActionSchedule.Mode.STREAM_LIMITED;
 import static 
org.apache.cassandra.simulator.ActionSchedule.Mode.TIME_AND_STREAM_LIMITED;
 import static org.apache.cassandra.simulator.ActionSchedule.Mode.TIME_LIMITED;
-import static org.apache.cassandra.simulator.Debug.EventType.PARTITION;
 
 @SuppressWarnings("unused")
 abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation
@@ -77,7 +84,6 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends 
PaxosSimulation
     final IntRange simulateKeyForSeconds;
     final ConsistencyLevel serialConsistency;
     final Debug debug;
-    final List<HistoryChecker> historyCheckers = new ArrayList<>();
     final AtomicInteger successfulReads = new AtomicInteger();
     final AtomicInteger successfulWrites = new AtomicInteger();
     final AtomicInteger failedReads = new AtomicInteger();
@@ -112,10 +118,71 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
 
     protected abstract String preInsertStmt();
 
-    abstract Operation verifying(int operationId, IInvokableInstance instance, 
int primaryKey, HistoryChecker historyChecker);
-    abstract Operation nonVerifying(int operationId, IInvokableInstance 
instance, int primaryKey, HistoryChecker historyChecker);
-    abstract Operation modifying(int operationId, IInvokableInstance instance, 
int primaryKey, HistoryChecker historyChecker);
     abstract boolean joinAll();
+    boolean allowMultiplePartitions() { return false; }
+
+    abstract BiFunction<SimulatedSystems, int[], Supplier<Action>> 
actionFactory();
+
+    protected Action checkErrorLogs(IInvokableInstance inst)
+    {
+        DatabaseDescriptor.clientInitialization();
+        return new Action("Error logs for node" + inst.config().num(), 
Action.Modifiers.NONE)
+        {
+            @Override
+            protected ActionList performSimple()
+            {
+                // can't use inst.logs as that runs in the class loader, which 
uses in-memory file system
+                String suite = new RunStartDefiner().getPropertyValue() + "-" 
+  new SeedDefiner().getPropertyValue();
+                String instanceId = "node" + inst.config().num();
+                File logFile = new 
File(String.format("build/test/logs/simulator/%s/%s/system.log", suite, 
instanceId));
+                FileLogAction logs = new FileLogAction(logFile);
+
+                LogResult<List<String>> errors = logs.grepForErrors();
+                if (!errors.getResult().isEmpty())
+                {
+                    List<Pair<String, String>> errorsSeen = new ArrayList<>();
+                    for (String error : errors.getResult())
+                    {
+                        for (String line : error.split("\\n"))
+                        {
+                            line = line.trim();
+                            if (line.startsWith("ERROR")) continue;
+                            if (line.startsWith("at ")) continue;
+                            errorsSeen.add(Pair.create(line.split(":")[0], 
error));
+                            break;
+                        }
+                    }
+                    Class<? extends Throwable>[] expected = 
expectedExceptions();
+                    StringBuilder sb = new StringBuilder();
+                    for (Pair<String, String> pair : errorsSeen)
+                    {
+                        String name = pair.left;
+                        String exception = pair.right;
+                        Class<?> klass;
+                        try
+                        {
+                            klass = Class.forName(name);
+                        }
+                        catch (ClassNotFoundException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+
+                        if (!Stream.of(expected).anyMatch(e -> 
e.isAssignableFrom(klass)))
+                            sb.append("Unexpected 
exception:\n").append(exception).append('\n');
+                    }
+                    if (sb.length() > 0)
+                    {
+                        AssertionError error = new AssertionError("Saw errors 
in node" + inst.config().num() + ": " + sb);
+                        // this stacktrace isn't helpful, can be more confusing
+                        error.setStackTrace(new StackTraceElement[0]);
+                        throw error;
+                    }
+                }
+                return ActionList.empty();
+            }
+        };
+    }
 
     public ActionPlan plan()
     {
@@ -124,101 +191,27 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
 
         plan = plan.encapsulate(ActionPlan.setUpTearDown(
             ActionList.of(
-                cluster.stream().map(i -> simulated.run("Insert Partitions", 
i, executeForPrimaryKeys(preInsertStmt(), primaryKeys)))
-            ).andThen(
+                cluster.stream().map(i -> simulated.run("Insert Partitions", 
i, executeForPrimaryKeys(preInsertStmt(), primaryKeys))),
                 // TODO (now): this is temporary until we have correct epoch 
handling
-                ActionList.of(
-                    cluster.stream().map(i -> simulated.run("Create Accord 
Epoch", i, () -> AccordService.instance().createEpochFromConfigUnsafe()))
-                )
-//            ).andThen(
-//                // TODO (now): this is temporary until we have 
parameterisation of simulation
-//                ActionList.of(
-//                    cluster.stream().map(i -> simulated.run("Disable Accord 
Cache", i, () -> AccordService.instance.setCacheSize(0)))
-//                )
+                cluster.stream().map(i -> simulated.run("Create Accord Epoch", 
i, () -> AccordService.instance().createEpochFromConfigUnsafe()))
             ),
             ActionList.of(
+                cluster.stream().map(i -> checkErrorLogs(i)),
                 cluster.stream().map(i -> 
SimulatedActionTask.unsafeTask("Shutdown " + i.broadcastAddress(), RELIABLE, 
RELIABLE_NO_TIMEOUTS, simulated, i, i::shutdown))
             )
         ));
 
-        final int nodes = cluster.size();
-        for (int primaryKey : primaryKeys)
-            historyCheckers.add(new HistoryChecker(primaryKey));
-
-        List<Supplier<Action>> primaryKeyActions = new ArrayList<>();
-        for (int pki = 0 ; pki < primaryKeys.length ; ++pki)
-        {
-            int primaryKey = primaryKeys[pki];
-            HistoryChecker historyChecker = historyCheckers.get(pki);
-            Supplier<Action> supplier = new Supplier<Action>()
-            {
-                int i = 0;
-
-                @Override
-                public Action get()
-                {
-                    int node = simulated.random.uniform(1, nodes + 1);
-                    IInvokableInstance instance = cluster.get(node);
-                    switch (serialConsistency)
-                    {
-                        default: throw new AssertionError();
-                        case LOCAL_SERIAL:
-                            if (simulated.snitch.dcOf(node) > 0)
-                            {
-                                // perform some queries against these nodes 
but don't expect them to be linearizable
-                                return nonVerifying(i++, instance, primaryKey, 
historyChecker);
-                            }
-                        case SERIAL:
-                            return simulated.random.decide(readRatio)
-                                   ? verifying(i++, instance, primaryKey, 
historyChecker)
-                                   : modifying(i++, instance, primaryKey, 
historyChecker);
-                    }
-                }
-
-                @Override
-                public String toString()
-                {
-                    return Integer.toString(primaryKey);
-                }
-            };
-
-            final ActionListener listener = debug.debug(PARTITION, 
simulated.time, cluster, KEYSPACE, primaryKey);
-            if (listener != null)
-            {
-                Supplier<Action> wrap = supplier;
-                supplier = new Supplier<Action>()
-                {
-                    @Override
-                    public Action get()
-                    {
-                        Action action = wrap.get();
-                        action.register(listener);
-                        return action;
-                    }
-
-                    @Override
-                    public String toString()
-                    {
-                        return wrap.toString();
-                    }
-                };
-            }
-
-            primaryKeyActions.add(supplier);
-        }
+        BiFunction<SimulatedSystems, int[], Supplier<Action>> factory = 
actionFactory();
 
         List<Integer> available = IntStream.range(0, 
primaryKeys.length).boxed().collect(Collectors.toList());
         Action stream = Actions.infiniteStream(concurrency, new 
Supplier<Action>() {
             @Override
             public Action get()
             {
-                int i = simulated.random.uniform(0, available.size());
-                int next = available.get(i);
-                available.set(i, available.get(available.size() - 1));
-                available.remove(available.size() - 1);
+                int[] primaryKeyIndex = consume(simulated.random, available);
                 long untilNanos = simulated.time.nanoTime() + 
SECONDS.toNanos(simulateKeyForSeconds.select(simulated.random));
                 int concurrency = 
withinKeyConcurrency.select(simulated.random);
-                Supplier<Action> supplier = primaryKeyActions.get(next);
+                Supplier<Action> supplier = factory.apply(simulated, 
primaryKeyIndex);
                 // while this stream is finite, it participates in an infinite 
stream via its parent, so we want to permit termination while it's running
                 return Actions.infiniteStream(concurrency, new 
Supplier<Action>()
                 {
@@ -227,7 +220,7 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
                     {
                         if (simulated.time.nanoTime() >= untilNanos)
                         {
-                            available.add(next);
+                            
IntStream.of(primaryKeyIndex).boxed().forEach(available::add);
                             return null;
                         }
                         return supplier.get();
@@ -253,7 +246,30 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
                                   
.encapsulate(ActionPlan.interleave(singletonList(ActionList.of(stream))));
     }
 
-    private IIsolatedExecutor.SerializableRunnable 
executeForPrimaryKeys(String cql, int[] primaryKeys)
+    private int[] consume(RandomSource random, List<Integer> available)
+    {
+        if (available.isEmpty())
+            throw new AssertionError("available partitions are empty!");
+        int numPartitions = available.size() == 1 || 
!allowMultiplePartitions() ? 1 : random.uniform(1, available.size());
+        int[] partitions = new int[numPartitions];
+        for (int counter = 0; counter < numPartitions; counter++)
+        {
+            int idx = random.uniform(0, available.size());
+            int next = available.get(idx);
+            int last = available.get(available.size() - 1);
+            if (available.set(idx, last) != next)
+                throw new IllegalStateException("Expected to set " + last + " 
index " + idx + " but did not return " + next);
+            int removed = available.remove(available.size() - 1);
+            if (last != removed)
+                throw new IllegalStateException("Expected to remove " + last + 
" but removed " + removed);
+
+            partitions[counter] = next;
+        }
+        Arrays.sort(partitions);
+        return partitions;
+    }
+
+    IIsolatedExecutor.SerializableRunnable executeForPrimaryKeys(String cql, 
int[] primaryKeys)
     {
         return () -> {
             for (int primaryKey : primaryKeys)
@@ -273,13 +289,6 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
         return new PaxosRepairValidator(cluster, KEYSPACE, TABLE, id);
     }
 
-    @Override
-    void log(@Nullable Integer primaryKey)
-    {
-        if (primaryKey == null) historyCheckers.forEach(HistoryChecker::print);
-        else historyCheckers.stream().filter(h -> h.primaryKey == 
primaryKey).forEach(HistoryChecker::print);
-    }
-
     @Override
     public void run()
     {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java
index d1e0771b1e..2465bf62cf 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java
@@ -127,13 +127,23 @@ class HistoryChecker
         return byId[id] = event;
     }
 
-    void witness(Observation witness, int[] witnessSequence, int start, int 
end)
+    private static int eventId(int[] witnessSequence, int eventPosition)
+    {
+        return eventPosition == 0 ? -1 : witnessSequence[eventPosition - 1];
+    }
+
+    void witness(Observation witness, int[] witnessSequence)
+    {
+        witness(witness.id, witnessSequence, witness.start, witness.end);
+    }
+
+    void witness(int id, int[] witnessSequence, int start, int end)
     {
         int eventPosition = witnessSequence.length;
-        int eventId = eventPosition == 0 ? -1 : witnessSequence[eventPosition 
- 1];
-        setById(witness.id, new Event(witness.id)).log.add(new 
VerboseWitness(witness.id, start, end, witnessSequence));
+        int eventId = eventId(witnessSequence, eventPosition);
+        setById(id, new Event(id)).log.add(new VerboseWitness(id, start, end, 
witnessSequence));
         Event event = get(eventPosition, eventId);
-        recordWitness(event, witness, witnessSequence);
+        recordWitness(event, id, start, end, witnessSequence);
         recordVisibleBy(event, end);
         recordVisibleUntil(event, start);
 
@@ -154,7 +164,7 @@ class HistoryChecker
                     }
                     else if (e.result)
                     {
-                        throw fail(primaryKey, "%d witnessed as absent by %d", 
e.eventId, witness.id);
+                        throw fail(primaryKey, "%d witnessed as absent by %d", 
e.eventId, id);
                     }
                 }
             }
@@ -181,16 +191,16 @@ class HistoryChecker
         }
     }
 
-    void recordWitness(Event event, Observation witness, int[] witnessSequence)
+    void recordWitness(Event event, int id, int start, int end, int[] 
witnessSequence)
     {
-        recordWitness(event, witness, witnessSequence.length, witnessSequence);
+        recordWitness(event, id, start, end, witnessSequence.length, 
witnessSequence);
     }
 
-    void recordWitness(Event event, Observation witness, int eventPosition, 
int[] witnessSequence)
+    void recordWitness(Event event, int id, int start, int end, int 
eventPosition, int[] witnessSequence)
     {
         while (true)
         {
-            event.log.add(new Witness(READ, witness.id, witness.start, 
witness.end));
+            event.log.add(new Witness(READ, id, start, end));
             if (event.witnessSequence != null)
             {
                 if (!Arrays.equals(event.witnessSequence, witnessSequence))
@@ -238,7 +248,7 @@ class HistoryChecker
             event.visibleUntil = visibleUntil;
             Event next = next(event);
             if (next != null && visibleUntil >= next.visibleBy)
-                throw fail(primaryKey, "%s %d not witnessed >= %d, but also 
witnessed <= %d", next.witnessSequence, next.eventId, event.visibleUntil, 
next.visibleBy);
+                throw fail(primaryKey, "%s+%d not witnessed >= %d, but also 
witnessed <= %d", next.witnessSequence, next.eventId, event.visibleUntil, 
next.visibleBy);
         }
     }
 
@@ -295,7 +305,7 @@ class HistoryChecker
             return null;
 
         // initialise the event, if necessary importing information from byId
-        return get(eventPosition, eventPosition == 0 ? -1 : 
event.witnessSequence[eventPosition - 1]);
+        return get(eventPosition, eventId(event.witnessSequence, 
eventPosition));
     }
 
     Event next(Event event)
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java
similarity index 57%
copy from 
test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
copy to 
test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java
index 546fd3179f..282b16d3b1 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java
@@ -18,28 +18,35 @@
 
 package org.apache.cassandra.simulator.paxos;
 
-class Observation implements Comparable<Observation>
+import javax.annotation.Nullable;
+
+public interface HistoryValidator
 {
-    final int id;
-    final Object[][] result;
-    final int start;
-    final int end;
+    Checker witness(int start, int end);
+
+    void print(@Nullable Integer pk);
 
-    Observation(int id, Object[][] result, int start, int end)
+    interface Checker extends AutoCloseable
     {
-        this.id = id;
-        this.result = result;
-        this.start = start;
-        this.end = end;
+        void read(int pk, int id, int count, int[] seq);
+        void write(int pk, int id, boolean success);
+
+        default void writeSuccess(int pk, int id)
+        {
+            write(pk, id, true);
+        }
+
+        default void writeUnknownFailure(int pk, int id)
+        {
+            write(pk, id, false);
+        }
+
+        @Override
+        default void close() {}
     }
 
-    // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many 
pair-wise comparisons the answer is 0
-    public int compareTo(Observation that)
+    interface Factory
     {
-        if (this.end < that.start)
-            return -1;
-        if (that.end < this.start)
-            return 1;
-        return 0;
+        HistoryValidator create(int[] partitions);
     }
 }
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java
new file mode 100644
index 0000000000..67c95a7378
--- /dev/null
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+
+import com.carrotsearch.hppc.IntObjectHashMap;
+import com.carrotsearch.hppc.IntObjectMap;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+
+public class LinearizabilityValidator implements HistoryValidator
+{
+    private final IntObjectMap<HistoryChecker> historyCheckers;
+
+    public LinearizabilityValidator(int[] primaryKeys)
+    {
+        historyCheckers = new IntObjectHashMap<>(primaryKeys.length);
+        for (int primaryKey : primaryKeys)
+            historyCheckers.put(primaryKey, new HistoryChecker(primaryKey));
+    }
+
+    @Override
+    public Checker witness(int start, int end)
+    {
+        return new Checker()
+        {
+            @Override
+            public void read(int pk, int id, int count, int[] seq)
+            {
+                get(pk).witness(id, seq, start, end);
+            }
+
+            @Override
+            public void write(int pk, int id, boolean success)
+            {
+                get(pk).applied(id, start, end, success);
+            }
+        };
+    }
+
+    @Override
+    public void print(@Nullable Integer pk)
+    {
+        if (pk == null) 
historyCheckers.values().forEach((Consumer<ObjectCursor<HistoryChecker>>) c -> 
c.value.print());
+        else historyCheckers.get(pk).print();
+    }
+
+    private HistoryChecker get(int pk)
+    {
+        HistoryChecker checker = historyCheckers.get(pk);
+        if (checker == null)
+            throw new NullPointerException("Unable to find checker for pk=" + 
pk);
+        return checker;
+    }
+
+    public static class Factory implements HistoryValidator.Factory
+    {
+        public static final Factory instance = new Factory();
+
+        @Override
+        public HistoryValidator create(int[] partitions)
+        {
+            return new LinearizabilityValidator(partitions);
+        }
+    }
+}
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java
new file mode 100644
index 0000000000..b39c3111ea
--- /dev/null
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingHistoryValidator implements HistoryValidator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(LoggingHistoryValidator.class);
+    private final HistoryValidator delegate;
+
+    public LoggingHistoryValidator(HistoryValidator delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public Checker witness(int start, int end)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Witness(start=").append(start).append(", 
end=").append(end).append(")\n");
+        Checker sub = delegate.witness(start, end);
+        return new Checker()
+        {
+            @Override
+            public void read(int pk, int id, int count, int[] seq)
+            {
+                sb.append("\tread(pk=").append(pk).append(", 
id=").append(id).append(", count=").append(count).append(", 
seq=").append(Arrays.toString(seq)).append(")\n");
+                sub.read(pk, id, count, seq);
+            }
+
+            @Override
+            public void write(int pk, int id, boolean success)
+            {
+                sb.append("\twrite(pk=").append(pk).append(", 
id=").append(id).append(", success=").append(success).append(")\n");
+                sub.write(pk, id, success);
+            }
+
+            @Override
+            public void close()
+            {
+                logger.info(sb.toString());
+                sub.close();
+            }
+        };
+    }
+
+    @Override
+    public void print(@Nullable Integer pk)
+    {
+        delegate.print(pk);
+    }
+}
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
index 546fd3179f..41eb2c348d 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
@@ -18,14 +18,16 @@
 
 package org.apache.cassandra.simulator.paxos;
 
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+
 class Observation implements Comparable<Observation>
 {
     final int id;
-    final Object[][] result;
+    final SimpleQueryResult result;
     final int start;
     final int end;
 
-    Observation(int id, Object[][] result, int start, int end)
+    Observation(int id, SimpleQueryResult result, int start, int end)
     {
         this.id = id;
         this.result = result;
@@ -33,6 +35,16 @@ class Observation implements Comparable<Observation>
         this.end = end;
     }
 
+    boolean isSuccess()
+    {
+        return result != null;
+    }
+
+    boolean isUnknownFailure()
+    {
+        return result == null;
+    }
+
     // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many 
pair-wise comparisons the answer is 0
     public int compareTo(Observation that)
     {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
index a9bbf7ca57..7724690bc5 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
@@ -22,153 +22,51 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.commons.lang3.ArrayUtils;
-import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.coordinate.Preempted;
-import accord.coordinate.Timeout;
-import accord.primitives.Txn;
+import com.carrotsearch.hppc.IntArrayList;
+import com.carrotsearch.hppc.IntHashSet;
+import com.carrotsearch.hppc.cursors.IntCursor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.ComplexColumnData;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.QueryResults;
-import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.impl.Query;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.accord.AccordService;
-import org.apache.cassandra.service.accord.AccordTestUtils;
-import org.apache.cassandra.service.accord.txn.TxnData;
-import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.simulator.Action;
 import org.apache.cassandra.simulator.Debug;
 import org.apache.cassandra.simulator.RunnableActionScheduler;
 import org.apache.cassandra.simulator.cluster.ClusterActions;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
 import org.apache.cassandra.simulator.utils.IntRange;
 
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY;
 import static org.apache.cassandra.simulator.paxos.HistoryChecker.fail;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.apache.cassandra.utils.AssertionUtils.hasCause;
+import static org.apache.cassandra.utils.AssertionUtils.isThrowableInstanceof;
 
 // TODO: the class hierarchy is a bit broken, but hard to untangle. Need to go 
Paxos->Consensus, probably.
 @SuppressWarnings("unused")
 public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxosSimulation
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PairOfSequencesAccordSimulation.class);
-    private static final String SELECT = "SELECT pk, count, seq FROM  " + 
KEYSPACE + ".tbl WHERE pk = ?";
-
-    class VerifyingOperation extends Operation
-    {
-        final HistoryChecker historyChecker;
-        public VerifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker 
historyChecker)
-        {
-            super(primaryKey, id, instance, "SELECT", read(primaryKey));
-            this.historyChecker = historyChecker;
-        }
-
-        void verify(Observation outcome)
-        {
-            (outcome.result != null ? successfulReads : 
failedReads).incrementAndGet();
-
-            if (outcome.result == null)
-                return;
-
-            if (outcome.result.length != 1)
-                throw fail(primaryKey, "#result (%s) != 1", 
Arrays.toString(outcome.result));
-
-            Object[] row = outcome.result[0];
-            // first verify internally consistent
-            int count = row[1] == null ? 0 : (Integer) row[1];
-            int[] seq = Arrays.stream((row[2] == null ? "" : (String) 
row[2]).split(","))
-                               .filter(s -> !s.isEmpty())
-                               .mapToInt(Integer::parseInt)
-                               .toArray();
-
-            if (seq.length != count)
-                throw fail(primaryKey, "%d != #%s", count, seq);
-
-            historyChecker.witness(outcome, seq, outcome.start, outcome.end);
-        }
-    }
-
-    private static IIsolatedExecutor.SerializableCallable<Object[][]> read(int 
primaryKey)
-    {
-        return () -> {
-            String cql = "BEGIN TRANSACTION\n" + SELECT + ";\n" + "COMMIT 
TRANSACTION";
-            List<ByteBuffer> values = ImmutableList.of(bytes(primaryKey));
-            Txn txn = AccordTestUtils.createTxn(cql, 
QueryOptions.forInternalCalls(values));
-            // TODO (now): support complex columns
-            return execute(txn, "pk", "count", "seq");
-        };
-    }
-
-    private static IIsolatedExecutor.SerializableCallable<Object[][]> 
write(int id, int primaryKey)
-    {
-        return () -> {
-            String cql = "BEGIN TRANSACTION\n" + 
-                         "    " + SELECT + ";\n" +
-                         "    UPDATE " + KEYSPACE + ".tbl SET seq += '" + id + 
",' WHERE pk = ?;\n" +
-                         "    UPDATE " + KEYSPACE + ".tbl SET count += 1 WHERE 
pk = ?;\n" +
-                         "COMMIT TRANSACTION";
-            List<ByteBuffer> values = ImmutableList.of(bytes(primaryKey), 
bytes(primaryKey), bytes(primaryKey));
-            Txn txn = AccordTestUtils.createTxn(cql, 
QueryOptions.forInternalCalls(values));
-            return execute(txn, "pk", "count", "seq");
-        };
-    }
-
-    private static Object[][] execute(Txn txn, String ... columns)
-    {
-        try
-        {
-            TxnData result = (TxnData) 
AccordService.instance().node.coordinate(txn).get();
-            Assert.assertNotNull(result);
-            QueryResults.Builder builder = QueryResults.builder();
-            boolean addedHeader = false;
-
-            FilteredPartition partition = result.get(TxnDataName.returning());
-            TableMetadata metadata = partition.metadata();
-            builder.columns(columns);
-
-            ByteBuffer[] keyComponents = 
SelectStatement.getComponents(metadata, partition.partitionKey());
-
-            Row s = partition.staticRow();
-            if (!s.isEmpty())
-                append(metadata, keyComponents, s, builder, columns);
-
-            for (Row row : partition)
-                append(metadata, keyComponents, row, builder, columns);
-
-            return builder.build().toObjectArrays();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        catch (ExecutionException e)
-        {
-            if (e.getCause() instanceof Preempted)
-                return null;
-            if (e.getCause() instanceof Timeout)
-                return null;
-            if (e.getCause() instanceof RequestTimeoutException)
-                return null;
-            throw new AssertionError(e);
-        }
-    }
+    private static final String SELECT = "SELECT pk, count, seq FROM  " + 
KEYSPACE + ".tbl WHERE pk IN (%s);";
+    private static final String UPDATE = "UPDATE " + KEYSPACE + ".tbl SET 
count += 1, seq = seq + ? WHERE pk = ?;";
 
     private static void append(TableMetadata metadata, ByteBuffer[] 
keyComponents, Row row, QueryResults.Builder builder, String[] columnNames)
     {
@@ -218,38 +116,14 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
         builder.row(buffer);
     }
 
-    class NonVerifyingOperation extends Operation
+    @Override
+    void log(@Nullable Integer pk)
     {
-        public NonVerifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker 
historyChecker)
-        {
-            super(primaryKey, id, instance, "SELECT", read(primaryKey));
-        }
-
-        void verify(Observation outcome)
-        {
-        }
+        validator.print(pk);
     }
 
-    public class ModifyingOperation extends Operation
-    {
-        final HistoryChecker historyChecker;
-        public ModifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, int 
primaryKey, HistoryChecker historyChecker)
-        {
-            super(primaryKey, id, instance, "UPDATE", write(id, primaryKey));
-            this.historyChecker = historyChecker;
-        }
-
-        void verify(Observation outcome)
-        {
-            (outcome.result != null ? successfulWrites : 
failedWrites).incrementAndGet();
-            if (outcome.result != null)
-            {
-                if (outcome.result.length != 1)
-                    throw fail(primaryKey, "Accord Result: 1 != #%s", 
ArrayUtils.toString(outcome.result));
-            }
-            historyChecker.applied(outcome.id, outcome.start, outcome.end, 
outcome.result != null);
-        }
-    }
+    private final float writeRatio;
+    private final HistoryValidator validator;
 
     public PairOfSequencesAccordSimulation(SimulatedSystems simulated,
                                            Cluster cluster,
@@ -266,6 +140,8 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
               scheduler, debug,
               seed, primaryKeys,
               runForNanos, jitter);
+        this.writeRatio = 1F - readRatio;
+        validator = new LoggingHistoryValidator(new 
StrictSerializabilityValidator(primaryKeys));
     }
 
     @Override
@@ -281,21 +157,145 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
     }
 
     @Override
-    Operation verifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    boolean allowMultiplePartitions() { return true; }
+
+    @Override
+    BiFunction<SimulatedSystems, int[], Supplier<Action>> actionFactory()
     {
-        return new VerifyingOperation(operationId, instance, 
serialConsistency, primaryKey, historyChecker);
+        AtomicInteger id = new AtomicInteger(0);
+
+        return (simulated, primaryKeyIndex) -> {
+            int[] primaryKeys = IntStream.of(primaryKeyIndex).map(i -> 
this.primaryKeys[i]).toArray();
+            return () -> accordAction(id.getAndIncrement(), simulated, 
primaryKeys);
+        };
     }
 
-    @Override
-    Operation nonVerifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    public class ReadWriteOperation extends Operation
     {
-        return new NonVerifyingOperation(operationId, instance, 
serialConsistency, primaryKey, historyChecker);
+        private final IntHashSet reads, writes;
+
+        public ReadWriteOperation(int id, int[] primaryKeys, IntHashSet reads, 
IntHashSet writes, IInvokableInstance instance)
+        {
+            super(primaryKeys, id, instance, "Accord", createQuery(id, reads, 
writes));
+            this.reads = reads;
+            this.writes = writes;
+        }
+
+        @Override
+        void verify(Observation outcome)
+        {
+            SimpleQueryResult result = outcome.result;
+            (result != null ? successfulWrites : 
failedWrites).incrementAndGet();
+            if (result != null)
+            {
+                IntHashSet seen = new IntHashSet();
+                //TODO if there isn't a value then we get empty read, which 
then doesn't make it into the QueryResult
+                // given the fact that we always run with the partitions 
defined this should be fine
+                try (HistoryValidator.Checker checker = 
validator.witness(outcome.start, outcome.end))
+                {
+                    while (result.hasNext())
+                    {
+                        org.apache.cassandra.distributed.api.Row row = 
result.next();
+
+                        int pk = row.getInteger("pk");
+                        int count = row.getInteger("count", 0);
+                        int[] seq = Arrays.stream(row.getString("seq", 
"").split(","))
+                                          .filter(s -> !s.isEmpty())
+                                          .mapToInt(Integer::parseInt)
+                                          .toArray();
+
+                        if (!seen.add(pk))
+                            throw new IllegalStateException("Duplicate 
partition key " + pk);
+                        // every partition was read, but not all were written 
to... need to verify each partition
+                        if (seq.length != count)
+                            throw fail(pk, "%d != #%s", count, seq);
+
+                        checker.read(pk, outcome.id, count, seq);
+                    }
+                    if (!seen.equals(reads))
+                        throw fail(0, "#result had %s partitions, but should 
have had %s", seen, reads);
+                    // handle writes
+                    for (IntCursor c : writes)
+                        checker.write(c.value, outcome.id, 
outcome.isSuccess());
+                }
+            }
+        }
     }
 
-    @Override
-    Operation modifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    private Action accordAction(int id, SimulatedSystems simulated, int[] 
partitions)
     {
-        return new ModifyingOperation(operationId, instance, ANY, 
serialConsistency, primaryKey, historyChecker);
+        IntArrayList reads = new IntArrayList();
+        IntArrayList writes = new IntArrayList();
+        for (int partition : partitions)
+        {
+            boolean added = false;
+            if (simulated.random.decide(readRatio))
+            {
+                reads.add(partition);
+                added = true;
+            }
+            if (simulated.random.decide(writeRatio))
+            {
+                writes.add(partition);
+                added = true;
+            }
+            if (!added)
+            {
+                // when read ratio fails that implies write
+                // when write ratio fails that implies read
+                // so make that case a read/write
+                // Its possible that both cases were true leading to a 
read/write; which is fine
+                // this just makes sure every partition is consumed.
+                reads.add(partition);
+                writes.add(partition);
+            }
+        }
+
+        int node = simulated.random.uniform(1, cluster.size() + 1);
+        IInvokableInstance instance = cluster.get(node);
+        return new ReadWriteOperation(id, partitions, new IntHashSet(reads), 
new IntHashSet(writes), instance);
+    }
+
+    private int[] genReadOnly(SimulatedSystems simulated, int[] partitions)
+    {
+        IntArrayList readOnly = new IntArrayList();
+        for (int partition : partitions)
+        {
+            if (simulated.random.decide(readRatio))
+                readOnly.add(partition);
+        }
+        return readOnly.toArray();
+    }
+
+    private static Query createQuery(int id, IntHashSet reads, IntHashSet 
writes)
+    {
+        if (reads.isEmpty() && writes.isEmpty())
+            throw new IllegalArgumentException("Partitions are empty");
+        List<Object> binds = new ArrayList<>();
+        StringBuilder sb = new StringBuilder();
+        sb.append("BEGIN TRANSACTION\n");
+        if (!reads.isEmpty())
+        {
+
+            sb.append("\t")
+              .append(String.format(SELECT, String.join(", ", 
IntStream.of(reads.toArray())
+                                                                       
.mapToObj(i -> {
+                                                                           
binds.add(i);
+                                                                           
return "?";
+                                                                       })
+                                                                       
.collect(Collectors.joining(", ")))))
+              .append('\n');
+        }
+
+        for (IntCursor c : writes)
+        {
+            sb.append('\t').append(UPDATE).append("\n");
+            binds.add(id + ",");
+            binds.add(c.value);
+        }
+
+        sb.append("COMMIT TRANSACTION");
+        return new Query(sb.toString(), 0, ConsistencyLevel.ANY, 
ConsistencyLevel.ANY, binds.toArray(new Object[0]));
     }
 
     @Override
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java
index c1ccb6648e..b07b4a86cd 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java
@@ -18,9 +18,13 @@
 
 package org.apache.cassandra.simulator.paxos;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.BiFunction;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.slf4j.Logger;
@@ -29,9 +33,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.impl.Query;
+import org.apache.cassandra.simulator.Action;
+import org.apache.cassandra.simulator.ActionListener;
+import org.apache.cassandra.simulator.Debug;
 import org.apache.cassandra.simulator.RunnableActionScheduler;
 import org.apache.cassandra.simulator.cluster.ClusterActions;
-import org.apache.cassandra.simulator.Debug;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
 import org.apache.cassandra.simulator.utils.IntRange;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -40,6 +49,7 @@ import static java.lang.Boolean.TRUE;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY;
+import static org.apache.cassandra.simulator.Debug.EventType.PARTITION;
 import static org.apache.cassandra.simulator.paxos.HistoryChecker.fail;
 
 @SuppressWarnings("unused")
@@ -49,7 +59,7 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
     private static final String UPDATE = "UPDATE " + KEYSPACE + ".tbl SET 
count = count + 1, seq1 = seq1 + ?, seq2 = seq2 + ? WHERE pk = ? IF EXISTS";
     private static final String SELECT = "SELECT pk, count, seq1, seq2 FROM  " 
+ KEYSPACE + ".tbl WHERE pk = ?";
 
-    class VerifyingOperation extends Operation
+    class VerifyingOperation extends PaxosOperation
     {
         final HistoryChecker historyChecker;
         public VerifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker 
historyChecker)
@@ -60,23 +70,26 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
 
         void verify(Observation outcome)
         {
-            (outcome.result != null ? successfulReads : 
failedReads).incrementAndGet();
+            SimpleQueryResult result = outcome.result;
+            (result != null ? successfulReads : failedReads).incrementAndGet();
 
-            if (outcome.result == null)
+            if (result == null)
                 return;
 
-            if (outcome.result.length != 1)
-                throw fail(primaryKey, "#result (%s) != 1", 
Arrays.toString(outcome.result));
+            if (!result.hasNext())
+                throw fail(primaryKey, "#result: ([]) != 1");
+
+            // pk, count, seq1, seq2
+            Row row = result.next();
 
-            Object[] row = outcome.result[0];
             // first verify internally consistent
-            int count = row[1] == null ? 0 : (Integer) row[1];
-            int[] seq1 = Arrays.stream((row[2] == null ? "" : (String) 
row[2]).split(","))
+            int count = row.getInteger("count", 0);
+            int[] seq1 = Arrays.stream(row.getString("seq1", "").split(","))
                                .filter(s -> !s.isEmpty())
                                .mapToInt(Integer::parseInt)
                                .toArray();
-            int[] seq2 = ((List<Integer>) (row[3] == null ? emptyList() : 
row[3]))
-                         .stream().mapToInt(x -> x).toArray();
+
+            int[] seq2 = row.<Integer>getList("seq2", 
emptyList()).stream().mapToInt(x -> x).toArray();
 
             if (!Arrays.equals(seq1, seq2))
                 throw fail(primaryKey, "%s != %s", seq1, seq2);
@@ -84,11 +97,24 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
             if (seq1.length != count)
                 throw fail(primaryKey, "%d != #%s", count, seq1);
 
-            historyChecker.witness(outcome, seq1, outcome.start, outcome.end);
+            if (result.hasNext())
+                throw fail(primaryKey, "#result (%s) != 1", 
ArrayUtils.toString(result.toObjectArrays()));
+
+            historyChecker.witness(outcome, seq1);
+        }
+    }
+
+    private abstract class PaxosOperation extends Operation
+    {
+        final int primaryKey;
+        PaxosOperation(int primaryKey, int id, IInvokableInstance instance, 
String idString, String query, ConsistencyLevel commitConsistency, 
ConsistencyLevel serialConsistency, Object... params)
+        {
+            super(new int[] {primaryKey}, id, instance, idString, new 
Query(query, -1, commitConsistency, serialConsistency, params));
+            this.primaryKey = primaryKey;
         }
     }
 
-    class NonVerifyingOperation extends Operation
+    class NonVerifyingOperation extends PaxosOperation
     {
         public NonVerifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker 
historyChecker)
         {
@@ -100,7 +126,7 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
         }
     }
 
-    public class ModifyingOperation extends Operation
+    public class ModifyingOperation extends PaxosOperation
     {
         final HistoryChecker historyChecker;
         public ModifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, int 
primaryKey, HistoryChecker historyChecker)
@@ -111,18 +137,23 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
 
         void verify(Observation outcome)
         {
-            (outcome.result != null ? successfulWrites : 
failedWrites).incrementAndGet();
-            if (outcome.result != null)
+            SimpleQueryResult result = outcome.result;
+            (result != null ? successfulWrites : 
failedWrites).incrementAndGet();
+            if (result != null)
             {
-                if (outcome.result.length != 1)
-                    throw fail(primaryKey, "Paxos Result: 1 != #%s", 
ArrayUtils.toString(outcome.result));
-                if (outcome.result[0][0] != TRUE)
+                if (!result.hasNext())
+                    throw fail(primaryKey, "Paxos Result: 1 != #[]");
+                if (result.next().getBoolean(0) != TRUE)
                     throw fail(primaryKey, "Result != TRUE");
+                if (result.hasNext())
+                    throw fail(primaryKey, "Paxos Result: 1 != #%s", 
ArrayUtils.toString(result.toObjectArrays()));
             }
-            historyChecker.applied(outcome.id, outcome.start, outcome.end, 
outcome.result != null);
+            historyChecker.applied(outcome.id, outcome.start, outcome.end, 
outcome.isSuccess());
         }
     }
 
+    final List<HistoryChecker> historyCheckers = new ArrayList<>();
+
     public PairOfSequencesPaxosSimulation(SimulatedSystems simulated,
                                           Cluster cluster,
                                           ClusterActions.Options 
clusterOptions,
@@ -140,6 +171,84 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
               runForNanos, jitter);
     }
 
+    @Override
+    BiFunction<SimulatedSystems, int[], Supplier<Action>> actionFactory()
+    {
+        final int nodes = cluster.size();
+        for (int primaryKey : primaryKeys)
+            historyCheckers.add(new HistoryChecker(primaryKey));
+
+        List<Supplier<Action>> primaryKeyActions = new ArrayList<>();
+        for (int pki = 0 ; pki < primaryKeys.length ; ++pki)
+        {
+            int primaryKey = primaryKeys[pki];
+            HistoryChecker historyChecker = historyCheckers.get(pki);
+            Supplier<Action> supplier = new Supplier<Action>()
+            {
+                int i = 0;
+
+                @Override
+                public Action get()
+                {
+                    int node = simulated.random.uniform(1, nodes + 1);
+                    IInvokableInstance instance = cluster.get(node);
+                    switch (serialConsistency)
+                    {
+                        default: throw new AssertionError();
+                        case LOCAL_SERIAL:
+                            if (simulated.snitch.dcOf(node) > 0)
+                            {
+                                // perform some queries against these nodes 
but don't expect them to be linearizable
+                                return nonVerifying(i++, instance, primaryKey, 
historyChecker);
+                            }
+                        case SERIAL:
+                            return simulated.random.decide(readRatio)
+                                   ? verifying(i++, instance, primaryKey, 
historyChecker)
+                                   : modifying(i++, instance, primaryKey, 
historyChecker);
+                    }
+                }
+
+                @Override
+                public String toString()
+                {
+                    return Integer.toString(primaryKey);
+                }
+            };
+
+            final ActionListener listener = debug.debug(PARTITION, 
simulated.time, cluster, KEYSPACE, primaryKey);
+            if (listener != null)
+            {
+                Supplier<Action> wrap = supplier;
+                supplier = new Supplier<Action>()
+                {
+                    @Override
+                    public Action get()
+                    {
+                        Action action = wrap.get();
+                        action.register(listener);
+                        return action;
+                    }
+
+                    @Override
+                    public String toString()
+                    {
+                        return wrap.toString();
+                    }
+                };
+            }
+
+            primaryKeyActions.add(supplier);
+        }
+        return (ignore, primaryKeyIndex) -> 
primaryKeyActions.get(only(primaryKeyIndex));
+    }
+
+    private static int only(int[] array)
+    {
+        if (array.length != 1)
+            throw new AssertionError("Require only 1 element but found array " 
+ Arrays.toString(array));
+        return array[0];
+    }
+
     @Override
     protected String createTableStmt()
     {
@@ -152,24 +261,28 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
         return "INSERT INTO " + KEYSPACE + ".tbl (pk, count, seq1, seq2) 
VALUES (?, 0, '', []) USING TIMESTAMP 0";
     }
 
-    @Override
-    Operation verifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    private Operation verifying(int operationId, IInvokableInstance instance, 
int primaryKey, HistoryChecker historyChecker)
     {
         return new VerifyingOperation(operationId, instance, 
serialConsistency, primaryKey, historyChecker);
     }
 
-    @Override
-    Operation nonVerifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    private Operation nonVerifying(int operationId, IInvokableInstance 
instance, int primaryKey, HistoryChecker historyChecker)
     {
         return new NonVerifyingOperation(operationId, instance, 
serialConsistency, primaryKey, historyChecker);
     }
 
-    @Override
-    Operation modifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    private Operation modifying(int operationId, IInvokableInstance instance, 
int primaryKey, HistoryChecker historyChecker)
     {
         return new ModifyingOperation(operationId, instance, ANY, 
serialConsistency, primaryKey, historyChecker);
     }
 
+    @Override
+    void log(@Nullable Integer primaryKey)
+    {
+        if (primaryKey == null) historyCheckers.forEach(HistoryChecker::print);
+        else historyCheckers.stream().filter(h -> h.primaryKey == 
primaryKey).forEach(HistoryChecker::print);
+    }
+
     @Override
     boolean joinAll()
     {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
index 227d8348fa..58d977bbd1 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.simulator.paxos;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -26,11 +27,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.LongSupplier;
-
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
 import com.google.common.base.Throwables;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,10 +38,9 @@ import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
-import org.apache.cassandra.distributed.impl.Query;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.paxos.BallotGenerator;
 import org.apache.cassandra.simulator.ActionList;
@@ -53,38 +52,44 @@ import 
org.apache.cassandra.simulator.cluster.ClusterActionListener;
 import org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods;
 import org.apache.cassandra.simulator.systems.SimulatedActionCallable;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
+import org.apache.cassandra.utils.AssertionUtils;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.concurrent.Threads;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-import static org.apache.cassandra.simulator.Action.Modifiers.NONE;
 import static org.apache.cassandra.simulator.Action.Modifiers.DISPLAY_ORIGIN;
+import static org.apache.cassandra.simulator.Action.Modifiers.NONE;
 import static org.apache.cassandra.simulator.SimulatorUtils.failWithOOM;
 import static org.apache.cassandra.simulator.paxos.HistoryChecker.causedBy;
+import static org.apache.cassandra.utils.AssertionUtils.anyOf;
+import static org.apache.cassandra.utils.AssertionUtils.hasCause;
+import static org.apache.cassandra.utils.AssertionUtils.isThrowableInstanceof;
 
 public abstract class PaxosSimulation implements Simulation, 
ClusterActionListener
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PaxosSimulation.class);
 
-    abstract class Operation extends SimulatedActionCallable<Object[][]> 
implements BiConsumer<Object[][], Throwable>
+    private static String createDescription(int[] primaryKeys, int id, String 
idString)
     {
-        final int primaryKey;
+        return primaryKeys.length == 1 ? Integer.toString(primaryKeys[0]) : 
Arrays.toString(primaryKeys) + "/" + id + ": " + idString;
+    }
+
+    protected Class<? extends Throwable>[] expectedExceptions()
+    {
+        return (Class<? extends Throwable>[]) new Class<?>[] { 
RequestExecutionException.class };
+    }
+
+    abstract class Operation extends 
SimulatedActionCallable<SimpleQueryResult> implements 
BiConsumer<SimpleQueryResult, Throwable>
+    {
+        final int[] primaryKeys;
         final int id;
         int start;
 
-        public Operation(int primaryKey, int id, IInvokableInstance instance,
-                         String idString, String query, ConsistencyLevel 
commitConsistency, ConsistencyLevel serialConsistency, Object... params)
-        {
-            super(primaryKey + "/" + id + ": " + idString, DISPLAY_ORIGIN, 
NONE, PaxosSimulation.this.simulated, instance, new Query(query, -1, 
commitConsistency, serialConsistency, params));
-            this.primaryKey = primaryKey;
-            this.id = id;
-        }
-
-        public Operation(int primaryKey, int id, IInvokableInstance instance,
-                         String idString, 
IIsolatedExecutor.SerializableCallable<Object[][]> query)
+        public Operation(int[] primaryKeys, int id, IInvokableInstance 
instance,
+                         String idString, 
IIsolatedExecutor.SerializableCallable<SimpleQueryResult> query)
         {
-            super(primaryKey + "/" + id + ": " + idString, DISPLAY_ORIGIN, 
NONE, PaxosSimulation.this.simulated, instance, query);
-            this.primaryKey = primaryKey;
+            super(createDescription(primaryKeys, id, idString), 
DISPLAY_ORIGIN, NONE, PaxosSimulation.this.simulated, instance, query);
+            this.primaryKeys = primaryKeys;
             this.id = id;
         }
 
@@ -95,9 +100,9 @@ public abstract class PaxosSimulation implements Simulation, 
ClusterActionListen
         }
 
         @Override
-        public void accept(Object[][] success, Throwable failure)
+        public void accept(SimpleQueryResult success, Throwable failure)
         {
-            if (failure != null && !(failure instanceof 
RequestExecutionException))
+            if (failure != null && !expectedException(failure))
             {
                 if (!simulated.failures.hasFailure() || !(failure instanceof 
UncheckedInterruptedException))
                     logger.error("Unexpected exception", failure);
@@ -108,10 +113,14 @@ public abstract class PaxosSimulation implements 
Simulation, ClusterActionListen
             {
                 logger.trace("{}", failure.getMessage());
             }
-
             verify(new Observation(id, success, start, 
logicalClock.incrementAndGet()));
         }
 
+        protected boolean expectedException(Throwable failure)
+        {
+            // due to class loaders can't use instanceOf directly
+            return 
hasCause(anyOf(Stream.of(expectedExceptions()).map(AssertionUtils::isThrowableInstanceof))).matches(failure);
+        }
         abstract void verify(Observation outcome);
     }
 
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java
new file mode 100644
index 0000000000..8469423bb9
--- /dev/null
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import javax.annotation.Nullable;
+
+import accord.verify.StrictSerializabilityVerifier;
+import com.carrotsearch.hppc.IntIntHashMap;
+import com.carrotsearch.hppc.IntIntMap;
+import com.carrotsearch.hppc.cursors.IntIntCursor;
+
+public class StrictSerializabilityValidator implements HistoryValidator
+{
+    private final StrictSerializabilityVerifier verifier;
+    private final IntIntMap pkToIndex;
+    private final int[] indexToPk;
+
+    public StrictSerializabilityValidator(int[] primaryKeys)
+    {
+        this.verifier = new StrictSerializabilityVerifier(primaryKeys.length);
+        pkToIndex = new IntIntHashMap(primaryKeys.length);
+        indexToPk = new int[primaryKeys.length];
+        for (int i = 0; i < primaryKeys.length; i++)
+        {
+            pkToIndex.put(primaryKeys[i], i);
+            indexToPk[i] = primaryKeys[i];
+        }
+    }
+
+    @Override
+    public Checker witness(int start, int end)
+    {
+        verifier.begin();
+        return new Checker()
+        {
+            @Override
+            public void read(int pk, int id, int count, int[] seq)
+            {
+                verifier.witnessRead(get(pk), seq);
+            }
+
+            @Override
+            public void write(int pk, int id, boolean success)
+            {
+                verifier.witnessWrite(get(pk), id);
+            }
+
+            @Override
+            public void close()
+            {
+                convertHistoryViolation(() -> verifier.apply(start, end));
+            }
+        };
+    }
+
+    @Override
+    public void print(@Nullable Integer pk)
+    {
+        if (pk == null) verifier.print();
+        else verifier.print(get(pk));
+    }
+
+    private int get(int pk)
+    {
+        if (pkToIndex.containsKey(pk))
+            return pkToIndex.get(pk);
+        throw new IllegalArgumentException("Unknown pk=" + pk);
+    }
+
+    private void convertHistoryViolation(Runnable fn)
+    {
+        try
+        {
+            fn.run();
+        }
+        catch (accord.verify.HistoryViolation e)
+        {
+            if (!(e.primaryKey() >= 0 && e.primaryKey() < indexToPk.length))  
throw new IllegalArgumentException("Unable to find primary key by index " + 
e.primaryKey());
+            int pk = indexToPk[e.primaryKey()];
+            HistoryViolation v = new HistoryViolation(pk, e.getMessage());
+            v.setStackTrace(e.getStackTrace());
+            throw v;
+        }
+    }
+
+    public static class Factory implements HistoryValidator.Factory
+    {
+        public static final Factory instance = new Factory();
+
+        @Override
+        public HistoryValidator create(int[] partitions)
+        {
+            return new StrictSerializabilityValidator(partitions);
+        }
+    }
+}
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java
index d190fd7a15..106cd8c027 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java
@@ -20,9 +20,10 @@ package org.apache.cassandra.simulator.systems;
 
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.impl.Query;
 
-public class SimulatedQuery extends SimulatedActionCallable<Object[][]>
+public class SimulatedQuery extends SimulatedActionCallable<SimpleQueryResult>
 {
     public SimulatedQuery(Object description, SimulatedSystems simulated, 
IInvokableInstance instance, String query, ConsistencyLevel commitConsistency, 
ConsistencyLevel serialConsistency, Object... params)
     {
@@ -45,7 +46,7 @@ public class SimulatedQuery extends 
SimulatedActionCallable<Object[][]>
     }
 
     @Override
-    public void accept(Object[][] success, Throwable failure)
+    public void accept(SimpleQueryResult success, Throwable failure)
     {
         if (failure != null)
             simulated.failures.accept(failure);
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
new file mode 100644
index 0000000000..6c773fcca8
--- /dev/null
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.junit.Assume;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.hppc.IntHashSet;
+import com.carrotsearch.hppc.IntIntHashMap;
+import com.carrotsearch.hppc.IntIntMap;
+import com.carrotsearch.hppc.IntSet;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.utils.Clock;
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.Assertions;
+
+import static org.apache.commons.lang3.ArrayUtils.add;
+import static org.apache.commons.lang3.ArrayUtils.swap;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Notes:
+ * * anomalyDirtyRead was left out as Accord doesn't reject requests, so 
without a way to reject or abort
+ *   requests then client doesn't have any way to abserve a REJECT, so all 
issues are UNKNOWN.
+ *
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING) // since Random is used, make 
sure tests run in a determanistic order
+public class HistoryValidatorTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(HistoryValidatorTest.class);
+    private static final Random RANDOM = random();
+    private static final int[] PARTITIONS = IntStream.range(0, 10).toArray();
+    private static final int x = 1;
+    private static final int y = 2;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> data()
+    {
+        List<Object[]> tests = new ArrayList<>();
+        tests.add(test(LinearizabilityValidator.Factory.instance));
+        tests.add(test(StrictSerializabilityValidator.Factory.instance));
+        return tests;
+    }
+
+    private static Object[] test(HistoryValidator.Factory factory)
+    {
+        return new Object[]{ factory };
+    }
+
+    private final HistoryValidator.Factory factory;
+
+    public HistoryValidatorTest(HistoryValidator.Factory factory)
+    {
+        this.factory = factory;
+    }
+
+    @Test
+    public void orderingWithWriteTimeout()
+    {
+        IntSet timeoutEvents = set(4, 17, 83);
+        for (boolean reject : Arrays.asList(true, false))
+        {
+            HistoryValidator validator = create();
+
+            int logicalClock = 1;
+            int[] seq = seq();
+            for (int eventId = 0; eventId < 100; eventId++)
+            {
+                if (timeoutEvents.contains(eventId))
+                {
+                    if (!reject)
+                        seq = add(seq, eventId); // wastn't observed, but was 
applied
+                    continue;
+                }
+                single(validator, ob(eventId, ++logicalClock, ++logicalClock), 
1, seq, true);
+                seq = add(seq, eventId); //TODO forgot to add this and 
LinearizabilityValidator was success... should reject!
+            }
+        }
+    }
+
+    /**
+     * This test differs from {@link #orderingWithWriteTimeout} as it defines 
event orders assuming
+     * requests were concurrent, so may happen in different orderings.
+     * <p>
+     * This means that we may see the results out of order, but the 
sequence/count ordering will remain
+     */
+    @Test
+    public void orderingWithWriteTimeoutWithConcurrency()
+    {
+        IntSet timeoutEvents = set(4, 17, 83);
+        for (boolean reject : Arrays.asList(true, false))
+        {
+            HistoryValidator validator = create();
+            // Since the requests are "concurrent" the window in which the 
operations happened are between start=1 and
+            // end=responseOrder.
+            int start = 1;
+            int logicalClock = start;
+
+            // 'ordering' is the order in which the txns are applied
+            // 'indexOrder' is the order in which the events are seen; since 
requests are "concurrent" the order we
+            //   validate may differ from the ordering they were applied.
+            int[] ordering = IntStream.range(0, 100).toArray();
+            if (reject)
+                ordering = IntStream.of(ordering).filter(i -> 
!timeoutEvents.contains(i)).toArray();
+            shuffle(ordering);
+            int[] indexOrder = IntStream.range(0, ordering.length - 
1).toArray();
+            shuffle(indexOrder);
+            for (int i = 0; i < indexOrder.length; i++)
+            {
+                int idx = indexOrder[i];
+                int eventId = ordering[idx];
+                if (timeoutEvents.contains(eventId))
+                    continue;
+                int[] seq = Arrays.copyOf(ordering, idx);
+                single(validator, ob(eventId, start, ++logicalClock), 1, seq, 
true);
+            }
+        }
+    }
+
+    @Test
+    public void anomalyNonMonotonicRead()
+    {
+        // Session1: w[x=10] -> Session2: r[x=10] -> r[x=0]
+        test(dsl -> {
+            dsl.txn(writeOnly(x));
+            dsl.txn(readOnly(x, seq(0)));
+            dsl.failingTxn(readOnly(x, 
seq())).isInstanceOf(HistoryViolation.class);
+        });
+    }
+
+    @Test
+    public void anomalyNonMonotonicWrite()
+    {
+        requiresMultiKeySupport();
+        // Session1: w[x=10] -> w[y=10] -> Session2: r[y=10] -> r[x=0]
+        test(dsl -> {
+            dsl.txn(writeOnly(x));
+            dsl.txn(writeOnly(y));
+            dsl.txn(readOnly(y, seq(1)));
+            dsl.failingTxn(readOnly(x, 
seq())).isInstanceOf(HistoryViolation.class);
+        });
+    }
+
+    @Test
+    public void anomalyNonMonotonicTransaction()
+    {
+        // Session1: r[x=5] -> w[y=10] -> Session2: r[y=10] -> r[x=0]
+        requiresMultiKeySupport();
+        test(dsl -> {
+            dsl.txn(writeOnly(x), writeOnly(y));
+
+            dsl.txn(readOnly(x, seq(0)));
+            dsl.txn(writeOnly(y));
+            dsl.txn(readOnly(y, seq(0, 2)));
+
+            dsl.failingTxn(readOnly(x, 
seq())).isInstanceOf(HistoryViolation.class);
+        });
+    }
+
+    @Test
+    public void anomalyReadYourOwnWrites()
+    {
+        // This test is kinda a duplicate; here just for completness
+        // w[x=12] -> r[x=8]
+        test(dsl -> {
+            dsl.txn(writeOnly(x));
+            dsl.failingTxn(readOnly(x, 
seq())).isInstanceOf(HistoryViolation.class);
+        });
+    }
+
+    //TODO write skew
+    @Test
+    public void anomalyReadSkew()
+    {
+        requiresMultiKeySupport();
+        // two different txn are involved to make this happen
+        // x=0, y=0
+        // U1: starts
+        // U2: starts
+        // U1: r[x=0]
+        // U2: w[x=5], w[y=5]
+        // U2: commit
+        // U1: r[y=5]
+        // U1: commit
+        HistoryValidator validator = create();
+
+        // init
+        txn(validator, ob(0, 1, 2), writeOnly(x), writeOnly(y));
+        int u1 = 1, u1_start = 3, u1_end = 6;
+        int u2 = 2, u2_start = 4, u2_end = 5;
+        txn(validator, ob(u2, u2_start, u2_end), readWrite(x, seq(0)), 
readWrite(y, seq(0)));
+        Assertions.assertThatThrownBy(() -> txn(validator, ob(u1, u1_start, 
u1_end), readWrite(x, seq(0)), readWrite(y, seq(0, u2))))
+                  .isInstanceOf(HistoryViolation.class);
+    }
+
+    @Test
+    public void anomalyWriteSkew()
+    {
+        // two different txn are involved to make this happen
+        // x=0, y=0
+        // U1: starts
+        // U2: starts
+        // U1: r[x=0]
+    }
+
+    @Test
+    public void seenBehavior()
+    {
+        fromLog("Witness(start=4, end=7)\n" +
+                "\tread(pk=121901541, id=2, count=0, seq=[])\n" +
+                "\twrite(pk=121901541, id=2, success=true)\n" +
+
+                "Witness(start=3, end=8)\n" +
+                "\tread(pk=122950117, id=0, count=0, seq=[])\n" +
+                "\twrite(pk=122950117, id=0, success=true)\n" +
+                "\twrite(pk=119804389, id=0, success=true)\n" +
+
+                "Witness(start=5, end=9)\n" +
+                "\tread(pk=121901541, id=3, count=1, seq=[2])\n" +
+                "\twrite(pk=121901541, id=3, success=true)\n" +
+
+                "Witness(start=2, end=10)\n" +
+                "\twrite(pk=122950117, id=1, success=true)\n" +
+                "\twrite(pk=119804389, id=1, success=true)\n" +
+
+                "Witness(start=6, end=11)\n" +
+                "\tread(pk=121901541, id=4, count=2, seq=[2, 3])\n" +
+                "\twrite(pk=121901541, id=4, success=true)\n" +
+
+                "Witness(start=12, end=14)\n" +
+                "\twrite(pk=121901541, id=5, success=true)\n" +
+
+                "Witness(start=13, end=16)\n" +
+                "\tread(pk=119804389, id=6, count=2, seq=[0, 1])\n" +
+                "\twrite(pk=119804389, id=6, success=true)\n" +
+                "\twrite(pk=122950117, id=6, success=true)\n" +
+
+                "Witness(start=15, end=18)\n" +
+                "\tread(pk=121901541, id=7, count=4, seq=[2, 3, 4, 5])\n" +
+                "\twrite(pk=121901541, id=7, success=true)\n" +
+
+                "Witness(start=17, end=20)\n" +
+                "\tread(pk=119804389, id=8, count=3, seq=[0, 1, 6])\n" +
+                "\twrite(pk=119804389, id=8, success=true)\n" +
+                "\twrite(pk=122950117, id=8, success=true)\n" // this 
partition is what triggers
+        );
+    }
+
+    private void requiresMultiKeySupport()
+    {
+        Assume.assumeTrue("Validator " + factory.getClass() + " does not 
support multi-key", factory instanceof StrictSerializabilityValidator.Factory);
+    }
+
+    private int[] shuffle(int[] ordering)
+    {
+        // shuffle array
+        for (int i = ordering.length; i > 1; i--)
+            swap(ordering, i - 1, RANDOM.nextInt(i));
+        return ordering;
+    }
+
+    private static void txn(HistoryValidator validator, Observation ob, 
Event... events)
+    {
+        String type = events.length == 1 ? "single" : "multiple";
+        logger.info("[Validator={}, Observation=({}, {}, {})] Validating {} 
{}}", validator.getClass().getSimpleName(), ob.id, ob.start, ob.end, type, 
events);
+        try (HistoryValidator.Checker check = validator.witness(ob.start, 
ob.end))
+        {
+            for (Event e : events)
+                e.process(ob, check);
+        }
+    }
+
+    private static void single(HistoryValidator validator, Observation ob, int 
pk, int[] seq, boolean hasWrite)
+    {
+        txn(validator, ob, hasWrite ? readWrite(pk, seq) : readOnly(pk, seq));
+    }
+
+    private static Observation ob(int id, int start, int end)
+    {
+        // why empty result?  The users don't actually check the result's 
data, just existence
+        return new Observation(id, QueryResults.empty(), start, end);
+    }
+
+    private static int[] seq(int... seq)
+    {
+        return seq;
+    }
+
+    private HistoryValidator create()
+    {
+        return factory.create(PARTITIONS);
+    }
+
+    private static IntSet set(int... values)
+    {
+        IntSet set = new IntHashSet(values.length);
+        for (int v : values)
+            set.add(v);
+        return set;
+    }
+
+    private static Random random()
+    {
+        long seed = Long.parseLong(System.getProperty("cassandra.test.seed", 
Long.toString(Clock.Global.nanoTime())));
+        logger.info("Random seed={}; set -Dcassandra.test.seed={} while 
reruning the tests to get the same order", seed, seed);
+        return new Random(seed);
+    }
+
+    private static Event readWrite(int pk, int[] seq)
+    {
+        return new Event(EnumSet.of(Event.Type.READ, Event.Type.WRITE), pk, 
seq);
+    }
+
+    private static Event readOnly(int pk, int[] seq)
+    {
+        return new Event(EnumSet.of(Event.Type.READ), pk, seq);
+    }
+
+    private static Event writeOnly(int pk)
+    {
+        return new Event(EnumSet.of(Event.Type.WRITE), pk, null);
+    }
+
+    private void fromLog(String log)
+    {
+        IntSet pks = new IntHashSet();
+        class Read
+        {
+            final int pk, id, count;
+            final int[] seq;
+
+            Read(int pk, int id, int count, int[] seq)
+            {
+                this.pk = pk;
+                this.id = id;
+                this.count = count;
+                this.seq = seq;
+            }
+        }
+        class Write
+        {
+            final int pk, id;
+            final boolean success;
+
+            Write(int pk, int id, boolean success)
+            {
+                this.pk = pk;
+                this.id = id;
+                this.success = success;
+            }
+        }
+        class Witness
+        {
+            final int start, end;
+            final List<Object> actions = new ArrayList<>();
+
+            Witness(int start, int end)
+            {
+                this.start = start;
+                this.end = end;
+            }
+
+            void read(int pk, int id, int count, int[] seq)
+            {
+                actions.add(new Read(pk, id, count, seq));
+            }
+
+            void write(int pk, int id, boolean success)
+            {
+                actions.add(new Write(pk, id, success));
+            }
+
+            void process(HistoryValidator validator)
+            {
+                try (HistoryValidator.Checker check = validator.witness(start, 
end))
+                {
+                    for (Object a : actions)
+                    {
+                        if (a instanceof Read)
+                        {
+                            Read read = (Read) a;
+                            check.read(read.pk, read.id, read.count, read.seq);
+                        }
+                        else
+                        {
+                            Write write = (Write) a;
+                            check.write(write.pk, write.id, write.success);
+                        }
+                    }
+                }
+            }
+        }
+        List<Witness> witnesses = new ArrayList<>();
+        Witness current = null;
+        for (String line : log.split("\n"))
+        {
+            if (line.startsWith("Witness"))
+            {
+                if (current != null)
+                    witnesses.add(current);
+                Matcher matcher = Pattern.compile("Witness\\(start=(.+), 
end=(.+)\\)").matcher(line);
+                if (!matcher.find()) throw new AssertionError("Unable to match 
start/end of " + line);
+                current = new Witness(Integer.parseInt(matcher.group(1)), 
Integer.parseInt(matcher.group(2)));
+            }
+            else if (line.startsWith("\tread"))
+            {
+                Matcher matcher = Pattern.compile("\tread\\(pk=(.+), id=(.+), 
count=(.+), seq=\\[(.*)\\]\\)").matcher(line);
+                if (!matcher.find()) throw new AssertionError("Unable to match 
read of " + line);
+                int pk = Integer.parseInt(matcher.group(1));
+                pks.add(pk);
+                int id = Integer.parseInt(matcher.group(2));
+                int count = Integer.parseInt(matcher.group(3));
+                String seqStr = matcher.group(4);
+                int[] seq = seqStr.isEmpty() ? new int[0] : 
Stream.of(seqStr.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
+                current.read(pk, id, count, seq);
+            }
+            else if (line.startsWith("\twrite"))
+            {
+                Matcher matcher = Pattern.compile("\twrite\\(pk=(.+), id=(.+), 
success=(.+)\\)").matcher(line);
+                if (!matcher.find()) throw new AssertionError("Unable to match 
write of " + line);
+                int pk = Integer.parseInt(matcher.group(1));
+                pks.add(pk);
+                int id = Integer.parseInt(matcher.group(2));
+                boolean success = Boolean.parseBoolean(matcher.group(3));
+                current.write(pk, id, success);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Unknow line: " + line);
+            }
+        }
+        if (current != null)
+            witnesses.add(current);
+        int[] keys = pks.toArray();
+        Arrays.sort(keys);
+        HistoryValidator validator = factory.create(keys);
+        for (Witness w : witnesses)
+            w.process(validator);
+    }
+
+    private static class Event
+    {
+        enum Type
+        {READ, WRITE}
+
+        ;
+        private final EnumSet<Type> types;
+        private final int pk;
+        private final int[] seq;
+
+        private Event(EnumSet<Type> types, int pk, int[] seq)
+        {
+            this.types = types;
+            this.pk = pk;
+            this.seq = seq;
+        }
+
+        private void process(Observation ob, HistoryValidator.Checker check)
+        {
+            if (types.contains(Type.READ))
+                check.read(pk, ob.id, seq.length, seq);
+            if (types.contains(Type.WRITE))
+                check.write(pk, ob.id, ob.isSuccess());
+        }
+    }
+
+    private interface TestDSL
+    {
+        void txn(Event... events);
+
+        AbstractThrowableAssert<?, ? extends Throwable> failingTxn(Event... 
events);
+    }
+
+    private static boolean supportMultiKey(HistoryValidator validator)
+    {
+        return validator instanceof StrictSerializabilityValidator;
+    }
+
+    private void test(Consumer<TestDSL> fn)
+    {
+        HistoryValidator validator = create();
+        boolean global = supportMultiKey(validator);
+        EventIdGen eventIdGen = global ? new AllPks() : new PerPk();
+        TestDSL dsl = new TestDSL()
+        {
+            int logicalClock = 0;
+
+            @Override
+            public void txn(Event... events)
+            {
+                if (global)
+                {
+                    int eventId = eventIdGen.next();
+                    HistoryValidatorTest.txn(validator, ob(eventId, 
++logicalClock, ++logicalClock), events);
+                }
+                else
+                {
+                    for (Event e : events)
+                    {
+                        int eventId = eventIdGen.next(e.pk);
+                        HistoryValidatorTest.txn(validator, ob(eventId, 
++logicalClock, ++logicalClock), e);
+                    }
+                }
+            }
+
+            @Override
+            public AbstractThrowableAssert<?, ? extends Throwable> 
failingTxn(Event... events)
+            {
+                return assertThatThrownBy(() -> txn(events));
+            }
+        };
+        fn.accept(dsl);
+    }
+
+    private interface EventIdGen
+    {
+        int next(int pk);
+
+        int next();
+    }
+
+    private static class PerPk implements EventIdGen
+    {
+        private final IntIntMap map = new IntIntHashMap();
+
+        @Override
+        public int next(int pk)
+        {
+            int next = !map.containsKey(pk) ? 0 : map.get(pk) + 1;
+            map.put(pk, next);
+            return next;
+        }
+
+        @Override
+        public int next()
+        {
+            throw new UnsupportedOperationException("next without pk not 
supported");
+        }
+    }
+
+    private static class AllPks implements EventIdGen
+    {
+        private int value = 0;
+
+        @Override
+        public int next(int pk)
+        {
+            return next();
+        }
+
+        @Override
+        public int next()
+        {
+            return value++;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index f61205d374..d1b5cd9038 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.accord;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.function.LongSupplier;
@@ -143,11 +144,16 @@ public class AccordTestUtils
         return createTxn(query, QueryOptions.DEFAULT);
     }
 
-    public static Txn createTxn(String cql, QueryOptions options)
+    public static Txn createTxn(String query, List<Object> binds)
     {
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) 
QueryProcessor.parseStatement(cql);
-        Assert.assertNotNull(parsed);
-        TransactionStatement statement = (TransactionStatement) 
parsed.prepare(ClientState.forInternalCalls());
+        TransactionStatement statement = parse(query);
+        QueryOptions options = QueryProcessor.makeInternalOptions(statement, 
binds.toArray(new Object[binds.size()]));
+        return statement.createTxn(ClientState.forInternalCalls(), options);
+    }
+
+    public static Txn createTxn(String query, QueryOptions options)
+    {
+        TransactionStatement statement = parse(query);
         return statement.createTxn(ClientState.forInternalCalls(), options);
     }
 
diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtils.java 
b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
index d5b1981fc1..c122a95315 100644
--- a/test/unit/org/apache/cassandra/utils/AssertionUtils.java
+++ b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
@@ -18,8 +18,11 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.stream.Stream;
+
 import com.google.common.base.Throwables;
 
+import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Condition;
 
 public class AssertionUtils
@@ -28,6 +31,16 @@ public class AssertionUtils
     {
     }
 
+    public static <T> Condition<T> anyOf(Stream<Condition<T>> stream) {
+        Iterable<Condition<T>> it = () -> stream.iterator();
+        return Assertions.anyOf(it);
+    }
+
+    public static Condition<Throwable> anyOfThrowable(Class<? extends 
Throwable>... klasses)
+    {
+        return anyOf(Stream.of(klasses).map(AssertionUtils::isThrowable));
+    }
+
     /**
      * When working with jvm-dtest the thrown error is in a different {@link 
ClassLoader} causing type checks
      * to fail; this method relies on naming instead.
@@ -100,6 +113,11 @@ public class AssertionUtils
         };
     }
 
+    public static Condition<Throwable> isThrowableInstanceof(Class<?> klass)
+    {
+        return (Condition<Throwable>) (Condition<?>) isInstanceof(klass);
+    }
+
     public static Condition<Throwable> rootCause(Condition<Throwable> other)
     {
         return new Condition<Throwable>() {
@@ -119,6 +137,32 @@ public class AssertionUtils
 
     public static Condition<Throwable> rootCauseIs(Class<? extends Throwable> 
klass)
     {
-        return rootCause((Condition<Throwable>) (Condition<?>) is(klass));
+        return rootCause(isThrowable(klass));
+    }
+
+    public static Condition<Throwable> hasCause(Class<? extends Throwable> 
klass)
+    {
+        return hasCause(isThrowable(klass));
+    }
+
+    public static Condition<Throwable> hasCauseAnyOf(Class<? extends 
Throwable>... matchers)
+    {
+        return hasCause(anyOfThrowable(matchers));
+    }
+
+    public static Condition<Throwable> hasCause(Condition<Throwable> matcher)
+    {
+        return new Condition<Throwable>() {
+            @Override
+            public boolean matches(Throwable value)
+            {
+                for (Throwable cause = value; cause != null; cause = 
cause.getCause())
+                {
+                    if (matcher.matches(cause))
+                        return true;
+                }
+                return false;
+            }
+        };
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to