This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit cc6f89124c171d4aea6cd3852000c780f16b7e99 Author: David Capwell <[email protected]> AuthorDate: Thu Jan 12 12:07:54 2023 -0800 CASSANDRA-18154: CEP-15: Enhance returning SELECT to allow partition and clustering IN clauses to return multiple partitions/rows --- .build/cassandra-build-deps-template.xml | 5 + .build/include-accord.sh | 2 +- .build/parent-pom-template.xml | 21 + ide/idea-iml-file.xml | 1 + ide/idea/workspace.xml | 35 +- .../metrics/AccordClientRequestMetrics.java | 37 +- .../cassandra/service/accord/AccordCommand.java | 2 + .../cassandra/service/accord/AccordService.java | 27 + .../cassandra/utils/logging/ClassNameFilter.java | 38 +- test/conf/logback-simulator.xml | 14 + .../org/apache/cassandra/distributed/api/Row.java | 160 ++++++ .../apache/cassandra/distributed/impl/Query.java | 12 +- .../distributed/util/QueryResultUtil.java | 2 + .../org/apache/cassandra/simulator/ActionList.java | 4 + .../AbstractPairOfSequencesPaxosSimulation.java | 201 +++---- .../cassandra/simulator/paxos/HistoryChecker.java | 32 +- .../{Observation.java => HistoryValidator.java} | 41 +- .../simulator/paxos/LinearizabilityValidator.java | 83 +++ .../simulator/paxos/LoggingHistoryValidator.java | 73 +++ .../cassandra/simulator/paxos/Observation.java | 16 +- .../paxos/PairOfSequencesAccordSimulation.java | 310 +++++------ .../paxos/PairOfSequencesPaxosSimulation.java | 165 +++++- .../cassandra/simulator/paxos/PaxosSimulation.java | 53 +- .../paxos/StrictSerializabilityValidator.java | 112 ++++ .../simulator/systems/SimulatedQuery.java | 5 +- .../simulator/paxos/HistoryValidatorTest.java | 592 +++++++++++++++++++++ .../cassandra/service/accord/AccordTestUtils.java | 14 +- .../org/apache/cassandra/utils/AssertionUtils.java | 46 +- 28 files changed, 1724 insertions(+), 379 deletions(-) diff --git a/.build/cassandra-build-deps-template.xml b/.build/cassandra-build-deps-template.xml index 93387996ef..cfd2806fe7 100644 --- a/.build/cassandra-build-deps-template.xml +++ b/.build/cassandra-build-deps-template.xml @@ -127,5 +127,10 @@ <groupId>com.fasterxml.jackson.dataformat</groupId> <artifactId>jackson-dataformat-yaml</artifactId> </dependency> + <dependency> + <groupId>accord</groupId> + <artifactId>accord</artifactId> + <classifier>tests</classifier> + </dependency> </dependencies> </project> diff --git a/.build/include-accord.sh b/.build/include-accord.sh index 2144e80b17..eabe4d204b 100755 --- a/.build/include-accord.sh +++ b/.build/include-accord.sh @@ -25,7 +25,7 @@ set -o nounset bin="$(cd "$(dirname "$0")" > /dev/null; pwd)" accord_repo='https://github.com/apache/cassandra-accord.git' -accord_branch='ad326d5df8d99d4799fa87de81482e3cb1fb92de' +accord_branch='5626c7c11400d4cf6d01a8e22517b53a83f5c512' accord_src="$bin/cassandra-accord" checkout() { diff --git a/.build/parent-pom-template.xml b/.build/parent-pom-template.xml index c582933234..218065968d 100644 --- a/.build/parent-pom-template.xml +++ b/.build/parent-pom-template.xml @@ -679,6 +679,27 @@ <artifactId>accord</artifactId> <version>1.0-SNAPSHOT</version> </dependency> + <dependency> + <groupId>accord</groupId> + <artifactId>accord</artifactId> + <version>1.0-SNAPSHOT</version> + <classifier>tests</classifier> + <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>org.junit.jupiter</artifactId> + <groupId>junit-jupiter-api</groupId> + </exclusion> + <exclusion> + <artifactId>org.junit.jupiter</artifactId> + <groupId>junit-jupiter-engine</groupId> + </exclusion> + <exclusion> + <artifactId>ch.qos.logback</artifactId> + <groupId>logback-classic</groupId> + </exclusion> + </exclusions> + </dependency> <dependency> <groupId>io.airlift</groupId> <artifactId>airline</artifactId> diff --git a/ide/idea-iml-file.xml b/ide/idea-iml-file.xml index c0b55842fa..4c529892a4 100644 --- a/ide/idea-iml-file.xml +++ b/ide/idea-iml-file.xml @@ -39,6 +39,7 @@ <sourceFolder url="file://$MODULE_DIR$/test/simulator/asm" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/simulator/bootstrap" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/simulator/main" isTestSource="true" /> + <sourceFolder url="file://$MODULE_DIR$/test/simulator/test" isTestSource="true" /> <sourceFolder url="file://$MODULE_DIR$/test/resources" type="java-test-resource" /> <sourceFolder url="file://$MODULE_DIR$/test/conf" type="java-test-resource" /> <excludeFolder url="file://$MODULE_DIR$/.idea" /> diff --git a/ide/idea/workspace.xml b/ide/idea/workspace.xml index a89921cc77..9248a8f6c9 100644 --- a/ide/idea/workspace.xml +++ b/ide/idea/workspace.xml @@ -167,7 +167,40 @@ <option name="MAIN_CLASS_NAME" value="" /> <option name="METHOD_NAME" value="" /> <option name="TEST_OBJECT" value="class" /> - <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables -Dcassandra.ring_delay_ms=1000 -Dcassandra.skip_sync=true -ea -XX:MaxMet [...] + <option name="VM_PARAMETERS" value=" + -ea + -Djava.library.path=$PROJECT_DIR$/lib/sigar-bin + -Djava.security.egd=file:/dev/urandom + -XX:-BackgroundCompilation + -XX:-TieredCompilation + -XX:ActiveProcessorCount=4 + -XX:CICompilerCount=1 + -XX:HeapDumpPath=build/test + -XX:MaxMetaspaceSize=384M + -XX:ReservedCodeCacheSize=256M + -XX:SoftRefLRUPolicyMSPerMB=0 + -XX:Tier4CompileThreshold=1000 + -Xbootclasspath/a:$PROJECT_DIR$/build/test/lib/jars/simulator-bootstrap.jar + -Xmx8G + -Xss384k + -javaagent:$PROJECT_DIR$/build/test/lib/jars/simulator-asm.jar + + -Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml + -Dcassandra.debugrefcount=false + -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs + -Dcassandra.memtable_row_overhead_computation_step=100 + -Dcassandra.reads.thresholds.coordinator.defensive_checks_enabled=true + -Dcassandra.ring_delay_ms=10000 + -Dcassandra.skip_sync=true + -Dcassandra.strict.runtime.checks=true + -Dcassandra.test.simulator.determinismcheck=strict + -Dcassandra.test.sstableformatdevelopment=true + -Dcassandra.tolerate_sstable_size=true + -Dinvalid-legacy-sstable-root=$PROJECT_DIR$/test/data/invalid-legacy-sstables + -Dlegacy-sstable-root=$PROJECT_DIR$/test/data/legacy-sstables + -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml + -Dmigration-sstable-root=$PROJECT_DIR$/test/data/migration-sstables + " /> <option name="PARAMETERS" value="" /> <fork_mode value="class" /> <option name="WORKING_DIRECTORY" value="" /> diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java b/src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java similarity index 52% copy from test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java copy to src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java index 546fd3179f..c95c3bd11f 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java +++ b/src/java/org/apache/cassandra/metrics/AccordClientRequestMetrics.java @@ -16,30 +16,31 @@ * limitations under the License. */ -package org.apache.cassandra.simulator.paxos; +package org.apache.cassandra.metrics; -class Observation implements Comparable<Observation> +import com.codahale.metrics.Histogram; +import com.codahale.metrics.Meter; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +public class AccordClientRequestMetrics extends ClientRequestMetrics { - final int id; - final Object[][] result; - final int start; - final int end; + public final Meter preempts; + public final Histogram keySize; - Observation(int id, Object[][] result, int start, int end) + public AccordClientRequestMetrics(String scope) { - this.id = id; - this.result = result; - this.start = start; - this.end = end; + super(scope); + + preempts = Metrics.meter(factory.createMetricName("Preempts")); + keySize = Metrics.histogram(factory.createMetricName("KeySizeHistogram"), false); } - // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many pair-wise comparisons the answer is 0 - public int compareTo(Observation that) + @Override + public void release() { - if (this.end < that.start) - return -1; - if (that.end < this.start) - return 1; - return 0; + super.release(); + Metrics.remove(factory.createMetricName("Preempts")); + Metrics.remove(factory.createMetricName("KeySizeHistogram")); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java index 5b4a8c2e9b..8020b29ac7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java @@ -706,6 +706,8 @@ public class AccordCommand extends Command implements AccordState<TxnId> if (listener instanceof AccordCommandsForKey) return new ListenerProxy.CommandsForKeyListenerProxy(((AccordCommandsForKey) listener).key()); + //TODO - Support accord.messages.Defer + throw new RuntimeException("Unhandled non-transient listener: " + listener); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 0135c741c3..5fee18e7fe 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException; import com.google.common.annotations.VisibleForTesting; import accord.api.Result; +import accord.coordinate.Preempted; import accord.coordinate.Timeout; import accord.impl.SimpleProgressLog; import accord.impl.SizeOfIntersectionSorter; @@ -40,6 +41,7 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.WriteType; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.metrics.AccordClientRequestMetrics; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.service.accord.api.AccordAgent; import org.apache.cassandra.service.accord.api.AccordRoutingKey.KeyspaceSplitter; @@ -53,9 +55,13 @@ import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps; import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner; +import static org.apache.cassandra.utils.Clock.Global.nanoTime; public class AccordService implements Shutdownable { + public static final AccordClientRequestMetrics readMetrics = new AccordClientRequestMetrics("AccordRead"); + public static final AccordClientRequestMetrics writeMetrics = new AccordClientRequestMetrics("AccordWrite"); + public final Node node; private final Shutdownable nodeShutdown; private final AccordMessageSink messageSink; @@ -122,8 +128,11 @@ public class AccordService implements Shutdownable */ public TxnData coordinate(Txn txn, ConsistencyLevel consistencyLevel) { + AccordClientRequestMetrics metrics = txn.isWrite() ? writeMetrics : readMetrics; + final long startNanos = nanoTime(); try { + metrics.keySize.update(txn.keys().size()); Future<Result> future = node.coordinate(txn); Result result = future.get(DatabaseDescriptor.getTransactionTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); return (TxnData) result; @@ -132,17 +141,35 @@ public class AccordService implements Shutdownable { Throwable cause = e.getCause(); if (cause instanceof Timeout) + { + metrics.timeouts.mark(); + throw throwTimeout(txn, consistencyLevel); + } + if (cause instanceof Preempted) + { + metrics.preempts.mark(); + //TODO need to improve + // Coordinator "could" query the accord state to see whats going on but that doesn't exist yet. + // Protocol also doesn't have a way to denote "unknown" outcome, so using a timeout as the closest match throw throwTimeout(txn, consistencyLevel); + } + metrics.failures.mark(); throw new RuntimeException(cause); } catch (InterruptedException e) { + metrics.failures.mark(); throw new UncheckedInterruptedException(e); } catch (TimeoutException e) { + metrics.timeouts.mark(); throw throwTimeout(txn, consistencyLevel); } + finally + { + metrics.addNano(nanoTime() - startNanos); + } } private static RuntimeException throwTimeout(Txn txn, ConsistencyLevel consistencyLevel) diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java b/src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java similarity index 54% copy from test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java copy to src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java index 546fd3179f..ef0ca2c5ca 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java +++ b/src/java/org/apache/cassandra/utils/logging/ClassNameFilter.java @@ -16,30 +16,32 @@ * limitations under the License. */ -package org.apache.cassandra.simulator.paxos; +package org.apache.cassandra.utils.logging; -class Observation implements Comparable<Observation> +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.filter.AbstractMatcherFilter; +import ch.qos.logback.core.spi.FilterReply; + +public class ClassNameFilter extends AbstractMatcherFilter<ILoggingEvent> { - final int id; - final Object[][] result; - final int start; - final int end; + String loggerName; + + public void setLoggerName(String loggerName) + { + this.loggerName = loggerName; + } - Observation(int id, Object[][] result, int start, int end) + @Override + public FilterReply decide(ILoggingEvent event) { - this.id = id; - this.result = result; - this.start = start; - this.end = end; + if (!isStarted()) return FilterReply.NEUTRAL; + if (event.getLoggerName().equals(loggerName)) return onMatch; + return onMismatch; } - // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many pair-wise comparisons the answer is 0 - public int compareTo(Observation that) + @Override + public void start() { - if (this.end < that.start) - return -1; - if (that.end < this.start) - return 1; - return 0; + if (loggerName != null) super.start(); } } diff --git a/test/conf/logback-simulator.xml b/test/conf/logback-simulator.xml index 3566779ea4..69def30afb 100644 --- a/test/conf/logback-simulator.xml +++ b/test/conf/logback-simulator.xml @@ -24,6 +24,19 @@ <!-- Shutdown hook ensures that async appender flushes --> <shutdownHook class="ch.qos.logback.core.hook.DelayingShutdownHook"/> + <appender name="HISTORYLOG" class="ch.qos.logback.core.FileAppender"> + <file>./build/test/logs/simulator/${run_start}-${run_seed}/history.log</file> + <encoder> + <pattern>%msg%n</pattern> + </encoder> + <immediateFlush>true</immediateFlush> + <filter class="org.apache.cassandra.utils.logging.ClassNameFilter"> + <loggerName>org.apache.cassandra.simulator.paxos.LoggingHistoryValidator</loggerName> + <onMatch>ACCEPT</onMatch> + <onMismatch>DENY</onMismatch> + </filter> + </appender> + <appender name="INSTANCEFILE" class="ch.qos.logback.core.FileAppender"> <file>./build/test/logs/simulator/${run_start}-${run_seed}/${instance_id}/system.log</file> <encoder> @@ -56,6 +69,7 @@ <root level="INFO"> <appender-ref ref="INSTANCEFILE" /> <appender-ref ref="STDOUT" /> + <appender-ref ref="HISTORYLOG" /> </root> </configuration> diff --git a/test/distributed/org/apache/cassandra/distributed/api/Row.java b/test/distributed/org/apache/cassandra/distributed/api/Row.java index 33272ed3d5..556a5716c0 100644 --- a/test/distributed/org/apache/cassandra/distributed/api/Row.java +++ b/test/distributed/org/apache/cassandra/distributed/api/Row.java @@ -21,6 +21,7 @@ package org.apache.cassandra.distributed.api; import java.util.Arrays; import java.util.Date; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.NoSuchElementException; import java.util.Objects; @@ -85,6 +86,17 @@ public class Row return (T) results[index]; } + public <T> T get(int index, T defaultValue) + { + checkAccess(); + if (index < 0 || index >= results.length) + throw new NoSuchElementException("by index: " + index); + T result = (T) results[index]; + if (result == null) + return defaultValue; + return result; + } + public <T> T get(String name) { checkAccess(); @@ -94,66 +106,158 @@ public class Row return (T) results[idx]; } + public <T> T get(String name, T defaultValue) + { + checkAccess(); + int idx = findIndex(name); + if (idx == NOT_FOUND) + throw new NoSuchElementException("by name: " + name); + T result = (T) results[idx]; + if (result == null) + return defaultValue; + return result; + } + + public Boolean getBoolean(int index) + { + return get(index); + } + + public Boolean getBoolean(int index, Boolean defaultValue) + { + return get(index, defaultValue); + } + + public Boolean getBoolean(String name) + { + return get(name); + } + + public Boolean getBoolean(String name, Boolean defaultValue) + { + return get(name, defaultValue); + } + public Short getShort(int index) { return get(index); } + public Short getShort(int index, Short defaultValue) + { + return get(index, defaultValue); + } + public Short getShort(String name) { return get(name); } + public Short getShort(String name, Short defaultValue) + { + return get(name, defaultValue); + } + public Integer getInteger(int index) { return get(index); } + public Integer getInteger(int index, Integer defaultValue) + { + return get(index, defaultValue); + } + public Integer getInteger(String name) { return get(name); } + public Integer getInteger(String name, Integer defaultValue) + { + return get(name, defaultValue); + } + public Long getLong(int index) { return get(index); } + public Long getLong(int index, Long defaultValue) + { + return get(index, defaultValue); + } + public Long getLong(String name) { return get(name); } + public Long getLong(String name, Long defaultValue) + { + return get(name, defaultValue); + } + public Float getFloat(int index) { return get(index); } + public Float getFloat(int index, Float defaultValue) + { + return get(index, defaultValue); + } + public Float getFloat(String name) { return get(name); } + public Float getFloat(String name, Float defaultValue) + { + return get(name, defaultValue); + } + public Double getDouble(int index) { return get(index); } + public Double getDouble(int index, Double defaultValue) + { + return get(index, defaultValue); + } + public Double getDouble(String name) { return get(name); } + public Double getDouble(String name, Double defaultValue) + { + return get(name, defaultValue); + } + public String getString(int index) { return get(index); } + public String getString(int index, String defaultValue) + { + return get(index, defaultValue); + } + public String getString(String name) { return get(name); } + public String getString(String name, String defaultValue) + { + return get(name, defaultValue); + } + public UUID getUUID(int index) { Object uuid = get(index); @@ -162,6 +266,14 @@ public class Row return (UUID) uuid; } + public UUID getUUID(int index, UUID defaultValue) + { + Object uuid = get(index, defaultValue); + if (uuid instanceof TimeUUID) + return ((TimeUUID) uuid).asUUID(); + return (UUID) uuid; + } + public UUID getUUID(String name) { Object uuid = get(name); @@ -170,26 +282,74 @@ public class Row return (UUID) uuid; } + public UUID getUUID(String name, UUID defaultValue) + { + Object uuid = get(name, defaultValue); + if (uuid instanceof TimeUUID) + return ((TimeUUID) uuid).asUUID(); + return (UUID) uuid; + } + public Date getTimestamp(int index) { return get(index); } + public Date getTimestamp(int index, Date defaultValue) + { + return get(index, defaultValue); + } + public Date getTimestamp(String name) { return get(name); } + public Date getTimestamp(String name, Date defaultValue) + { + return get(name, defaultValue); + } + public <T> Set<T> getSet(int index) { return get(index); } + public <T> Set<T> getSet(int index, Set<T> defaultValue) + { + return get(index, defaultValue); + } + public <T> Set<T> getSet(String name) { return get(name); } + public <T> Set<T> getSet(String name, Set<T> defaultValue) + { + return get(name, defaultValue); + } + + public <T> List<T> getList(int index) + { + return get(index); + } + + public <T> List<T> getList(int index, List<T> defaultValue) + { + return get(index, defaultValue); + } + + public <T> List<T> getList(String name) + { + return get(name); + } + + public <T> List<T> getList(String name, List<T> defaultValue) + { + return get(name, defaultValue); + } + /** * Get the row as a array. */ diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Query.java b/test/distributed/org/apache/cassandra/distributed/impl/Query.java index 823113f857..f40ebca904 100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/Query.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Query.java @@ -27,6 +27,7 @@ import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; @@ -37,15 +38,15 @@ import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.utils.Clock.Global.nanoTime; -public class Query implements IIsolatedExecutor.SerializableCallable<Object[][]> +public class Query implements IIsolatedExecutor.SerializableCallable<SimpleQueryResult> { private static final long serialVersionUID = 1L; - final String query; + public final String query; final long timestamp; final org.apache.cassandra.distributed.api.ConsistencyLevel commitConsistencyOrigin; final org.apache.cassandra.distributed.api.ConsistencyLevel serialConsistencyOrigin; - final Object[] boundValues; + public final Object[] boundValues; public Query(String query, long timestamp, org.apache.cassandra.distributed.api.ConsistencyLevel commitConsistencyOrigin, org.apache.cassandra.distributed.api.ConsistencyLevel serialConsistencyOrigin, Object[] boundValues) { @@ -56,7 +57,8 @@ public class Query implements IIsolatedExecutor.SerializableCallable<Object[][]> this.boundValues = boundValues; } - public Object[][] call() + @Override + public SimpleQueryResult call() { ConsistencyLevel commitConsistency = toCassandraCL(commitConsistencyOrigin); ConsistencyLevel serialConsistency = serialConsistencyOrigin == null ? null : toCassandraCL(serialConsistencyOrigin); @@ -89,7 +91,7 @@ public class Query implements IIsolatedExecutor.SerializableCallable<Object[][]> if (res != null) res.setWarnings(ClientWarn.instance.getWarnings()); - return RowUtil.toQueryResult(res).toObjectArrays(); + return RowUtil.toQueryResult(res); } public String toString() diff --git a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java index 9d08f0a567..a9cfbb16a8 100644 --- a/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java +++ b/test/distributed/org/apache/cassandra/distributed/util/QueryResultUtil.java @@ -117,6 +117,7 @@ public class QueryResultUtil { StringBuilder sb = new StringBuilder(); int rowNum = 1; + qr.mark(); while (qr.hasNext()) { sb.append("@ Row ").append(rowNum).append('\n'); @@ -129,6 +130,7 @@ public class QueryResultUtil } sb.append(table); } + qr.reset(); return sb.toString(); } diff --git a/test/simulator/main/org/apache/cassandra/simulator/ActionList.java b/test/simulator/main/org/apache/cassandra/simulator/ActionList.java index 64474e6184..2357684043 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/ActionList.java +++ b/test/simulator/main/org/apache/cassandra/simulator/ActionList.java @@ -41,6 +41,10 @@ public class ActionList extends AbstractCollection<Action> public static ActionList empty() { return EMPTY; } public static ActionList of(Action action) { return new ActionList(new Action[] { action }); } public static ActionList of(Stream<Action> action) { return new ActionList(action.toArray(Action[]::new)); } + public static ActionList of(Stream<Action> action, Stream<Action>... actions) + { + return new ActionList(Stream.concat(action, Stream.of(actions).flatMap(a -> a)).toArray(Action[]::new)); + } public static ActionList of(Collection<Action> actions) { return actions.isEmpty() ? EMPTY : new ActionList(actions.toArray(new Action[0])); } public static ActionList of(Action ... actions) { return new ActionList(actions); } diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java index 54a871bb45..5a528468ea 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/AbstractPairOfSequencesPaxosSimulation.java @@ -22,35 +22,43 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; -import javax.annotation.Nullable; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.distributed.api.LogResult; +import org.apache.cassandra.distributed.impl.FileLogAction; import org.apache.cassandra.distributed.impl.Instance; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.simulator.Action; import org.apache.cassandra.simulator.ActionList; -import org.apache.cassandra.simulator.ActionListener; import org.apache.cassandra.simulator.ActionPlan; import org.apache.cassandra.simulator.Actions; import org.apache.cassandra.simulator.Debug; +import org.apache.cassandra.simulator.RandomSource; import org.apache.cassandra.simulator.RunnableActionScheduler; import org.apache.cassandra.simulator.cluster.ClusterActions; import org.apache.cassandra.simulator.cluster.KeyspaceActions; +import org.apache.cassandra.simulator.logging.RunStartDefiner; +import org.apache.cassandra.simulator.logging.SeedDefiner; import org.apache.cassandra.simulator.systems.SimulatedActionTask; import org.apache.cassandra.simulator.systems.SimulatedSystems; import org.apache.cassandra.simulator.utils.IntRange; +import org.apache.cassandra.utils.Pair; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.SECONDS; @@ -59,7 +67,6 @@ import static org.apache.cassandra.simulator.Action.Modifiers.RELIABLE_NO_TIMEOU import static org.apache.cassandra.simulator.ActionSchedule.Mode.STREAM_LIMITED; import static org.apache.cassandra.simulator.ActionSchedule.Mode.TIME_AND_STREAM_LIMITED; import static org.apache.cassandra.simulator.ActionSchedule.Mode.TIME_LIMITED; -import static org.apache.cassandra.simulator.Debug.EventType.PARTITION; @SuppressWarnings("unused") abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation @@ -77,7 +84,6 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation final IntRange simulateKeyForSeconds; final ConsistencyLevel serialConsistency; final Debug debug; - final List<HistoryChecker> historyCheckers = new ArrayList<>(); final AtomicInteger successfulReads = new AtomicInteger(); final AtomicInteger successfulWrites = new AtomicInteger(); final AtomicInteger failedReads = new AtomicInteger(); @@ -112,10 +118,71 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation protected abstract String preInsertStmt(); - abstract Operation verifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker); - abstract Operation nonVerifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker); - abstract Operation modifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker); abstract boolean joinAll(); + boolean allowMultiplePartitions() { return false; } + + abstract BiFunction<SimulatedSystems, int[], Supplier<Action>> actionFactory(); + + protected Action checkErrorLogs(IInvokableInstance inst) + { + DatabaseDescriptor.clientInitialization(); + return new Action("Error logs for node" + inst.config().num(), Action.Modifiers.NONE) + { + @Override + protected ActionList performSimple() + { + // can't use inst.logs as that runs in the class loader, which uses in-memory file system + String suite = new RunStartDefiner().getPropertyValue() + "-" + new SeedDefiner().getPropertyValue(); + String instanceId = "node" + inst.config().num(); + File logFile = new File(String.format("build/test/logs/simulator/%s/%s/system.log", suite, instanceId)); + FileLogAction logs = new FileLogAction(logFile); + + LogResult<List<String>> errors = logs.grepForErrors(); + if (!errors.getResult().isEmpty()) + { + List<Pair<String, String>> errorsSeen = new ArrayList<>(); + for (String error : errors.getResult()) + { + for (String line : error.split("\\n")) + { + line = line.trim(); + if (line.startsWith("ERROR")) continue; + if (line.startsWith("at ")) continue; + errorsSeen.add(Pair.create(line.split(":")[0], error)); + break; + } + } + Class<? extends Throwable>[] expected = expectedExceptions(); + StringBuilder sb = new StringBuilder(); + for (Pair<String, String> pair : errorsSeen) + { + String name = pair.left; + String exception = pair.right; + Class<?> klass; + try + { + klass = Class.forName(name); + } + catch (ClassNotFoundException e) + { + throw new RuntimeException(e); + } + + if (!Stream.of(expected).anyMatch(e -> e.isAssignableFrom(klass))) + sb.append("Unexpected exception:\n").append(exception).append('\n'); + } + if (sb.length() > 0) + { + AssertionError error = new AssertionError("Saw errors in node" + inst.config().num() + ": " + sb); + // this stacktrace isn't helpful, can be more confusing + error.setStackTrace(new StackTraceElement[0]); + throw error; + } + } + return ActionList.empty(); + } + }; + } public ActionPlan plan() { @@ -124,101 +191,27 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation plan = plan.encapsulate(ActionPlan.setUpTearDown( ActionList.of( - cluster.stream().map(i -> simulated.run("Insert Partitions", i, executeForPrimaryKeys(preInsertStmt(), primaryKeys))) - ).andThen( + cluster.stream().map(i -> simulated.run("Insert Partitions", i, executeForPrimaryKeys(preInsertStmt(), primaryKeys))), // TODO (now): this is temporary until we have correct epoch handling - ActionList.of( - cluster.stream().map(i -> simulated.run("Create Accord Epoch", i, () -> AccordService.instance().createEpochFromConfigUnsafe())) - ) -// ).andThen( -// // TODO (now): this is temporary until we have parameterisation of simulation -// ActionList.of( -// cluster.stream().map(i -> simulated.run("Disable Accord Cache", i, () -> AccordService.instance.setCacheSize(0))) -// ) + cluster.stream().map(i -> simulated.run("Create Accord Epoch", i, () -> AccordService.instance().createEpochFromConfigUnsafe())) ), ActionList.of( + cluster.stream().map(i -> checkErrorLogs(i)), cluster.stream().map(i -> SimulatedActionTask.unsafeTask("Shutdown " + i.broadcastAddress(), RELIABLE, RELIABLE_NO_TIMEOUTS, simulated, i, i::shutdown)) ) )); - final int nodes = cluster.size(); - for (int primaryKey : primaryKeys) - historyCheckers.add(new HistoryChecker(primaryKey)); - - List<Supplier<Action>> primaryKeyActions = new ArrayList<>(); - for (int pki = 0 ; pki < primaryKeys.length ; ++pki) - { - int primaryKey = primaryKeys[pki]; - HistoryChecker historyChecker = historyCheckers.get(pki); - Supplier<Action> supplier = new Supplier<Action>() - { - int i = 0; - - @Override - public Action get() - { - int node = simulated.random.uniform(1, nodes + 1); - IInvokableInstance instance = cluster.get(node); - switch (serialConsistency) - { - default: throw new AssertionError(); - case LOCAL_SERIAL: - if (simulated.snitch.dcOf(node) > 0) - { - // perform some queries against these nodes but don't expect them to be linearizable - return nonVerifying(i++, instance, primaryKey, historyChecker); - } - case SERIAL: - return simulated.random.decide(readRatio) - ? verifying(i++, instance, primaryKey, historyChecker) - : modifying(i++, instance, primaryKey, historyChecker); - } - } - - @Override - public String toString() - { - return Integer.toString(primaryKey); - } - }; - - final ActionListener listener = debug.debug(PARTITION, simulated.time, cluster, KEYSPACE, primaryKey); - if (listener != null) - { - Supplier<Action> wrap = supplier; - supplier = new Supplier<Action>() - { - @Override - public Action get() - { - Action action = wrap.get(); - action.register(listener); - return action; - } - - @Override - public String toString() - { - return wrap.toString(); - } - }; - } - - primaryKeyActions.add(supplier); - } + BiFunction<SimulatedSystems, int[], Supplier<Action>> factory = actionFactory(); List<Integer> available = IntStream.range(0, primaryKeys.length).boxed().collect(Collectors.toList()); Action stream = Actions.infiniteStream(concurrency, new Supplier<Action>() { @Override public Action get() { - int i = simulated.random.uniform(0, available.size()); - int next = available.get(i); - available.set(i, available.get(available.size() - 1)); - available.remove(available.size() - 1); + int[] primaryKeyIndex = consume(simulated.random, available); long untilNanos = simulated.time.nanoTime() + SECONDS.toNanos(simulateKeyForSeconds.select(simulated.random)); int concurrency = withinKeyConcurrency.select(simulated.random); - Supplier<Action> supplier = primaryKeyActions.get(next); + Supplier<Action> supplier = factory.apply(simulated, primaryKeyIndex); // while this stream is finite, it participates in an infinite stream via its parent, so we want to permit termination while it's running return Actions.infiniteStream(concurrency, new Supplier<Action>() { @@ -227,7 +220,7 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation { if (simulated.time.nanoTime() >= untilNanos) { - available.add(next); + IntStream.of(primaryKeyIndex).boxed().forEach(available::add); return null; } return supplier.get(); @@ -253,7 +246,30 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation .encapsulate(ActionPlan.interleave(singletonList(ActionList.of(stream)))); } - private IIsolatedExecutor.SerializableRunnable executeForPrimaryKeys(String cql, int[] primaryKeys) + private int[] consume(RandomSource random, List<Integer> available) + { + if (available.isEmpty()) + throw new AssertionError("available partitions are empty!"); + int numPartitions = available.size() == 1 || !allowMultiplePartitions() ? 1 : random.uniform(1, available.size()); + int[] partitions = new int[numPartitions]; + for (int counter = 0; counter < numPartitions; counter++) + { + int idx = random.uniform(0, available.size()); + int next = available.get(idx); + int last = available.get(available.size() - 1); + if (available.set(idx, last) != next) + throw new IllegalStateException("Expected to set " + last + " index " + idx + " but did not return " + next); + int removed = available.remove(available.size() - 1); + if (last != removed) + throw new IllegalStateException("Expected to remove " + last + " but removed " + removed); + + partitions[counter] = next; + } + Arrays.sort(partitions); + return partitions; + } + + IIsolatedExecutor.SerializableRunnable executeForPrimaryKeys(String cql, int[] primaryKeys) { return () -> { for (int primaryKey : primaryKeys) @@ -273,13 +289,6 @@ abstract class AbstractPairOfSequencesPaxosSimulation extends PaxosSimulation return new PaxosRepairValidator(cluster, KEYSPACE, TABLE, id); } - @Override - void log(@Nullable Integer primaryKey) - { - if (primaryKey == null) historyCheckers.forEach(HistoryChecker::print); - else historyCheckers.stream().filter(h -> h.primaryKey == primaryKey).forEach(HistoryChecker::print); - } - @Override public void run() { diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java index d1e0771b1e..2465bf62cf 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryChecker.java @@ -127,13 +127,23 @@ class HistoryChecker return byId[id] = event; } - void witness(Observation witness, int[] witnessSequence, int start, int end) + private static int eventId(int[] witnessSequence, int eventPosition) + { + return eventPosition == 0 ? -1 : witnessSequence[eventPosition - 1]; + } + + void witness(Observation witness, int[] witnessSequence) + { + witness(witness.id, witnessSequence, witness.start, witness.end); + } + + void witness(int id, int[] witnessSequence, int start, int end) { int eventPosition = witnessSequence.length; - int eventId = eventPosition == 0 ? -1 : witnessSequence[eventPosition - 1]; - setById(witness.id, new Event(witness.id)).log.add(new VerboseWitness(witness.id, start, end, witnessSequence)); + int eventId = eventId(witnessSequence, eventPosition); + setById(id, new Event(id)).log.add(new VerboseWitness(id, start, end, witnessSequence)); Event event = get(eventPosition, eventId); - recordWitness(event, witness, witnessSequence); + recordWitness(event, id, start, end, witnessSequence); recordVisibleBy(event, end); recordVisibleUntil(event, start); @@ -154,7 +164,7 @@ class HistoryChecker } else if (e.result) { - throw fail(primaryKey, "%d witnessed as absent by %d", e.eventId, witness.id); + throw fail(primaryKey, "%d witnessed as absent by %d", e.eventId, id); } } } @@ -181,16 +191,16 @@ class HistoryChecker } } - void recordWitness(Event event, Observation witness, int[] witnessSequence) + void recordWitness(Event event, int id, int start, int end, int[] witnessSequence) { - recordWitness(event, witness, witnessSequence.length, witnessSequence); + recordWitness(event, id, start, end, witnessSequence.length, witnessSequence); } - void recordWitness(Event event, Observation witness, int eventPosition, int[] witnessSequence) + void recordWitness(Event event, int id, int start, int end, int eventPosition, int[] witnessSequence) { while (true) { - event.log.add(new Witness(READ, witness.id, witness.start, witness.end)); + event.log.add(new Witness(READ, id, start, end)); if (event.witnessSequence != null) { if (!Arrays.equals(event.witnessSequence, witnessSequence)) @@ -238,7 +248,7 @@ class HistoryChecker event.visibleUntil = visibleUntil; Event next = next(event); if (next != null && visibleUntil >= next.visibleBy) - throw fail(primaryKey, "%s %d not witnessed >= %d, but also witnessed <= %d", next.witnessSequence, next.eventId, event.visibleUntil, next.visibleBy); + throw fail(primaryKey, "%s+%d not witnessed >= %d, but also witnessed <= %d", next.witnessSequence, next.eventId, event.visibleUntil, next.visibleBy); } } @@ -295,7 +305,7 @@ class HistoryChecker return null; // initialise the event, if necessary importing information from byId - return get(eventPosition, eventPosition == 0 ? -1 : event.witnessSequence[eventPosition - 1]); + return get(eventPosition, eventId(event.witnessSequence, eventPosition)); } Event next(Event event) diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java similarity index 57% copy from test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java copy to test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java index 546fd3179f..282b16d3b1 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/HistoryValidator.java @@ -18,28 +18,35 @@ package org.apache.cassandra.simulator.paxos; -class Observation implements Comparable<Observation> +import javax.annotation.Nullable; + +public interface HistoryValidator { - final int id; - final Object[][] result; - final int start; - final int end; + Checker witness(int start, int end); + + void print(@Nullable Integer pk); - Observation(int id, Object[][] result, int start, int end) + interface Checker extends AutoCloseable { - this.id = id; - this.result = result; - this.start = start; - this.end = end; + void read(int pk, int id, int count, int[] seq); + void write(int pk, int id, boolean success); + + default void writeSuccess(int pk, int id) + { + write(pk, id, true); + } + + default void writeUnknownFailure(int pk, int id) + { + write(pk, id, false); + } + + @Override + default void close() {} } - // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many pair-wise comparisons the answer is 0 - public int compareTo(Observation that) + interface Factory { - if (this.end < that.start) - return -1; - if (that.end < this.start) - return 1; - return 0; + HistoryValidator create(int[] partitions); } } diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java new file mode 100644 index 0000000000..67c95a7378 --- /dev/null +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/LinearizabilityValidator.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.simulator.paxos; + +import java.util.function.Consumer; +import javax.annotation.Nullable; + +import com.carrotsearch.hppc.IntObjectHashMap; +import com.carrotsearch.hppc.IntObjectMap; +import com.carrotsearch.hppc.cursors.ObjectCursor; + +public class LinearizabilityValidator implements HistoryValidator +{ + private final IntObjectMap<HistoryChecker> historyCheckers; + + public LinearizabilityValidator(int[] primaryKeys) + { + historyCheckers = new IntObjectHashMap<>(primaryKeys.length); + for (int primaryKey : primaryKeys) + historyCheckers.put(primaryKey, new HistoryChecker(primaryKey)); + } + + @Override + public Checker witness(int start, int end) + { + return new Checker() + { + @Override + public void read(int pk, int id, int count, int[] seq) + { + get(pk).witness(id, seq, start, end); + } + + @Override + public void write(int pk, int id, boolean success) + { + get(pk).applied(id, start, end, success); + } + }; + } + + @Override + public void print(@Nullable Integer pk) + { + if (pk == null) historyCheckers.values().forEach((Consumer<ObjectCursor<HistoryChecker>>) c -> c.value.print()); + else historyCheckers.get(pk).print(); + } + + private HistoryChecker get(int pk) + { + HistoryChecker checker = historyCheckers.get(pk); + if (checker == null) + throw new NullPointerException("Unable to find checker for pk=" + pk); + return checker; + } + + public static class Factory implements HistoryValidator.Factory + { + public static final Factory instance = new Factory(); + + @Override + public HistoryValidator create(int[] partitions) + { + return new LinearizabilityValidator(partitions); + } + } +} diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java new file mode 100644 index 0000000000..b39c3111ea --- /dev/null +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/LoggingHistoryValidator.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.simulator.paxos; + +import java.util.Arrays; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoggingHistoryValidator implements HistoryValidator +{ + private static final Logger logger = LoggerFactory.getLogger(LoggingHistoryValidator.class); + private final HistoryValidator delegate; + + public LoggingHistoryValidator(HistoryValidator delegate) + { + this.delegate = delegate; + } + + @Override + public Checker witness(int start, int end) + { + StringBuilder sb = new StringBuilder(); + sb.append("Witness(start=").append(start).append(", end=").append(end).append(")\n"); + Checker sub = delegate.witness(start, end); + return new Checker() + { + @Override + public void read(int pk, int id, int count, int[] seq) + { + sb.append("\tread(pk=").append(pk).append(", id=").append(id).append(", count=").append(count).append(", seq=").append(Arrays.toString(seq)).append(")\n"); + sub.read(pk, id, count, seq); + } + + @Override + public void write(int pk, int id, boolean success) + { + sb.append("\twrite(pk=").append(pk).append(", id=").append(id).append(", success=").append(success).append(")\n"); + sub.write(pk, id, success); + } + + @Override + public void close() + { + logger.info(sb.toString()); + sub.close(); + } + }; + } + + @Override + public void print(@Nullable Integer pk) + { + delegate.print(pk); + } +} diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java index 546fd3179f..41eb2c348d 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/Observation.java @@ -18,14 +18,16 @@ package org.apache.cassandra.simulator.paxos; +import org.apache.cassandra.distributed.api.SimpleQueryResult; + class Observation implements Comparable<Observation> { final int id; - final Object[][] result; + final SimpleQueryResult result; final int start; final int end; - Observation(int id, Object[][] result, int start, int end) + Observation(int id, SimpleQueryResult result, int start, int end) { this.id = id; this.result = result; @@ -33,6 +35,16 @@ class Observation implements Comparable<Observation> this.end = end; } + boolean isSuccess() + { + return result != null; + } + + boolean isUnknownFailure() + { + return result == null; + } + // computes a PARTIAL ORDER on when the outcome occurred, i.e. for many pair-wise comparisons the answer is 0 public int compareTo(Observation that) { diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java index a9bbf7ca57..7724690bc5 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesAccordSimulation.java @@ -22,153 +22,51 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; import java.util.function.LongSupplier; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import javax.annotation.Nullable; -import com.google.common.collect.ImmutableList; -import org.apache.commons.lang3.ArrayUtils; -import org.junit.Assert; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import accord.coordinate.Preempted; -import accord.coordinate.Timeout; -import accord.primitives.Txn; +import com.carrotsearch.hppc.IntArrayList; +import com.carrotsearch.hppc.IntHashSet; +import com.carrotsearch.hppc.cursors.IntCursor; import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.cql3.QueryOptions; -import org.apache.cassandra.cql3.statements.SelectStatement; import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.ComplexColumnData; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; -import org.apache.cassandra.distributed.api.IIsolatedExecutor; import org.apache.cassandra.distributed.api.QueryResults; -import org.apache.cassandra.exceptions.RequestTimeoutException; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.impl.Query; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; -import org.apache.cassandra.service.accord.AccordService; -import org.apache.cassandra.service.accord.AccordTestUtils; -import org.apache.cassandra.service.accord.txn.TxnData; -import org.apache.cassandra.service.accord.txn.TxnDataName; +import org.apache.cassandra.simulator.Action; import org.apache.cassandra.simulator.Debug; import org.apache.cassandra.simulator.RunnableActionScheduler; import org.apache.cassandra.simulator.cluster.ClusterActions; import org.apache.cassandra.simulator.systems.SimulatedSystems; import org.apache.cassandra.simulator.utils.IntRange; -import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY; import static org.apache.cassandra.simulator.paxos.HistoryChecker.fail; -import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.apache.cassandra.utils.AssertionUtils.hasCause; +import static org.apache.cassandra.utils.AssertionUtils.isThrowableInstanceof; // TODO: the class hierarchy is a bit broken, but hard to untangle. Need to go Paxos->Consensus, probably. @SuppressWarnings("unused") public class PairOfSequencesAccordSimulation extends AbstractPairOfSequencesPaxosSimulation { private static final Logger logger = LoggerFactory.getLogger(PairOfSequencesAccordSimulation.class); - private static final String SELECT = "SELECT pk, count, seq FROM " + KEYSPACE + ".tbl WHERE pk = ?"; - - class VerifyingOperation extends Operation - { - final HistoryChecker historyChecker; - public VerifyingOperation(int id, IInvokableInstance instance, ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker historyChecker) - { - super(primaryKey, id, instance, "SELECT", read(primaryKey)); - this.historyChecker = historyChecker; - } - - void verify(Observation outcome) - { - (outcome.result != null ? successfulReads : failedReads).incrementAndGet(); - - if (outcome.result == null) - return; - - if (outcome.result.length != 1) - throw fail(primaryKey, "#result (%s) != 1", Arrays.toString(outcome.result)); - - Object[] row = outcome.result[0]; - // first verify internally consistent - int count = row[1] == null ? 0 : (Integer) row[1]; - int[] seq = Arrays.stream((row[2] == null ? "" : (String) row[2]).split(",")) - .filter(s -> !s.isEmpty()) - .mapToInt(Integer::parseInt) - .toArray(); - - if (seq.length != count) - throw fail(primaryKey, "%d != #%s", count, seq); - - historyChecker.witness(outcome, seq, outcome.start, outcome.end); - } - } - - private static IIsolatedExecutor.SerializableCallable<Object[][]> read(int primaryKey) - { - return () -> { - String cql = "BEGIN TRANSACTION\n" + SELECT + ";\n" + "COMMIT TRANSACTION"; - List<ByteBuffer> values = ImmutableList.of(bytes(primaryKey)); - Txn txn = AccordTestUtils.createTxn(cql, QueryOptions.forInternalCalls(values)); - // TODO (now): support complex columns - return execute(txn, "pk", "count", "seq"); - }; - } - - private static IIsolatedExecutor.SerializableCallable<Object[][]> write(int id, int primaryKey) - { - return () -> { - String cql = "BEGIN TRANSACTION\n" + - " " + SELECT + ";\n" + - " UPDATE " + KEYSPACE + ".tbl SET seq += '" + id + ",' WHERE pk = ?;\n" + - " UPDATE " + KEYSPACE + ".tbl SET count += 1 WHERE pk = ?;\n" + - "COMMIT TRANSACTION"; - List<ByteBuffer> values = ImmutableList.of(bytes(primaryKey), bytes(primaryKey), bytes(primaryKey)); - Txn txn = AccordTestUtils.createTxn(cql, QueryOptions.forInternalCalls(values)); - return execute(txn, "pk", "count", "seq"); - }; - } - - private static Object[][] execute(Txn txn, String ... columns) - { - try - { - TxnData result = (TxnData) AccordService.instance().node.coordinate(txn).get(); - Assert.assertNotNull(result); - QueryResults.Builder builder = QueryResults.builder(); - boolean addedHeader = false; - - FilteredPartition partition = result.get(TxnDataName.returning()); - TableMetadata metadata = partition.metadata(); - builder.columns(columns); - - ByteBuffer[] keyComponents = SelectStatement.getComponents(metadata, partition.partitionKey()); - - Row s = partition.staticRow(); - if (!s.isEmpty()) - append(metadata, keyComponents, s, builder, columns); - - for (Row row : partition) - append(metadata, keyComponents, row, builder, columns); - - return builder.build().toObjectArrays(); - } - catch (InterruptedException e) - { - throw new AssertionError(e); - } - catch (ExecutionException e) - { - if (e.getCause() instanceof Preempted) - return null; - if (e.getCause() instanceof Timeout) - return null; - if (e.getCause() instanceof RequestTimeoutException) - return null; - throw new AssertionError(e); - } - } + private static final String SELECT = "SELECT pk, count, seq FROM " + KEYSPACE + ".tbl WHERE pk IN (%s);"; + private static final String UPDATE = "UPDATE " + KEYSPACE + ".tbl SET count += 1, seq = seq + ? WHERE pk = ?;"; private static void append(TableMetadata metadata, ByteBuffer[] keyComponents, Row row, QueryResults.Builder builder, String[] columnNames) { @@ -218,38 +116,14 @@ public class PairOfSequencesAccordSimulation extends AbstractPairOfSequencesPaxo builder.row(buffer); } - class NonVerifyingOperation extends Operation + @Override + void log(@Nullable Integer pk) { - public NonVerifyingOperation(int id, IInvokableInstance instance, ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker historyChecker) - { - super(primaryKey, id, instance, "SELECT", read(primaryKey)); - } - - void verify(Observation outcome) - { - } + validator.print(pk); } - public class ModifyingOperation extends Operation - { - final HistoryChecker historyChecker; - public ModifyingOperation(int id, IInvokableInstance instance, ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, int primaryKey, HistoryChecker historyChecker) - { - super(primaryKey, id, instance, "UPDATE", write(id, primaryKey)); - this.historyChecker = historyChecker; - } - - void verify(Observation outcome) - { - (outcome.result != null ? successfulWrites : failedWrites).incrementAndGet(); - if (outcome.result != null) - { - if (outcome.result.length != 1) - throw fail(primaryKey, "Accord Result: 1 != #%s", ArrayUtils.toString(outcome.result)); - } - historyChecker.applied(outcome.id, outcome.start, outcome.end, outcome.result != null); - } - } + private final float writeRatio; + private final HistoryValidator validator; public PairOfSequencesAccordSimulation(SimulatedSystems simulated, Cluster cluster, @@ -266,6 +140,8 @@ public class PairOfSequencesAccordSimulation extends AbstractPairOfSequencesPaxo scheduler, debug, seed, primaryKeys, runForNanos, jitter); + this.writeRatio = 1F - readRatio; + validator = new LoggingHistoryValidator(new StrictSerializabilityValidator(primaryKeys)); } @Override @@ -281,21 +157,145 @@ public class PairOfSequencesAccordSimulation extends AbstractPairOfSequencesPaxo } @Override - Operation verifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) + boolean allowMultiplePartitions() { return true; } + + @Override + BiFunction<SimulatedSystems, int[], Supplier<Action>> actionFactory() { - return new VerifyingOperation(operationId, instance, serialConsistency, primaryKey, historyChecker); + AtomicInteger id = new AtomicInteger(0); + + return (simulated, primaryKeyIndex) -> { + int[] primaryKeys = IntStream.of(primaryKeyIndex).map(i -> this.primaryKeys[i]).toArray(); + return () -> accordAction(id.getAndIncrement(), simulated, primaryKeys); + }; } - @Override - Operation nonVerifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) + public class ReadWriteOperation extends Operation { - return new NonVerifyingOperation(operationId, instance, serialConsistency, primaryKey, historyChecker); + private final IntHashSet reads, writes; + + public ReadWriteOperation(int id, int[] primaryKeys, IntHashSet reads, IntHashSet writes, IInvokableInstance instance) + { + super(primaryKeys, id, instance, "Accord", createQuery(id, reads, writes)); + this.reads = reads; + this.writes = writes; + } + + @Override + void verify(Observation outcome) + { + SimpleQueryResult result = outcome.result; + (result != null ? successfulWrites : failedWrites).incrementAndGet(); + if (result != null) + { + IntHashSet seen = new IntHashSet(); + //TODO if there isn't a value then we get empty read, which then doesn't make it into the QueryResult + // given the fact that we always run with the partitions defined this should be fine + try (HistoryValidator.Checker checker = validator.witness(outcome.start, outcome.end)) + { + while (result.hasNext()) + { + org.apache.cassandra.distributed.api.Row row = result.next(); + + int pk = row.getInteger("pk"); + int count = row.getInteger("count", 0); + int[] seq = Arrays.stream(row.getString("seq", "").split(",")) + .filter(s -> !s.isEmpty()) + .mapToInt(Integer::parseInt) + .toArray(); + + if (!seen.add(pk)) + throw new IllegalStateException("Duplicate partition key " + pk); + // every partition was read, but not all were written to... need to verify each partition + if (seq.length != count) + throw fail(pk, "%d != #%s", count, seq); + + checker.read(pk, outcome.id, count, seq); + } + if (!seen.equals(reads)) + throw fail(0, "#result had %s partitions, but should have had %s", seen, reads); + // handle writes + for (IntCursor c : writes) + checker.write(c.value, outcome.id, outcome.isSuccess()); + } + } + } } - @Override - Operation modifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) + private Action accordAction(int id, SimulatedSystems simulated, int[] partitions) { - return new ModifyingOperation(operationId, instance, ANY, serialConsistency, primaryKey, historyChecker); + IntArrayList reads = new IntArrayList(); + IntArrayList writes = new IntArrayList(); + for (int partition : partitions) + { + boolean added = false; + if (simulated.random.decide(readRatio)) + { + reads.add(partition); + added = true; + } + if (simulated.random.decide(writeRatio)) + { + writes.add(partition); + added = true; + } + if (!added) + { + // when read ratio fails that implies write + // when write ratio fails that implies read + // so make that case a read/write + // Its possible that both cases were true leading to a read/write; which is fine + // this just makes sure every partition is consumed. + reads.add(partition); + writes.add(partition); + } + } + + int node = simulated.random.uniform(1, cluster.size() + 1); + IInvokableInstance instance = cluster.get(node); + return new ReadWriteOperation(id, partitions, new IntHashSet(reads), new IntHashSet(writes), instance); + } + + private int[] genReadOnly(SimulatedSystems simulated, int[] partitions) + { + IntArrayList readOnly = new IntArrayList(); + for (int partition : partitions) + { + if (simulated.random.decide(readRatio)) + readOnly.add(partition); + } + return readOnly.toArray(); + } + + private static Query createQuery(int id, IntHashSet reads, IntHashSet writes) + { + if (reads.isEmpty() && writes.isEmpty()) + throw new IllegalArgumentException("Partitions are empty"); + List<Object> binds = new ArrayList<>(); + StringBuilder sb = new StringBuilder(); + sb.append("BEGIN TRANSACTION\n"); + if (!reads.isEmpty()) + { + + sb.append("\t") + .append(String.format(SELECT, String.join(", ", IntStream.of(reads.toArray()) + .mapToObj(i -> { + binds.add(i); + return "?"; + }) + .collect(Collectors.joining(", "))))) + .append('\n'); + } + + for (IntCursor c : writes) + { + sb.append('\t').append(UPDATE).append("\n"); + binds.add(id + ","); + binds.add(c.value); + } + + sb.append("COMMIT TRANSACTION"); + return new Query(sb.toString(), 0, ConsistencyLevel.ANY, ConsistencyLevel.ANY, binds.toArray(new Object[0])); } @Override diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java index c1ccb6648e..b07b4a86cd 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/PairOfSequencesPaxosSimulation.java @@ -18,9 +18,13 @@ package org.apache.cassandra.simulator.paxos; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.function.BiFunction; import java.util.function.LongSupplier; +import java.util.function.Supplier; +import javax.annotation.Nullable; import org.apache.commons.lang3.ArrayUtils; import org.slf4j.Logger; @@ -29,9 +33,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.Row; +import org.apache.cassandra.distributed.api.SimpleQueryResult; +import org.apache.cassandra.distributed.impl.Query; +import org.apache.cassandra.simulator.Action; +import org.apache.cassandra.simulator.ActionListener; +import org.apache.cassandra.simulator.Debug; import org.apache.cassandra.simulator.RunnableActionScheduler; import org.apache.cassandra.simulator.cluster.ClusterActions; -import org.apache.cassandra.simulator.Debug; import org.apache.cassandra.simulator.systems.SimulatedSystems; import org.apache.cassandra.simulator.utils.IntRange; import org.apache.cassandra.utils.ByteBufferUtil; @@ -40,6 +49,7 @@ import static java.lang.Boolean.TRUE; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.apache.cassandra.distributed.api.ConsistencyLevel.ANY; +import static org.apache.cassandra.simulator.Debug.EventType.PARTITION; import static org.apache.cassandra.simulator.paxos.HistoryChecker.fail; @SuppressWarnings("unused") @@ -49,7 +59,7 @@ public class PairOfSequencesPaxosSimulation extends AbstractPairOfSequencesPaxos private static final String UPDATE = "UPDATE " + KEYSPACE + ".tbl SET count = count + 1, seq1 = seq1 + ?, seq2 = seq2 + ? WHERE pk = ? IF EXISTS"; private static final String SELECT = "SELECT pk, count, seq1, seq2 FROM " + KEYSPACE + ".tbl WHERE pk = ?"; - class VerifyingOperation extends Operation + class VerifyingOperation extends PaxosOperation { final HistoryChecker historyChecker; public VerifyingOperation(int id, IInvokableInstance instance, ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker historyChecker) @@ -60,23 +70,26 @@ public class PairOfSequencesPaxosSimulation extends AbstractPairOfSequencesPaxos void verify(Observation outcome) { - (outcome.result != null ? successfulReads : failedReads).incrementAndGet(); + SimpleQueryResult result = outcome.result; + (result != null ? successfulReads : failedReads).incrementAndGet(); - if (outcome.result == null) + if (result == null) return; - if (outcome.result.length != 1) - throw fail(primaryKey, "#result (%s) != 1", Arrays.toString(outcome.result)); + if (!result.hasNext()) + throw fail(primaryKey, "#result: ([]) != 1"); + + // pk, count, seq1, seq2 + Row row = result.next(); - Object[] row = outcome.result[0]; // first verify internally consistent - int count = row[1] == null ? 0 : (Integer) row[1]; - int[] seq1 = Arrays.stream((row[2] == null ? "" : (String) row[2]).split(",")) + int count = row.getInteger("count", 0); + int[] seq1 = Arrays.stream(row.getString("seq1", "").split(",")) .filter(s -> !s.isEmpty()) .mapToInt(Integer::parseInt) .toArray(); - int[] seq2 = ((List<Integer>) (row[3] == null ? emptyList() : row[3])) - .stream().mapToInt(x -> x).toArray(); + + int[] seq2 = row.<Integer>getList("seq2", emptyList()).stream().mapToInt(x -> x).toArray(); if (!Arrays.equals(seq1, seq2)) throw fail(primaryKey, "%s != %s", seq1, seq2); @@ -84,11 +97,24 @@ public class PairOfSequencesPaxosSimulation extends AbstractPairOfSequencesPaxos if (seq1.length != count) throw fail(primaryKey, "%d != #%s", count, seq1); - historyChecker.witness(outcome, seq1, outcome.start, outcome.end); + if (result.hasNext()) + throw fail(primaryKey, "#result (%s) != 1", ArrayUtils.toString(result.toObjectArrays())); + + historyChecker.witness(outcome, seq1); + } + } + + private abstract class PaxosOperation extends Operation + { + final int primaryKey; + PaxosOperation(int primaryKey, int id, IInvokableInstance instance, String idString, String query, ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, Object... params) + { + super(new int[] {primaryKey}, id, instance, idString, new Query(query, -1, commitConsistency, serialConsistency, params)); + this.primaryKey = primaryKey; } } - class NonVerifyingOperation extends Operation + class NonVerifyingOperation extends PaxosOperation { public NonVerifyingOperation(int id, IInvokableInstance instance, ConsistencyLevel consistencyLevel, int primaryKey, HistoryChecker historyChecker) { @@ -100,7 +126,7 @@ public class PairOfSequencesPaxosSimulation extends AbstractPairOfSequencesPaxos } } - public class ModifyingOperation extends Operation + public class ModifyingOperation extends PaxosOperation { final HistoryChecker historyChecker; public ModifyingOperation(int id, IInvokableInstance instance, ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, int primaryKey, HistoryChecker historyChecker) @@ -111,18 +137,23 @@ public class PairOfSequencesPaxosSimulation extends AbstractPairOfSequencesPaxos void verify(Observation outcome) { - (outcome.result != null ? successfulWrites : failedWrites).incrementAndGet(); - if (outcome.result != null) + SimpleQueryResult result = outcome.result; + (result != null ? successfulWrites : failedWrites).incrementAndGet(); + if (result != null) { - if (outcome.result.length != 1) - throw fail(primaryKey, "Paxos Result: 1 != #%s", ArrayUtils.toString(outcome.result)); - if (outcome.result[0][0] != TRUE) + if (!result.hasNext()) + throw fail(primaryKey, "Paxos Result: 1 != #[]"); + if (result.next().getBoolean(0) != TRUE) throw fail(primaryKey, "Result != TRUE"); + if (result.hasNext()) + throw fail(primaryKey, "Paxos Result: 1 != #%s", ArrayUtils.toString(result.toObjectArrays())); } - historyChecker.applied(outcome.id, outcome.start, outcome.end, outcome.result != null); + historyChecker.applied(outcome.id, outcome.start, outcome.end, outcome.isSuccess()); } } + final List<HistoryChecker> historyCheckers = new ArrayList<>(); + public PairOfSequencesPaxosSimulation(SimulatedSystems simulated, Cluster cluster, ClusterActions.Options clusterOptions, @@ -140,6 +171,84 @@ public class PairOfSequencesPaxosSimulation extends AbstractPairOfSequencesPaxos runForNanos, jitter); } + @Override + BiFunction<SimulatedSystems, int[], Supplier<Action>> actionFactory() + { + final int nodes = cluster.size(); + for (int primaryKey : primaryKeys) + historyCheckers.add(new HistoryChecker(primaryKey)); + + List<Supplier<Action>> primaryKeyActions = new ArrayList<>(); + for (int pki = 0 ; pki < primaryKeys.length ; ++pki) + { + int primaryKey = primaryKeys[pki]; + HistoryChecker historyChecker = historyCheckers.get(pki); + Supplier<Action> supplier = new Supplier<Action>() + { + int i = 0; + + @Override + public Action get() + { + int node = simulated.random.uniform(1, nodes + 1); + IInvokableInstance instance = cluster.get(node); + switch (serialConsistency) + { + default: throw new AssertionError(); + case LOCAL_SERIAL: + if (simulated.snitch.dcOf(node) > 0) + { + // perform some queries against these nodes but don't expect them to be linearizable + return nonVerifying(i++, instance, primaryKey, historyChecker); + } + case SERIAL: + return simulated.random.decide(readRatio) + ? verifying(i++, instance, primaryKey, historyChecker) + : modifying(i++, instance, primaryKey, historyChecker); + } + } + + @Override + public String toString() + { + return Integer.toString(primaryKey); + } + }; + + final ActionListener listener = debug.debug(PARTITION, simulated.time, cluster, KEYSPACE, primaryKey); + if (listener != null) + { + Supplier<Action> wrap = supplier; + supplier = new Supplier<Action>() + { + @Override + public Action get() + { + Action action = wrap.get(); + action.register(listener); + return action; + } + + @Override + public String toString() + { + return wrap.toString(); + } + }; + } + + primaryKeyActions.add(supplier); + } + return (ignore, primaryKeyIndex) -> primaryKeyActions.get(only(primaryKeyIndex)); + } + + private static int only(int[] array) + { + if (array.length != 1) + throw new AssertionError("Require only 1 element but found array " + Arrays.toString(array)); + return array[0]; + } + @Override protected String createTableStmt() { @@ -152,24 +261,28 @@ public class PairOfSequencesPaxosSimulation extends AbstractPairOfSequencesPaxos return "INSERT INTO " + KEYSPACE + ".tbl (pk, count, seq1, seq2) VALUES (?, 0, '', []) USING TIMESTAMP 0"; } - @Override - Operation verifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) + private Operation verifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) { return new VerifyingOperation(operationId, instance, serialConsistency, primaryKey, historyChecker); } - @Override - Operation nonVerifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) + private Operation nonVerifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) { return new NonVerifyingOperation(operationId, instance, serialConsistency, primaryKey, historyChecker); } - @Override - Operation modifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) + private Operation modifying(int operationId, IInvokableInstance instance, int primaryKey, HistoryChecker historyChecker) { return new ModifyingOperation(operationId, instance, ANY, serialConsistency, primaryKey, historyChecker); } + @Override + void log(@Nullable Integer primaryKey) + { + if (primaryKey == null) historyCheckers.forEach(HistoryChecker::print); + else historyCheckers.stream().filter(h -> h.primaryKey == primaryKey).forEach(HistoryChecker::print); + } + @Override boolean joinAll() { diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java index 227d8348fa..58d977bbd1 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/PaxosSimulation.java @@ -18,6 +18,7 @@ package org.apache.cassandra.simulator.paxos; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -26,11 +27,10 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.LongSupplier; - +import java.util.stream.Stream; import javax.annotation.Nullable; import com.google.common.base.Throwables; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,10 +38,9 @@ import org.apache.cassandra.concurrent.ExecutorFactory; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.IIsolatedExecutor; -import org.apache.cassandra.distributed.impl.Query; +import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.service.paxos.BallotGenerator; import org.apache.cassandra.simulator.ActionList; @@ -53,38 +52,44 @@ import org.apache.cassandra.simulator.cluster.ClusterActionListener; import org.apache.cassandra.simulator.systems.InterceptorOfGlobalMethods; import org.apache.cassandra.simulator.systems.SimulatedActionCallable; import org.apache.cassandra.simulator.systems.SimulatedSystems; +import org.apache.cassandra.utils.AssertionUtils; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.concurrent.Threads; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -import static org.apache.cassandra.simulator.Action.Modifiers.NONE; import static org.apache.cassandra.simulator.Action.Modifiers.DISPLAY_ORIGIN; +import static org.apache.cassandra.simulator.Action.Modifiers.NONE; import static org.apache.cassandra.simulator.SimulatorUtils.failWithOOM; import static org.apache.cassandra.simulator.paxos.HistoryChecker.causedBy; +import static org.apache.cassandra.utils.AssertionUtils.anyOf; +import static org.apache.cassandra.utils.AssertionUtils.hasCause; +import static org.apache.cassandra.utils.AssertionUtils.isThrowableInstanceof; public abstract class PaxosSimulation implements Simulation, ClusterActionListener { private static final Logger logger = LoggerFactory.getLogger(PaxosSimulation.class); - abstract class Operation extends SimulatedActionCallable<Object[][]> implements BiConsumer<Object[][], Throwable> + private static String createDescription(int[] primaryKeys, int id, String idString) { - final int primaryKey; + return primaryKeys.length == 1 ? Integer.toString(primaryKeys[0]) : Arrays.toString(primaryKeys) + "/" + id + ": " + idString; + } + + protected Class<? extends Throwable>[] expectedExceptions() + { + return (Class<? extends Throwable>[]) new Class<?>[] { RequestExecutionException.class }; + } + + abstract class Operation extends SimulatedActionCallable<SimpleQueryResult> implements BiConsumer<SimpleQueryResult, Throwable> + { + final int[] primaryKeys; final int id; int start; - public Operation(int primaryKey, int id, IInvokableInstance instance, - String idString, String query, ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, Object... params) - { - super(primaryKey + "/" + id + ": " + idString, DISPLAY_ORIGIN, NONE, PaxosSimulation.this.simulated, instance, new Query(query, -1, commitConsistency, serialConsistency, params)); - this.primaryKey = primaryKey; - this.id = id; - } - - public Operation(int primaryKey, int id, IInvokableInstance instance, - String idString, IIsolatedExecutor.SerializableCallable<Object[][]> query) + public Operation(int[] primaryKeys, int id, IInvokableInstance instance, + String idString, IIsolatedExecutor.SerializableCallable<SimpleQueryResult> query) { - super(primaryKey + "/" + id + ": " + idString, DISPLAY_ORIGIN, NONE, PaxosSimulation.this.simulated, instance, query); - this.primaryKey = primaryKey; + super(createDescription(primaryKeys, id, idString), DISPLAY_ORIGIN, NONE, PaxosSimulation.this.simulated, instance, query); + this.primaryKeys = primaryKeys; this.id = id; } @@ -95,9 +100,9 @@ public abstract class PaxosSimulation implements Simulation, ClusterActionListen } @Override - public void accept(Object[][] success, Throwable failure) + public void accept(SimpleQueryResult success, Throwable failure) { - if (failure != null && !(failure instanceof RequestExecutionException)) + if (failure != null && !expectedException(failure)) { if (!simulated.failures.hasFailure() || !(failure instanceof UncheckedInterruptedException)) logger.error("Unexpected exception", failure); @@ -108,10 +113,14 @@ public abstract class PaxosSimulation implements Simulation, ClusterActionListen { logger.trace("{}", failure.getMessage()); } - verify(new Observation(id, success, start, logicalClock.incrementAndGet())); } + protected boolean expectedException(Throwable failure) + { + // due to class loaders can't use instanceOf directly + return hasCause(anyOf(Stream.of(expectedExceptions()).map(AssertionUtils::isThrowableInstanceof))).matches(failure); + } abstract void verify(Observation outcome); } diff --git a/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java b/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java new file mode 100644 index 0000000000..8469423bb9 --- /dev/null +++ b/test/simulator/main/org/apache/cassandra/simulator/paxos/StrictSerializabilityValidator.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.simulator.paxos; + +import javax.annotation.Nullable; + +import accord.verify.StrictSerializabilityVerifier; +import com.carrotsearch.hppc.IntIntHashMap; +import com.carrotsearch.hppc.IntIntMap; +import com.carrotsearch.hppc.cursors.IntIntCursor; + +public class StrictSerializabilityValidator implements HistoryValidator +{ + private final StrictSerializabilityVerifier verifier; + private final IntIntMap pkToIndex; + private final int[] indexToPk; + + public StrictSerializabilityValidator(int[] primaryKeys) + { + this.verifier = new StrictSerializabilityVerifier(primaryKeys.length); + pkToIndex = new IntIntHashMap(primaryKeys.length); + indexToPk = new int[primaryKeys.length]; + for (int i = 0; i < primaryKeys.length; i++) + { + pkToIndex.put(primaryKeys[i], i); + indexToPk[i] = primaryKeys[i]; + } + } + + @Override + public Checker witness(int start, int end) + { + verifier.begin(); + return new Checker() + { + @Override + public void read(int pk, int id, int count, int[] seq) + { + verifier.witnessRead(get(pk), seq); + } + + @Override + public void write(int pk, int id, boolean success) + { + verifier.witnessWrite(get(pk), id); + } + + @Override + public void close() + { + convertHistoryViolation(() -> verifier.apply(start, end)); + } + }; + } + + @Override + public void print(@Nullable Integer pk) + { + if (pk == null) verifier.print(); + else verifier.print(get(pk)); + } + + private int get(int pk) + { + if (pkToIndex.containsKey(pk)) + return pkToIndex.get(pk); + throw new IllegalArgumentException("Unknown pk=" + pk); + } + + private void convertHistoryViolation(Runnable fn) + { + try + { + fn.run(); + } + catch (accord.verify.HistoryViolation e) + { + if (!(e.primaryKey() >= 0 && e.primaryKey() < indexToPk.length)) throw new IllegalArgumentException("Unable to find primary key by index " + e.primaryKey()); + int pk = indexToPk[e.primaryKey()]; + HistoryViolation v = new HistoryViolation(pk, e.getMessage()); + v.setStackTrace(e.getStackTrace()); + throw v; + } + } + + public static class Factory implements HistoryValidator.Factory + { + public static final Factory instance = new Factory(); + + @Override + public HistoryValidator create(int[] partitions) + { + return new StrictSerializabilityValidator(partitions); + } + } +} diff --git a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java index d190fd7a15..106cd8c027 100644 --- a/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java +++ b/test/simulator/main/org/apache/cassandra/simulator/systems/SimulatedQuery.java @@ -20,9 +20,10 @@ package org.apache.cassandra.simulator.systems; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.impl.Query; -public class SimulatedQuery extends SimulatedActionCallable<Object[][]> +public class SimulatedQuery extends SimulatedActionCallable<SimpleQueryResult> { public SimulatedQuery(Object description, SimulatedSystems simulated, IInvokableInstance instance, String query, ConsistencyLevel commitConsistency, ConsistencyLevel serialConsistency, Object... params) { @@ -45,7 +46,7 @@ public class SimulatedQuery extends SimulatedActionCallable<Object[][]> } @Override - public void accept(Object[][] success, Throwable failure) + public void accept(SimpleQueryResult success, Throwable failure) { if (failure != null) simulated.failures.accept(failure); diff --git a/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java new file mode 100644 index 0000000000..6c773fcca8 --- /dev/null +++ b/test/simulator/test/org/apache/cassandra/simulator/paxos/HistoryValidatorTest.java @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.simulator.paxos; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; +import java.util.Random; +import java.util.function.Consumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.junit.Assume; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.carrotsearch.hppc.IntHashSet; +import com.carrotsearch.hppc.IntIntHashMap; +import com.carrotsearch.hppc.IntIntMap; +import com.carrotsearch.hppc.IntSet; +import org.apache.cassandra.distributed.api.QueryResults; +import org.apache.cassandra.utils.Clock; +import org.assertj.core.api.AbstractThrowableAssert; +import org.assertj.core.api.Assertions; + +import static org.apache.commons.lang3.ArrayUtils.add; +import static org.apache.commons.lang3.ArrayUtils.swap; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Notes: + * * anomalyDirtyRead was left out as Accord doesn't reject requests, so without a way to reject or abort + * requests then client doesn't have any way to abserve a REJECT, so all issues are UNKNOWN. + * + */ +@RunWith(Parameterized.class) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) // since Random is used, make sure tests run in a determanistic order +public class HistoryValidatorTest +{ + private static final Logger logger = LoggerFactory.getLogger(HistoryValidatorTest.class); + private static final Random RANDOM = random(); + private static final int[] PARTITIONS = IntStream.range(0, 10).toArray(); + private static final int x = 1; + private static final int y = 2; + + @Parameterized.Parameters(name = "{0}") + public static Collection<Object[]> data() + { + List<Object[]> tests = new ArrayList<>(); + tests.add(test(LinearizabilityValidator.Factory.instance)); + tests.add(test(StrictSerializabilityValidator.Factory.instance)); + return tests; + } + + private static Object[] test(HistoryValidator.Factory factory) + { + return new Object[]{ factory }; + } + + private final HistoryValidator.Factory factory; + + public HistoryValidatorTest(HistoryValidator.Factory factory) + { + this.factory = factory; + } + + @Test + public void orderingWithWriteTimeout() + { + IntSet timeoutEvents = set(4, 17, 83); + for (boolean reject : Arrays.asList(true, false)) + { + HistoryValidator validator = create(); + + int logicalClock = 1; + int[] seq = seq(); + for (int eventId = 0; eventId < 100; eventId++) + { + if (timeoutEvents.contains(eventId)) + { + if (!reject) + seq = add(seq, eventId); // wastn't observed, but was applied + continue; + } + single(validator, ob(eventId, ++logicalClock, ++logicalClock), 1, seq, true); + seq = add(seq, eventId); //TODO forgot to add this and LinearizabilityValidator was success... should reject! + } + } + } + + /** + * This test differs from {@link #orderingWithWriteTimeout} as it defines event orders assuming + * requests were concurrent, so may happen in different orderings. + * <p> + * This means that we may see the results out of order, but the sequence/count ordering will remain + */ + @Test + public void orderingWithWriteTimeoutWithConcurrency() + { + IntSet timeoutEvents = set(4, 17, 83); + for (boolean reject : Arrays.asList(true, false)) + { + HistoryValidator validator = create(); + // Since the requests are "concurrent" the window in which the operations happened are between start=1 and + // end=responseOrder. + int start = 1; + int logicalClock = start; + + // 'ordering' is the order in which the txns are applied + // 'indexOrder' is the order in which the events are seen; since requests are "concurrent" the order we + // validate may differ from the ordering they were applied. + int[] ordering = IntStream.range(0, 100).toArray(); + if (reject) + ordering = IntStream.of(ordering).filter(i -> !timeoutEvents.contains(i)).toArray(); + shuffle(ordering); + int[] indexOrder = IntStream.range(0, ordering.length - 1).toArray(); + shuffle(indexOrder); + for (int i = 0; i < indexOrder.length; i++) + { + int idx = indexOrder[i]; + int eventId = ordering[idx]; + if (timeoutEvents.contains(eventId)) + continue; + int[] seq = Arrays.copyOf(ordering, idx); + single(validator, ob(eventId, start, ++logicalClock), 1, seq, true); + } + } + } + + @Test + public void anomalyNonMonotonicRead() + { + // Session1: w[x=10] -> Session2: r[x=10] -> r[x=0] + test(dsl -> { + dsl.txn(writeOnly(x)); + dsl.txn(readOnly(x, seq(0))); + dsl.failingTxn(readOnly(x, seq())).isInstanceOf(HistoryViolation.class); + }); + } + + @Test + public void anomalyNonMonotonicWrite() + { + requiresMultiKeySupport(); + // Session1: w[x=10] -> w[y=10] -> Session2: r[y=10] -> r[x=0] + test(dsl -> { + dsl.txn(writeOnly(x)); + dsl.txn(writeOnly(y)); + dsl.txn(readOnly(y, seq(1))); + dsl.failingTxn(readOnly(x, seq())).isInstanceOf(HistoryViolation.class); + }); + } + + @Test + public void anomalyNonMonotonicTransaction() + { + // Session1: r[x=5] -> w[y=10] -> Session2: r[y=10] -> r[x=0] + requiresMultiKeySupport(); + test(dsl -> { + dsl.txn(writeOnly(x), writeOnly(y)); + + dsl.txn(readOnly(x, seq(0))); + dsl.txn(writeOnly(y)); + dsl.txn(readOnly(y, seq(0, 2))); + + dsl.failingTxn(readOnly(x, seq())).isInstanceOf(HistoryViolation.class); + }); + } + + @Test + public void anomalyReadYourOwnWrites() + { + // This test is kinda a duplicate; here just for completness + // w[x=12] -> r[x=8] + test(dsl -> { + dsl.txn(writeOnly(x)); + dsl.failingTxn(readOnly(x, seq())).isInstanceOf(HistoryViolation.class); + }); + } + + //TODO write skew + @Test + public void anomalyReadSkew() + { + requiresMultiKeySupport(); + // two different txn are involved to make this happen + // x=0, y=0 + // U1: starts + // U2: starts + // U1: r[x=0] + // U2: w[x=5], w[y=5] + // U2: commit + // U1: r[y=5] + // U1: commit + HistoryValidator validator = create(); + + // init + txn(validator, ob(0, 1, 2), writeOnly(x), writeOnly(y)); + int u1 = 1, u1_start = 3, u1_end = 6; + int u2 = 2, u2_start = 4, u2_end = 5; + txn(validator, ob(u2, u2_start, u2_end), readWrite(x, seq(0)), readWrite(y, seq(0))); + Assertions.assertThatThrownBy(() -> txn(validator, ob(u1, u1_start, u1_end), readWrite(x, seq(0)), readWrite(y, seq(0, u2)))) + .isInstanceOf(HistoryViolation.class); + } + + @Test + public void anomalyWriteSkew() + { + // two different txn are involved to make this happen + // x=0, y=0 + // U1: starts + // U2: starts + // U1: r[x=0] + } + + @Test + public void seenBehavior() + { + fromLog("Witness(start=4, end=7)\n" + + "\tread(pk=121901541, id=2, count=0, seq=[])\n" + + "\twrite(pk=121901541, id=2, success=true)\n" + + + "Witness(start=3, end=8)\n" + + "\tread(pk=122950117, id=0, count=0, seq=[])\n" + + "\twrite(pk=122950117, id=0, success=true)\n" + + "\twrite(pk=119804389, id=0, success=true)\n" + + + "Witness(start=5, end=9)\n" + + "\tread(pk=121901541, id=3, count=1, seq=[2])\n" + + "\twrite(pk=121901541, id=3, success=true)\n" + + + "Witness(start=2, end=10)\n" + + "\twrite(pk=122950117, id=1, success=true)\n" + + "\twrite(pk=119804389, id=1, success=true)\n" + + + "Witness(start=6, end=11)\n" + + "\tread(pk=121901541, id=4, count=2, seq=[2, 3])\n" + + "\twrite(pk=121901541, id=4, success=true)\n" + + + "Witness(start=12, end=14)\n" + + "\twrite(pk=121901541, id=5, success=true)\n" + + + "Witness(start=13, end=16)\n" + + "\tread(pk=119804389, id=6, count=2, seq=[0, 1])\n" + + "\twrite(pk=119804389, id=6, success=true)\n" + + "\twrite(pk=122950117, id=6, success=true)\n" + + + "Witness(start=15, end=18)\n" + + "\tread(pk=121901541, id=7, count=4, seq=[2, 3, 4, 5])\n" + + "\twrite(pk=121901541, id=7, success=true)\n" + + + "Witness(start=17, end=20)\n" + + "\tread(pk=119804389, id=8, count=3, seq=[0, 1, 6])\n" + + "\twrite(pk=119804389, id=8, success=true)\n" + + "\twrite(pk=122950117, id=8, success=true)\n" // this partition is what triggers + ); + } + + private void requiresMultiKeySupport() + { + Assume.assumeTrue("Validator " + factory.getClass() + " does not support multi-key", factory instanceof StrictSerializabilityValidator.Factory); + } + + private int[] shuffle(int[] ordering) + { + // shuffle array + for (int i = ordering.length; i > 1; i--) + swap(ordering, i - 1, RANDOM.nextInt(i)); + return ordering; + } + + private static void txn(HistoryValidator validator, Observation ob, Event... events) + { + String type = events.length == 1 ? "single" : "multiple"; + logger.info("[Validator={}, Observation=({}, {}, {})] Validating {} {}}", validator.getClass().getSimpleName(), ob.id, ob.start, ob.end, type, events); + try (HistoryValidator.Checker check = validator.witness(ob.start, ob.end)) + { + for (Event e : events) + e.process(ob, check); + } + } + + private static void single(HistoryValidator validator, Observation ob, int pk, int[] seq, boolean hasWrite) + { + txn(validator, ob, hasWrite ? readWrite(pk, seq) : readOnly(pk, seq)); + } + + private static Observation ob(int id, int start, int end) + { + // why empty result? The users don't actually check the result's data, just existence + return new Observation(id, QueryResults.empty(), start, end); + } + + private static int[] seq(int... seq) + { + return seq; + } + + private HistoryValidator create() + { + return factory.create(PARTITIONS); + } + + private static IntSet set(int... values) + { + IntSet set = new IntHashSet(values.length); + for (int v : values) + set.add(v); + return set; + } + + private static Random random() + { + long seed = Long.parseLong(System.getProperty("cassandra.test.seed", Long.toString(Clock.Global.nanoTime()))); + logger.info("Random seed={}; set -Dcassandra.test.seed={} while reruning the tests to get the same order", seed, seed); + return new Random(seed); + } + + private static Event readWrite(int pk, int[] seq) + { + return new Event(EnumSet.of(Event.Type.READ, Event.Type.WRITE), pk, seq); + } + + private static Event readOnly(int pk, int[] seq) + { + return new Event(EnumSet.of(Event.Type.READ), pk, seq); + } + + private static Event writeOnly(int pk) + { + return new Event(EnumSet.of(Event.Type.WRITE), pk, null); + } + + private void fromLog(String log) + { + IntSet pks = new IntHashSet(); + class Read + { + final int pk, id, count; + final int[] seq; + + Read(int pk, int id, int count, int[] seq) + { + this.pk = pk; + this.id = id; + this.count = count; + this.seq = seq; + } + } + class Write + { + final int pk, id; + final boolean success; + + Write(int pk, int id, boolean success) + { + this.pk = pk; + this.id = id; + this.success = success; + } + } + class Witness + { + final int start, end; + final List<Object> actions = new ArrayList<>(); + + Witness(int start, int end) + { + this.start = start; + this.end = end; + } + + void read(int pk, int id, int count, int[] seq) + { + actions.add(new Read(pk, id, count, seq)); + } + + void write(int pk, int id, boolean success) + { + actions.add(new Write(pk, id, success)); + } + + void process(HistoryValidator validator) + { + try (HistoryValidator.Checker check = validator.witness(start, end)) + { + for (Object a : actions) + { + if (a instanceof Read) + { + Read read = (Read) a; + check.read(read.pk, read.id, read.count, read.seq); + } + else + { + Write write = (Write) a; + check.write(write.pk, write.id, write.success); + } + } + } + } + } + List<Witness> witnesses = new ArrayList<>(); + Witness current = null; + for (String line : log.split("\n")) + { + if (line.startsWith("Witness")) + { + if (current != null) + witnesses.add(current); + Matcher matcher = Pattern.compile("Witness\\(start=(.+), end=(.+)\\)").matcher(line); + if (!matcher.find()) throw new AssertionError("Unable to match start/end of " + line); + current = new Witness(Integer.parseInt(matcher.group(1)), Integer.parseInt(matcher.group(2))); + } + else if (line.startsWith("\tread")) + { + Matcher matcher = Pattern.compile("\tread\\(pk=(.+), id=(.+), count=(.+), seq=\\[(.*)\\]\\)").matcher(line); + if (!matcher.find()) throw new AssertionError("Unable to match read of " + line); + int pk = Integer.parseInt(matcher.group(1)); + pks.add(pk); + int id = Integer.parseInt(matcher.group(2)); + int count = Integer.parseInt(matcher.group(3)); + String seqStr = matcher.group(4); + int[] seq = seqStr.isEmpty() ? new int[0] : Stream.of(seqStr.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray(); + current.read(pk, id, count, seq); + } + else if (line.startsWith("\twrite")) + { + Matcher matcher = Pattern.compile("\twrite\\(pk=(.+), id=(.+), success=(.+)\\)").matcher(line); + if (!matcher.find()) throw new AssertionError("Unable to match write of " + line); + int pk = Integer.parseInt(matcher.group(1)); + pks.add(pk); + int id = Integer.parseInt(matcher.group(2)); + boolean success = Boolean.parseBoolean(matcher.group(3)); + current.write(pk, id, success); + } + else + { + throw new IllegalArgumentException("Unknow line: " + line); + } + } + if (current != null) + witnesses.add(current); + int[] keys = pks.toArray(); + Arrays.sort(keys); + HistoryValidator validator = factory.create(keys); + for (Witness w : witnesses) + w.process(validator); + } + + private static class Event + { + enum Type + {READ, WRITE} + + ; + private final EnumSet<Type> types; + private final int pk; + private final int[] seq; + + private Event(EnumSet<Type> types, int pk, int[] seq) + { + this.types = types; + this.pk = pk; + this.seq = seq; + } + + private void process(Observation ob, HistoryValidator.Checker check) + { + if (types.contains(Type.READ)) + check.read(pk, ob.id, seq.length, seq); + if (types.contains(Type.WRITE)) + check.write(pk, ob.id, ob.isSuccess()); + } + } + + private interface TestDSL + { + void txn(Event... events); + + AbstractThrowableAssert<?, ? extends Throwable> failingTxn(Event... events); + } + + private static boolean supportMultiKey(HistoryValidator validator) + { + return validator instanceof StrictSerializabilityValidator; + } + + private void test(Consumer<TestDSL> fn) + { + HistoryValidator validator = create(); + boolean global = supportMultiKey(validator); + EventIdGen eventIdGen = global ? new AllPks() : new PerPk(); + TestDSL dsl = new TestDSL() + { + int logicalClock = 0; + + @Override + public void txn(Event... events) + { + if (global) + { + int eventId = eventIdGen.next(); + HistoryValidatorTest.txn(validator, ob(eventId, ++logicalClock, ++logicalClock), events); + } + else + { + for (Event e : events) + { + int eventId = eventIdGen.next(e.pk); + HistoryValidatorTest.txn(validator, ob(eventId, ++logicalClock, ++logicalClock), e); + } + } + } + + @Override + public AbstractThrowableAssert<?, ? extends Throwable> failingTxn(Event... events) + { + return assertThatThrownBy(() -> txn(events)); + } + }; + fn.accept(dsl); + } + + private interface EventIdGen + { + int next(int pk); + + int next(); + } + + private static class PerPk implements EventIdGen + { + private final IntIntMap map = new IntIntHashMap(); + + @Override + public int next(int pk) + { + int next = !map.containsKey(pk) ? 0 : map.get(pk) + 1; + map.put(pk, next); + return next; + } + + @Override + public int next() + { + throw new UnsupportedOperationException("next without pk not supported"); + } + } + + private static class AllPks implements EventIdGen + { + private int value = 0; + + @Override + public int next(int pk) + { + return next(); + } + + @Override + public int next() + { + return value++; + } + } +} diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index f61205d374..d1b5cd9038 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -19,6 +19,7 @@ package org.apache.cassandra.service.accord; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.function.LongSupplier; @@ -143,11 +144,16 @@ public class AccordTestUtils return createTxn(query, QueryOptions.DEFAULT); } - public static Txn createTxn(String cql, QueryOptions options) + public static Txn createTxn(String query, List<Object> binds) { - TransactionStatement.Parsed parsed = (TransactionStatement.Parsed) QueryProcessor.parseStatement(cql); - Assert.assertNotNull(parsed); - TransactionStatement statement = (TransactionStatement) parsed.prepare(ClientState.forInternalCalls()); + TransactionStatement statement = parse(query); + QueryOptions options = QueryProcessor.makeInternalOptions(statement, binds.toArray(new Object[binds.size()])); + return statement.createTxn(ClientState.forInternalCalls(), options); + } + + public static Txn createTxn(String query, QueryOptions options) + { + TransactionStatement statement = parse(query); return statement.createTxn(ClientState.forInternalCalls(), options); } diff --git a/test/unit/org/apache/cassandra/utils/AssertionUtils.java b/test/unit/org/apache/cassandra/utils/AssertionUtils.java index d5b1981fc1..c122a95315 100644 --- a/test/unit/org/apache/cassandra/utils/AssertionUtils.java +++ b/test/unit/org/apache/cassandra/utils/AssertionUtils.java @@ -18,8 +18,11 @@ package org.apache.cassandra.utils; +import java.util.stream.Stream; + import com.google.common.base.Throwables; +import org.assertj.core.api.Assertions; import org.assertj.core.api.Condition; public class AssertionUtils @@ -28,6 +31,16 @@ public class AssertionUtils { } + public static <T> Condition<T> anyOf(Stream<Condition<T>> stream) { + Iterable<Condition<T>> it = () -> stream.iterator(); + return Assertions.anyOf(it); + } + + public static Condition<Throwable> anyOfThrowable(Class<? extends Throwable>... klasses) + { + return anyOf(Stream.of(klasses).map(AssertionUtils::isThrowable)); + } + /** * When working with jvm-dtest the thrown error is in a different {@link ClassLoader} causing type checks * to fail; this method relies on naming instead. @@ -100,6 +113,11 @@ public class AssertionUtils }; } + public static Condition<Throwable> isThrowableInstanceof(Class<?> klass) + { + return (Condition<Throwable>) (Condition<?>) isInstanceof(klass); + } + public static Condition<Throwable> rootCause(Condition<Throwable> other) { return new Condition<Throwable>() { @@ -119,6 +137,32 @@ public class AssertionUtils public static Condition<Throwable> rootCauseIs(Class<? extends Throwable> klass) { - return rootCause((Condition<Throwable>) (Condition<?>) is(klass)); + return rootCause(isThrowable(klass)); + } + + public static Condition<Throwable> hasCause(Class<? extends Throwable> klass) + { + return hasCause(isThrowable(klass)); + } + + public static Condition<Throwable> hasCauseAnyOf(Class<? extends Throwable>... matchers) + { + return hasCause(anyOfThrowable(matchers)); + } + + public static Condition<Throwable> hasCause(Condition<Throwable> matcher) + { + return new Condition<Throwable>() { + @Override + public boolean matches(Throwable value) + { + for (Throwable cause = value; cause != null; cause = cause.getCause()) + { + if (matcher.matches(cause)) + return true; + } + return false; + } + }; } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
