This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-harry.git
commit a8b9869972ddd7adf68e12568d70b2c382ae1ad9 Author: Alex Petrov <[email protected]> AuthorDate: Thu Mar 9 17:28:44 2023 +0100 Allow selecting subsets of columns and wilcard queries. Open API for hand-crafting both mutation and read queries Improve errors Fix a problem with corruptor sorting Patch by Alex Petrov for CASSANDRA-17603. --- .../src/harry/corruptor/ChangeValueCorruptor.java | 2 +- .../harry/corruptor/QueryResponseCorruptor.java | 16 +- harry-core/src/harry/data/ResultSetRow.java | 7 + harry-core/src/harry/ddl/SchemaSpec.java | 8 +- .../src/harry/generators/DataGenerators.java | 2 + harry-core/src/harry/model/OpSelectors.java | 2 +- harry-core/src/harry/model/QuiescentChecker.java | 96 ++++-- harry-core/src/harry/model/SelectHelper.java | 142 +++++++-- harry-core/src/harry/operations/Query.java | 233 ++++++++++++++- .../src/harry/operations/QueryGenerator.java | 241 +++------------ .../src/harry/reconciler/PartitionState.java | 266 +++++++++++++++++ harry-core/src/harry/reconciler/Reconciler.java | 234 +-------------- .../generators/DataGeneratorsIntegrationTest.java | 2 +- .../ConcurrentQuiescentCheckerIntegrationTest.java | 2 +- .../harry/model/HistoryBuilderIntegrationTest.java | 2 +- .../test/harry/model/ModelTestBase.java | 2 +- .../harry/model/QuerySelectorNegativeTest.java | 2 - .../model/QuiescentCheckerIntegrationTest.java | 5 +- .../QuiescentLocalStateCheckerIntegrationTest.java | 2 +- .../harry/reconciler/SimpleReconcilerTest.java | 332 +++++++++++++++++++++ 20 files changed, 1107 insertions(+), 491 deletions(-) diff --git a/harry-core/src/harry/corruptor/ChangeValueCorruptor.java b/harry-core/src/harry/corruptor/ChangeValueCorruptor.java index 5f23a06..3074679 100644 --- a/harry-core/src/harry/corruptor/ChangeValueCorruptor.java +++ b/harry-core/src/harry/corruptor/ChangeValueCorruptor.java @@ -75,7 +75,7 @@ public class ChangeValueCorruptor implements RowCorruptor final long oldV = row.vds[idx]; do { - corruptedVds[idx] =+ rng.next(); + corruptedVds[idx] = schema.regularColumns.get(idx).type.generator().adjustEntropyDomain(rng.next()); } // we need to find a value that sorts strictly greater than the current one while (schema.regularColumns.get(idx).type.compareLexicographically(corruptedVds[idx], oldV) <= 0); diff --git a/harry-core/src/harry/corruptor/QueryResponseCorruptor.java b/harry-core/src/harry/corruptor/QueryResponseCorruptor.java index 62bf589..bf29754 100644 --- a/harry-core/src/harry/corruptor/QueryResponseCorruptor.java +++ b/harry-core/src/harry/corruptor/QueryResponseCorruptor.java @@ -84,11 +84,25 @@ public interface QueryResponseCorruptor mismatch = true; } } - assert mismatch || before.length != after.length; + assert mismatch || before.length != after.length : String.format("Could not corrupt.\n" + + "Before\n%s\n" + + "After\n%s\nkma", + toString(before), + toString(after)); return true; } } return false; } + + private static String toString(Object[][] obj) + { + StringBuilder sb = new StringBuilder(); + for (Object[] objects : obj) + { + sb.append(Arrays.toString(objects)).append("\n"); + } + return sb.toString(); + } } } diff --git a/harry-core/src/harry/data/ResultSetRow.java b/harry-core/src/harry/data/ResultSetRow.java index d6ff77b..8f37c82 100644 --- a/harry-core/src/harry/data/ResultSetRow.java +++ b/harry-core/src/harry/data/ResultSetRow.java @@ -48,6 +48,13 @@ public class ResultSetRow this.slts = slts; } + public ResultSetRow clone() + { + return new ResultSetRow(pd, cd, + Arrays.copyOf(sds, sds.length), Arrays.copyOf(slts, slts.length), + Arrays.copyOf(vds, vds.length), Arrays.copyOf(lts, lts.length)); + } + public String toString() { return "resultSetRow(" diff --git a/harry-core/src/harry/ddl/SchemaSpec.java b/harry-core/src/harry/ddl/SchemaSpec.java index 8d8a62f..4a8a2de 100644 --- a/harry-core/src/harry/ddl/SchemaSpec.java +++ b/harry-core/src/harry/ddl/SchemaSpec.java @@ -18,11 +18,7 @@ package harry.ddl; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Objects; +import java.util.*; import java.util.function.Consumer; import harry.generators.DataGenerators; @@ -54,6 +50,7 @@ public class SchemaSpec public final List<ColumnSpec<?>> regularColumns; public final List<ColumnSpec<?>> staticColumns; public final List<ColumnSpec<?>> allColumns; + public final Set<ColumnSpec<?>> allColumnsSet; public final BitSet ALL_COLUMNS_BITSET; public final int regularColumnsOffset; @@ -109,6 +106,7 @@ public class SchemaSpec all.add(columnSpec); } this.allColumns = Collections.unmodifiableList(all); + this.allColumnsSet = Collections.unmodifiableSet(new LinkedHashSet<>(all)); this.pkGenerator = DataGenerators.createKeyGenerator(partitionKeys); this.ckGenerator = DataGenerators.createKeyGenerator(clusteringKeys); diff --git a/harry-core/src/harry/generators/DataGenerators.java b/harry-core/src/harry/generators/DataGenerators.java index e87e7dc..434ea7d 100644 --- a/harry-core/src/harry/generators/DataGenerators.java +++ b/harry-core/src/harry/generators/DataGenerators.java @@ -67,6 +67,8 @@ public class DataGenerators ColumnSpec columnSpec = columns.get(i); if (data[i] == null) descriptors[i] = NIL_DESCR; + else if (data[i] == UNSET_VALUE) + descriptors[i] = UNSET_DESCR; else descriptors[i] = columnSpec.deflate(data[i]); } diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java index 69d0704..cbbdeca 100644 --- a/harry-core/src/harry/model/OpSelectors.java +++ b/harry-core/src/harry/model/OpSelectors.java @@ -189,7 +189,7 @@ public interface OpSelectors return descriptors(pd, cd, lts, opId, schema.staticColumns, schema.staticColumnsMask(), setColumns, schema.staticColumnsOffset); } - private long[] descriptors(long pd, long cd, long lts, long opId, List<ColumnSpec<?>> columns, BitSet mask, BitSet setColumns, int offset) + public long[] descriptors(long pd, long cd, long lts, long opId, List<ColumnSpec<?>> columns, BitSet mask, BitSet setColumns, int offset) { assert opId < opsPerModification(lts) * numberOfModifications(lts) : String.format("Operation id %d exceeds the maximum expected number of operations %d (%d * %d)", opId, opsPerModification(lts) * numberOfModifications(lts), opsPerModification(lts), numberOfModifications(lts)); diff --git a/harry-core/src/harry/model/QuiescentChecker.java b/harry-core/src/harry/model/QuiescentChecker.java index 0638915..4dde895 100644 --- a/harry-core/src/harry/model/QuiescentChecker.java +++ b/harry-core/src/harry/model/QuiescentChecker.java @@ -18,21 +18,21 @@ package harry.model; -import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; +import java.util.*; import java.util.function.Supplier; import harry.core.Run; import harry.data.ResultSetRow; +import harry.ddl.ColumnSpec; import harry.ddl.SchemaSpec; import harry.model.sut.SystemUnderTest; +import harry.reconciler.PartitionState; import harry.operations.Query; import harry.reconciler.Reconciler; import harry.runner.DataTracker; import static harry.generators.DataGenerators.NIL_DESCR; +import static harry.generators.DataGenerators.UNSET_DESCR; public class QuiescentChecker implements Model { @@ -67,9 +67,43 @@ public class QuiescentChecker implements Model protected void validate(Supplier<List<ResultSetRow>> rowsSupplier, Query query) { List<ResultSetRow> actualRows = rowsSupplier.get(); - Iterator<ResultSetRow> actual = actualRows.iterator(); + PartitionState partitionState = reconciler.inflatePartitionState(query.pd, tracker, query); + validate(schema, partitionState, actualRows, query); + } + + public static void validate(SchemaSpec schema, PartitionState partitionState, List<ResultSetRow> actualRows, Query query) + { + Set<ColumnSpec<?>> columns = new HashSet<>(); + columns.addAll(schema.allColumns); + validate(schema, columns, partitionState, actualRows, query); + } + + public static Reconciler.RowState adjustForSelection(Reconciler.RowState row, SchemaSpec schema, Set<ColumnSpec<?>> selection, boolean isStatic) + { + if (selection.size() == schema.allColumns.size()) + return row; + + List<ColumnSpec<?>> columns = isStatic ? schema.staticColumns : schema.regularColumns; + Reconciler.RowState newRowState = row.clone(); + assert newRowState.vds.length == columns.size(); + for (int i = 0; i < columns.size(); i++) + { + if (!selection.contains(columns.get(i))) + { + newRowState.vds[i] = UNSET_DESCR; + newRowState.lts[i] = NO_TIMESTAMP; + } + } + return newRowState; + } + + public static void validate(SchemaSpec schema, Set<ColumnSpec<?>> selection, PartitionState partitionState, List<ResultSetRow> actualRows, Query query) + { + boolean isWildcardQuery = selection == null; + if (isWildcardQuery) + selection = new HashSet<>(schema.allColumns); - Reconciler.PartitionState partitionState = reconciler.inflatePartitionState(query.pd, tracker, query); + Iterator<ResultSetRow> actual = actualRows.iterator(); Collection<Reconciler.RowState> expectedRows = partitionState.rows(query.reverse); Iterator<Reconciler.RowState> expected = expectedRows.iterator(); @@ -78,7 +112,7 @@ public class QuiescentChecker implements Model if (partitionState.isEmpty() && partitionState.staticRow() != null && actual.hasNext()) { ResultSetRow actualRowState = actual.next(); - if (actualRowState.cd != partitionState.staticRow().cd) + if (actualRowState.cd != UNSET_DESCR && actualRowState.cd != partitionState.staticRow().cd) throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Found a row while model predicts statics only:" + @@ -99,15 +133,18 @@ public class QuiescentChecker implements Model actualRowState, query.toSelectStatement()); } - assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schema); + assertStaticRow(partitionState, actualRows, + adjustForSelection(partitionState.staticRow(), schema, selection, true), + actualRowState, query, schema, isWildcardQuery); } while (actual.hasNext() && expected.hasNext()) { ResultSetRow actualRowState = actual.next(); - Reconciler.RowState expectedRowState = expected.next(); + Reconciler.RowState originalExpectedRowState = expected.next(); + Reconciler.RowState expectedRowState = adjustForSelection(originalExpectedRowState, schema, selection, false); // TODO: this is not necessarily true. It can also be that ordering is incorrect. - if (actualRowState.cd != expectedRowState.cd) + if (actualRowState.cd != UNSET_DESCR && actualRowState.cd != expectedRowState.cd) throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Found a row in the model that is not present in the resultset:" + @@ -124,11 +161,12 @@ public class QuiescentChecker implements Model "\nExpected: %s (%s)" + "\nActual: %s (%s)." + "\nQuery: %s", - Arrays.toString(expectedRowState.vds), expectedRowState.toString(schema), - Arrays.toString(actualRowState.vds), actualRowState, + descriptorsToString(expectedRowState.vds), expectedRowState.toString(schema), + descriptorsToString(actualRowState.vds), actualRowState, query.toSelectStatement()); - if (!Arrays.equals(actualRowState.lts, expectedRowState.lts)) + // Wildcard queries do not include timestamps + if (!isWildcardQuery && !Arrays.equals(actualRowState.lts, expectedRowState.lts)) throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Timestamps in the row state don't match ones predicted by the model:" + @@ -141,7 +179,10 @@ public class QuiescentChecker implements Model query.toSelectStatement()); if (partitionState.staticRow() != null || actualRowState.sds != null || actualRowState.slts != null) - assertStaticRow(partitionState, actualRows, partitionState.staticRow(), actualRowState, query, schema); + { + Reconciler.RowState expectedStaticRowState = adjustForSelection(partitionState.staticRow(), schema, selection, true); + assertStaticRow(partitionState, actualRows, expectedStaticRowState, actualRowState, query, schema, isWildcardQuery); + } } if (actual.hasNext() || expected.hasNext()) @@ -159,12 +200,13 @@ public class QuiescentChecker implements Model } } - public static void assertStaticRow(Reconciler.PartitionState partitionState, + public static void assertStaticRow(PartitionState partitionState, List<ResultSetRow> actualRows, Reconciler.RowState staticRow, ResultSetRow actualRowState, Query query, - SchemaSpec schemaSpec) + SchemaSpec schemaSpec, + boolean isWildcardQuery) { if (!Arrays.equals(staticRow.vds, actualRowState.sds)) throw new ValidationException(partitionState.toString(schemaSpec), @@ -173,11 +215,11 @@ public class QuiescentChecker implements Model "\nExpected: %s (%s)" + "\nActual: %s (%s)." + "\nQuery: %s", - Arrays.toString(staticRow.vds), staticRow.toString(schemaSpec), - Arrays.toString(actualRowState.sds), actualRowState, + descriptorsToString(staticRow.vds), staticRow.toString(schemaSpec), + descriptorsToString(actualRowState.sds), actualRowState, query.toSelectStatement()); - if (!Arrays.equals(staticRow.lts, actualRowState.slts)) + if (!isWildcardQuery && !Arrays.equals(staticRow.lts, actualRowState.slts)) throw new ValidationException(partitionState.toString(schemaSpec), toString(actualRows), "Timestamps in the static row state don't match ones predicted by the model:" + @@ -189,6 +231,22 @@ public class QuiescentChecker implements Model query.toSelectStatement()); } + public static String descriptorsToString(long[] descriptors) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < descriptors.length; i++) + { + if (descriptors[i] == NIL_DESCR) + sb.append("NIL"); + if (descriptors[i] == UNSET_DESCR) + sb.append("UNSET"); + else + sb.append(descriptors[i]); + if (i > 0) + sb.append(", "); + } + return sb.toString(); + } public static String toString(Collection<Reconciler.RowState> collection, SchemaSpec schema) { StringBuilder builder = new StringBuilder(); diff --git a/harry-core/src/harry/model/SelectHelper.java b/harry-core/src/harry/model/SelectHelper.java index 675052c..8debbbe 100644 --- a/harry-core/src/harry/model/SelectHelper.java +++ b/harry-core/src/harry/model/SelectHelper.java @@ -18,23 +18,29 @@ package harry.model; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; +import java.util.*; import harry.data.ResultSetRow; import harry.ddl.ColumnSpec; import harry.ddl.SchemaSpec; +import harry.generators.DataGenerators; import harry.model.sut.SystemUnderTest; import harry.operations.CompiledStatement; import harry.operations.Relation; import harry.operations.Query; +import static harry.generators.DataGenerators.UNSET_DESCR; + public class SelectHelper { + public static CompiledStatement selectWildcard(SchemaSpec schema, long pd) + { + return select(schema, pd, null, Collections.emptyList(), false, true); + } + public static CompiledStatement select(SchemaSpec schema, long pd) { - return select(schema, pd, Collections.emptyList(), false, true); + return select(schema, pd, schema.allColumnsSet, Collections.emptyList(), false, true); } /** @@ -44,30 +50,68 @@ public class SelectHelper */ public static CompiledStatement select(SchemaSpec schema, long pd, List<Relation> relations, boolean reverse, boolean includeWriteTime) { + return select(schema, pd, schema.allColumnsSet, relations, reverse, includeWriteTime); + } + + public static CompiledStatement selectWildcard(SchemaSpec schema, long pd, List<Relation> relations, boolean reverse, boolean includeWriteTime) + { + return select(schema, pd, null, relations, reverse, includeWriteTime); + } + + public static CompiledStatement select(SchemaSpec schema, long pd, Set<ColumnSpec<?>> columns, List<Relation> relations, boolean reverse, boolean includeWriteTime) + { + boolean isWildcardQuery = columns == null; + if (isWildcardQuery) + { + columns = schema.allColumnsSet; + includeWriteTime = false; + } + StringBuilder b = new StringBuilder(); b.append("SELECT "); - for (int i = 0; i < schema.allColumns.size(); i++) + boolean isFirst = true; + if (isWildcardQuery) + { + b.append("*"); + } + else { - ColumnSpec<?> spec = schema.allColumns.get(i); - if (i > 0) - b.append(", "); - b.append(spec.name); + for (int i = 0; i < schema.allColumns.size(); i++) + { + ColumnSpec<?> spec = schema.allColumns.get(i); + if (columns != null && !columns.contains(spec)) + continue; + + if (isFirst) + isFirst = false; + else + b.append(", "); + b.append(spec.name); + } } if (includeWriteTime) { - for (ColumnSpec<?> column : schema.staticColumns) + for (ColumnSpec<?> spec : schema.staticColumns) + { + if (columns != null && !columns.contains(spec)) + continue; b.append(", ") .append("writetime(") - .append(column.name) + .append(spec.name) .append(")"); + } - for (ColumnSpec<?> column : schema.regularColumns) + for (ColumnSpec<?> spec : schema.regularColumns) + { + if (columns != null && !columns.contains(spec)) + continue; b.append(", ") .append("writetime(") - .append(column.name) + .append(spec.name) .append(")"); + } } b.append(" FROM ") @@ -157,6 +201,64 @@ public class SelectHelper return name + " DESC"; } + + public static Object[] broadenResult(SchemaSpec schemaSpec, Set<ColumnSpec<?>> columns, Object[] result) + { + boolean isWildcardQuery = columns == null; + + if (isWildcardQuery) + columns = schemaSpec.allColumnsSet; + else if (schemaSpec.allColumns.size() == columns.size()) + return result; + + Object[] newRes = new Object[schemaSpec.allColumns.size() + schemaSpec.staticColumns.size() + schemaSpec.regularColumns.size()]; + + int origPointer = 0; + int newPointer = 0; + for (int i = 0; i < schemaSpec.allColumns.size(); i++) + { + ColumnSpec<?> column = schemaSpec.allColumns.get(i); + if (columns.contains(column)) + newRes[newPointer] = result[origPointer++]; + else + newRes[newPointer] = DataGenerators.UNSET_VALUE; + newPointer++; + } + + // Make sure to include writetime, but only in case query actually includes writetime (for example, it's not a wildcard query) + for (int i = 0; i < schemaSpec.staticColumns.size() && origPointer < result.length; i++) + { + ColumnSpec<?> column = schemaSpec.staticColumns.get(i); + if (columns.contains(column)) + newRes[newPointer] = result[origPointer++]; + else + newRes[newPointer] = null; + newPointer++; + } + + for (int i = 0; i < schemaSpec.regularColumns.size() && origPointer < result.length; i++) + { + ColumnSpec<?> column = schemaSpec.regularColumns.get(i); + if (columns.contains(column)) + newRes[newPointer] = result[origPointer++]; + else + newRes[newPointer] = null; + newPointer++; + } + + return newRes; + } + + static boolean isDeflatable(Object[] columns) + { + for (Object column : columns) + { + if (column == DataGenerators.UNSET_VALUE) + return false; + } + return true; + } + public static ResultSetRow resultSetToRow(SchemaSpec schema, OpSelectors.MonotonicClock clock, Object[] result) { Object[] partitionKey = new Object[schema.partitionKeys.size()]; @@ -183,8 +285,8 @@ public class SelectHelper lts[i] = v == null ? Model.NO_TIMESTAMP : clock.lts((long) v); } - return new ResultSetRow(schema.deflatePartitionKey(partitionKey), - schema.deflateClusteringKey(clusteringKey), + return new ResultSetRow(isDeflatable(partitionKey) ? schema.deflatePartitionKey(partitionKey) : UNSET_DESCR, + isDeflatable(clusteringKey) ? schema.deflateClusteringKey(clusteringKey) : UNSET_DESCR, schema.staticColumns.isEmpty() ? null : schema.deflateStaticColumns(staticColumns), schema.staticColumns.isEmpty() ? null : slts, schema.deflateRegularColumns(regularColumns), @@ -193,12 +295,16 @@ public class SelectHelper public static List<ResultSetRow> execute(SystemUnderTest sut, OpSelectors.MonotonicClock clock, Query query) { - CompiledStatement compiled = query.toSelectStatement(); + return execute(sut, clock, query, query.schemaSpec.allColumnsSet); + } + + public static List<ResultSetRow> execute(SystemUnderTest sut, OpSelectors.MonotonicClock clock, Query query, Set<ColumnSpec<?>> columns) + { + CompiledStatement compiled = query.toSelectStatement(columns, true); Object[][] objects = sut.executeIdempotent(compiled.cql(), SystemUnderTest.ConsistencyLevel.QUORUM, compiled.bindings()); List<ResultSetRow> result = new ArrayList<>(); for (Object[] obj : objects) - result.add(resultSetToRow(query.schemaSpec, clock, obj)); - + result.add(resultSetToRow(query.schemaSpec, clock, broadenResult(query.schemaSpec, columns, obj))); return result; } } diff --git a/harry-core/src/harry/operations/Query.java b/harry-core/src/harry/operations/Query.java index d7e75c3..3ed1a83 100644 --- a/harry-core/src/harry/operations/Query.java +++ b/harry-core/src/harry/operations/Query.java @@ -18,20 +18,20 @@ package harry.operations; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.function.LongSupplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import harry.ddl.ColumnSpec; import harry.ddl.SchemaSpec; +import harry.generators.DataGenerators; +import harry.generators.RngUtils; import harry.model.SelectHelper; import harry.util.Ranges; +import static harry.operations.QueryGenerator.relationKind; import static harry.operations.Relation.FORWARD_COMPARATOR; public abstract class Query @@ -243,14 +243,24 @@ public abstract class Query } + public CompiledStatement toWildcardSelectStatement() + { + return SelectHelper.select(schemaSpec, pd, null, reverse, false); + } + public CompiledStatement toSelectStatement() { - return toSelectStatement(true); + return SelectHelper.select(schemaSpec, pd, schemaSpec.allColumnsSet, relations, reverse, true); } public CompiledStatement toSelectStatement(boolean includeWriteTime) { - return SelectHelper.select(schemaSpec, pd, relations, reverse, includeWriteTime); + return SelectHelper.select(schemaSpec, pd, schemaSpec.allColumnsSet, relations, reverse, includeWriteTime); + } + + public CompiledStatement toSelectStatement(Set<ColumnSpec<?>> columns, boolean includeWriteTime) + { + return SelectHelper.select(schemaSpec, pd, columns, relations, reverse, includeWriteTime); } public CompiledStatement toDeleteStatement(long rts) @@ -262,7 +272,212 @@ public abstract class Query public static Query selectPartition(SchemaSpec schemaSpec, long pd, boolean reverse) { - return new SinglePartitionQuery(QueryKind.SINGLE_PARTITION, pd, reverse, Collections.emptyList(), schemaSpec); + return new Query.SinglePartitionQuery(Query.QueryKind.SINGLE_PARTITION, + pd, + reverse, + Collections.emptyList(), + schemaSpec); + } + + public static Query singleClustering(SchemaSpec schema, long pd, long cd, boolean reverse) + { + return new Query.SingleClusteringQuery(Query.QueryKind.SINGLE_CLUSTERING, + pd, + cd, + reverse, + Relation.eqRelations(schema.ckGenerator.slice(cd), schema.clusteringKeys), + schema); + } + + public static Query clusteringSliceQuery(SchemaSpec schema, long pd, long cd, long queryDescriptor, boolean isGt, boolean isEquals, boolean reverse) + { + List<Relation> relations = new ArrayList<>(); + + long[] sliced = schema.ckGenerator.slice(cd); + long min; + long max; + int nonEqFrom = RngUtils.asInt(queryDescriptor, 0, sliced.length - 1); + + long[] minBound = new long[sliced.length]; + long[] maxBound = new long[sliced.length]; + + // Algorithm that determines boundaries for a clustering slice. + // + // Basic principles are not hard but there are a few edge cases. I haven't figured out how to simplify + // those, so there might be some room for improvement. In short, what we want to achieve is: + // + // 1. Every part that is restricted with an EQ relation goes into the bound verbatim. + // 2. Every part that is restricted with a non-EQ relation (LT, GT, LTE, GTE) is taken into the bound + // if it is required to satisfy the relationship. For example, in `ck1 = 0 AND ck2 < 5`, ck2 will go + // to the _max_ boundary, and minimum value will go to the _min_ boundary, since we can select every + // descriptor that is prefixed with ck1. + // 3. Every other part (e.g., ones that are not explicitly mentioned in the query) has to be restricted + // according to equality. For example, in `ck1 = 0 AND ck2 < 5`, ck3 that is present in schema but not + // mentioned in query, makes sure that any value between [0, min_value, min_value] and [0, 5, min_value] + // is matched. + // + // One edge case is a query on the first clustering key: `ck1 < 5`. In this case, we have to fixup the lower + // value to the minimum possible value. We could really just do Long.MIN_VALUE, but in case we forget to + // adjust entropy elsewhere, it'll be caught correctly here. + for (int i = 0; i < sliced.length; i++) + { + long v = sliced[i]; + DataGenerators.KeyGenerator gen = schema.ckGenerator; + ColumnSpec column = schema.clusteringKeys.get(i); + int idx = i; + LongSupplier maxSupplier = () -> gen.maxValue(idx); + LongSupplier minSupplier = () -> gen.minValue(idx); + + if (i < nonEqFrom) + { + relations.add(Relation.eqRelation(schema.clusteringKeys.get(i), v)); + minBound[i] = v; + maxBound[i] = v; + } + else if (i == nonEqFrom) + { + relations.add(Relation.relation(relationKind(isGt, isEquals), schema.clusteringKeys.get(i), v)); + + if (column.isReversed()) + { + minBound[i] = isGt ? minSupplier.getAsLong() : v; + maxBound[i] = isGt ? v : maxSupplier.getAsLong(); + } + else + { + minBound[i] = isGt ? v : minSupplier.getAsLong(); + maxBound[i] = isGt ? maxSupplier.getAsLong() : v; + } + } + else + { + if (isEquals) + { + minBound[i] = minSupplier.getAsLong(); + maxBound[i] = maxSupplier.getAsLong(); + } + // If we have a non-eq case, all subsequent bounds have to correspond to the maximum in normal case, + // or minimum in case the last bound locked with a relation was reversed. + // + // For example, if we have (ck1, ck2, ck3) as (ASC, DESC, ASC), and query ck1 > X, we'll have: + // [xxxxx | max_value | max_value] + // ck1 ck2 ck3 + // which will exclude xxxx, but take every possible (ck1 > xxxxx) prefixed value. + // + // Similarly, if we have (ck1, ck2, ck3) as (ASC, DESC, ASC), and query ck1 <= X, we'll have: + // [xxxxx | max_value | max_value] + // which will include every (ck1 < xxxxx), and any clustering prefixed with xxxxx. + else if (schema.clusteringKeys.get(nonEqFrom).isReversed()) + maxBound[i] = minBound[i] = isGt ? minSupplier.getAsLong() : maxSupplier.getAsLong(); + else + maxBound[i] = minBound[i] = isGt ? maxSupplier.getAsLong() : minSupplier.getAsLong(); + } + } + + if (schema.clusteringKeys.get(nonEqFrom).isReversed()) + isGt = !isGt; + + min = schema.ckGenerator.stitch(minBound); + max = schema.ckGenerator.stitch(maxBound); + + if (nonEqFrom == 0) + { + min = isGt ? min : schema.ckGenerator.minValue(); + max = !isGt ? max : schema.ckGenerator.maxValue(); + } + + // if we're about to create an "impossible" query, just bump the modifier and re-generate + if (min == max && !isEquals) + throw new IllegalArgumentException("Impossible Query"); + + return new Query.ClusteringSliceQuery(Query.QueryKind.CLUSTERING_SLICE, + pd, + min, + max, + relationKind(true, isGt ? isEquals : true), + relationKind(false, !isGt ? isEquals : true), + reverse, + relations, + schema); + } + + public static Query clusteringRangeQuery(SchemaSpec schema, long pd, long cd1, long cd2, long queryDescriptor, boolean isMinEq, boolean isMaxEq, boolean reverse) + { + List<Relation> relations = new ArrayList<>(); + + long[] minBound = schema.ckGenerator.slice(cd1); + long[] maxBound = schema.ckGenerator.slice(cd2); + + int nonEqFrom = RngUtils.asInt(queryDescriptor, 0, schema.clusteringKeys.size() - 1); + + // Logic here is similar to how clustering slices are implemented, except for both lower and upper bound + // get their values from sliced value in (1) and (2) cases: + // + // 1. Every part that is restricted with an EQ relation, takes its value from the min bound. + // TODO: this can actually be improved, since in case of hierarchical clustering generation we can + // pick out of the keys that are already locked. That said, we'll exercise more cases the way + // it is implemented right now. + // 2. Every part that is restricted with a non-EQ relation is taken into the bound, if it is used in + // the query. For example in, `ck1 = 0 AND ck2 > 2 AND ck2 < 5`, ck2 values 2 and 5 will be placed, + // correspondingly, to the min and max bound. + // 3. Every other part has to be restricted according to equality. Similar to clustering slice, we have + // to decide whether we use a min or the max value for the bound. Foe example `ck1 = 0 AND ck2 > 2 AND ck2 <= 5`, + // assuming we have ck3 that is present in schema but not mentioned in the query, we'll have bounds + // created as follows: [0, 2, max_value] and [0, 5, max_value]. Idea here is that since ck2 = 2 is excluded, + // we also disallow all ck3 values for [0, 2] prefix. Similarly, since ck2 = 5 is included, we allow every + // ck3 value with a prefix of [0, 5]. + for (int i = 0; i < schema.clusteringKeys.size(); i++) + { + ColumnSpec<?> col = schema.clusteringKeys.get(i); + if (i < nonEqFrom) + { + relations.add(Relation.eqRelation(col, minBound[i])); + maxBound[i] = minBound[i]; + } + else if (i == nonEqFrom) + { + long minLocked = Math.min(minBound[nonEqFrom], maxBound[nonEqFrom]); + long maxLocked = Math.max(minBound[nonEqFrom], maxBound[nonEqFrom]); + relations.add(Relation.relation(relationKind(true, col.isReversed() ? isMaxEq : isMinEq), col, + col.isReversed() ? maxLocked : minLocked)); + relations.add(Relation.relation(relationKind(false, col.isReversed() ? isMinEq : isMaxEq), col, + col.isReversed() ? minLocked : maxLocked)); + minBound[i] = minLocked; + maxBound[i] = maxLocked; + + // Impossible query + if (i == 0 && minLocked == maxLocked) + throw new IllegalArgumentException("impossible query"); + } + else + { + minBound[i] = isMinEq ? schema.ckGenerator.minValue(i) : schema.ckGenerator.maxValue(i); + maxBound[i] = isMaxEq ? schema.ckGenerator.maxValue(i) : schema.ckGenerator.minValue(i); + } + } + + long stitchedMin = schema.ckGenerator.stitch(minBound); + long stitchedMax = schema.ckGenerator.stitch(maxBound); + + // if we're about to create an "impossible" query, just bump the modifier and re-generate + // TODO: this isn't considered "normal" that we do it this way, but I'd rather fix it with + // a refactoring that's mentioned below + if (stitchedMin == stitchedMax) + throw new IllegalArgumentException("impossible query"); + + // TODO: one of the ways to get rid of garbage here, and potentially even simplify the code is to + // simply return bounds here. After bounds are created, we slice them and generate query right + // from the bounds. In this case, we can even say that things like -inf/+inf are special values, + // and use them as placeholders. Also, it'll be easier to manipulate relations. + return new Query.ClusteringRangeQuery(Query.QueryKind.CLUSTERING_RANGE, + pd, + stitchedMin, + stitchedMax, + relationKind(true, isMinEq), + relationKind(false, isMaxEq), + reverse, + relations, + schema); } public enum QueryKind diff --git a/harry-core/src/harry/operations/QueryGenerator.java b/harry-core/src/harry/operations/QueryGenerator.java index 6c815b5..70974f0 100644 --- a/harry-core/src/harry/operations/QueryGenerator.java +++ b/harry-core/src/harry/operations/QueryGenerator.java @@ -18,18 +18,11 @@ package harry.operations; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.function.LongSupplier; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import harry.core.Run; -import harry.ddl.ColumnSpec; import harry.ddl.SchemaSpec; -import harry.generators.DataGenerators; import harry.generators.RngUtils; import harry.generators.Surjections; import harry.model.OpSelectors; @@ -110,227 +103,69 @@ public class QueryGenerator switch (queryKind) { case SINGLE_PARTITION: - return new Query.SinglePartitionQuery(queryKind, - pd, - reverse, - Collections.emptyList(), - schema); + return singlePartition(pd, reverse); case SINGLE_CLUSTERING: { long cd = descriptorSelector.randomCd(pd, descriptor, schema); - return new Query.SingleClusteringQuery(queryKind, - pd, - cd, - reverse, - Relation.eqRelations(schema.ckGenerator.slice(cd), schema.clusteringKeys), - schema); + return singleClustering(pd, cd, reverse); } case CLUSTERING_SLICE: { - List<Relation> relations = new ArrayList<>(); long cd = descriptorSelector.randomCd(pd, descriptor, schema); - boolean isGt = RngUtils.asBoolean(rng.next(descriptor, GT_STREAM)); - // TODO: make generation of EQ configurable; turn it off and on - boolean isEquals = RngUtils.asBoolean(rng.next(descriptor, E_STREAM)); - - long[] sliced = schema.ckGenerator.slice(cd); - long min; - long max; - int nonEqFrom = RngUtils.asInt(descriptor, 0, sliced.length - 1); - - long[] minBound = new long[sliced.length]; - long[] maxBound = new long[sliced.length]; - - // Algorithm that determines boundaries for a clustering slice. - // - // Basic principles are not hard but there are a few edge cases. I haven't figured out how to simplify - // those, so there might be some room for improvement. In short, what we want to achieve is: - // - // 1. Every part that is restricted with an EQ relation goes into the bound verbatim. - // 2. Every part that is restricted with a non-EQ relation (LT, GT, LTE, GTE) is taken into the bound - // if it is required to satisfy the relationship. For example, in `ck1 = 0 AND ck2 < 5`, ck2 will go - // to the _max_ boundary, and minimum value will go to the _min_ boundary, since we can select every - // descriptor that is prefixed with ck1. - // 3. Every other part (e.g., ones that are not explicitly mentioned in the query) has to be restricted - // according to equality. For example, in `ck1 = 0 AND ck2 < 5`, ck3 that is present in schema but not - // mentioned in query, makes sure that any value between [0, min_value, min_value] and [0, 5, min_value] - // is matched. - // - // One edge case is a query on the first clustering key: `ck1 < 5`. In this case, we have to fixup the lower - // value to the minimum possible value. We could really just do Long.MIN_VALUE, but in case we forget to - // adjust entropy elsewhere, it'll be caught correctly here. - for (int i = 0; i < sliced.length; i++) + try { - long v = sliced[i]; - DataGenerators.KeyGenerator gen = schema.ckGenerator; - ColumnSpec column = schema.clusteringKeys.get(i); - int idx = i; - LongSupplier maxSupplier = () -> gen.maxValue(idx); - LongSupplier minSupplier = () -> gen.minValue(idx); - - if (i < nonEqFrom) - { - relations.add(Relation.eqRelation(schema.clusteringKeys.get(i), v)); - minBound[i] = v; - maxBound[i] = v; - } - else if (i == nonEqFrom) - { - relations.add(Relation.relation(relationKind(isGt, isEquals), schema.clusteringKeys.get(i), v)); - - if (column.isReversed()) - { - minBound[i] = isGt ? minSupplier.getAsLong() : v; - maxBound[i] = isGt ? v : maxSupplier.getAsLong(); - } - else - { - minBound[i] = isGt ? v : minSupplier.getAsLong(); - maxBound[i] = isGt ? maxSupplier.getAsLong() : v; - } - } - else - { - if (isEquals) - { - minBound[i] = minSupplier.getAsLong(); - maxBound[i] = maxSupplier.getAsLong(); - } - // If we have a non-eq case, all subsequent bounds have to correspond to the maximum in normal case, - // or minimum in case the last bound locked with a relation was reversed. - // - // For example, if we have (ck1, ck2, ck3) as (ASC, DESC, ASC), and query ck1 > X, we'll have: - // [xxxxx | max_value | max_value] - // ck1 ck2 ck3 - // which will exclude xxxx, but take every possible (ck1 > xxxxx) prefixed value. - // - // Similarly, if we have (ck1, ck2, ck3) as (ASC, DESC, ASC), and query ck1 <= X, we'll have: - // [xxxxx | max_value | max_value] - // which will include every (ck1 < xxxxx), and any clustering prefixed with xxxxx. - else if (schema.clusteringKeys.get(nonEqFrom).isReversed()) - maxBound[i] = minBound[i] = isGt ? minSupplier.getAsLong() : maxSupplier.getAsLong(); - else - maxBound[i] = minBound[i] = isGt ? maxSupplier.getAsLong() : minSupplier.getAsLong(); - } + return clusteringSliceQuery(pd, cd, descriptor, reverse); } - - if (schema.clusteringKeys.get(nonEqFrom).isReversed()) - isGt = !isGt; - - min = schema.ckGenerator.stitch(minBound); - max = schema.ckGenerator.stitch(maxBound); - - if (nonEqFrom == 0) + catch (IllegalArgumentException retry) { - min = isGt ? min : schema.ckGenerator.minValue(); - max = !isGt ? max : schema.ckGenerator.maxValue(); - } - - // if we're about to create an "impossible" query, just bump the modifier and re-generate - if (min == max && !isEquals) return inflate(lts, modifier + 1, queryKind); - - return new Query.ClusteringSliceQuery(Query.QueryKind.CLUSTERING_SLICE, - pd, - min, - max, - relationKind(true, isGt ? isEquals : true), - relationKind(false, !isGt ? isEquals : true), - reverse, - relations, - schema); + } } case CLUSTERING_RANGE: { - List<Relation> relations = new ArrayList<>(); - long cd1 = descriptorSelector.randomCd(pd, descriptor, schema); - boolean isMinEq = RngUtils.asBoolean(descriptor); - long cd2 = descriptorSelector.randomCd(pd, rng.next(descriptor, lts), schema); - boolean isMaxEq = RngUtils.asBoolean(rng.next(descriptor, lts)); - - long[] minBound = schema.ckGenerator.slice(cd1); - long[] maxBound = schema.ckGenerator.slice(cd2); - - int nonEqFrom = RngUtils.asInt(descriptor, 0, schema.clusteringKeys.size() - 1); - - // Logic here is similar to how clustering slices are implemented, except for both lower and upper bound - // get their values from sliced value in (1) and (2) cases: - // - // 1. Every part that is restricted with an EQ relation, takes its value from the min bound. - // TODO: this can actually be improved, since in case of hierarchical clustering generation we can - // pick out of the keys that are already locked. That said, we'll exercise more cases the way - // it is implemented right now. - // 2. Every part that is restricted with a non-EQ relation is taken into the bound, if it is used in - // the query. For example in, `ck1 = 0 AND ck2 > 2 AND ck2 < 5`, ck2 values 2 and 5 will be placed, - // correspondingly, to the min and max bound. - // 3. Every other part has to be restricted according to equality. Similar to clustering slice, we have - // to decide whether we use a min or the max value for the bound. Foe example `ck1 = 0 AND ck2 > 2 AND ck2 <= 5`, - // assuming we have ck3 that is present in schema but not mentioned in the query, we'll have bounds - // created as follows: [0, 2, max_value] and [0, 5, max_value]. Idea here is that since ck2 = 2 is excluded, - // we also disallow all ck3 values for [0, 2] prefix. Similarly, since ck2 = 5 is included, we allow every - // ck3 value with a prefix of [0, 5]. - for (int i = 0; i < schema.clusteringKeys.size(); i++) + try { - ColumnSpec<?> col = schema.clusteringKeys.get(i); - if (i < nonEqFrom) - { - relations.add(Relation.eqRelation(col, minBound[i])); - maxBound[i] = minBound[i]; - } - else if (i == nonEqFrom) - { - long minLocked = Math.min(minBound[nonEqFrom], maxBound[nonEqFrom]); - long maxLocked = Math.max(minBound[nonEqFrom], maxBound[nonEqFrom]); - relations.add(Relation.relation(relationKind(true, col.isReversed() ? isMaxEq : isMinEq), col, - col.isReversed() ? maxLocked : minLocked)); - relations.add(Relation.relation(relationKind(false, col.isReversed() ? isMinEq : isMaxEq), col, - col.isReversed() ? minLocked : maxLocked)); - minBound[i] = minLocked; - maxBound[i] = maxLocked; - - // Impossible query - if (i == 0 && minLocked == maxLocked) - { - return inflate(lts, modifier + 1, queryKind); - } - } - else - { - minBound[i] = isMinEq ? schema.ckGenerator.minValue(i) : schema.ckGenerator.maxValue(i); - maxBound[i] = isMaxEq ? schema.ckGenerator.maxValue(i) : schema.ckGenerator.minValue(i); - } + long cd1 = descriptorSelector.randomCd(pd, descriptor, schema); + long cd2 = descriptorSelector.randomCd(pd, rng.next(descriptor, lts), schema); + return clusteringRangeQuery(pd, cd1, cd2, descriptor, reverse); } - - long stitchedMin = schema.ckGenerator.stitch(minBound); - long stitchedMax = schema.ckGenerator.stitch(maxBound); - - // if we're about to create an "impossible" query, just bump the modifier and re-generate - // TODO: this isn't considered "normal" that we do it this way, but I'd rather fix it with - // a refactoring that's mentioned below - if (stitchedMin == stitchedMax) + catch (IllegalArgumentException retry) { return inflate(lts, modifier + 1, queryKind); } - - // TODO: one of the ways to get rid of garbage here, and potentially even simplify the code is to - // simply return bounds here. After bounds are created, we slice them and generate query right - // from the bounds. In this case, we can even say that things like -inf/+inf are special values, - // and use them as placeholders. Also, it'll be easier to manipulate relations. - return new Query.ClusteringRangeQuery(Query.QueryKind.CLUSTERING_RANGE, - pd, - stitchedMin, - stitchedMax, - relationKind(true, isMinEq), - relationKind(false, isMaxEq), - reverse, - relations, - schema); } default: throw new IllegalArgumentException("Shouldn't happen"); } } + public Query singlePartition(long pd, boolean reverse) + { + return Query.selectPartition(schema, pd, reverse); + } + + public Query singleClustering(long pd, long cd, boolean reverse) + { + return Query.singleClustering(schema, pd, cd, reverse); + } + + public Query clusteringSliceQuery(long pd, long cd, long queryDescriptor, boolean reverse) + { + boolean isGt = RngUtils.asBoolean(rng.next(queryDescriptor, GT_STREAM)); + // TODO: make generation of EQ configurable; turn it off and on + boolean isEquals = RngUtils.asBoolean(rng.next(queryDescriptor, E_STREAM)); + + return Query.clusteringSliceQuery(schema, pd, cd, queryDescriptor, isGt, isEquals, reverse); + } + + public Query clusteringRangeQuery(long pd, long cd1, long cd2, long queryDescriptor, boolean reverse) + { + boolean isMinEq = RngUtils.asBoolean(queryDescriptor); + boolean isMaxEq = RngUtils.asBoolean(rng.next(queryDescriptor, pd)); + + return Query.clusteringRangeQuery(schema, pd, cd1, cd2, queryDescriptor, isMinEq, isMaxEq, reverse); + } + public static Relation.RelationKind relationKind(boolean isGt, boolean isEquals) { if (isGt) diff --git a/harry-core/src/harry/reconciler/PartitionState.java b/harry-core/src/harry/reconciler/PartitionState.java new file mode 100644 index 0000000..f284ec0 --- /dev/null +++ b/harry-core/src/harry/reconciler/PartitionState.java @@ -0,0 +1,266 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.reconciler; + +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import harry.ddl.ColumnSpec; +import harry.ddl.SchemaSpec; +import harry.operations.Query; +import harry.util.BitSet; +import harry.util.Ranges; + +import static harry.generators.DataGenerators.NIL_DESCR; +import static harry.generators.DataGenerators.UNSET_DESCR; +import static harry.model.Model.NO_TIMESTAMP; + +public class PartitionState implements Iterable<Reconciler.RowState> +{ + private static final Logger logger = LoggerFactory.getLogger(Reconciler.class); + + final long pd; + final long debugCd; + final SchemaSpec schema; + + // Collected state + Reconciler.RowState staticRow; + final NavigableMap<Long, Reconciler.RowState> rows; + + public PartitionState(long pd, long debugCd, SchemaSpec schema) + { + this.pd = pd; + this.rows = new TreeMap<>(); + if (!schema.staticColumns.isEmpty()) + { + staticRow = new Reconciler.RowState(this, + Reconciler.STATIC_CLUSTERING, + Reconciler.arr(schema.staticColumns.size(), NIL_DESCR), + Reconciler.arr(schema.staticColumns.size(), NO_TIMESTAMP)); + } + this.debugCd = debugCd; + this.schema = schema; + } + + public NavigableMap<Long, Reconciler.RowState> rows() + { + return rows; + } + + public void writeStaticRow(long[] staticVds, long lts) + { + if (staticRow != null) + staticRow = updateRowState(staticRow, schema.staticColumns, Reconciler.STATIC_CLUSTERING, staticVds, lts, false); + } + + public void write(long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness) + { + rows.compute(cd, (cd_, current) -> updateRowState(current, schema.regularColumns, cd, vds, lts, writePrimaryKeyLiveness)); + } + + public void delete(Ranges.Range range, long lts) + { + if (range.minBound > range.maxBound) + return; + + Iterator<Map.Entry<Long, Reconciler.RowState>> iter = rows.subMap(range.minBound, range.minInclusive, + range.maxBound, range.maxInclusive) + .entrySet() + .iterator(); + while (iter.hasNext()) + { + Map.Entry<Long, Reconciler.RowState> e = iter.next(); + if (debugCd != -1 && e.getKey() == debugCd) + logger.info("Hiding {} at {} because of range tombstone {}", debugCd, lts, range); + + // assert row state doesn't have fresher lts + iter.remove(); + } + } + + public void delete(long cd, long lts) + { + Reconciler.RowState state = rows.remove(cd); + if (state != null) + { + for (long v : state.lts) + assert lts >= v : String.format("Attempted to remove a row with a tombstone that has older timestamp (%d): %s", lts, state); + } + } + + public boolean isEmpty() + { + return rows.isEmpty(); + } + + private Reconciler.RowState updateRowState(Reconciler.RowState currentState, List<ColumnSpec<?>> columns, long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness) + { + if (currentState == null) + { + long[] ltss = new long[vds.length]; + long[] vdsCopy = new long[vds.length]; + for (int i = 0; i < vds.length; i++) + { + if (vds[i] != UNSET_DESCR) + { + ltss[i] = lts; + vdsCopy[i] = vds[i]; + } + else + { + ltss[i] = NO_TIMESTAMP; + vdsCopy[i] = NIL_DESCR; + } + } + + currentState = new Reconciler.RowState(this, cd, vdsCopy, ltss); + } + else + { + assert currentState.vds.length == vds.length; + for (int i = 0; i < vds.length; i++) + { + if (vds[i] == UNSET_DESCR) + continue; + + assert lts >= currentState.lts[i] : String.format("Out-of-order LTS: %d. Max seen: %s", lts, currentState.lts[i]); // sanity check; we're iterating in lts order + + if (currentState.lts[i] == lts) + { + // Timestamp collision case + ColumnSpec<?> column = columns.get(i); + if (column.type.compareLexicographically(vds[i], currentState.vds[i]) > 0) + currentState.vds[i] = vds[i]; + } + else + { + currentState.vds[i] = vds[i]; + assert lts > currentState.lts[i]; + currentState.lts[i] = lts; + } + } + } + + if (writePrimaryKeyLiveness) + currentState.hasPrimaryKeyLivenessInfo = true; + + return currentState; + } + + public void deleteRegularColumns(long lts, long cd, int columnOffset, harry.util.BitSet columns, harry.util.BitSet mask) + { + deleteColumns(lts, rows.get(cd), columnOffset, columns, mask); + } + + public void deleteStaticColumns(long lts, int columnOffset, harry.util.BitSet columns, harry.util.BitSet mask) + { + deleteColumns(lts, staticRow, columnOffset, columns, mask); + } + + public void deleteColumns(long lts, Reconciler.RowState state, int columnOffset, harry.util.BitSet columns, BitSet mask) + { + if (state == null) + return; + + //TODO: optimise by iterating over the columns that were removed by this deletion + //TODO: optimise final decision to fully remove the column by counting a number of set/unset columns + boolean allNil = true; + for (int i = 0; i < state.vds.length; i++) + { + if (columns.isSet(columnOffset + i, mask)) + { + state.vds[i] = NIL_DESCR; + state.lts[i] = NO_TIMESTAMP; + } + else if (state.vds[i] != NIL_DESCR) + { + allNil = false; + } + } + + if (state.cd != Reconciler.STATIC_CLUSTERING && allNil & !state.hasPrimaryKeyLivenessInfo) + delete(state.cd, lts); + } + + public void deletePartition(long lts) + { + if (debugCd != -1) + logger.info("Hiding {} at {} because partition deletion", debugCd, lts); + + rows.clear(); + if (!schema.staticColumns.isEmpty()) + { + Arrays.fill(staticRow.vds, NIL_DESCR); + Arrays.fill(staticRow.lts, NO_TIMESTAMP); + } + } + + public Iterator<Reconciler.RowState> iterator() + { + return iterator(false); + } + + public Iterator<Reconciler.RowState> iterator(boolean reverse) + { + if (reverse) + return rows.descendingMap().values().iterator(); + + return rows.values().iterator(); + } + + public Collection<Reconciler.RowState> rows(boolean reverse) + { + if (reverse) + return rows.descendingMap().values(); + + return rows.values(); + } + + public Reconciler.RowState staticRow() + { + return staticRow; + } + + public PartitionState apply(Query query) + { + PartitionState partitionState = new PartitionState(pd, debugCd, schema); + partitionState.staticRow = staticRow; + // TODO: we could improve this if we could get original descriptors + for (Reconciler.RowState rowState : rows.values()) + if (query.match(rowState.cd)) + partitionState.rows.put(rowState.cd, rowState); + + return partitionState; + } + + public String toString(SchemaSpec schema) + { + StringBuilder sb = new StringBuilder(); + + if (staticRow != null) + sb.append("Static row: " + staticRow.toString(schema)).append("\n"); + + for (Reconciler.RowState row : rows.values()) + sb.append(row.toString(schema)).append("\n"); + + return sb.toString(); + } +} diff --git a/harry-core/src/harry/reconciler/Reconciler.java b/harry-core/src/harry/reconciler/Reconciler.java index e4970dd..cc50a6e 100644 --- a/harry-core/src/harry/reconciler/Reconciler.java +++ b/harry-core/src/harry/reconciler/Reconciler.java @@ -20,19 +20,13 @@ package harry.reconciler; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; import java.util.function.Function; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import harry.core.Run; -import harry.ddl.ColumnSpec; import harry.ddl.SchemaSpec; import harry.model.OpSelectors; import harry.operations.Query; @@ -47,8 +41,6 @@ import harry.visitors.ReplayingVisitor; import harry.visitors.VisitExecutor; import static harry.generators.DataGenerators.NIL_DESCR; -import static harry.generators.DataGenerators.UNSET_DESCR; -import static harry.model.Model.NO_TIMESTAMP; /** * A simple Cassandra-style reconciler for operations against model state. @@ -62,7 +54,7 @@ public class Reconciler { private static final Logger logger = LoggerFactory.getLogger(Reconciler.class); - private static long STATIC_CLUSTERING = NIL_DESCR; + public static long STATIC_CLUSTERING = NIL_DESCR; private final OpSelectors.DescriptorSelector descriptorSelector; private final OpSelectors.PdSelector pdSelector; @@ -86,11 +78,11 @@ public class Reconciler this.visitorFactory = ltsVisitorFactory; } - private final long debugCd = Long.getLong("debug_cd", -1L); + private final long debugCd = Long.getLong("harry.reconciler.debug_cd", -1L); public PartitionState inflatePartitionState(final long pd, DataTracker tracker, Query query) { - PartitionState partitionState = new PartitionState(pd); + PartitionState partitionState = new PartitionState(pd, debugCd, schema); class Processor extends VisitExecutor { @@ -139,7 +131,6 @@ public class Reconciler case UPDATE_WITH_STATICS: if (debugCd != -1 && cd == debugCd) logger.info("Writing {} ({}) at {}/{}", cd, opType, lts, opId); - // TODO: switch to Operation as an entity that can just be passed here writes.add(new ReplayingVisitor.Operation(cd, opId, opType)); break; case DELETE_COLUMN_WITH_STATICS: @@ -272,219 +263,7 @@ public class Reconciler return partitionState; } - - public class PartitionState implements Iterable<RowState> - { - private final long pd; - private final NavigableMap<Long, RowState> rows; - private RowState staticRow; - - private PartitionState(long pd) - { - this.pd = pd; - rows = new TreeMap<>(); - if (!schema.staticColumns.isEmpty()) - { - staticRow = new RowState(this, - STATIC_CLUSTERING, - arr(schema.staticColumns.size(), NIL_DESCR), - arr(schema.staticColumns.size(), NO_TIMESTAMP)); - } - } - - private void writeStaticRow(long[] staticVds, - long lts) - { - if (staticRow != null) - staticRow = updateRowState(staticRow, schema.staticColumns, STATIC_CLUSTERING, staticVds, lts, false); - } - - private void write(long cd, - long[] vds, - long lts, - boolean writePrimaryKeyLiveness) - { - rows.compute(cd, (cd_, current) -> updateRowState(current, schema.regularColumns, cd, vds, lts, writePrimaryKeyLiveness)); - } - - private void delete(Ranges.Range range, - long lts) - { - if (range.minBound > range.maxBound) - return; - - Iterator<Map.Entry<Long, RowState>> iter = rows.subMap(range.minBound, range.minInclusive, - range.maxBound, range.maxInclusive) - .entrySet() - .iterator(); - while (iter.hasNext()) - { - Map.Entry<Long, RowState> e = iter.next(); - if (debugCd != -1 && e.getKey() == debugCd) - logger.info("Hiding {} at {} because of range tombstone {}", debugCd, lts, range); - - // assert row state doesn't have fresher lts - iter.remove(); - } - } - - private void delete(long cd, - long lts) - { - RowState state = rows.remove(cd); - if (state != null) - { - for (long v : state.lts) - assert lts >= v : String.format("Attempted to remove a row with a tombstone that has older timestamp (%d): %s", lts, state); - } - } - public boolean isEmpty() - { - return rows.isEmpty(); - } - - private RowState updateRowState(RowState currentState, List<ColumnSpec<?>> columns, long cd, long[] vds, long lts, boolean writePrimaryKeyLiveness) - { - if (currentState == null) - { - long[] ltss = new long[vds.length]; - long[] vdsCopy = new long[vds.length]; - for (int i = 0; i < vds.length; i++) - { - if (vds[i] != UNSET_DESCR) - { - ltss[i] = lts; - vdsCopy[i] = vds[i]; - } - else - { - ltss[i] = NO_TIMESTAMP; - vdsCopy[i] = NIL_DESCR; - } - } - - currentState = new RowState(this, cd, vdsCopy, ltss); - } - else - { - assert currentState.vds.length == vds.length; - for (int i = 0; i < vds.length; i++) - { - if (vds[i] == UNSET_DESCR) - continue; - - assert lts >= currentState.lts[i] : String.format("Out-of-order LTS: %d. Max seen: %s", lts, currentState.lts[i]); // sanity check; we're iterating in lts order - - if (currentState.lts[i] == lts) - { - // Timestamp collision case - ColumnSpec<?> column = columns.get(i); - if (column.type.compareLexicographically(vds[i], currentState.vds[i]) > 0) - currentState.vds[i] = vds[i]; - } - else - { - currentState.vds[i] = vds[i]; - assert lts > currentState.lts[i]; - currentState.lts[i] = lts; - } - } - } - - if (writePrimaryKeyLiveness) - currentState.hasPrimaryKeyLivenessInfo = true; - - return currentState; - } - - private void deleteRegularColumns(long lts, long cd, int columnOffset, BitSet columns, BitSet mask) - { - deleteColumns(lts, rows.get(cd), columnOffset, columns, mask); - } - - private void deleteStaticColumns(long lts, int columnOffset, BitSet columns, BitSet mask) - { - deleteColumns(lts, staticRow, columnOffset, columns, mask); - } - - private void deleteColumns(long lts, RowState state, int columnOffset, BitSet columns, BitSet mask) - { - if (state == null) - return; - - //TODO: optimise by iterating over the columns that were removed by this deletion - //TODO: optimise final decision to fully remove the column by counting a number of set/unset columns - boolean allNil = true; - for (int i = 0; i < state.vds.length; i++) - { - if (columns.isSet(columnOffset + i, mask)) - { - state.vds[i] = NIL_DESCR; - state.lts[i] = NO_TIMESTAMP; - } - else if (state.vds[i] != NIL_DESCR) - { - allNil = false; - } - } - - if (state.cd != STATIC_CLUSTERING && allNil & !state.hasPrimaryKeyLivenessInfo) - delete(state.cd, lts); - } - - private void deletePartition(long lts) - { - if (debugCd != -1) - logger.info("Hiding {} at {} because partition deletion", debugCd, lts); - - rows.clear(); - if (!schema.staticColumns.isEmpty()) - { - Arrays.fill(staticRow.vds, NIL_DESCR); - Arrays.fill(staticRow.lts, NO_TIMESTAMP); - } - } - - public Iterator<RowState> iterator() - { - return iterator(false); - } - - public Iterator<RowState> iterator(boolean reverse) - { - if (reverse) - return rows.descendingMap().values().iterator(); - - return rows.values().iterator(); - } - - public Collection<RowState> rows(boolean reverse) - { - if (reverse) - return rows.descendingMap().values(); - - return rows.values(); - } - - public RowState staticRow() - { - return staticRow; - } - - public String toString(SchemaSpec schema) - { - StringBuilder sb = new StringBuilder(); - - if (staticRow != null) - sb.append("Static row: " + staticRow.toString(schema)).append("\n"); - - for (RowState row : rows.values()) - sb.append(row.toString(schema)).append("\n"); - - return sb.toString(); - } - } - + public static long[] arr(int length, long fill) { long[] arr = new long[length]; @@ -511,6 +290,11 @@ public class Reconciler this.lts = lts; } + public RowState clone() + { + return new RowState(partitionState, cd, Arrays.copyOf(vds, vds.length), Arrays.copyOf(lts, lts.length)); + } + public String toString() { return toString((SchemaSpec) null); diff --git a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java index 6c4d1d0..245b1e4 100644 --- a/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java +++ b/harry-integration/test/harry/generators/DataGeneratorsIntegrationTest.java @@ -46,7 +46,7 @@ public class DataGeneratorsIntegrationTest extends CQLTester String tbl = "table_" + (counter++); createTable(String.format("CREATE TABLE %s.%s (pk int PRIMARY KEY, v %s)", ks, tbl, - dataType.toString())); + dataType)); for (int i = 0; i < 10_000; i++) { long d1 = dataType.generator().adjustEntropyDomain(rng.nextLong()); diff --git a/harry-integration/test/harry/model/ConcurrentQuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/ConcurrentQuiescentCheckerIntegrationTest.java index e0fc67d..024045b 100644 --- a/harry-integration/test/harry/model/ConcurrentQuiescentCheckerIntegrationTest.java +++ b/harry-integration/test/harry/model/ConcurrentQuiescentCheckerIntegrationTest.java @@ -63,7 +63,7 @@ public class ConcurrentQuiescentCheckerIntegrationTest extends ModelTestBase } @Override - Configuration.ModelConfiguration modelConfiguration() + protected Configuration.ModelConfiguration modelConfiguration() { return new Configuration.QuiescentCheckerConfig(); } diff --git a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java index 31cd9f1..9f9c259 100644 --- a/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java +++ b/harry-integration/test/harry/model/HistoryBuilderIntegrationTest.java @@ -185,7 +185,7 @@ public class HistoryBuilderIntegrationTest extends ModelTestBase } } - Configuration.ModelConfiguration modelConfiguration() + protected Configuration.ModelConfiguration modelConfiguration() { return new Configuration.QuiescentCheckerConfig(); } diff --git a/harry-integration/test/harry/model/ModelTestBase.java b/harry-integration/test/harry/model/ModelTestBase.java index 0baee11..47fa152 100644 --- a/harry-integration/test/harry/model/ModelTestBase.java +++ b/harry-integration/test/harry/model/ModelTestBase.java @@ -73,7 +73,7 @@ public abstract class ModelTestBase extends IntegrationTestBase } } - abstract Configuration.ModelConfiguration modelConfiguration(); + protected abstract Configuration.ModelConfiguration modelConfiguration(); protected SingleValidator validator(Run run) { diff --git a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java index 95bcbc8..356882f 100644 --- a/harry-integration/test/harry/model/QuerySelectorNegativeTest.java +++ b/harry-integration/test/harry/model/QuerySelectorNegativeTest.java @@ -110,7 +110,6 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase Run run = config.createRun(); run.sut.schemaChange(run.schemaSpec.compile().cql()); System.out.println(run.schemaSpec.compile().cql()); - OpSelectors.MonotonicClock clock = run.clock; Visitor visitor = new MutatingVisitor(run, MutatingRowVisitor::new); Model model = new QuiescentChecker(run); @@ -129,7 +128,6 @@ public class QuerySelectorNegativeTest extends IntegrationTestBase run.rng); QueryGenerator.TypedQueryGenerator querySelector = new QueryGenerator.TypedQueryGenerator(run.rng, queryGen); - Query query = querySelector.inflate(verificationLts, counter); model.validate(query); diff --git a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java index dfe5ffa..4a234fe 100644 --- a/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java +++ b/harry-integration/test/harry/model/QuiescentCheckerIntegrationTest.java @@ -196,7 +196,8 @@ public class QuiescentCheckerIntegrationTest extends ModelTestBase String expected = "Returned row state doesn't match the one predicted by the model"; String expected2 = "Timestamps in the row state don't match ones predicted by the model"; - if (t.getMessage().contains(expected) || t.getMessage().contains(expected2)) + if (t.getMessage() != null && + (t.getMessage().contains(expected) || t.getMessage().contains(expected2))) return; throw new AssertionError(String.format("Exception string mismatch.\nExpected error: %s.\nActual error: %s", expected, t.getMessage()), @@ -205,7 +206,7 @@ public class QuiescentCheckerIntegrationTest extends ModelTestBase } @Override - Configuration.ModelConfiguration modelConfiguration() + protected Configuration.ModelConfiguration modelConfiguration() { return new Configuration.QuiescentCheckerConfig(); } diff --git a/harry-integration/test/harry/model/QuiescentLocalStateCheckerIntegrationTest.java b/harry-integration/test/harry/model/QuiescentLocalStateCheckerIntegrationTest.java index ae537db..6b0169f 100644 --- a/harry-integration/test/harry/model/QuiescentLocalStateCheckerIntegrationTest.java +++ b/harry-integration/test/harry/model/QuiescentLocalStateCheckerIntegrationTest.java @@ -72,7 +72,7 @@ public class QuiescentLocalStateCheckerIntegrationTest extends ModelTestBase } @Override - Configuration.ModelConfiguration modelConfiguration() + protected Configuration.ModelConfiguration modelConfiguration() { return new Configuration.QuiescentCheckerConfig(); } diff --git a/harry-integration/test/harry/reconciler/SimpleReconcilerTest.java b/harry-integration/test/harry/reconciler/SimpleReconcilerTest.java new file mode 100644 index 0000000..8c07c92 --- /dev/null +++ b/harry-integration/test/harry/reconciler/SimpleReconcilerTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package harry.reconciler; + + +import java.util.*; + +import org.junit.Test; + +import harry.core.Configuration; +import harry.core.Run; +import harry.ddl.ColumnSpec; +import harry.ddl.SchemaGenerators; +import harry.ddl.SchemaSpec; +import harry.generators.Surjections; +import harry.model.sut.injvm.InJvmSut; +import harry.model.*; +import harry.operations.*; +import harry.util.BitSet; +import org.apache.cassandra.distributed.api.ConsistencyLevel; + +public class SimpleReconcilerTest extends IntegrationTestBase +{ + public static Surjections.Surjection<SchemaSpec> defaultSchemaSpecGen(String ks, String table) + { + return new SchemaGenerators.Builder(ks, () -> table) + .partitionKeySpec(1, 3, + ColumnSpec.int64Type, + ColumnSpec.asciiType(5, 256)) + .clusteringKeySpec(1, 3, + ColumnSpec.int64Type, + ColumnSpec.asciiType(2, 3), + ColumnSpec.ReversedType.getInstance(ColumnSpec.int64Type), + ColumnSpec.ReversedType.getInstance(ColumnSpec.asciiType(2, 3))) + .regularColumnSpec(50, 50, + ColumnSpec.int64Type, + ColumnSpec.asciiType(5, 256)) + .staticColumnSpec(50, 50, + ColumnSpec.int64Type, + ColumnSpec.asciiType(4, 256)) + .surjection(); + } + + @Test + public void testStatics() throws Throwable + { + int rowsPerPartition = 50; + SchemaSpec schema = defaultSchemaSpecGen("harry", "tbl").inflate(1); + Configuration config = sharedConfiguration(1, schema).build(); + Run run = config.createRun(); + SyntheticTest test = new SyntheticTest(run.rng, schema); + beforeEach(); + cluster.schemaChange(schema.compile().cql()); + + ModelState state = new ModelState(new HashMap<>()); + InJvmSut sut = (InJvmSut) run.sut; + Random rng = new Random(1); + + int partitionIdx = 0; + + for (int i = 0; i < 100; i++) + { + BitSet subset = BitSet.allUnset(schema.allColumns.size()); + for (int j = 0; j < subset.size(); j++) + { + if (rng.nextBoolean()) + subset.set(j); + } + if (!isValidSubset(schema.allColumns, subset)) + continue; + int pdIdx = partitionIdx++; + long pd = test.pd(pdIdx); + + for (int j = 0; j < 10; j++) + { + int cdIdx = rng.nextInt(rowsPerPartition); + long cd = test.cd(pdIdx, cdIdx); + + long[] vds = run.descriptorSelector.descriptors(pd, cd, state.lts, 0, schema.regularColumns, + schema.regularColumnsMask(), + subset, + schema.regularColumnsOffset); + long[] sds = run.descriptorSelector.descriptors(pd, cd, state.lts, 0, schema.staticColumns, + schema.staticColumnsMask, + subset, + schema.staticColumnsOffset); + + CompiledStatement statement = WriteHelper.inflateUpdate(schema, pd, cd, vds, sds, run.clock.rts(state.lts)); + sut.cluster.coordinator(1).execute(statement.cql(), ConsistencyLevel.QUORUM, statement.bindings()); + + PartitionState partitionState = state.state.get(pd); + if (partitionState == null) + { + partitionState = new PartitionState(pd, -1, schema); + state.state.put(pd, partitionState); + } + + partitionState.writeStaticRow(sds, state.lts); + partitionState.write(cd, vds, state.lts, true); + + state.lts++; + } + } + + // Validate that all partitions correspond to our expectations + for (Long pd : state.state.keySet()) + { + ArrayList<Long> clusteringDescriptors = new ArrayList<>(state.state.get(pd).rows().keySet()); + + // TODO: allow sub-selection + // Try different column subsets + for (int i = 0; i < 10; i++) + { + BitSet bitset = BitSet.allUnset(schema.allColumns.size()); + for (int j = 0; j < bitset.size(); j++) + { + if (rng.nextBoolean()) + bitset.set(j); + } + Set<ColumnSpec<?>> subset = i == 0 ? null : subset(schema.allColumns, bitset); + if (subset != null && !isValidSubset(schema.allColumns, bitset)) + continue; + + int a = rng.nextInt(clusteringDescriptors.size()); + long cd1tmp = clusteringDescriptors.get(a); + long cd2tmp; + int b; + while (true) + { + b = rng.nextInt(clusteringDescriptors.size()); + long tmp = clusteringDescriptors.get(b); + if (tmp != cd1tmp) + { + cd2tmp = tmp; + break; + } + } + + long cd1 = Math.min(cd1tmp, cd2tmp); + long cd2 = Math.max(cd1tmp, cd2tmp); + + for (boolean reverse : new boolean[]{ true, false }) + { + Query query; + + query = Query.selectPartition(schema, pd, reverse); + + QuiescentChecker.validate(schema, + subset, + state.state.get(pd), + SelectHelper.execute(sut, run.clock, query, subset), + query); + + query = Query.singleClustering(schema, pd, cd1, false); + QuiescentChecker.validate(schema, + subset, + state.state.get(pd).apply(query), + SelectHelper.execute(sut, run.clock, query, subset), + query); + + for (boolean isGt : new boolean[]{ true, false }) + { + for (boolean isEquals : new boolean[]{ true, false }) + { + try + { + query = Query.clusteringSliceQuery(schema, pd, cd1, rng.nextLong(), isGt, isEquals, reverse); + } + catch (IllegalArgumentException impossibleQuery) + { + continue; + } + + QuiescentChecker.validate(schema, + subset, + state.state.get(pd).apply(query), + SelectHelper.execute(sut, run.clock, query, subset), + query); + } + } + + for (boolean isMinEq : new boolean[]{ true, false }) + { + for (boolean isMaxEq : new boolean[]{ true, false }) + { + try + { + query = Query.clusteringRangeQuery(schema, pd, cd1, cd2, rng.nextLong(), isMinEq, isMaxEq, reverse); + } + catch (IllegalArgumentException impossibleQuery) + { + continue; + } + QuiescentChecker.validate(schema, + subset, + state.state.get(pd).apply(query), + SelectHelper.execute(sut, run.clock, query, subset), + query); + } + } + } + } + } + } + + public static boolean isValidSubset(List<ColumnSpec<?>> superset, BitSet bitSet) + { + boolean hasRegular = false; + for (int i = 0; i < superset.size(); i++) + { + ColumnSpec<?> column = superset.get(i); + if (column.kind == ColumnSpec.Kind.PARTITION_KEY && !bitSet.isSet(i)) + return false; + if (column.kind == ColumnSpec.Kind.CLUSTERING && !bitSet.isSet(i)) + return false; + if (column.kind == ColumnSpec.Kind.REGULAR && bitSet.isSet(i)) + hasRegular = true; + } + + return hasRegular; + } + + public static Set<ColumnSpec<?>> subset(List<ColumnSpec<?>> superset, BitSet bitSet) + { + Set<ColumnSpec<?>> subset = new HashSet<>(); + for (int i = 0; i < superset.size(); i++) + { + if (bitSet.isSet(i)) + subset.add(superset.get(i)); + } + + return subset; + } + + public static Set<ColumnSpec<?>> randomSubset(List<ColumnSpec<?>> superset, Random e) + { + Set<ColumnSpec<?>> set = new HashSet<>(); + boolean hadRegular = false; + for (ColumnSpec<?> v : superset) + { + // TODO: allow selecting without partition and clustering key, too + if (e.nextBoolean() || v.kind == ColumnSpec.Kind.CLUSTERING || v.kind == ColumnSpec.Kind.PARTITION_KEY) + { + set.add(v); + hadRegular |= v.kind == ColumnSpec.Kind.REGULAR; + } + } + + // TODO: this is an oversimplification and a workaround for "Invalid restrictions on clustering columns since the UPDATE statement modifies only static columns" + if (!hadRegular) + return randomSubset(superset, e); + + return set; + } + + public static <T> BitSet subsetToBitset(List<T> superset, Set<T> subset) + { + BitSet bitSet = new BitSet.BitSet64Bit(superset.size()); + for (int i = 0; i < superset.size(); i++) + { + if (subset.contains(superset.get(i))) + bitSet.set(i); + } + return bitSet; + } + + public static class ModelState + { + public long lts = 0; + public final Map<Long, PartitionState> state; + + public ModelState(Map<Long, PartitionState> state) + { + this.state = state; + } + } + + public static class SyntheticTest // TODO: horrible name + { + private static long PD_STREAM = System.nanoTime(); + private final OpSelectors.Rng rng; + private final SchemaSpec schema; + + public SyntheticTest(OpSelectors.Rng rng, SchemaSpec schema) + { + this.schema = schema; + this.rng = rng; + } + + public long pd(int pdIdx) + { + long pd = this.rng.randomNumber(pdIdx + 1, PD_STREAM); + long adjusted = schema.adjustPdEntropy(pd); + assert adjusted == pd : "Partition descriptors not utilising all entropy bits are not supported."; + return pd; + } + + public long pdIdx(long pd) + { + return this.rng.sequenceNumber(pd, PD_STREAM) - 1; + } + + public long cd(int pdIdx, int cdIdx) + { + long cd = this.rng.randomNumber(cdIdx + 1, pd(pdIdx)); + long adjusted = schema.adjustCdEntropy(cd); + assert adjusted == cd : "Clustering descriptors not utilising all entropy bits are not supported."; + return cd; + } + + public long cdIdx(long pd) + { + return this.rng.sequenceNumber(pd, PD_STREAM) - 1; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
