This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 0ddfbbdeee CASSANDRA-18154: CEP-15: Enhance returning SELECT to allow 
partition and clustering IN clauses to return multiple partitions/rows
0ddfbbdeee is described below

commit 0ddfbbdeee813a9d16980ad0f80e183eaf919b50
Author: David Capwell <[email protected]>
AuthorDate: Thu Jan 12 12:07:54 2023 -0800

    CASSANDRA-18154: CEP-15: Enhance returning SELECT to allow partition and 
clustering IN clauses to return multiple partitions/rows
---
 .build/cassandra-build-deps-template.xml           |   5 +
 .build/include-accord.sh                           |   2 +-
 .build/parent-pom-template.xml                     |  21 +
 build.xml                                          |   6 +-
 ide/idea-iml-file.xml                              |   1 +
 ide/idea/workspace.xml                             |  35 +-
 .../metrics/AccordClientRequestMetrics.java        |  37 +-
 .../cassandra/service/accord/AccordCommand.java    |   2 +
 .../cassandra/service/accord/AccordService.java    |  27 +
 .../cassandra/utils/logging/ClassNameFilter.java   |  38 +-
 test/conf/logback-simulator.xml                    |  14 +
 .../org/apache/cassandra/distributed/api/Row.java  | 160 ++++++
 .../apache/cassandra/distributed/impl/Query.java   |  12 +-
 .../distributed/util/QueryResultUtil.java          |   2 +
 .../org/apache/cassandra/simulator/ActionList.java |   4 +
 .../AbstractPairOfSequencesPaxosSimulation.java    | 201 +++----
 .../cassandra/simulator/paxos/HistoryChecker.java  |  32 +-
 .../{Observation.java => HistoryValidator.java}    |  41 +-
 .../simulator/paxos/LinearizabilityValidator.java  |  83 +++
 .../simulator/paxos/LoggingHistoryValidator.java   |  73 +++
 .../cassandra/simulator/paxos/Observation.java     |  16 +-
 .../paxos/PairOfSequencesAccordSimulation.java     | 310 +++++------
 .../paxos/PairOfSequencesPaxosSimulation.java      | 165 +++++-
 .../cassandra/simulator/paxos/PaxosSimulation.java |  53 +-
 .../paxos/StrictSerializabilityValidator.java      | 112 ++++
 .../simulator/systems/SimulatedQuery.java          |   5 +-
 .../simulator/paxos/HistoryValidatorTest.java      | 592 +++++++++++++++++++++
 .../cassandra/service/accord/AccordTestUtils.java  |  14 +-
 .../org/apache/cassandra/utils/AssertionUtils.java |  46 +-
 29 files changed, 1727 insertions(+), 382 deletions(-)

diff --git a/.build/cassandra-build-deps-template.xml 
b/.build/cassandra-build-deps-template.xml
index 727da9179a..23e00b1986 100644
--- a/.build/cassandra-build-deps-template.xml
+++ b/.build/cassandra-build-deps-template.xml
@@ -123,5 +123,10 @@
       <groupId>com.fasterxml.jackson.dataformat</groupId>
       <artifactId>jackson-dataformat-yaml</artifactId>
     </dependency>
+    <dependency>
+      <groupId>accord</groupId>
+      <artifactId>accord</artifactId>
+      <classifier>tests</classifier>
+    </dependency>
   </dependencies>
 </project>
diff --git a/.build/include-accord.sh b/.build/include-accord.sh
index 2144e80b17..eabe4d204b 100755
--- a/.build/include-accord.sh
+++ b/.build/include-accord.sh
@@ -25,7 +25,7 @@ set -o nounset
 bin="$(cd "$(dirname "$0")" > /dev/null; pwd)"
 
 accord_repo='https://github.com/apache/cassandra-accord.git'
-accord_branch='ad326d5df8d99d4799fa87de81482e3cb1fb92de'
+accord_branch='5626c7c11400d4cf6d01a8e22517b53a83f5c512'
 accord_src="$bin/cassandra-accord"
 
 checkout() {
diff --git a/.build/parent-pom-template.xml b/.build/parent-pom-template.xml
index b57beb2d93..2ce7d5bb51 100644
--- a/.build/parent-pom-template.xml
+++ b/.build/parent-pom-template.xml
@@ -673,6 +673,27 @@
         <artifactId>accord</artifactId>
         <version>1.0-SNAPSHOT</version>
       </dependency>
+      <dependency>
+        <groupId>accord</groupId>
+        <artifactId>accord</artifactId>
+        <version>1.0-SNAPSHOT</version>
+        <classifier>tests</classifier>
+        <scope>test</scope>
+        <exclusions>
+          <exclusion>
+            <artifactId>org.junit.jupiter</artifactId>
+            <groupId>junit-jupiter-api</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>org.junit.jupiter</artifactId>
+            <groupId>junit-jupiter-engine</groupId>
+          </exclusion>
+          <exclusion>
+            <artifactId>ch.qos.logback</artifactId>
+            <groupId>logback-classic</groupId>
+          </exclusion>
+        </exclusions>
+      </dependency>
       <dependency>
         <groupId>io.airlift</groupId>
         <artifactId>airline</artifactId>
diff --git a/build.xml b/build.xml
index 8628de5e8f..fc42dfa30a 100644
--- a/build.xml
+++ b/build.xml
@@ -1063,7 +1063,7 @@
         <jvmarg value="-ea"/>
         <jvmarg value="-Djava.io.tmpdir=${tmp.dir}"/>
         <jvmarg value="-Dcassandra.debugrefcount=true"/>
-        <jvmarg value="-Xss256k"/>
+        <jvmarg value="-Xss384k"/>
         <!-- When we do classloader manipulation SoftReferences can cause 
memory leaks
              that can OOM our test runs. The next two settings informs our GC
              algorithm to limit the metaspace size and clean up SoftReferences
@@ -1366,7 +1366,7 @@
         <jvmarg value="-Djava.awt.headless=true"/>
         <jvmarg value="-javaagent:${build.lib}/jamm-${jamm.version}.jar" />
         <jvmarg value="-ea"/>
-        <jvmarg value="-Xss256k"/>
+        <jvmarg value="-Xss384k"/>
         <jvmarg 
value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
         <jvmarg 
value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
         <jvmarg value="-Dcassandra.skip_sync=true" />
@@ -1412,7 +1412,7 @@
         <jvmarg value="-Djava.awt.headless=true"/>
         <jvmarg value="-javaagent:${build.lib}/jamm-${jamm.version}.jar" />
         <jvmarg value="-ea"/>
-        <jvmarg value="-Xss256k"/>
+        <jvmarg value="-Xss384k"/>
         <jvmarg 
value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
         <jvmarg 
value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
         <jvmarg value="-Dcassandra.skip_sync=true" />
diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml
index c0b55842fa..4c529892a4 100644
--- a/ide/idea-iml-file.xml
+++ b/ide/idea-iml-file.xml
@@ -39,6 +39,7 @@
             <sourceFolder url="file://$MODULE_DIR$/test/simulator/asm" 
isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/simulator/bootstrap" 
isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/simulator/main" 
isTestSource="true" />
+            <sourceFolder url="file://$MODULE_DIR$/test/simulator/test" 
isTestSource="true" />
             <sourceFolder url="file://$MODULE_DIR$/test/resources" 
type="java-test-resource" />
             <sourceFolder url="file://$MODULE_DIR$/test/conf" 
type="java-test-resource" />
             <excludeFolder url="file://$MODULE_DIR$/.idea" />
diff --git a/ide/idea/workspace.xml b/ide/idea/workspace.xml
index 321edd8024..979e1acf77 100644
--- a/ide/idea/workspace.xml
+++ b/ide/idea/workspace.xml
@@ -167,7 +167,40 @@
       <option name="MAIN_CLASS_NAME" value="" />
       <option name="METHOD_NAME" value="" />
       <option name="TEST_OBJECT" value="class" />
-      <option name="VM_PARAMETERS" 
value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml 
-Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml 
-Dcassandra.logdir=$PROJECT_DIR$/build/test/logs 
-Djava.library.path=$PROJECT_DIR$/lib/sigar-bin 
-Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables 
-Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables 
-Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMet [...]
+      <option name="VM_PARAMETERS" value="
+        -ea 
+        -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin 
+        -Djava.security.egd=file:/dev/urandom
+        -XX:-BackgroundCompilation
+        -XX:-TieredCompilation
+        -XX:ActiveProcessorCount=4
+        -XX:CICompilerCount=1
+        -XX:HeapDumpPath=build/test 
+        -XX:MaxMetaspaceSize=384M 
+        -XX:ReservedCodeCacheSize=256M
+        -XX:SoftRefLRUPolicyMSPerMB=0 
+        -XX:Tier4CompileThreshold=1000
+        
-Xbootclasspath/a:$PROJECT_DIR$/build/test/lib/jars/simulator-bootstrap.jar
+        -Xmx8G
+        -Xss384k
+        -javaagent:$PROJECT_DIR$/build/test/lib/jars/simulator-asm.jar
+
+        -Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml 
+        -Dcassandra.debugrefcount=false
+        -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs 
+        -Dcassandra.memtable_row_overhead_computation_step=100
+        -Dcassandra.reads.thresholds.coordinator.defensive_checks_enabled=true
+        -Dcassandra.ring_delay_ms=10000
+        -Dcassandra.skip_sync=true 
+        -Dcassandra.strict.runtime.checks=true 
+        -Dcassandra.test.simulator.determinismcheck=strict
+        -Dcassandra.test.sstableformatdevelopment=true
+        -Dcassandra.tolerate_sstable_size=true 
+        
-Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables 
+        -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables 
+        
-Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml 
+        -Dmigration-sstable-root=$PROJECT_DIR$/test/data/migration-sstables 
+        " />
       <option name="PARAMETERS" value="" />
       <fork_mode value="class" />
       <option name="WORKING_DIRECTORY" value="" />
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java 
b/src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java
similarity index 52%
copy from 
test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
copy to src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java
index 546fd3179f..c95c3bd11f 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
+++ b/src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java
@@ -16,30 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.simulator.paxos;
+package org.apache.cassandra.metrics;
 
-class Observation implements Comparable<Observation>
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.Meter;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class AccordClientRequestMetrics extends ClientRequestMetrics
 {
-    final int id;
-    final Object[][] result;
-    final int start;
-    final int end;
+    public final Meter preempts;
+    public final Histogram keySize;
 
-    Observation(int id, Object[][] result, int start, int end)
+    public AccordClientRequestMetrics(String scope)
     {
-        this.id = id;
-        this.result = result;
-        this.start = start;
-        this.end = end;
+        super(scope);
+
+        preempts = Metrics.meter(factory.createMetricName("Preempts"));
+        keySize = 
Metrics.histogram(factory.createMetricName("KeySizeHistogram"), false);
     }
 
-    // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many 
pair-wise comparisons the answer is 0
-    public int compareTo(Observation that)
+    @Override
+    public void release()
     {
-        if (this.end < that.start)
-            return -1;
-        if (that.end < this.start)
-            return 1;
-        return 0;
+        super.release();
+        Metrics.remove(factory.createMetricName("Preempts"));
+        Metrics.remove(factory.createMetricName("KeySizeHistogram"));
     }
 }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
index 5b4a8c2e9b..8020b29ac7 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java
@@ -706,6 +706,8 @@ public class AccordCommand extends Command implements 
AccordState<TxnId>
         if (listener instanceof AccordCommandsForKey)
             return new 
ListenerProxy.CommandsForKeyListenerProxy(((AccordCommandsForKey) 
listener).key());
 
+        //TODO - Support accord.messages.Defer
+
         throw new RuntimeException("Unhandled non-transient listener: " + 
listener);
     }
 
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 0135c741c3..5fee18e7fe 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
 import com.google.common.annotations.VisibleForTesting;
 
 import accord.api.Result;
+import accord.coordinate.Preempted;
 import accord.coordinate.Timeout;
 import accord.impl.SimpleProgressLog;
 import accord.impl.SizeOfIntersectionSorter;
@@ -40,6 +41,7 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
+import org.apache.cassandra.metrics.AccordClientRequestMetrics;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.service.accord.api.AccordAgent;
 import 
org.apache.cassandra.service.accord.api.AccordRoutingKey.KeyspaceSplitter;
@@ -53,9 +55,13 @@ import 
org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
 import static 
org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps;
 import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 public class AccordService implements Shutdownable
 {
+    public static final AccordClientRequestMetrics readMetrics = new 
AccordClientRequestMetrics("AccordRead");
+    public static final AccordClientRequestMetrics writeMetrics = new 
AccordClientRequestMetrics("AccordWrite");
+
     public final Node node;
     private final Shutdownable nodeShutdown;
     private final AccordMessageSink messageSink;
@@ -122,8 +128,11 @@ public class AccordService implements Shutdownable
      */
     public TxnData coordinate(Txn txn, ConsistencyLevel consistencyLevel)
     {
+        AccordClientRequestMetrics metrics = txn.isWrite() ? writeMetrics : 
readMetrics;
+        final long startNanos = nanoTime();
         try
         {
+            metrics.keySize.update(txn.keys().size());
             Future<Result> future = node.coordinate(txn);
             Result result = 
future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
             return (TxnData) result;
@@ -132,17 +141,35 @@ public class AccordService implements Shutdownable
         {
             Throwable cause = e.getCause();
             if (cause instanceof Timeout)
+            {
+                metrics.timeouts.mark();
+                throw throwTimeout(txn, consistencyLevel);
+            }
+            if (cause instanceof Preempted)
+            {
+                metrics.preempts.mark();
+                //TODO need to improve
+                // Coordinator "could" query the accord state to see whats 
going on but that doesn't exist yet.
+                // Protocol also doesn't have a way to denote "unknown" 
outcome, so using a timeout as the closest match
                 throw throwTimeout(txn, consistencyLevel);
+            }
+            metrics.failures.mark();
             throw new RuntimeException(cause);
         }
         catch (InterruptedException e)
         {
+            metrics.failures.mark();
             throw new UncheckedInterruptedException(e);
         }
         catch (TimeoutException e)
         {
+            metrics.timeouts.mark();
             throw throwTimeout(txn, consistencyLevel);
         }
+        finally
+        {
+            metrics.addNano(nanoTime() - startNanos);
+        }
     }
 
     private static RuntimeException throwTimeout(Txn txn, ConsistencyLevel 
consistencyLevel)
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java 
b/src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java
similarity index 54%
copy from 
test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
copy to src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java
index 546fd3179f..ef0ca2c5ca 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
+++ b/src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java
@@ -16,30 +16,32 @@
  * limitations under the License.
  */
 
-package org.apache.cassandra.simulator.paxos;
+package org.apache.cassandra.utils.logging;
 
-class Observation implements Comparable<Observation>
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.filter.AbstractMatcherFilter;
+import ch.qos.logback.core.spi.FilterReply;
+
+public class ClassNameFilter extends AbstractMatcherFilter<ILoggingEvent>
 {
-    final int id;
-    final Object[][] result;
-    final int start;
-    final int end;
+    String loggerName;
+
+    public void setLoggerName(String loggerName)
+    {
+        this.loggerName = loggerName;
+    }
 
-    Observation(int id, Object[][] result, int start, int end)
+    @Override
+    public FilterReply decide(ILoggingEvent event)
     {
-        this.id = id;
-        this.result = result;
-        this.start = start;
-        this.end = end;
+        if (!isStarted()) return FilterReply.NEUTRAL;
+        if (event.getLoggerName().equals(loggerName)) return onMatch;
+        return onMismatch;
     }
 
-    // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many 
pair-wise comparisons the answer is 0
-    public int compareTo(Observation that)
+    @Override
+    public void start()
     {
-        if (this.end < that.start)
-            return -1;
-        if (that.end < this.start)
-            return 1;
-        return 0;
+        if (loggerName != null) super.start();
     }
 }
diff --git a/test/conf/logback-simulator.xml b/test/conf/logback-simulator.xml
index 3566779ea4..69def30afb 100644
--- a/test/conf/logback-simulator.xml
+++ b/test/conf/logback-simulator.xml
@@ -24,6 +24,19 @@
   <!-- Shutdown hook ensures that async appender flushes -->
   <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/>
 
+  <appender name="HISTORYLOG" class="ch.qos.logback.core.FileAppender">
+    
<file>./build/test/logs/simulator/${run_start}-${run_seed}/history.log</file>
+    <encoder>
+      <pattern>%msg%n</pattern>
+    </encoder>
+    <immediateFlush>true</immediateFlush>
+    <filter class="org.apache.cassandra.utils.logging.ClassNameFilter">
+      
<loggerName>org.apache.cassandra.simulator.paxos.LoggingHistoryValidator</loggerName>
+      <onMatch>ACCEPT</onMatch>
+      <onMismatch>DENY</onMismatch>
+    </filter>
+  </appender>
+
   <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender">
     
<file>./build/test/logs/simulator/${run_start}-${run_seed}/${instance_id}/system.log</file>
     <encoder>
@@ -56,6 +69,7 @@
   <root level="INFO">
     <appender-ref ref="INSTANCEFILE" />
     <appender-ref ref="STDOUT" />
+    <appender-ref ref="HISTORYLOG" />
   </root>
 </configuration>
 
diff --git a/test/distributed/org/apache/cassandra/distributed/api/Row.java 
b/test/distributed/org/apache/cassandra/distributed/api/Row.java
index 33272ed3d5..556a5716c0 100644
--- a/test/distributed/org/apache/cassandra/distributed/api/Row.java
+++ b/test/distributed/org/apache/cassandra/distributed/api/Row.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.api;
 import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.Objects;
@@ -85,6 +86,17 @@ public class Row
         return (T) results[index];
     }
 
+    public <T> T get(int index, T defaultValue)
+    {
+        checkAccess();
+        if (index < 0 || index >= results.length)
+            throw new NoSuchElementException("by index: " + index);
+        T result = (T) results[index];
+        if (result == null)
+            return defaultValue;
+        return result;
+    }
+
     public <T> T get(String name)
     {
         checkAccess();
@@ -94,66 +106,158 @@ public class Row
         return (T) results[idx];
     }
 
+    public <T> T get(String name, T defaultValue)
+    {
+        checkAccess();
+        int idx = findIndex(name);
+        if (idx == NOT_FOUND)
+            throw new NoSuchElementException("by name: " + name);
+        T result = (T) results[idx];
+        if (result == null)
+            return defaultValue;
+        return result;
+    }
+
+    public Boolean getBoolean(int index)
+    {
+        return get(index);
+    }
+
+    public Boolean getBoolean(int index, Boolean defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
+    public Boolean getBoolean(String name)
+    {
+        return get(name);
+    }
+
+    public Boolean getBoolean(String name, Boolean defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Short getShort(int index)
     {
         return get(index);
     }
 
+    public Short getShort(int index, Short defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Short getShort(String name)
     {
         return get(name);
     }
 
+    public Short getShort(String name, Short defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Integer getInteger(int index)
     {
         return get(index);
     }
 
+    public Integer getInteger(int index, Integer defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Integer getInteger(String name)
     {
         return get(name);
     }
 
+    public Integer getInteger(String name, Integer defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Long getLong(int index)
     {
         return get(index);
     }
 
+    public Long getLong(int index, Long defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Long getLong(String name)
     {
         return get(name);
     }
 
+    public Long getLong(String name, Long defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Float getFloat(int index)
     {
         return get(index);
     }
 
+    public Float getFloat(int index, Float defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Float getFloat(String name)
     {
         return get(name);
     }
 
+    public Float getFloat(String name, Float defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public Double getDouble(int index)
     {
         return get(index);
     }
 
+    public Double getDouble(int index, Double defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Double getDouble(String name)
     {
         return get(name);
     }
 
+    public Double getDouble(String name, Double defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public String getString(int index)
     {
         return get(index);
     }
 
+    public String getString(int index, String defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public String getString(String name)
     {
         return get(name);
     }
 
+    public String getString(String name, String defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public UUID getUUID(int index)
     {
         Object uuid = get(index);
@@ -162,6 +266,14 @@ public class Row
         return (UUID) uuid;
     }
 
+    public UUID getUUID(int index, UUID defaultValue)
+    {
+        Object uuid = get(index, defaultValue);
+        if (uuid instanceof TimeUUID)
+            return ((TimeUUID) uuid).asUUID();
+        return (UUID) uuid;
+    }
+
     public UUID getUUID(String name)
     {
         Object uuid = get(name);
@@ -170,26 +282,74 @@ public class Row
         return (UUID) uuid;
     }
 
+    public UUID getUUID(String name, UUID defaultValue)
+    {
+        Object uuid = get(name, defaultValue);
+        if (uuid instanceof TimeUUID)
+            return ((TimeUUID) uuid).asUUID();
+        return (UUID) uuid;
+    }
+
     public Date getTimestamp(int index)
     {
         return get(index);
     }
 
+    public Date getTimestamp(int index, Date defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public Date getTimestamp(String name)
     {
         return get(name);
     }
 
+    public Date getTimestamp(String name, Date defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     public <T> Set<T> getSet(int index)
     {
         return get(index);
     }
 
+    public <T> Set<T> getSet(int index, Set<T> defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
     public <T> Set<T> getSet(String name)
     {
         return get(name);
     }
 
+    public <T> Set<T> getSet(String name, Set<T> defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
+    public <T> List<T> getList(int index)
+    {
+        return get(index);
+    }
+
+    public <T> List<T> getList(int index, List<T> defaultValue)
+    {
+        return get(index, defaultValue);
+    }
+
+    public <T> List<T> getList(String name)
+    {
+        return get(name);
+    }
+
+    public <T> List<T> getList(String name, List<T> defaultValue)
+    {
+        return get(name, defaultValue);
+    }
+
     /**
      * Get the row as a array.
      */
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Query.java 
b/test/distributed/org/apache/cassandra/distributed/impl/Query.java
index 823113f857..f40ebca904 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Query.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Query.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
@@ -37,15 +38,15 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
-public class Query implements 
IIsolatedExecutor.SerializableCallable<Object[][]>
+public class Query implements 
IIsolatedExecutor.SerializableCallable<SimpleQueryResult>
 {
     private static final long serialVersionUID = 1L;
 
-    final String query;
+    public final String query;
     final long timestamp;
     final org.apache.cassandra.distributed.api.ConsistencyLevel 
commitConsistencyOrigin;
     final org.apache.cassandra.distributed.api.ConsistencyLevel 
serialConsistencyOrigin;
-    final Object[] boundValues;
+    public final Object[] boundValues;
 
     public Query(String query, long timestamp, 
org.apache.cassandra.distributed.api.ConsistencyLevel commitConsistencyOrigin, 
org.apache.cassandra.distributed.api.ConsistencyLevel serialConsistencyOrigin, 
Object[] boundValues)
     {
@@ -56,7 +57,8 @@ public class Query implements 
IIsolatedExecutor.SerializableCallable<Object[][]>
         this.boundValues = boundValues;
     }
 
-    public Object[][] call()
+    @Override
+    public SimpleQueryResult call()
     {
         ConsistencyLevel commitConsistency = 
toCassandraCL(commitConsistencyOrigin);
         ConsistencyLevel serialConsistency = serialConsistencyOrigin == null ? 
null : toCassandraCL(serialConsistencyOrigin);
@@ -89,7 +91,7 @@ public class Query implements 
IIsolatedExecutor.SerializableCallable<Object[][]>
         if (res != null)
             res.setWarnings(ClientWarn.instance.getWarnings());
 
-        return RowUtil.toQueryResult(res).toObjectArrays();
+        return RowUtil.toQueryResult(res);
     }
 
     public String toString()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java 
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
index b5dde953a3..0bef6a45c4 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java
@@ -116,6 +116,7 @@ public class QueryResultUtil
     {
         StringBuilder sb = new StringBuilder();
         int rowNum = 1;
+        qr.mark();
         while (qr.hasNext())
         {
             sb.append("@ Row ").append(rowNum).append('\n');
@@ -128,6 +129,7 @@ public class QueryResultUtil
             }
             sb.append(table);
         }
+        qr.reset();
         return sb.toString();
     }
 
diff --git a/test/simulator/main/org/apache/cassandra/simulator/ActionList.java 
b/test/simulator/main/org/apache/cassandra/simulator/ActionList.java
index 64474e6184..2357684043 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/ActionList.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/ActionList.java
@@ -41,6 +41,10 @@ public class ActionList extends AbstractCollection<Action>
     public static ActionList empty() { return EMPTY; }
     public static ActionList of(Action action) { return new ActionList(new 
Action[] { action }); }
     public static ActionList of(Stream<Action> action) { return new 
ActionList(action.toArray(Action[]::new)); }
+    public static ActionList of(Stream<Action> action, Stream<Action>... 
actions)
+    {
+        return new ActionList(Stream.concat(action, 
Stream.of(actions).flatMap(a -> a)).toArray(Action[]::new));
+    }
     public static ActionList of(Collection<Action> actions) { return 
actions.isEmpty() ? EMPTY : new ActionList(actions.toArray(new Action[0])); }
     public static ActionList of(Action ... actions) { return new 
ActionList(actions); }
 
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
index 54a871bb45..5a528468ea 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java
@@ -22,35 +22,43 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import javax.annotation.Nullable;
+import java.util.stream.Stream;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.marshal.ListType;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.api.LogResult;
+import org.apache.cassandra.distributed.impl.FileLogAction;
 import org.apache.cassandra.distributed.impl.Instance;
+import org.apache.cassandra.io.util.File;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.simulator.Action;
 import org.apache.cassandra.simulator.ActionList;
-import org.apache.cassandra.simulator.ActionListener;
 import org.apache.cassandra.simulator.ActionPlan;
 import org.apache.cassandra.simulator.Actions;
 import org.apache.cassandra.simulator.Debug;
+import org.apache.cassandra.simulator.RandomSource;
 import org.apache.cassandra.simulator.RunnableActionScheduler;
 import org.apache.cassandra.simulator.cluster.ClusterActions;
 import org.apache.cassandra.simulator.cluster.KeyspaceActions;
+import org.apache.cassandra.simulator.logging.RunStartDefiner;
+import org.apache.cassandra.simulator.logging.SeedDefiner;
 import org.apache.cassandra.simulator.systems.SimulatedActionTask;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
 import org.apache.cassandra.simulator.utils.IntRange;
+import org.apache.cassandra.utils.Pair;
 
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -59,7 +67,6 @@ import static 
org.apache.cassandra.simulator.Action.Modifiers.RELIABLE_NO_TIMEOU
 import static 
org.apache.cassandra.simulator.ActionSchedule.Mode.STREAM_LIMITED;
 import static 
org.apache.cassandra.simulator.ActionSchedule.Mode.TIME_AND_STREAM_LIMITED;
 import static org.apache.cassandra.simulator.ActionSchedule.Mode.TIME_LIMITED;
-import static org.apache.cassandra.simulator.Debug.EventType.PARTITION;
 
 @SuppressWarnings("unused")
 abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation
@@ -77,7 +84,6 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends 
PaxosSimulation
     final IntRange simulateKeyForSeconds;
     final ConsistencyLevel serialConsistency;
     final Debug debug;
-    final List<HistoryChecker> historyCheckers = new ArrayList<>();
     final AtomicInteger successfulReads = new AtomicInteger();
     final AtomicInteger successfulWrites = new AtomicInteger();
     final AtomicInteger failedReads = new AtomicInteger();
@@ -112,10 +118,71 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
 
     protected abstract String preInsertStmt();
 
-    abstract Operation verifying(int operationId, IInvokableInstance instance, 
int primaryKey, HistoryChecker historyChecker);
-    abstract Operation nonVerifying(int operationId, IInvokableInstance 
instance, int primaryKey, HistoryChecker historyChecker);
-    abstract Operation modifying(int operationId, IInvokableInstance instance, 
int primaryKey, HistoryChecker historyChecker);
     abstract boolean joinAll();
+    boolean allowMultiplePartitions() { return false; }
+
+    abstract BiFunction<SimulatedSystems, int[], Supplier<Action>> 
actionFactory();
+
+    protected Action checkErrorLogs(IInvokableInstance inst)
+    {
+        DatabaseDescriptor.clientInitialization();
+        return new Action("Error logs for node" + inst.config().num(), 
Action.Modifiers.NONE)
+        {
+            @Override
+            protected ActionList performSimple()
+            {
+                // can't use inst.logs as that runs in the class loader, which 
uses in-memory file system
+                String suite = new RunStartDefiner().getPropertyValue() + "-" 
+  new SeedDefiner().getPropertyValue();
+                String instanceId = "node" + inst.config().num();
+                File logFile = new 
File(String.format("build/test/logs/simulator/%s/%s/system.log", suite, 
instanceId));
+                FileLogAction logs = new FileLogAction(logFile);
+
+                LogResult<List<String>> errors = logs.grepForErrors();
+                if (!errors.getResult().isEmpty())
+                {
+                    List<Pair<String, String>> errorsSeen = new ArrayList<>();
+                    for (String error : errors.getResult())
+                    {
+                        for (String line : error.split("\\n"))
+                        {
+                            line = line.trim();
+                            if (line.startsWith("ERROR")) continue;
+                            if (line.startsWith("at ")) continue;
+                            errorsSeen.add(Pair.create(line.split(":")[0], 
error));
+                            break;
+                        }
+                    }
+                    Class<? extends Throwable>[] expected = 
expectedExceptions();
+                    StringBuilder sb = new StringBuilder();
+                    for (Pair<String, String> pair : errorsSeen)
+                    {
+                        String name = pair.left;
+                        String exception = pair.right;
+                        Class<?> klass;
+                        try
+                        {
+                            klass = Class.forName(name);
+                        }
+                        catch (ClassNotFoundException e)
+                        {
+                            throw new RuntimeException(e);
+                        }
+
+                        if (!Stream.of(expected).anyMatch(e -> 
e.isAssignableFrom(klass)))
+                            sb.append("Unexpected 
exception:\n").append(exception).append('\n');
+                    }
+                    if (sb.length() > 0)
+                    {
+                        AssertionError error = new AssertionError("Saw errors 
in node" + inst.config().num() + ": " + sb);
+                        // this stacktrace isn't helpful, can be more confusing
+                        error.setStackTrace(new StackTraceElement[0]);
+                        throw error;
+                    }
+                }
+                return ActionList.empty();
+            }
+        };
+    }
 
     public ActionPlan plan()
     {
@@ -124,101 +191,27 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
 
         plan = plan.encapsulate(ActionPlan.setUpTearDown(
             ActionList.of(
-                cluster.stream().map(i -> simulated.run("Insert Partitions", 
i, executeForPrimaryKeys(preInsertStmt(), primaryKeys)))
-            ).andThen(
+                cluster.stream().map(i -> simulated.run("Insert Partitions", 
i, executeForPrimaryKeys(preInsertStmt(), primaryKeys))),
                 // TODO (now): this is temporary until we have correct epoch 
handling
-                ActionList.of(
-                    cluster.stream().map(i -> simulated.run("Create Accord 
Epoch", i, () -> AccordService.instance().createEpochFromConfigUnsafe()))
-                )
-//            ).andThen(
-//                // TODO (now): this is temporary until we have 
parameterisation of simulation
-//                ActionList.of(
-//                    cluster.stream().map(i -> simulated.run("Disable Accord 
Cache", i, () -> AccordService.instance.setCacheSize(0)))
-//                )
+                cluster.stream().map(i -> simulated.run("Create Accord Epoch", 
i, () -> AccordService.instance().createEpochFromConfigUnsafe()))
             ),
             ActionList.of(
+                cluster.stream().map(i -> checkErrorLogs(i)),
                 cluster.stream().map(i -> 
SimulatedActionTask.unsafeTask("Shutdown " + i.broadcastAddress(), RELIABLE, 
RELIABLE_NO_TIMEOUTS, simulated, i, i::shutdown))
             )
         ));
 
-        final int nodes = cluster.size();
-        for (int primaryKey : primaryKeys)
-            historyCheckers.add(new HistoryChecker(primaryKey));
-
-        List<Supplier<Action>> primaryKeyActions = new ArrayList<>();
-        for (int pki = 0 ; pki < primaryKeys.length ; ++pki)
-        {
-            int primaryKey = primaryKeys[pki];
-            HistoryChecker historyChecker = historyCheckers.get(pki);
-            Supplier<Action> supplier = new Supplier<Action>()
-            {
-                int i = 0;
-
-                @Override
-                public Action get()
-                {
-                    int node = simulated.random.uniform(1, nodes + 1);
-                    IInvokableInstance instance = cluster.get(node);
-                    switch (serialConsistency)
-                    {
-                        default: throw new AssertionError();
-                        case LOCAL_SERIAL:
-                            if (simulated.snitch.dcOf(node) > 0)
-                            {
-                                // perform some queries against these nodes 
but don't expect them to be linearizable
-                                return nonVerifying(i++, instance, primaryKey, 
historyChecker);
-                            }
-                        case SERIAL:
-                            return simulated.random.decide(readRatio)
-                                   ? verifying(i++, instance, primaryKey, 
historyChecker)
-                                   : modifying(i++, instance, primaryKey, 
historyChecker);
-                    }
-                }
-
-                @Override
-                public String toString()
-                {
-                    return Integer.toString(primaryKey);
-                }
-            };
-
-            final ActionListener listener = debug.debug(PARTITION, 
simulated.time, cluster, KEYSPACE, primaryKey);
-            if (listener != null)
-            {
-                Supplier<Action> wrap = supplier;
-                supplier = new Supplier<Action>()
-                {
-                    @Override
-                    public Action get()
-                    {
-                        Action action = wrap.get();
-                        action.register(listener);
-                        return action;
-                    }
-
-                    @Override
-                    public String toString()
-                    {
-                        return wrap.toString();
-                    }
-                };
-            }
-
-            primaryKeyActions.add(supplier);
-        }
+        BiFunction<SimulatedSystems, int[], Supplier<Action>> factory = 
actionFactory();
 
         List<Integer> available = IntStream.range(0, 
primaryKeys.length).boxed().collect(Collectors.toList());
         Action stream = Actions.infiniteStream(concurrency, new 
Supplier<Action>() {
             @Override
             public Action get()
             {
-                int i = simulated.random.uniform(0, available.size());
-                int next = available.get(i);
-                available.set(i, available.get(available.size() - 1));
-                available.remove(available.size() - 1);
+                int[] primaryKeyIndex = consume(simulated.random, available);
                 long untilNanos = simulated.time.nanoTime() + 
SECONDS.toNanos(simulateKeyForSeconds.select(simulated.random));
                 int concurrency = 
withinKeyConcurrency.select(simulated.random);
-                Supplier<Action> supplier = primaryKeyActions.get(next);
+                Supplier<Action> supplier = factory.apply(simulated, 
primaryKeyIndex);
                 // while this stream is finite, it participates in an infinite 
stream via its parent, so we want to permit termination while it's running
                 return Actions.infiniteStream(concurrency, new 
Supplier<Action>()
                 {
@@ -227,7 +220,7 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
                     {
                         if (simulated.time.nanoTime() >= untilNanos)
                         {
-                            available.add(next);
+                            
IntStream.of(primaryKeyIndex).boxed().forEach(available::add);
                             return null;
                         }
                         return supplier.get();
@@ -253,7 +246,30 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
                                   
.encapsulate(ActionPlan.interleave(singletonList(ActionList.of(stream))));
     }
 
-    private IIsolatedExecutor.SerializableRunnable 
executeForPrimaryKeys(String cql, int[] primaryKeys)
+    private int[] consume(RandomSource random, List<Integer> available)
+    {
+        if (available.isEmpty())
+            throw new AssertionError("available partitions are empty!");
+        int numPartitions = available.size() == 1 || 
!allowMultiplePartitions() ? 1 : random.uniform(1, available.size());
+        int[] partitions = new int[numPartitions];
+        for (int counter = 0; counter < numPartitions; counter++)
+        {
+            int idx = random.uniform(0, available.size());
+            int next = available.get(idx);
+            int last = available.get(available.size() - 1);
+            if (available.set(idx, last) != next)
+                throw new IllegalStateException("Expected to set " + last + " 
index " + idx + " but did not return " + next);
+            int removed = available.remove(available.size() - 1);
+            if (last != removed)
+                throw new IllegalStateException("Expected to remove " + last + 
" but removed " + removed);
+
+            partitions[counter] = next;
+        }
+        Arrays.sort(partitions);
+        return partitions;
+    }
+
+    IIsolatedExecutor.SerializableRunnable executeForPrimaryKeys(String cql, 
int[] primaryKeys)
     {
         return () -> {
             for (int primaryKey : primaryKeys)
@@ -273,13 +289,6 @@ abstract class AbstractPairOfSequencesPaxosSimulation 
extends PaxosSimulation
         return new PaxosRepairValidator(cluster, KEYSPACE, TABLE, id);
     }
 
-    @Override
-    void log(@Nullable Integer primaryKey)
-    {
-        if (primaryKey == null) historyCheckers.forEach(HistoryChecker::print);
-        else historyCheckers.stream().filter(h -> h.primaryKey == 
primaryKey).forEach(HistoryChecker::print);
-    }
-
     @Override
     public void run()
     {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java
index d1e0771b1e..2465bf62cf 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java
@@ -127,13 +127,23 @@ class HistoryChecker
         return byId[id] = event;
     }
 
-    void witness(Observation witness, int[] witnessSequence, int start, int 
end)
+    private static int eventId(int[] witnessSequence, int eventPosition)
+    {
+        return eventPosition == 0 ? -1 : witnessSequence[eventPosition - 1];
+    }
+
+    void witness(Observation witness, int[] witnessSequence)
+    {
+        witness(witness.id, witnessSequence, witness.start, witness.end);
+    }
+
+    void witness(int id, int[] witnessSequence, int start, int end)
     {
         int eventPosition = witnessSequence.length;
-        int eventId = eventPosition == 0 ? -1 : witnessSequence[eventPosition 
- 1];
-        setById(witness.id, new Event(witness.id)).log.add(new 
VerboseWitness(witness.id, start, end, witnessSequence));
+        int eventId = eventId(witnessSequence, eventPosition);
+        setById(id, new Event(id)).log.add(new VerboseWitness(id, start, end, 
witnessSequence));
         Event event = get(eventPosition, eventId);
-        recordWitness(event, witness, witnessSequence);
+        recordWitness(event, id, start, end, witnessSequence);
         recordVisibleBy(event, end);
         recordVisibleUntil(event, start);
 
@@ -154,7 +164,7 @@ class HistoryChecker
                     }
                     else if (e.result)
                     {
-                        throw fail(primaryKey, "%d witnessed as absent by %d", 
e.eventId, witness.id);
+                        throw fail(primaryKey, "%d witnessed as absent by %d", 
e.eventId, id);
                     }
                 }
             }
@@ -181,16 +191,16 @@ class HistoryChecker
         }
     }
 
-    void recordWitness(Event event, Observation witness, int[] witnessSequence)
+    void recordWitness(Event event, int id, int start, int end, int[] 
witnessSequence)
     {
-        recordWitness(event, witness, witnessSequence.length, witnessSequence);
+        recordWitness(event, id, start, end, witnessSequence.length, 
witnessSequence);
     }
 
-    void recordWitness(Event event, Observation witness, int eventPosition, 
int[] witnessSequence)
+    void recordWitness(Event event, int id, int start, int end, int 
eventPosition, int[] witnessSequence)
     {
         while (true)
         {
-            event.log.add(new Witness(READ, witness.id, witness.start, 
witness.end));
+            event.log.add(new Witness(READ, id, start, end));
             if (event.witnessSequence != null)
             {
                 if (!Arrays.equals(event.witnessSequence, witnessSequence))
@@ -238,7 +248,7 @@ class HistoryChecker
             event.visibleUntil = visibleUntil;
             Event next = next(event);
             if (next != null && visibleUntil >= next.visibleBy)
-                throw fail(primaryKey, "%s %d not witnessed >= %d, but also 
witnessed <= %d", next.witnessSequence, next.eventId, event.visibleUntil, 
next.visibleBy);
+                throw fail(primaryKey, "%s+%d not witnessed >= %d, but also 
witnessed <= %d", next.witnessSequence, next.eventId, event.visibleUntil, 
next.visibleBy);
         }
     }
 
@@ -295,7 +305,7 @@ class HistoryChecker
             return null;
 
         // initialise the event, if necessary importing information from byId
-        return get(eventPosition, eventPosition == 0 ? -1 : 
event.witnessSequence[eventPosition - 1]);
+        return get(eventPosition, eventId(event.witnessSequence, 
eventPosition));
     }
 
     Event next(Event event)
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java
similarity index 57%
copy from 
test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
copy to 
test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java
index 546fd3179f..282b16d3b1 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java
@@ -18,28 +18,35 @@
 
 package org.apache.cassandra.simulator.paxos;
 
-class Observation implements Comparable<Observation>
+import javax.annotation.Nullable;
+
+public interface HistoryValidator
 {
-    final int id;
-    final Object[][] result;
-    final int start;
-    final int end;
+    Checker witness(int start, int end);
+
+    void print(@Nullable Integer pk);
 
-    Observation(int id, Object[][] result, int start, int end)
+    interface Checker extends AutoCloseable
     {
-        this.id = id;
-        this.result = result;
-        this.start = start;
-        this.end = end;
+        void read(int pk, int id, int count, int[] seq);
+        void write(int pk, int id, boolean success);
+
+        default void writeSuccess(int pk, int id)
+        {
+            write(pk, id, true);
+        }
+
+        default void writeUnknownFailure(int pk, int id)
+        {
+            write(pk, id, false);
+        }
+
+        @Override
+        default void close() {}
     }
 
-    // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many 
pair-wise comparisons the answer is 0
-    public int compareTo(Observation that)
+    interface Factory
     {
-        if (this.end < that.start)
-            return -1;
-        if (that.end < this.start)
-            return 1;
-        return 0;
+        HistoryValidator create(int[] partitions);
     }
 }
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java
new file mode 100644
index 0000000000..67c95a7378
--- /dev/null
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+
+import com.carrotsearch.hppc.IntObjectHashMap;
+import com.carrotsearch.hppc.IntObjectMap;
+import com.carrotsearch.hppc.cursors.ObjectCursor;
+
+public class LinearizabilityValidator implements HistoryValidator
+{
+    private final IntObjectMap<HistoryChecker> historyCheckers;
+
+    public LinearizabilityValidator(int[] primaryKeys)
+    {
+        historyCheckers = new IntObjectHashMap<>(primaryKeys.length);
+        for (int primaryKey : primaryKeys)
+            historyCheckers.put(primaryKey, new HistoryChecker(primaryKey));
+    }
+
+    @Override
+    public Checker witness(int start, int end)
+    {
+        return new Checker()
+        {
+            @Override
+            public void read(int pk, int id, int count, int[] seq)
+            {
+                get(pk).witness(id, seq, start, end);
+            }
+
+            @Override
+            public void write(int pk, int id, boolean success)
+            {
+                get(pk).applied(id, start, end, success);
+            }
+        };
+    }
+
+    @Override
+    public void print(@Nullable Integer pk)
+    {
+        if (pk == null) 
historyCheckers.values().forEach((Consumer<ObjectCursor<HistoryChecker>>) c -> 
c.value.print());
+        else historyCheckers.get(pk).print();
+    }
+
+    private HistoryChecker get(int pk)
+    {
+        HistoryChecker checker = historyCheckers.get(pk);
+        if (checker == null)
+            throw new NullPointerException("Unable to find checker for pk=" + 
pk);
+        return checker;
+    }
+
+    public static class Factory implements HistoryValidator.Factory
+    {
+        public static final Factory instance = new Factory();
+
+        @Override
+        public HistoryValidator create(int[] partitions)
+        {
+            return new LinearizabilityValidator(partitions);
+        }
+    }
+}
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java
new file mode 100644
index 0000000000..b39c3111ea
--- /dev/null
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import java.util.Arrays;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LoggingHistoryValidator implements HistoryValidator
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(LoggingHistoryValidator.class);
+    private final HistoryValidator delegate;
+
+    public LoggingHistoryValidator(HistoryValidator delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    @Override
+    public Checker witness(int start, int end)
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Witness(start=").append(start).append(", 
end=").append(end).append(")\n");
+        Checker sub = delegate.witness(start, end);
+        return new Checker()
+        {
+            @Override
+            public void read(int pk, int id, int count, int[] seq)
+            {
+                sb.append("\tread(pk=").append(pk).append(", 
id=").append(id).append(", count=").append(count).append(", 
seq=").append(Arrays.toString(seq)).append(")\n");
+                sub.read(pk, id, count, seq);
+            }
+
+            @Override
+            public void write(int pk, int id, boolean success)
+            {
+                sb.append("\twrite(pk=").append(pk).append(", 
id=").append(id).append(", success=").append(success).append(")\n");
+                sub.write(pk, id, success);
+            }
+
+            @Override
+            public void close()
+            {
+                logger.info(sb.toString());
+                sub.close();
+            }
+        };
+    }
+
+    @Override
+    public void print(@Nullable Integer pk)
+    {
+        delegate.print(pk);
+    }
+}
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
index 546fd3179f..41eb2c348d 100644
--- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
+++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java
@@ -18,14 +18,16 @@
 
 package org.apache.cassandra.simulator.paxos;
 
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+
 class Observation implements Comparable<Observation>
 {
     final int id;
-    final Object[][] result;
+    final SimpleQueryResult result;
     final int start;
     final int end;
 
-    Observation(int id, Object[][] result, int start, int end)
+    Observation(int id, SimpleQueryResult result, int start, int end)
     {
         this.id = id;
         this.result = result;
@@ -33,6 +35,16 @@ class Observation implements Comparable<Observation>
         this.end = end;
     }
 
+    boolean isSuccess()
+    {
+        return result != null;
+    }
+
+    boolean isUnknownFailure()
+    {
+        return result == null;
+    }
+
     // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many 
pair-wise comparisons the answer is 0
     public int compareTo(Observation that)
     {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
index a9bbf7ca57..7724690bc5 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java
@@ -22,153 +22,51 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.annotation.Nullable;
 
-import com.google.common.collect.ImmutableList;
-import org.apache.commons.lang3.ArrayUtils;
-import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import accord.coordinate.Preempted;
-import accord.coordinate.Timeout;
-import accord.primitives.Txn;
+import com.carrotsearch.hppc.IntArrayList;
+import com.carrotsearch.hppc.IntHashSet;
+import com.carrotsearch.hppc.cursors.IntCursor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.cql3.QueryOptions;
-import org.apache.cassandra.cql3.statements.SelectStatement;
 import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.ComplexColumnData;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
-import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.QueryResults;
-import org.apache.cassandra.exceptions.RequestTimeoutException;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.impl.Query;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.service.accord.AccordService;
-import org.apache.cassandra.service.accord.AccordTestUtils;
-import org.apache.cassandra.service.accord.txn.TxnData;
-import org.apache.cassandra.service.accord.txn.TxnDataName;
+import org.apache.cassandra.simulator.Action;
 import org.apache.cassandra.simulator.Debug;
 import org.apache.cassandra.simulator.RunnableActionScheduler;
 import org.apache.cassandra.simulator.cluster.ClusterActions;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
 import org.apache.cassandra.simulator.utils.IntRange;
 
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY;
 import static org.apache.cassandra.simulator.paxos.HistoryChecker.fail;
-import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+import static org.apache.cassandra.utils.AssertionUtils.hasCause;
+import static org.apache.cassandra.utils.AssertionUtils.isThrowableInstanceof;
 
 // TODO: the class hierarchy is a bit broken, but hard to untangle. Need to go 
Paxos->Consensus, probably.
 @SuppressWarnings("unused")
 public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxosSimulation
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PairOfSequencesAccordSimulation.class);
-    private static final String SELECT = "SELECT pk, count, seq FROM  " + 
KEYSPACE + ".tbl WHERE pk = ?";
-
-    class VerifyingOperation extends Operation
-    {
-        final HistoryChecker historyChecker;
-        public VerifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker 
historyChecker)
-        {
-            super(primaryKey, id, instance, "SELECT", read(primaryKey));
-            this.historyChecker = historyChecker;
-        }
-
-        void verify(Observation outcome)
-        {
-            (outcome.result != null ? successfulReads : 
failedReads).incrementAndGet();
-
-            if (outcome.result == null)
-                return;
-
-            if (outcome.result.length != 1)
-                throw fail(primaryKey, "#result (%s) != 1", 
Arrays.toString(outcome.result));
-
-            Object[] row = outcome.result[0];
-            // first verify internally consistent
-            int count = row[1] == null ? 0 : (Integer) row[1];
-            int[] seq = Arrays.stream((row[2] == null ? "" : (String) 
row[2]).split(","))
-                               .filter(s -> !s.isEmpty())
-                               .mapToInt(Integer::parseInt)
-                               .toArray();
-
-            if (seq.length != count)
-                throw fail(primaryKey, "%d != #%s", count, seq);
-
-            historyChecker.witness(outcome, seq, outcome.start, outcome.end);
-        }
-    }
-
-    private static IIsolatedExecutor.SerializableCallable<Object[][]> read(int 
primaryKey)
-    {
-        return () -> {
-            String cql = "BEGIN TRANSACTION\n" + SELECT + ";\n" + "COMMIT 
TRANSACTION";
-            List<ByteBuffer> values = ImmutableList.of(bytes(primaryKey));
-            Txn txn = AccordTestUtils.createTxn(cql, 
QueryOptions.forInternalCalls(values));
-            // TODO (now): support complex columns
-            return execute(txn, "pk", "count", "seq");
-        };
-    }
-
-    private static IIsolatedExecutor.SerializableCallable<Object[][]> 
write(int id, int primaryKey)
-    {
-        return () -> {
-            String cql = "BEGIN TRANSACTION\n" + 
-                         "    " + SELECT + ";\n" +
-                         "    UPDATE " + KEYSPACE + ".tbl SET seq += '" + id + 
",' WHERE pk = ?;\n" +
-                         "    UPDATE " + KEYSPACE + ".tbl SET count += 1 WHERE 
pk = ?;\n" +
-                         "COMMIT TRANSACTION";
-            List<ByteBuffer> values = ImmutableList.of(bytes(primaryKey), 
bytes(primaryKey), bytes(primaryKey));
-            Txn txn = AccordTestUtils.createTxn(cql, 
QueryOptions.forInternalCalls(values));
-            return execute(txn, "pk", "count", "seq");
-        };
-    }
-
-    private static Object[][] execute(Txn txn, String ... columns)
-    {
-        try
-        {
-            TxnData result = (TxnData) 
AccordService.instance().node.coordinate(txn).get();
-            Assert.assertNotNull(result);
-            QueryResults.Builder builder = QueryResults.builder();
-            boolean addedHeader = false;
-
-            FilteredPartition partition = result.get(TxnDataName.returning());
-            TableMetadata metadata = partition.metadata();
-            builder.columns(columns);
-
-            ByteBuffer[] keyComponents = 
SelectStatement.getComponents(metadata, partition.partitionKey());
-
-            Row s = partition.staticRow();
-            if (!s.isEmpty())
-                append(metadata, keyComponents, s, builder, columns);
-
-            for (Row row : partition)
-                append(metadata, keyComponents, row, builder, columns);
-
-            return builder.build().toObjectArrays();
-        }
-        catch (InterruptedException e)
-        {
-            throw new AssertionError(e);
-        }
-        catch (ExecutionException e)
-        {
-            if (e.getCause() instanceof Preempted)
-                return null;
-            if (e.getCause() instanceof Timeout)
-                return null;
-            if (e.getCause() instanceof RequestTimeoutException)
-                return null;
-            throw new AssertionError(e);
-        }
-    }
+    private static final String SELECT = "SELECT pk, count, seq FROM  " + 
KEYSPACE + ".tbl WHERE pk IN (%s);";
+    private static final String UPDATE = "UPDATE " + KEYSPACE + ".tbl SET 
count += 1, seq = seq + ? WHERE pk = ?;";
 
     private static void append(TableMetadata metadata, ByteBuffer[] 
keyComponents, Row row, QueryResults.Builder builder, String[] columnNames)
     {
@@ -218,38 +116,14 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
         builder.row(buffer);
     }
 
-    class NonVerifyingOperation extends Operation
+    @Override
+    void log(@Nullable Integer pk)
     {
-        public NonVerifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker 
historyChecker)
-        {
-            super(primaryKey, id, instance, "SELECT", read(primaryKey));
-        }
-
-        void verify(Observation outcome)
-        {
-        }
+        validator.print(pk);
     }
 
-    public class ModifyingOperation extends Operation
-    {
-        final HistoryChecker historyChecker;
-        public ModifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, int 
primaryKey, HistoryChecker historyChecker)
-        {
-            super(primaryKey, id, instance, "UPDATE", write(id, primaryKey));
-            this.historyChecker = historyChecker;
-        }
-
-        void verify(Observation outcome)
-        {
-            (outcome.result != null ? successfulWrites : 
failedWrites).incrementAndGet();
-            if (outcome.result != null)
-            {
-                if (outcome.result.length != 1)
-                    throw fail(primaryKey, "Accord Result: 1 != #%s", 
ArrayUtils.toString(outcome.result));
-            }
-            historyChecker.applied(outcome.id, outcome.start, outcome.end, 
outcome.result != null);
-        }
-    }
+    private final float writeRatio;
+    private final HistoryValidator validator;
 
     public PairOfSequencesAccordSimulation(SimulatedSystems simulated,
                                            Cluster cluster,
@@ -266,6 +140,8 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
               scheduler, debug,
               seed, primaryKeys,
               runForNanos, jitter);
+        this.writeRatio = 1F - readRatio;
+        validator = new LoggingHistoryValidator(new 
StrictSerializabilityValidator(primaryKeys));
     }
 
     @Override
@@ -281,21 +157,145 @@ public class PairOfSequencesAccordSimulation extends 
AbstractPairOfSequencesPaxo
     }
 
     @Override
-    Operation verifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    boolean allowMultiplePartitions() { return true; }
+
+    @Override
+    BiFunction<SimulatedSystems, int[], Supplier<Action>> actionFactory()
     {
-        return new VerifyingOperation(operationId, instance, 
serialConsistency, primaryKey, historyChecker);
+        AtomicInteger id = new AtomicInteger(0);
+
+        return (simulated, primaryKeyIndex) -> {
+            int[] primaryKeys = IntStream.of(primaryKeyIndex).map(i -> 
this.primaryKeys[i]).toArray();
+            return () -> accordAction(id.getAndIncrement(), simulated, 
primaryKeys);
+        };
     }
 
-    @Override
-    Operation nonVerifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    public class ReadWriteOperation extends Operation
     {
-        return new NonVerifyingOperation(operationId, instance, 
serialConsistency, primaryKey, historyChecker);
+        private final IntHashSet reads, writes;
+
+        public ReadWriteOperation(int id, int[] primaryKeys, IntHashSet reads, 
IntHashSet writes, IInvokableInstance instance)
+        {
+            super(primaryKeys, id, instance, "Accord", createQuery(id, reads, 
writes));
+            this.reads = reads;
+            this.writes = writes;
+        }
+
+        @Override
+        void verify(Observation outcome)
+        {
+            SimpleQueryResult result = outcome.result;
+            (result != null ? successfulWrites : 
failedWrites).incrementAndGet();
+            if (result != null)
+            {
+                IntHashSet seen = new IntHashSet();
+                //TODO if there isn't a value then we get empty read, which 
then doesn't make it into the QueryResult
+                // given the fact that we always run with the partitions 
defined this should be fine
+                try (HistoryValidator.Checker checker = 
validator.witness(outcome.start, outcome.end))
+                {
+                    while (result.hasNext())
+                    {
+                        org.apache.cassandra.distributed.api.Row row = 
result.next();
+
+                        int pk = row.getInteger("pk");
+                        int count = row.getInteger("count", 0);
+                        int[] seq = Arrays.stream(row.getString("seq", 
"").split(","))
+                                          .filter(s -> !s.isEmpty())
+                                          .mapToInt(Integer::parseInt)
+                                          .toArray();
+
+                        if (!seen.add(pk))
+                            throw new IllegalStateException("Duplicate 
partition key " + pk);
+                        // every partition was read, but not all were written 
to... need to verify each partition
+                        if (seq.length != count)
+                            throw fail(pk, "%d != #%s", count, seq);
+
+                        checker.read(pk, outcome.id, count, seq);
+                    }
+                    if (!seen.equals(reads))
+                        throw fail(0, "#result had %s partitions, but should 
have had %s", seen, reads);
+                    // handle writes
+                    for (IntCursor c : writes)
+                        checker.write(c.value, outcome.id, 
outcome.isSuccess());
+                }
+            }
+        }
     }
 
-    @Override
-    Operation modifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    private Action accordAction(int id, SimulatedSystems simulated, int[] 
partitions)
     {
-        return new ModifyingOperation(operationId, instance, ANY, 
serialConsistency, primaryKey, historyChecker);
+        IntArrayList reads = new IntArrayList();
+        IntArrayList writes = new IntArrayList();
+        for (int partition : partitions)
+        {
+            boolean added = false;
+            if (simulated.random.decide(readRatio))
+            {
+                reads.add(partition);
+                added = true;
+            }
+            if (simulated.random.decide(writeRatio))
+            {
+                writes.add(partition);
+                added = true;
+            }
+            if (!added)
+            {
+                // when read ratio fails that implies write
+                // when write ratio fails that implies read
+                // so make that case a read/write
+                // Its possible that both cases were true leading to a 
read/write; which is fine
+                // this just makes sure every partition is consumed.
+                reads.add(partition);
+                writes.add(partition);
+            }
+        }
+
+        int node = simulated.random.uniform(1, cluster.size() + 1);
+        IInvokableInstance instance = cluster.get(node);
+        return new ReadWriteOperation(id, partitions, new IntHashSet(reads), 
new IntHashSet(writes), instance);
+    }
+
+    private int[] genReadOnly(SimulatedSystems simulated, int[] partitions)
+    {
+        IntArrayList readOnly = new IntArrayList();
+        for (int partition : partitions)
+        {
+            if (simulated.random.decide(readRatio))
+                readOnly.add(partition);
+        }
+        return readOnly.toArray();
+    }
+
+    private static Query createQuery(int id, IntHashSet reads, IntHashSet 
writes)
+    {
+        if (reads.isEmpty() && writes.isEmpty())
+            throw new IllegalArgumentException("Partitions are empty");
+        List<Object> binds = new ArrayList<>();
+        StringBuilder sb = new StringBuilder();
+        sb.append("BEGIN TRANSACTION\n");
+        if (!reads.isEmpty())
+        {
+
+            sb.append("\t")
+              .append(String.format(SELECT, String.join(", ", 
IntStream.of(reads.toArray())
+                                                                       
.mapToObj(i -> {
+                                                                           
binds.add(i);
+                                                                           
return "?";
+                                                                       })
+                                                                       
.collect(Collectors.joining(", ")))))
+              .append('\n');
+        }
+
+        for (IntCursor c : writes)
+        {
+            sb.append('\t').append(UPDATE).append("\n");
+            binds.add(id + ",");
+            binds.add(c.value);
+        }
+
+        sb.append("COMMIT TRANSACTION");
+        return new Query(sb.toString(), 0, ConsistencyLevel.ANY, 
ConsistencyLevel.ANY, binds.toArray(new Object[0]));
     }
 
     @Override
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java
index c1ccb6648e..b07b4a86cd 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java
@@ -18,9 +18,13 @@
 
 package org.apache.cassandra.simulator.paxos;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.BiFunction;
 import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
 
 import org.apache.commons.lang3.ArrayUtils;
 import org.slf4j.Logger;
@@ -29,9 +33,14 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.distributed.impl.Query;
+import org.apache.cassandra.simulator.Action;
+import org.apache.cassandra.simulator.ActionListener;
+import org.apache.cassandra.simulator.Debug;
 import org.apache.cassandra.simulator.RunnableActionScheduler;
 import org.apache.cassandra.simulator.cluster.ClusterActions;
-import org.apache.cassandra.simulator.Debug;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
 import org.apache.cassandra.simulator.utils.IntRange;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -40,6 +49,7 @@ import static java.lang.Boolean.TRUE;
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
 import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY;
+import static org.apache.cassandra.simulator.Debug.EventType.PARTITION;
 import static org.apache.cassandra.simulator.paxos.HistoryChecker.fail;
 
 @SuppressWarnings("unused")
@@ -49,7 +59,7 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
     private static final String UPDATE = "UPDATE " + KEYSPACE + ".tbl SET 
count = count + 1, seq1 = seq1 + ?, seq2 = seq2 + ? WHERE pk = ? IF EXISTS";
     private static final String SELECT = "SELECT pk, count, seq1, seq2 FROM  " 
+ KEYSPACE + ".tbl WHERE pk = ?";
 
-    class VerifyingOperation extends Operation
+    class VerifyingOperation extends PaxosOperation
     {
         final HistoryChecker historyChecker;
         public VerifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker 
historyChecker)
@@ -60,23 +70,26 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
 
         void verify(Observation outcome)
         {
-            (outcome.result != null ? successfulReads : 
failedReads).incrementAndGet();
+            SimpleQueryResult result = outcome.result;
+            (result != null ? successfulReads : failedReads).incrementAndGet();
 
-            if (outcome.result == null)
+            if (result == null)
                 return;
 
-            if (outcome.result.length != 1)
-                throw fail(primaryKey, "#result (%s) != 1", 
Arrays.toString(outcome.result));
+            if (!result.hasNext())
+                throw fail(primaryKey, "#result: ([]) != 1");
+
+            // pk, count, seq1, seq2
+            Row row = result.next();
 
-            Object[] row = outcome.result[0];
             // first verify internally consistent
-            int count = row[1] == null ? 0 : (Integer) row[1];
-            int[] seq1 = Arrays.stream((row[2] == null ? "" : (String) 
row[2]).split(","))
+            int count = row.getInteger("count", 0);
+            int[] seq1 = Arrays.stream(row.getString("seq1", "").split(","))
                                .filter(s -> !s.isEmpty())
                                .mapToInt(Integer::parseInt)
                                .toArray();
-            int[] seq2 = ((List<Integer>) (row[3] == null ? emptyList() : 
row[3]))
-                         .stream().mapToInt(x -> x).toArray();
+
+            int[] seq2 = row.<Integer>getList("seq2", 
emptyList()).stream().mapToInt(x -> x).toArray();
 
             if (!Arrays.equals(seq1, seq2))
                 throw fail(primaryKey, "%s != %s", seq1, seq2);
@@ -84,11 +97,24 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
             if (seq1.length != count)
                 throw fail(primaryKey, "%d != #%s", count, seq1);
 
-            historyChecker.witness(outcome, seq1, outcome.start, outcome.end);
+            if (result.hasNext())
+                throw fail(primaryKey, "#result (%s) != 1", 
ArrayUtils.toString(result.toObjectArrays()));
+
+            historyChecker.witness(outcome, seq1);
+        }
+    }
+
+    private abstract class PaxosOperation extends Operation
+    {
+        final int primaryKey;
+        PaxosOperation(int primaryKey, int id, IInvokableInstance instance, 
String idString, String query, ConsistencyLevel commitConsistency, 
ConsistencyLevel serialConsistency, Object... params)
+        {
+            super(new int[] {primaryKey}, id, instance, idString, new 
Query(query, -1, commitConsistency, serialConsistency, params));
+            this.primaryKey = primaryKey;
         }
     }
 
-    class NonVerifyingOperation extends Operation
+    class NonVerifyingOperation extends PaxosOperation
     {
         public NonVerifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker 
historyChecker)
         {
@@ -100,7 +126,7 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
         }
     }
 
-    public class ModifyingOperation extends Operation
+    public class ModifyingOperation extends PaxosOperation
     {
         final HistoryChecker historyChecker;
         public ModifyingOperation(int id, IInvokableInstance instance, 
ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, int 
primaryKey, HistoryChecker historyChecker)
@@ -111,18 +137,23 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
 
         void verify(Observation outcome)
         {
-            (outcome.result != null ? successfulWrites : 
failedWrites).incrementAndGet();
-            if (outcome.result != null)
+            SimpleQueryResult result = outcome.result;
+            (result != null ? successfulWrites : 
failedWrites).incrementAndGet();
+            if (result != null)
             {
-                if (outcome.result.length != 1)
-                    throw fail(primaryKey, "Paxos Result: 1 != #%s", 
ArrayUtils.toString(outcome.result));
-                if (outcome.result[0][0] != TRUE)
+                if (!result.hasNext())
+                    throw fail(primaryKey, "Paxos Result: 1 != #[]");
+                if (result.next().getBoolean(0) != TRUE)
                     throw fail(primaryKey, "Result != TRUE");
+                if (result.hasNext())
+                    throw fail(primaryKey, "Paxos Result: 1 != #%s", 
ArrayUtils.toString(result.toObjectArrays()));
             }
-            historyChecker.applied(outcome.id, outcome.start, outcome.end, 
outcome.result != null);
+            historyChecker.applied(outcome.id, outcome.start, outcome.end, 
outcome.isSuccess());
         }
     }
 
+    final List<HistoryChecker> historyCheckers = new ArrayList<>();
+
     public PairOfSequencesPaxosSimulation(SimulatedSystems simulated,
                                           Cluster cluster,
                                           ClusterActions.Options 
clusterOptions,
@@ -140,6 +171,84 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
               runForNanos, jitter);
     }
 
+    @Override
+    BiFunction<SimulatedSystems, int[], Supplier<Action>> actionFactory()
+    {
+        final int nodes = cluster.size();
+        for (int primaryKey : primaryKeys)
+            historyCheckers.add(new HistoryChecker(primaryKey));
+
+        List<Supplier<Action>> primaryKeyActions = new ArrayList<>();
+        for (int pki = 0 ; pki < primaryKeys.length ; ++pki)
+        {
+            int primaryKey = primaryKeys[pki];
+            HistoryChecker historyChecker = historyCheckers.get(pki);
+            Supplier<Action> supplier = new Supplier<Action>()
+            {
+                int i = 0;
+
+                @Override
+                public Action get()
+                {
+                    int node = simulated.random.uniform(1, nodes + 1);
+                    IInvokableInstance instance = cluster.get(node);
+                    switch (serialConsistency)
+                    {
+                        default: throw new AssertionError();
+                        case LOCAL_SERIAL:
+                            if (simulated.snitch.dcOf(node) > 0)
+                            {
+                                // perform some queries against these nodes 
but don't expect them to be linearizable
+                                return nonVerifying(i++, instance, primaryKey, 
historyChecker);
+                            }
+                        case SERIAL:
+                            return simulated.random.decide(readRatio)
+                                   ? verifying(i++, instance, primaryKey, 
historyChecker)
+                                   : modifying(i++, instance, primaryKey, 
historyChecker);
+                    }
+                }
+
+                @Override
+                public String toString()
+                {
+                    return Integer.toString(primaryKey);
+                }
+            };
+
+            final ActionListener listener = debug.debug(PARTITION, 
simulated.time, cluster, KEYSPACE, primaryKey);
+            if (listener != null)
+            {
+                Supplier<Action> wrap = supplier;
+                supplier = new Supplier<Action>()
+                {
+                    @Override
+                    public Action get()
+                    {
+                        Action action = wrap.get();
+                        action.register(listener);
+                        return action;
+                    }
+
+                    @Override
+                    public String toString()
+                    {
+                        return wrap.toString();
+                    }
+                };
+            }
+
+            primaryKeyActions.add(supplier);
+        }
+        return (ignore, primaryKeyIndex) -> 
primaryKeyActions.get(only(primaryKeyIndex));
+    }
+
+    private static int only(int[] array)
+    {
+        if (array.length != 1)
+            throw new AssertionError("Require only 1 element but found array " 
+ Arrays.toString(array));
+        return array[0];
+    }
+
     @Override
     protected String createTableStmt()
     {
@@ -152,24 +261,28 @@ public class PairOfSequencesPaxosSimulation extends 
AbstractPairOfSequencesPaxos
         return "INSERT INTO " + KEYSPACE + ".tbl (pk, count, seq1, seq2) 
VALUES (?, 0, '', []) USING TIMESTAMP 0";
     }
 
-    @Override
-    Operation verifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    private Operation verifying(int operationId, IInvokableInstance instance, 
int primaryKey, HistoryChecker historyChecker)
     {
         return new VerifyingOperation(operationId, instance, 
serialConsistency, primaryKey, historyChecker);
     }
 
-    @Override
-    Operation nonVerifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    private Operation nonVerifying(int operationId, IInvokableInstance 
instance, int primaryKey, HistoryChecker historyChecker)
     {
         return new NonVerifyingOperation(operationId, instance, 
serialConsistency, primaryKey, historyChecker);
     }
 
-    @Override
-    Operation modifying(int operationId, IInvokableInstance instance, int 
primaryKey, HistoryChecker historyChecker)
+    private Operation modifying(int operationId, IInvokableInstance instance, 
int primaryKey, HistoryChecker historyChecker)
     {
         return new ModifyingOperation(operationId, instance, ANY, 
serialConsistency, primaryKey, historyChecker);
     }
 
+    @Override
+    void log(@Nullable Integer primaryKey)
+    {
+        if (primaryKey == null) historyCheckers.forEach(HistoryChecker::print);
+        else historyCheckers.stream().filter(h -> h.primaryKey == 
primaryKey).forEach(HistoryChecker::print);
+    }
+
     @Override
     boolean joinAll()
     {
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
index 0568d1e70d..1791b31c9d 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.simulator.paxos;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -26,11 +27,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.LongSupplier;
-
+import java.util.stream.Stream;
 import javax.annotation.Nullable;
 
 import com.google.common.base.Throwables;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -38,10 +38,9 @@ import org.apache.cassandra.concurrent.ExecutorFactory;
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.apache.cassandra.config.CassandraRelevantProperties;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
-import org.apache.cassandra.distributed.impl.Query;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.exceptions.RequestExecutionException;
 import org.apache.cassandra.service.paxos.BallotGenerator;
 import org.apache.cassandra.simulator.ActionList;
@@ -53,38 +52,44 @@ import 
org.apache.cassandra.simulator.cluster.ClusterActionListener;
 import org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods;
 import org.apache.cassandra.simulator.systems.SimulatedActionCallable;
 import org.apache.cassandra.simulator.systems.SimulatedSystems;
+import org.apache.cassandra.utils.AssertionUtils;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.concurrent.Threads;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-import static org.apache.cassandra.simulator.Action.Modifiers.NONE;
 import static org.apache.cassandra.simulator.Action.Modifiers.DISPLAY_ORIGIN;
+import static org.apache.cassandra.simulator.Action.Modifiers.NONE;
 import static org.apache.cassandra.simulator.SimulatorUtils.failWithOOM;
 import static org.apache.cassandra.simulator.paxos.HistoryChecker.causedBy;
+import static org.apache.cassandra.utils.AssertionUtils.anyOf;
+import static org.apache.cassandra.utils.AssertionUtils.hasCause;
+import static org.apache.cassandra.utils.AssertionUtils.isThrowableInstanceof;
 
 public abstract class PaxosSimulation implements Simulation, 
ClusterActionListener
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PaxosSimulation.class);
 
-    abstract class Operation extends SimulatedActionCallable<Object[][]> 
implements BiConsumer<Object[][], Throwable>
+    private static String createDescription(int[] primaryKeys, int id, String 
idString)
     {
-        final int primaryKey;
+        return primaryKeys.length == 1 ? Integer.toString(primaryKeys[0]) : 
Arrays.toString(primaryKeys) + "/" + id + ": " + idString;
+    }
+
+    protected Class<? extends Throwable>[] expectedExceptions()
+    {
+        return (Class<? extends Throwable>[]) new Class<?>[] { 
RequestExecutionException.class };
+    }
+
+    abstract class Operation extends 
SimulatedActionCallable<SimpleQueryResult> implements 
BiConsumer<SimpleQueryResult, Throwable>
+    {
+        final int[] primaryKeys;
         final int id;
         int start;
 
-        public Operation(int primaryKey, int id, IInvokableInstance instance,
-                         String idString, String query, ConsistencyLevel 
commitConsistency, ConsistencyLevel serialConsistency, Object... params)
-        {
-            super(primaryKey + "/" + id + ": " + idString, DISPLAY_ORIGIN, 
NONE, PaxosSimulation.this.simulated, instance, new Query(query, -1, 
commitConsistency, serialConsistency, params));
-            this.primaryKey = primaryKey;
-            this.id = id;
-        }
-
-        public Operation(int primaryKey, int id, IInvokableInstance instance,
-                         String idString, 
IIsolatedExecutor.SerializableCallable<Object[][]> query)
+        public Operation(int[] primaryKeys, int id, IInvokableInstance 
instance,
+                         String idString, 
IIsolatedExecutor.SerializableCallable<SimpleQueryResult> query)
         {
-            super(primaryKey + "/" + id + ": " + idString, DISPLAY_ORIGIN, 
NONE, PaxosSimulation.this.simulated, instance, query);
-            this.primaryKey = primaryKey;
+            super(createDescription(primaryKeys, id, idString), 
DISPLAY_ORIGIN, NONE, PaxosSimulation.this.simulated, instance, query);
+            this.primaryKeys = primaryKeys;
             this.id = id;
         }
 
@@ -95,9 +100,9 @@ public abstract class PaxosSimulation implements Simulation, 
ClusterActionListen
         }
 
         @Override
-        public void accept(Object[][] success, Throwable failure)
+        public void accept(SimpleQueryResult success, Throwable failure)
         {
-            if (failure != null && !(failure instanceof 
RequestExecutionException))
+            if (failure != null && !expectedException(failure))
             {
                 if (!simulated.failures.hasFailure() || !(failure instanceof 
UncheckedInterruptedException))
                     logger.error("Unexpected exception", failure);
@@ -108,10 +113,14 @@ public abstract class PaxosSimulation implements 
Simulation, ClusterActionListen
             {
                 logger.trace("{}", failure.getMessage());
             }
-
             verify(new Observation(id, success, start, 
logicalClock.incrementAndGet()));
         }
 
+        protected boolean expectedException(Throwable failure)
+        {
+            // due to class loaders can't use instanceOf directly
+            return 
hasCause(anyOf(Stream.of(expectedExceptions()).map(AssertionUtils::isThrowableInstanceof))).matches(failure);
+        }
         abstract void verify(Observation outcome);
     }
 
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java
 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java
new file mode 100644
index 0000000000..8469423bb9
--- /dev/null
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import javax.annotation.Nullable;
+
+import accord.verify.StrictSerializabilityVerifier;
+import com.carrotsearch.hppc.IntIntHashMap;
+import com.carrotsearch.hppc.IntIntMap;
+import com.carrotsearch.hppc.cursors.IntIntCursor;
+
+public class StrictSerializabilityValidator implements HistoryValidator
+{
+    private final StrictSerializabilityVerifier verifier;
+    private final IntIntMap pkToIndex;
+    private final int[] indexToPk;
+
+    public StrictSerializabilityValidator(int[] primaryKeys)
+    {
+        this.verifier = new StrictSerializabilityVerifier(primaryKeys.length);
+        pkToIndex = new IntIntHashMap(primaryKeys.length);
+        indexToPk = new int[primaryKeys.length];
+        for (int i = 0; i < primaryKeys.length; i++)
+        {
+            pkToIndex.put(primaryKeys[i], i);
+            indexToPk[i] = primaryKeys[i];
+        }
+    }
+
+    @Override
+    public Checker witness(int start, int end)
+    {
+        verifier.begin();
+        return new Checker()
+        {
+            @Override
+            public void read(int pk, int id, int count, int[] seq)
+            {
+                verifier.witnessRead(get(pk), seq);
+            }
+
+            @Override
+            public void write(int pk, int id, boolean success)
+            {
+                verifier.witnessWrite(get(pk), id);
+            }
+
+            @Override
+            public void close()
+            {
+                convertHistoryViolation(() -> verifier.apply(start, end));
+            }
+        };
+    }
+
+    @Override
+    public void print(@Nullable Integer pk)
+    {
+        if (pk == null) verifier.print();
+        else verifier.print(get(pk));
+    }
+
+    private int get(int pk)
+    {
+        if (pkToIndex.containsKey(pk))
+            return pkToIndex.get(pk);
+        throw new IllegalArgumentException("Unknown pk=" + pk);
+    }
+
+    private void convertHistoryViolation(Runnable fn)
+    {
+        try
+        {
+            fn.run();
+        }
+        catch (accord.verify.HistoryViolation e)
+        {
+            if (!(e.primaryKey() >= 0 && e.primaryKey() < indexToPk.length))  
throw new IllegalArgumentException("Unable to find primary key by index " + 
e.primaryKey());
+            int pk = indexToPk[e.primaryKey()];
+            HistoryViolation v = new HistoryViolation(pk, e.getMessage());
+            v.setStackTrace(e.getStackTrace());
+            throw v;
+        }
+    }
+
+    public static class Factory implements HistoryValidator.Factory
+    {
+        public static final Factory instance = new Factory();
+
+        @Override
+        public HistoryValidator create(int[] partitions)
+        {
+            return new StrictSerializabilityValidator(partitions);
+        }
+    }
+}
diff --git 
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java
 
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java
index d190fd7a15..106cd8c027 100644
--- 
a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java
+++ 
b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java
@@ -20,9 +20,10 @@ package org.apache.cassandra.simulator.systems;
 
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.impl.Query;
 
-public class SimulatedQuery extends SimulatedActionCallable<Object[][]>
+public class SimulatedQuery extends SimulatedActionCallable<SimpleQueryResult>
 {
     public SimulatedQuery(Object description, SimulatedSystems simulated, 
IInvokableInstance instance, String query, ConsistencyLevel commitConsistency, 
ConsistencyLevel serialConsistency, Object... params)
     {
@@ -45,7 +46,7 @@ public class SimulatedQuery extends 
SimulatedActionCallable<Object[][]>
     }
 
     @Override
-    public void accept(Object[][] success, Throwable failure)
+    public void accept(SimpleQueryResult success, Throwable failure)
     {
         if (failure != null)
             simulated.failures.accept(failure);
diff --git 
a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
 
b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
new file mode 100644
index 0000000000..6c773fcca8
--- /dev/null
+++ 
b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.simulator.paxos;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Consumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.junit.Assume;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.carrotsearch.hppc.IntHashSet;
+import com.carrotsearch.hppc.IntIntHashMap;
+import com.carrotsearch.hppc.IntIntMap;
+import com.carrotsearch.hppc.IntSet;
+import org.apache.cassandra.distributed.api.QueryResults;
+import org.apache.cassandra.utils.Clock;
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.Assertions;
+
+import static org.apache.commons.lang3.ArrayUtils.add;
+import static org.apache.commons.lang3.ArrayUtils.swap;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Notes:
+ * * anomalyDirtyRead was left out as Accord doesn't reject requests, so 
without a way to reject or abort
+ *   requests then client doesn't have any way to abserve a REJECT, so all 
issues are UNKNOWN.
+ *
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING) // since Random is used, make 
sure tests run in a determanistic order
+public class HistoryValidatorTest
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(HistoryValidatorTest.class);
+    private static final Random RANDOM = random();
+    private static final int[] PARTITIONS = IntStream.range(0, 10).toArray();
+    private static final int x = 1;
+    private static final int y = 2;
+
+    @Parameterized.Parameters(name = "{0}")
+    public static Collection<Object[]> data()
+    {
+        List<Object[]> tests = new ArrayList<>();
+        tests.add(test(LinearizabilityValidator.Factory.instance));
+        tests.add(test(StrictSerializabilityValidator.Factory.instance));
+        return tests;
+    }
+
+    private static Object[] test(HistoryValidator.Factory factory)
+    {
+        return new Object[]{ factory };
+    }
+
+    private final HistoryValidator.Factory factory;
+
+    public HistoryValidatorTest(HistoryValidator.Factory factory)
+    {
+        this.factory = factory;
+    }
+
+    @Test
+    public void orderingWithWriteTimeout()
+    {
+        IntSet timeoutEvents = set(4, 17, 83);
+        for (boolean reject : Arrays.asList(true, false))
+        {
+            HistoryValidator validator = create();
+
+            int logicalClock = 1;
+            int[] seq = seq();
+            for (int eventId = 0; eventId < 100; eventId++)
+            {
+                if (timeoutEvents.contains(eventId))
+                {
+                    if (!reject)
+                        seq = add(seq, eventId); // wastn't observed, but was 
applied
+                    continue;
+                }
+                single(validator, ob(eventId, ++logicalClock, ++logicalClock), 
1, seq, true);
+                seq = add(seq, eventId); //TODO forgot to add this and 
LinearizabilityValidator was success... should reject!
+            }
+        }
+    }
+
+    /**
+     * This test differs from {@link #orderingWithWriteTimeout} as it defines 
event orders assuming
+     * requests were concurrent, so may happen in different orderings.
+     * <p>
+     * This means that we may see the results out of order, but the 
sequence/count ordering will remain
+     */
+    @Test
+    public void orderingWithWriteTimeoutWithConcurrency()
+    {
+        IntSet timeoutEvents = set(4, 17, 83);
+        for (boolean reject : Arrays.asList(true, false))
+        {
+            HistoryValidator validator = create();
+            // Since the requests are "concurrent" the window in which the 
operations happened are between start=1 and
+            // end=responseOrder.
+            int start = 1;
+            int logicalClock = start;
+
+            // 'ordering' is the order in which the txns are applied
+            // 'indexOrder' is the order in which the events are seen; since 
requests are "concurrent" the order we
+            //   validate may differ from the ordering they were applied.
+            int[] ordering = IntStream.range(0, 100).toArray();
+            if (reject)
+                ordering = IntStream.of(ordering).filter(i -> 
!timeoutEvents.contains(i)).toArray();
+            shuffle(ordering);
+            int[] indexOrder = IntStream.range(0, ordering.length - 
1).toArray();
+            shuffle(indexOrder);
+            for (int i = 0; i < indexOrder.length; i++)
+            {
+                int idx = indexOrder[i];
+                int eventId = ordering[idx];
+                if (timeoutEvents.contains(eventId))
+                    continue;
+                int[] seq = Arrays.copyOf(ordering, idx);
+                single(validator, ob(eventId, start, ++logicalClock), 1, seq, 
true);
+            }
+        }
+    }
+
+    @Test
+    public void anomalyNonMonotonicRead()
+    {
+        // Session1: w[x=10] -> Session2: r[x=10] -> r[x=0]
+        test(dsl -> {
+            dsl.txn(writeOnly(x));
+            dsl.txn(readOnly(x, seq(0)));
+            dsl.failingTxn(readOnly(x, 
seq())).isInstanceOf(HistoryViolation.class);
+        });
+    }
+
+    @Test
+    public void anomalyNonMonotonicWrite()
+    {
+        requiresMultiKeySupport();
+        // Session1: w[x=10] -> w[y=10] -> Session2: r[y=10] -> r[x=0]
+        test(dsl -> {
+            dsl.txn(writeOnly(x));
+            dsl.txn(writeOnly(y));
+            dsl.txn(readOnly(y, seq(1)));
+            dsl.failingTxn(readOnly(x, 
seq())).isInstanceOf(HistoryViolation.class);
+        });
+    }
+
+    @Test
+    public void anomalyNonMonotonicTransaction()
+    {
+        // Session1: r[x=5] -> w[y=10] -> Session2: r[y=10] -> r[x=0]
+        requiresMultiKeySupport();
+        test(dsl -> {
+            dsl.txn(writeOnly(x), writeOnly(y));
+
+            dsl.txn(readOnly(x, seq(0)));
+            dsl.txn(writeOnly(y));
+            dsl.txn(readOnly(y, seq(0, 2)));
+
+            dsl.failingTxn(readOnly(x, 
seq())).isInstanceOf(HistoryViolation.class);
+        });
+    }
+
+    @Test
+    public void anomalyReadYourOwnWrites()
+    {
+        // This test is kinda a duplicate; here just for completness
+        // w[x=12] -> r[x=8]
+        test(dsl -> {
+            dsl.txn(writeOnly(x));
+            dsl.failingTxn(readOnly(x, 
seq())).isInstanceOf(HistoryViolation.class);
+        });
+    }
+
+    //TODO write skew
+    @Test
+    public void anomalyReadSkew()
+    {
+        requiresMultiKeySupport();
+        // two different txn are involved to make this happen
+        // x=0, y=0
+        // U1: starts
+        // U2: starts
+        // U1: r[x=0]
+        // U2: w[x=5], w[y=5]
+        // U2: commit
+        // U1: r[y=5]
+        // U1: commit
+        HistoryValidator validator = create();
+
+        // init
+        txn(validator, ob(0, 1, 2), writeOnly(x), writeOnly(y));
+        int u1 = 1, u1_start = 3, u1_end = 6;
+        int u2 = 2, u2_start = 4, u2_end = 5;
+        txn(validator, ob(u2, u2_start, u2_end), readWrite(x, seq(0)), 
readWrite(y, seq(0)));
+        Assertions.assertThatThrownBy(() -> txn(validator, ob(u1, u1_start, 
u1_end), readWrite(x, seq(0)), readWrite(y, seq(0, u2))))
+                  .isInstanceOf(HistoryViolation.class);
+    }
+
+    @Test
+    public void anomalyWriteSkew()
+    {
+        // two different txn are involved to make this happen
+        // x=0, y=0
+        // U1: starts
+        // U2: starts
+        // U1: r[x=0]
+    }
+
+    @Test
+    public void seenBehavior()
+    {
+        fromLog("Witness(start=4, end=7)\n" +
+                "\tread(pk=121901541, id=2, count=0, seq=[])\n" +
+                "\twrite(pk=121901541, id=2, success=true)\n" +
+
+                "Witness(start=3, end=8)\n" +
+                "\tread(pk=122950117, id=0, count=0, seq=[])\n" +
+                "\twrite(pk=122950117, id=0, success=true)\n" +
+                "\twrite(pk=119804389, id=0, success=true)\n" +
+
+                "Witness(start=5, end=9)\n" +
+                "\tread(pk=121901541, id=3, count=1, seq=[2])\n" +
+                "\twrite(pk=121901541, id=3, success=true)\n" +
+
+                "Witness(start=2, end=10)\n" +
+                "\twrite(pk=122950117, id=1, success=true)\n" +
+                "\twrite(pk=119804389, id=1, success=true)\n" +
+
+                "Witness(start=6, end=11)\n" +
+                "\tread(pk=121901541, id=4, count=2, seq=[2, 3])\n" +
+                "\twrite(pk=121901541, id=4, success=true)\n" +
+
+                "Witness(start=12, end=14)\n" +
+                "\twrite(pk=121901541, id=5, success=true)\n" +
+
+                "Witness(start=13, end=16)\n" +
+                "\tread(pk=119804389, id=6, count=2, seq=[0, 1])\n" +
+                "\twrite(pk=119804389, id=6, success=true)\n" +
+                "\twrite(pk=122950117, id=6, success=true)\n" +
+
+                "Witness(start=15, end=18)\n" +
+                "\tread(pk=121901541, id=7, count=4, seq=[2, 3, 4, 5])\n" +
+                "\twrite(pk=121901541, id=7, success=true)\n" +
+
+                "Witness(start=17, end=20)\n" +
+                "\tread(pk=119804389, id=8, count=3, seq=[0, 1, 6])\n" +
+                "\twrite(pk=119804389, id=8, success=true)\n" +
+                "\twrite(pk=122950117, id=8, success=true)\n" // this 
partition is what triggers
+        );
+    }
+
+    private void requiresMultiKeySupport()
+    {
+        Assume.assumeTrue("Validator " + factory.getClass() + " does not 
support multi-key", factory instanceof StrictSerializabilityValidator.Factory);
+    }
+
+    private int[] shuffle(int[] ordering)
+    {
+        // shuffle array
+        for (int i = ordering.length; i > 1; i--)
+            swap(ordering, i - 1, RANDOM.nextInt(i));
+        return ordering;
+    }
+
+    private static void txn(HistoryValidator validator, Observation ob, 
Event... events)
+    {
+        String type = events.length == 1 ? "single" : "multiple";
+        logger.info("[Validator={}, Observation=({}, {}, {})] Validating {} 
{}}", validator.getClass().getSimpleName(), ob.id, ob.start, ob.end, type, 
events);
+        try (HistoryValidator.Checker check = validator.witness(ob.start, 
ob.end))
+        {
+            for (Event e : events)
+                e.process(ob, check);
+        }
+    }
+
+    private static void single(HistoryValidator validator, Observation ob, int 
pk, int[] seq, boolean hasWrite)
+    {
+        txn(validator, ob, hasWrite ? readWrite(pk, seq) : readOnly(pk, seq));
+    }
+
+    private static Observation ob(int id, int start, int end)
+    {
+        // why empty result?  The users don't actually check the result's 
data, just existence
+        return new Observation(id, QueryResults.empty(), start, end);
+    }
+
+    private static int[] seq(int... seq)
+    {
+        return seq;
+    }
+
+    private HistoryValidator create()
+    {
+        return factory.create(PARTITIONS);
+    }
+
+    private static IntSet set(int... values)
+    {
+        IntSet set = new IntHashSet(values.length);
+        for (int v : values)
+            set.add(v);
+        return set;
+    }
+
+    private static Random random()
+    {
+        long seed = Long.parseLong(System.getProperty("cassandra.test.seed", 
Long.toString(Clock.Global.nanoTime())));
+        logger.info("Random seed={}; set -Dcassandra.test.seed={} while 
reruning the tests to get the same order", seed, seed);
+        return new Random(seed);
+    }
+
+    private static Event readWrite(int pk, int[] seq)
+    {
+        return new Event(EnumSet.of(Event.Type.READ, Event.Type.WRITE), pk, 
seq);
+    }
+
+    private static Event readOnly(int pk, int[] seq)
+    {
+        return new Event(EnumSet.of(Event.Type.READ), pk, seq);
+    }
+
+    private static Event writeOnly(int pk)
+    {
+        return new Event(EnumSet.of(Event.Type.WRITE), pk, null);
+    }
+
+    private void fromLog(String log)
+    {
+        IntSet pks = new IntHashSet();
+        class Read
+        {
+            final int pk, id, count;
+            final int[] seq;
+
+            Read(int pk, int id, int count, int[] seq)
+            {
+                this.pk = pk;
+                this.id = id;
+                this.count = count;
+                this.seq = seq;
+            }
+        }
+        class Write
+        {
+            final int pk, id;
+            final boolean success;
+
+            Write(int pk, int id, boolean success)
+            {
+                this.pk = pk;
+                this.id = id;
+                this.success = success;
+            }
+        }
+        class Witness
+        {
+            final int start, end;
+            final List<Object> actions = new ArrayList<>();
+
+            Witness(int start, int end)
+            {
+                this.start = start;
+                this.end = end;
+            }
+
+            void read(int pk, int id, int count, int[] seq)
+            {
+                actions.add(new Read(pk, id, count, seq));
+            }
+
+            void write(int pk, int id, boolean success)
+            {
+                actions.add(new Write(pk, id, success));
+            }
+
+            void process(HistoryValidator validator)
+            {
+                try (HistoryValidator.Checker check = validator.witness(start, 
end))
+                {
+                    for (Object a : actions)
+                    {
+                        if (a instanceof Read)
+                        {
+                            Read read = (Read) a;
+                            check.read(read.pk, read.id, read.count, read.seq);
+                        }
+                        else
+                        {
+                            Write write = (Write) a;
+                            check.write(write.pk, write.id, write.success);
+                        }
+                    }
+                }
+            }
+        }
+        List<Witness> witnesses = new ArrayList<>();
+        Witness current = null;
+        for (String line : log.split("\n"))
+        {
+            if (line.startsWith("Witness"))
+            {
+                if (current != null)
+                    witnesses.add(current);
+                Matcher matcher = Pattern.compile("Witness\\(start=(.+), 
end=(.+)\\)").matcher(line);
+                if (!matcher.find()) throw new AssertionError("Unable to match 
start/end of " + line);
+                current = new Witness(Integer.parseInt(matcher.group(1)), 
Integer.parseInt(matcher.group(2)));
+            }
+            else if (line.startsWith("\tread"))
+            {
+                Matcher matcher = Pattern.compile("\tread\\(pk=(.+), id=(.+), 
count=(.+), seq=\\[(.*)\\]\\)").matcher(line);
+                if (!matcher.find()) throw new AssertionError("Unable to match 
read of " + line);
+                int pk = Integer.parseInt(matcher.group(1));
+                pks.add(pk);
+                int id = Integer.parseInt(matcher.group(2));
+                int count = Integer.parseInt(matcher.group(3));
+                String seqStr = matcher.group(4);
+                int[] seq = seqStr.isEmpty() ? new int[0] : 
Stream.of(seqStr.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
+                current.read(pk, id, count, seq);
+            }
+            else if (line.startsWith("\twrite"))
+            {
+                Matcher matcher = Pattern.compile("\twrite\\(pk=(.+), id=(.+), 
success=(.+)\\)").matcher(line);
+                if (!matcher.find()) throw new AssertionError("Unable to match 
write of " + line);
+                int pk = Integer.parseInt(matcher.group(1));
+                pks.add(pk);
+                int id = Integer.parseInt(matcher.group(2));
+                boolean success = Boolean.parseBoolean(matcher.group(3));
+                current.write(pk, id, success);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Unknow line: " + line);
+            }
+        }
+        if (current != null)
+            witnesses.add(current);
+        int[] keys = pks.toArray();
+        Arrays.sort(keys);
+        HistoryValidator validator = factory.create(keys);
+        for (Witness w : witnesses)
+            w.process(validator);
+    }
+
+    private static class Event
+    {
+        enum Type
+        {READ, WRITE}
+
+        ;
+        private final EnumSet<Type> types;
+        private final int pk;
+        private final int[] seq;
+
+        private Event(EnumSet<Type> types, int pk, int[] seq)
+        {
+            this.types = types;
+            this.pk = pk;
+            this.seq = seq;
+        }
+
+        private void process(Observation ob, HistoryValidator.Checker check)
+        {
+            if (types.contains(Type.READ))
+                check.read(pk, ob.id, seq.length, seq);
+            if (types.contains(Type.WRITE))
+                check.write(pk, ob.id, ob.isSuccess());
+        }
+    }
+
+    private interface TestDSL
+    {
+        void txn(Event... events);
+
+        AbstractThrowableAssert<?, ? extends Throwable> failingTxn(Event... 
events);
+    }
+
+    private static boolean supportMultiKey(HistoryValidator validator)
+    {
+        return validator instanceof StrictSerializabilityValidator;
+    }
+
+    private void test(Consumer<TestDSL> fn)
+    {
+        HistoryValidator validator = create();
+        boolean global = supportMultiKey(validator);
+        EventIdGen eventIdGen = global ? new AllPks() : new PerPk();
+        TestDSL dsl = new TestDSL()
+        {
+            int logicalClock = 0;
+
+            @Override
+            public void txn(Event... events)
+            {
+                if (global)
+                {
+                    int eventId = eventIdGen.next();
+                    HistoryValidatorTest.txn(validator, ob(eventId, 
++logicalClock, ++logicalClock), events);
+                }
+                else
+                {
+                    for (Event e : events)
+                    {
+                        int eventId = eventIdGen.next(e.pk);
+                        HistoryValidatorTest.txn(validator, ob(eventId, 
++logicalClock, ++logicalClock), e);
+                    }
+                }
+            }
+
+            @Override
+            public AbstractThrowableAssert<?, ? extends Throwable> 
failingTxn(Event... events)
+            {
+                return assertThatThrownBy(() -> txn(events));
+            }
+        };
+        fn.accept(dsl);
+    }
+
+    private interface EventIdGen
+    {
+        int next(int pk);
+
+        int next();
+    }
+
+    private static class PerPk implements EventIdGen
+    {
+        private final IntIntMap map = new IntIntHashMap();
+
+        @Override
+        public int next(int pk)
+        {
+            int next = !map.containsKey(pk) ? 0 : map.get(pk) + 1;
+            map.put(pk, next);
+            return next;
+        }
+
+        @Override
+        public int next()
+        {
+            throw new UnsupportedOperationException("next without pk not 
supported");
+        }
+    }
+
+    private static class AllPks implements EventIdGen
+    {
+        private int value = 0;
+
+        @Override
+        public int next(int pk)
+        {
+            return next();
+        }
+
+        @Override
+        public int next()
+        {
+            return value++;
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
index f61205d374..d1b5cd9038 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.service.accord;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.function.LongSupplier;
@@ -143,11 +144,16 @@ public class AccordTestUtils
         return createTxn(query, QueryOptions.DEFAULT);
     }
 
-    public static Txn createTxn(String cql, QueryOptions options)
+    public static Txn createTxn(String query, List<Object> binds)
     {
-        TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) 
QueryProcessor.parseStatement(cql);
-        Assert.assertNotNull(parsed);
-        TransactionStatement statement = (TransactionStatement) 
parsed.prepare(ClientState.forInternalCalls());
+        TransactionStatement statement = parse(query);
+        QueryOptions options = QueryProcessor.makeInternalOptions(statement, 
binds.toArray(new Object[binds.size()]));
+        return statement.createTxn(ClientState.forInternalCalls(), options);
+    }
+
+    public static Txn createTxn(String query, QueryOptions options)
+    {
+        TransactionStatement statement = parse(query);
         return statement.createTxn(ClientState.forInternalCalls(), options);
     }
 
diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtils.java 
b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
index d5b1981fc1..c122a95315 100644
--- a/test/unit/org/apache/cassandra/utils/AssertionUtils.java
+++ b/test/unit/org/apache/cassandra/utils/AssertionUtils.java
@@ -18,8 +18,11 @@
 
 package org.apache.cassandra.utils;
 
+import java.util.stream.Stream;
+
 import com.google.common.base.Throwables;
 
+import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Condition;
 
 public class AssertionUtils
@@ -28,6 +31,16 @@ public class AssertionUtils
     {
     }
 
+    public static <T> Condition<T> anyOf(Stream<Condition<T>> stream) {
+        Iterable<Condition<T>> it = () -> stream.iterator();
+        return Assertions.anyOf(it);
+    }
+
+    public static Condition<Throwable> anyOfThrowable(Class<? extends 
Throwable>... klasses)
+    {
+        return anyOf(Stream.of(klasses).map(AssertionUtils::isThrowable));
+    }
+
     /**
      * When working with jvm-dtest the thrown error is in a different {@link 
ClassLoader} causing type checks
      * to fail; this method relies on naming instead.
@@ -100,6 +113,11 @@ public class AssertionUtils
         };
     }
 
+    public static Condition<Throwable> isThrowableInstanceof(Class<?> klass)
+    {
+        return (Condition<Throwable>) (Condition<?>) isInstanceof(klass);
+    }
+
     public static Condition<Throwable> rootCause(Condition<Throwable> other)
     {
         return new Condition<Throwable>() {
@@ -119,6 +137,32 @@ public class AssertionUtils
 
     public static Condition<Throwable> rootCauseIs(Class<? extends Throwable> 
klass)
     {
-        return rootCause((Condition<Throwable>) (Condition<?>) is(klass));
+        return rootCause(isThrowable(klass));
+    }
+
+    public static Condition<Throwable> hasCause(Class<? extends Throwable> 
klass)
+    {
+        return hasCause(isThrowable(klass));
+    }
+
+    public static Condition<Throwable> hasCauseAnyOf(Class<? extends 
Throwable>... matchers)
+    {
+        return hasCause(anyOfThrowable(matchers));
+    }
+
+    public static Condition<Throwable> hasCause(Condition<Throwable> matcher)
+    {
+        return new Condition<Throwable>() {
+            @Override
+            public boolean matches(Throwable value)
+            {
+                for (Throwable cause = value; cause != null; cause = 
cause.getCause())
+                {
+                    if (matcher.matches(cause))
+                        return true;
+                }
+                return false;
+            }
+        };
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to