This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 001f70367e Harry model and in-JVM tests for partition-restricted 2i
queries
001f70367e is described below
commit 001f70367e32bd44dc03c30d5533e549bbaea67e
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Jan 11 23:13:56 2024 -0600
Harry model and in-JVM tests for partition-restricted 2i queries
patch by Caleb Rackliffe; reviewed by Alex Petrov for CASSANDRA-18275
Co-authored-by: Caleb Rackliffe <[email protected]>
Co-authored-by: Alex Petrov <[email protected]>
---
CHANGES.txt | 1 +
.../cassandra/fuzz/sai/MultiNodeSAITest.java | 102 +++++++
.../cassandra/fuzz/sai/SingleNodeSAITest.java | 310 +++++++++++++++++++++
.../cassandra/fuzz/sai/StaticsTortureTest.java | 264 ++++++++++++++++++
.../org/apache/cassandra/harry/ddl/SchemaSpec.java | 35 ++-
.../cassandra/harry/model/AgainstSutChecker.java | 32 ++-
.../apache/cassandra/harry/model/SelectHelper.java | 39 +--
.../harry/model/reconciler/PartitionState.java | 1 -
.../harry/operations/CompiledStatement.java | 7 +
.../cassandra/harry/operations/FilteringQuery.java | 6 +
.../cassandra/harry/sut/injvm/InJvmSutBase.java | 11 +-
11 files changed, 770 insertions(+), 38 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index fbbc57216b..f3c3096705 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Harry model and in-JVM tests for partition-restricted 2i queries
(CASSANDRA-18275)
* Refactor cqlshmain global constants (CASSANDRA-19201)
* Remove native_transport_port_ssl (CASSANDRA-19397)
* Make nodetool reconfigurecms sync by default and add --cancel to be able to
cancel ongoing reconfigurations (CASSANDRA-19216)
diff --git
a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java
b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java
new file mode 100644
index 0000000000..b45b6f782b
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.sai;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.test.sai.SAIUtil;
+import org.apache.cassandra.harry.ddl.SchemaSpec;
+import org.apache.cassandra.harry.sut.injvm.InJvmSut;
+import org.apache.cassandra.harry.sut.injvm.InJvmSutBase;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public class MultiNodeSAITest extends SingleNodeSAITest
+{
+ /**
+ * Chosing a fetch size has implications for how well this test will
excercise paging, short-read protection, and
+ * other important parts of the distributed query apparatus. This should
be set low enough to ensure a significant
+ * number of queries during validation page, but not too low that more
expesive queries time out and fail the test.
+ */
+ private static final int FETCH_SIZE = 10;
+
+ @BeforeClass
+ public static void before() throws Throwable
+ {
+ cluster = Cluster.build()
+ .withNodes(2)
+ // At lower fetch sizes, queries w/ hundreds or
thousands of matches can take a very long time.
+ .withConfig(InJvmSutBase.defaultConfig().andThen(c ->
c.set("range_request_timeout", "180s").set("read_request_timeout", "180s")
+
.with(GOSSIP).with(NETWORK)))
+ .createWithoutStarting();
+ cluster.setUncaughtExceptionsFilter(t -> {
+ logger.error("Caught exception, reporting during shutdown.
Ignoring.", t);
+ return true;
+ });
+ cluster.startup();
+ cluster = init(cluster);
+ sut = new InJvmSut(cluster) {
+ @Override
+ public Object[][] execute(String cql, ConsistencyLevel cl,
Object[] bindings)
+ {
+ // The goal here is to make replicas as out of date as
possible, modulo the efforts of repair
+ // and read-repair in the test itself.
+ if (cql.contains("SELECT"))
+ return super.execute(cql, ConsistencyLevel.ALL,
FETCH_SIZE, bindings);
+ return super.execute(cql, ConsistencyLevel.NODE_LOCAL,
bindings);
+ }
+ };
+ }
+
+ @Before
+ public void beforeEach()
+ {
+ cluster.schemaChange("DROP KEYSPACE IF EXISTS harry");
+ cluster.schemaChange("CREATE KEYSPACE harry WITH replication =
{'class': 'SimpleStrategy', 'replication_factor': 2};");
+ }
+
+ @Override
+ protected void flush(SchemaSpec schema)
+ {
+ cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
+ cluster.get(2).nodetool("flush", schema.keyspace, schema.table);
+ }
+
+ @Override
+ protected void repair(SchemaSpec schema)
+ {
+ cluster.get(1).nodetool("repair", schema.keyspace);
+ }
+
+ @Override
+ protected void compact(SchemaSpec schema)
+ {
+ cluster.get(1).nodetool("compact", schema.keyspace);
+ cluster.get(2).nodetool("compact", schema.keyspace);
+ }
+
+ @Override
+ protected void waitForIndexesQueryable(SchemaSpec schema)
+ {
+ SAIUtil.waitForIndexQueryable(cluster, schema.keyspace);
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
new file mode 100644
index 0000000000..f0fdf6c819
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.sai;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
+import org.apache.cassandra.harry.ddl.ColumnSpec;
+import org.apache.cassandra.harry.ddl.SchemaSpec;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.gen.DataGenerators;
+import org.apache.cassandra.harry.gen.EntropySource;
+import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
+import org.apache.cassandra.harry.model.QuiescentChecker;
+import org.apache.cassandra.harry.model.SelectHelper;
+import org.apache.cassandra.harry.model.reconciler.PartitionState;
+import org.apache.cassandra.harry.model.reconciler.Reconciler;
+import org.apache.cassandra.harry.operations.FilteringQuery;
+import org.apache.cassandra.harry.operations.Query;
+import org.apache.cassandra.harry.operations.Relation;
+import org.apache.cassandra.harry.sut.SystemUnderTest;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.tracker.DataTracker;
+import org.apache.cassandra.harry.tracker.DefaultDataTracker;
+
+public class SingleNodeSAITest extends IntegrationTestBase
+{
+ private static final int RUNS = 1;
+
+ private static final int OPERATIONS_PER_RUN = 30_000;
+ private static final int REPAIR_SKIP = OPERATIONS_PER_RUN / 2;
+ private static final int FLUSH_SKIP = OPERATIONS_PER_RUN / 7;
+ private static final int VALIDATION_SKIP = OPERATIONS_PER_RUN / 100;
+
+ private static final int NUM_PARTITIONS = OPERATIONS_PER_RUN / 1000;
+ protected static final int MAX_PARTITION_SIZE = 10_000;
+ private static final int UNIQUE_CELL_VALUES = 5;
+
+ long seed = 1;
+
+ @Test
+ public void basicSaiTest()
+ {
+ CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6);
+ SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl1",
+ Arrays.asList(ColumnSpec.ck("pk1",
ColumnSpec.int64Type),
+ ColumnSpec.ck("pk2",
ColumnSpec.asciiType(4, 100)),
+ ColumnSpec.ck("pk3",
ColumnSpec.int64Type)),
+ Arrays.asList(ColumnSpec.ck("ck1",
ColumnSpec.asciiType(4, 100)),
+ ColumnSpec.ck("ck2",
ColumnSpec.asciiType, true),
+ ColumnSpec.ck("ck3",
ColumnSpec.int64Type)),
+
Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)),
+
ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
+
ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)),
+
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))),
+ false,
+ false,
+ "LeveledCompactionStrategy",
+ false);
+
+ sut.schemaChange(schema.compile().cql());
+ sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table +
"_debug").compile().cql());
+ sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s)
USING 'sai' ",
+ schema.regularColumns.get(0).name,
+ schema.keyspace,
+ schema.table,
+ schema.regularColumns.get(0).name));
+ sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s)
USING 'sai';",
+ schema.regularColumns.get(1).name,
+ schema.keyspace,
+ schema.table,
+ schema.regularColumns.get(1).name));
+ sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s)
USING 'sai';",
+ schema.regularColumns.get(2).name,
+ schema.keyspace,
+ schema.table,
+ schema.regularColumns.get(2).name));
+ sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s)
USING 'sai';",
+ schema.staticColumns.get(0).name,
+ schema.keyspace,
+ schema.table,
+ schema.staticColumns.get(0).name));
+
+ waitForIndexesQueryable(schema);
+
+ DataTracker tracker = new DefaultDataTracker();
+ TokenPlacementModel.ReplicationFactor rf = new
TokenPlacementModel.SimpleReplicationFactor(cluster.size());
+ ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed,
+
MAX_PARTITION_SIZE,
+
MAX_PARTITION_SIZE,
+ tracker,
+ sut,
+ schema,
+ rf,
+
SystemUnderTest.ConsistencyLevel.QUORUM);
+
+ for (int run = 0; run < RUNS; run++)
+ {
+ logger.info("Starting run {}/{}...", run + 1, RUNS);
+ EntropySource random = new JdkRandomEntropySource(run);
+
+ // Populate the array of possible values for all operations in the
run:
+ long[] values = new long[UNIQUE_CELL_VALUES];
+ for (int i = 0; i < values.length; i++)
+ values[i] = random.next();
+
+ for (int i = 0; i < OPERATIONS_PER_RUN; i++)
+ {
+ int partitionIndex = random.nextInt(0, NUM_PARTITIONS);
+
+ history.visitPartition(partitionIndex)
+ .insert(random.nextInt(MAX_PARTITION_SIZE),
+ new long[] { random.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)],
+ random.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)],
+ random.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] },
+ new long[] { random.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] });
+
+ if (random.nextFloat() > 0.99f)
+ {
+ int row1 = random.nextInt(MAX_PARTITION_SIZE);
+ int row2 = random.nextInt(MAX_PARTITION_SIZE);
+
history.visitPartition(partitionIndex).deleteRowRange(Math.min(row1, row2),
Math.max(row1, row2),
+
random.nextBoolean(), random.nextBoolean());
+ }
+ else if (random.nextFloat() > 0.999f)
+ {
+ history.visitPartition(partitionIndex).deleteRowSlice();
+ }
+
+ if (random.nextFloat() > 0.995f)
+ {
+ history.visitPartition(partitionIndex).deleteColumns();
+ }
+
+ if (random.nextFloat() > 0.9995f)
+ {
+ history.visitPartition(partitionIndex).deletePartition();
+ }
+
+ if (i % REPAIR_SKIP == 0)
+ {
+ logger.debug("Repairing/flushing after operation {}...",
i);
+ repair(schema);
+ }
+ else if (i % FLUSH_SKIP == 0)
+ {
+ logger.debug("Flushing after operation {}...", i);
+ flush(schema);
+ }
+
+ if (i % VALIDATION_SKIP != 0)
+ continue;
+
+ logger.debug("Validating partition at index {} after operation
{} in run {}...", partitionIndex, i, run + 1);
+
+ for (int j = 0; j < 10; j++)
+ {
+ List<Relation> relations = new ArrayList<>();
+
+ // For one text column and 2 numeric columns, we can use
between 1 and 5 total relations.
+ int num = random.nextInt(1, 5);
+
+ List<List<Relation.RelationKind>> pick = new ArrayList<>();
+ //noinspection ArraysAsListWithZeroOrOneArgument
+ pick.add(new
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ))); // text column supports
only EQ
+ pick.add(new
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT,
Relation.RelationKind.LT)));
+ pick.add(new
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT,
Relation.RelationKind.LT)));
+
+ if (random.nextFloat() > 0.75f)
+ {
+ relations.addAll(Query.clusteringSliceQuery(schema,
+
partitionIndex,
+
random.next(),
+
random.next(),
+
random.nextBoolean(),
+
random.nextBoolean(),
+
false).relations);
+ }
+
+ for (int k = 0; k < num; k++)
+ {
+ int column =
random.nextInt(schema.regularColumns.size());
+ Relation.RelationKind relationKind = pickKind(random,
pick, column);
+
+ if (relationKind != null)
+ relations.add(Relation.relation(relationKind,
+
schema.regularColumns.get(column),
+
values[random.nextInt(values.length)]));
+ }
+
+ if (random.nextFloat() > 0.7f)
+ {
+
relations.add(Relation.relation(Relation.RelationKind.EQ,
+
schema.staticColumns.get(0),
+
values[random.nextInt(values.length)]));
+ }
+
+ long pd =
history.presetSelector.pdAtPosition(partitionIndex);
+ FilteringQuery query = new FilteringQuery(pd, false,
relations, schema);
+ Reconciler reconciler = new
Reconciler(history.presetSelector, schema, history::visitor);
+ Set<ColumnSpec<?>> columns = new
HashSet<>(schema.allColumns);
+
+ PartitionState modelState =
reconciler.inflatePartitionState(pd, tracker, query).filter(query);
+
+ if (modelState.rows().size() > 0)
+ logger.debug("Model contains {} matching rows for
query {}.", modelState.rows().size(), query);
+
+ try
+ {
+ QuiescentChecker.validate(schema,
+ tracker,
+ columns,
+ modelState,
+ SelectHelper.execute(sut,
history.clock(), query),
+ query);
+
+ // Run the query again to see if the first execution
caused an issue via read-repair:
+ QuiescentChecker.validate(schema,
+ tracker,
+ columns,
+ modelState,
+ SelectHelper.execute(sut,
history.clock(), query),
+ query);
+ }
+ catch (Throwable t)
+ {
+ logger.debug("Partition index = {}, run = {}, j = {},
i = {}", partitionIndex, run, j, i);
+
+ Query partitionQuery = Query.selectPartition(schema,
pd, false);
+ QuiescentChecker.validate(schema,
+ tracker,
+ columns,
+
reconciler.inflatePartitionState(pd, tracker, partitionQuery),
+ SelectHelper.execute(sut,
history.clock(), partitionQuery),
+ partitionQuery);
+ logger.debug("Partition state agrees. Throwing
original error...");
+
+ throw t;
+ }
+ }
+ }
+
+ if (run + 1 < RUNS)
+ {
+ logger.debug("Forcing compaction at the end of run {}...", run
+ 1);
+ compact(schema);
+ }
+ }
+ }
+
+ protected void flush(SchemaSpec schema)
+ {
+ cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
+ }
+
+ protected void compact(SchemaSpec schema)
+ {
+ cluster.get(1).nodetool("compact", schema.keyspace);
+ }
+
+ protected void repair(SchemaSpec schema)
+ {
+ // Repair is nonsensical for a single node, but a repair does flush
first, so do that at least.
+ cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
+ }
+
+ protected void waitForIndexesQueryable(SchemaSpec schema) {}
+
+ private static Relation.RelationKind pickKind(EntropySource random,
List<List<Relation.RelationKind>> options, int column)
+ {
+ Relation.RelationKind kind = null;
+
+ if (!options.get(column).isEmpty())
+ {
+ List<Relation.RelationKind> possible = options.get(column);
+ int chosen = random.nextInt(possible.size());
+ kind = possible.remove(chosen);
+
+ if (kind == Relation.RelationKind.EQ)
+ possible.clear(); // EQ precludes LT and GT
+ else
+ possible.remove(Relation.RelationKind.EQ); // LT GT preclude EQ
+ }
+
+ return kind;
+ }
+}
\ No newline at end of file
diff --git
a/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java
b/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java
new file mode 100644
index 0000000000..6c24c2fff9
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/StaticsTortureTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.sai;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
+import org.apache.cassandra.harry.data.ResultSetRow;
+import org.apache.cassandra.harry.ddl.ColumnSpec;
+import org.apache.cassandra.harry.ddl.SchemaSpec;
+import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
+import org.apache.cassandra.harry.gen.DataGenerators;
+import org.apache.cassandra.harry.gen.EntropySource;
+import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
+import org.apache.cassandra.harry.model.AgainstSutChecker;
+import org.apache.cassandra.harry.model.Model;
+import org.apache.cassandra.harry.model.SelectHelper;
+import org.apache.cassandra.harry.operations.CompiledStatement;
+import org.apache.cassandra.harry.operations.FilteringQuery;
+import org.apache.cassandra.harry.operations.Query;
+import org.apache.cassandra.harry.operations.Relation;
+import org.apache.cassandra.harry.sut.DoubleWritingSut;
+import org.apache.cassandra.harry.sut.QueryModifyingSut;
+import org.apache.cassandra.harry.sut.SystemUnderTest;
+import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.tracker.DataTracker;
+import org.apache.cassandra.harry.tracker.DefaultDataTracker;
+
+public class StaticsTortureTest extends IntegrationTestBase
+{
+ private static final int MAX_PARTITION_SIZE = 10_000;
+ private static final int NUM_PARTITIONS = 100;
+ private static final int UNIQUE_CELL_VALUES = 5;
+
+ long seed = 1;
+
+ @Test
+ public void staticsTortureTest()
+ {
+ CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6);
+ staticsTortureTest(Arrays.asList(ColumnSpec.ck("ck1",
ColumnSpec.asciiType(4, 100)),
+ ColumnSpec.ck("ck2",
ColumnSpec.asciiType),
+ ColumnSpec.ck("ck3",
ColumnSpec.int64Type)));
+
+ for (boolean b1 : new boolean[]{ true, false })
+ for (boolean b2 : new boolean[]{ true, false })
+ for (boolean b3 : new boolean[]{ true, false })
+ {
+ staticsTortureTest(Arrays.asList(ColumnSpec.ck("ck1",
ColumnSpec.asciiType(4, 100), b1),
+ ColumnSpec.ck("ck2",
ColumnSpec.asciiType, b2),
+ ColumnSpec.ck("ck3",
ColumnSpec.int64Type, b3)));
+ }
+ }
+
+ public void staticsTortureTest(List<ColumnSpec<?>> cks)
+ {
+ SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl" + (seed++),
+ Arrays.asList(ColumnSpec.ck("pk1",
ColumnSpec.int64Type),
+ ColumnSpec.ck("pk2",
ColumnSpec.asciiType(4, 100)),
+ ColumnSpec.ck("pk3",
ColumnSpec.int64Type)),
+ cks,
+
Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)),
+
ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
+
ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)),
+
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100)),
+
ColumnSpec.staticColumn("s2", ColumnSpec.int64Type),
+
ColumnSpec.staticColumn("s3", ColumnSpec.asciiType(40, 100))
+ ));
+
+ sut.schemaChange(schema.compile().cql());
+ SchemaSpec debugSchema = schema.cloneWithName(schema.keyspace,
schema.table + "_debug");
+ sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table +
"_debug").compile().cql());
+ sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s
(%s) USING 'sai' " +
+ "WITH OPTIONS = {'case_sensitive':
'false', 'normalize': 'true', 'ascii': 'true'};",
+ schema.table,
+ schema.regularColumns.get(0).name,
+ schema.keyspace,
+ schema.table,
+ schema.regularColumns.get(0).name));
+ sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s
(%s) USING 'sai';",
+ schema.table,
+ schema.regularColumns.get(1).name,
+ schema.keyspace,
+ schema.table,
+ schema.regularColumns.get(1).name));
+ sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s
(%s) USING 'sai';",
+ schema.table,
+ schema.regularColumns.get(2).name,
+ schema.keyspace,
+ schema.table,
+ schema.regularColumns.get(2).name));
+ sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s
(%s) USING 'sai';",
+ schema.table,
+ schema.staticColumns.get(0).name,
+ schema.keyspace,
+ schema.table,
+ schema.staticColumns.get(0).name));
+ sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s
(%s) USING 'sai';",
+ schema.table,
+ schema.staticColumns.get(1).name,
+ schema.keyspace,
+ schema.table,
+ schema.staticColumns.get(1).name));
+ sut.schemaChange(String.format("CREATE INDEX %s_%s_sai_idx ON %s.%s
(%s) USING 'sai';",
+ schema.table,
+ schema.staticColumns.get(2).name,
+ schema.keyspace,
+ schema.table,
+ schema.staticColumns.get(2).name));
+ DataTracker tracker = new DefaultDataTracker();
+ TokenPlacementModel.ReplicationFactor rf = new
TokenPlacementModel.SimpleReplicationFactor(cluster.size());
+ ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed,
+
MAX_PARTITION_SIZE,
+ 100,
+ tracker,
+ new
DoubleWritingSut(sut,
+
new QueryModifyingSut(sut,
+
schema.keyspace + "." + schema.table,
+
debugSchema.keyspace + "." +
debugSchema.table)),
+ schema,
+ rf,
+
SystemUnderTest.ConsistencyLevel.QUORUM);
+
+ EntropySource rng = new JdkRandomEntropySource(1l);
+ long[] values = new long[UNIQUE_CELL_VALUES];
+ for (int i = 0; i < values.length; i++)
+ values[i] = rng.next();
+
+ for (int i = 0; i < NUM_PARTITIONS; i++)
+ {
+ history.visitPartition(i)
+ .insert(1,
+ new long[]{ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)],
+ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)],
+ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)]
+ },
+ new long[]{ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)],
+ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)],
+ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)]
+ });
+ history.visitPartition(i)
+ .insert(5,
+ new long[]{ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)],
+ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)],
+ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)]
+ },
+ new long[]{ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)],
+ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)],
+ rng.nextBoolean() ?
DataGenerators.UNSET_DESCR : values[rng.nextInt(values.length)]
+ });
+
+ if (rng.nextFloat() > 0.9f)
+ {
+ history.visitPartition(i)
+ .deleteRowRange(rng.nextInt(5), rng.nextInt(5),
rng.nextBoolean(), rng.nextBoolean());
+ }
+
+ if (rng.nextFloat() > 0.9f)
+ {
+ history.visitPartition(i)
+ .deleteColumns();
+ }
+
+ if (i % 50 == 0)
+ cluster.get(1).nodetool("flush", schema.keyspace,
schema.table);
+ }
+
+ Model model = new AgainstSutChecker(tracker, history.clock(), sut,
schema, schema.cloneWithName(schema.keyspace, debugSchema.table)) {
+ @Override
+ protected List<ResultSetRow> executeOnDebugSchema(Query query)
+ {
+ CompiledStatement s2 =
query.toSelectStatement(doubleWriteTable.allColumnsSet, true)
+ .withSchema(schema.keyspace,
schema.table, doubleWriteTable.keyspace, doubleWriteTable.table)
+ .withFiltering();
+ return SelectHelper.execute(sut, clock, s2, schema);
+ }
+ };
+
+ for (int pdx = 0; pdx < NUM_PARTITIONS; pdx++)
+ {
+ long pd = history.presetSelector.pdAtPosition(pdx);
+ history.presetSelector.pdAtPosition(1);
+ for (int i1 = 0; i1 < values.length; i1++)
+ for (int i2 = 0; i2 < values.length; i2++)
+ for (int i3 = 0; i3 < values.length; i3++)
+ {
+ long[] descriptors = new long[]{ values[i1],
values[i2], values[i3],
+ values[i1],
values[i2], values[i3] };
+ List<Relation> relations = new ArrayList<>();
+ Stream.concat(schema.regularColumns.stream(),
+ schema.staticColumns.stream())
+ .forEach(new Consumer<>()
+ {
+ int counter = 0;
+
+ @Override
+ public void accept(ColumnSpec<?> column)
+ {
+ if (rng.nextBoolean())
+ return;
+
+ if
(column.type.toString().equals(ColumnSpec.int64Type.toString()))
+ {
+ if (rng.nextBoolean())
+ {
+
relations.add(Relation.relation(Relation.RelationKind.EQ,
+
column,
+
descriptors[counter]));
+ }
+ else
+ {
+
Relation.relation(rng.nextBoolean() ? Relation.RelationKind.LT :
Relation.RelationKind.GT,
+ column,
+
descriptors[counter]);
+ }
+ }
+ else
+ {
+
Relation.relation(Relation.RelationKind.EQ,
+ column,
+
descriptors[counter]);
+ }
+
+ counter++;
+ }
+ });
+
+ // Without partition key
+ model.validate(new FilteringQuery(-1, false,
relations, schema)
+ {
+ @Override
+ public CompiledStatement toSelectStatement()
+ {
+ return SelectHelper.select(schemaSpec, null,
schemaSpec.allColumnsSet, relations, reverse, true);
+ }
+ });
+ model.validate(new FilteringQuery(pd, false,
relations, schema));
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java
b/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java
index 2ee0c50add..4ad7f29369 100644
--- a/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java
+++ b/test/harry/main/org/apache/cassandra/harry/ddl/SchemaSpec.java
@@ -31,13 +31,15 @@ public class SchemaSpec
{
public interface SchemaSpecFactory
{
- public SchemaSpec make(long seed, SystemUnderTest sut);
+ SchemaSpec make(long seed, SystemUnderTest sut);
}
public final DataGenerators.KeyGenerator pkGenerator;
public final DataGenerators.KeyGenerator ckGenerator;
private final boolean isCompactStorage;
+ private final boolean disableReadRepair;
+ private final String compactionStrategy;
public final boolean trackLts;
// These fields are immutable, and are safe as public
@@ -65,23 +67,26 @@ public class SchemaSpec
List<ColumnSpec<?>> regularColumns,
List<ColumnSpec<?>> staticColumns)
{
- this(keyspace, table, partitionKeys, clusteringKeys, regularColumns,
staticColumns, false, false);
+ this(keyspace, table, partitionKeys, clusteringKeys, regularColumns,
staticColumns, false, false, null, false);
}
public SchemaSpec cloneWithName(String ks,
String table)
{
- return new SchemaSpec(ks, table, partitionKeys, clusteringKeys,
regularColumns, staticColumns, isCompactStorage, trackLts);
+ return new SchemaSpec(ks, table, partitionKeys, clusteringKeys,
regularColumns, staticColumns,
+ isCompactStorage, disableReadRepair,
compactionStrategy, trackLts);
}
public SchemaSpec trackLts()
{
- return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys,
regularColumns, staticColumns, isCompactStorage, true);
+ return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys,
regularColumns, staticColumns,
+ isCompactStorage, true, compactionStrategy,
disableReadRepair);
}
public SchemaSpec withCompactStorage()
{
- return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys,
regularColumns, staticColumns, true, trackLts);
+ return new SchemaSpec(keyspace, table, partitionKeys, clusteringKeys,
regularColumns,
+ staticColumns, true, disableReadRepair,
compactionStrategy, trackLts);
}
public SchemaSpec(String keyspace,
@@ -91,6 +96,8 @@ public class SchemaSpec
List<ColumnSpec<?>> regularColumns,
List<ColumnSpec<?>> staticColumns,
boolean isCompactStorage,
+ boolean disableReadRepair,
+ String compactionStrategy,
boolean trackLts)
{
assert !isCompactStorage || clusteringKeys.size() == 0 ||
regularColumns.size() <= 1;
@@ -98,6 +105,8 @@ public class SchemaSpec
this.keyspace = keyspace;
this.table = table;
this.isCompactStorage = isCompactStorage;
+ this.disableReadRepair = disableReadRepair;
+ this.compactionStrategy = compactionStrategy;
this.partitionKeys = Collections.unmodifiableList(new
ArrayList<>(partitionKeys));
for (int i = 0; i < partitionKeys.size(); i++)
@@ -296,12 +305,24 @@ public class SchemaSpec
sb.append(')');
- Runnable appendWith = doOnce(() -> sb.append(" WITH "));
+ Runnable appendWith = doOnce(() -> sb.append(" WITH"));
if (isCompactStorage)
{
appendWith.run();
- sb.append("COMPACT STORAGE AND");
+ sb.append(" COMPACT STORAGE AND");
+ }
+
+ if (disableReadRepair)
+ {
+ appendWith.run();
+ sb.append(" read_repair = 'NONE' AND");
+ }
+
+ if (compactionStrategy != null)
+ {
+ appendWith.run();
+ sb.append(" compaction = {'class':
'").append(compactionStrategy).append("'} AND");
}
if (clusteringKeys.size() > 0)
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/AgainstSutChecker.java
b/test/harry/main/org/apache/cassandra/harry/model/AgainstSutChecker.java
index a9c35337df..e4bd009c8c 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/AgainstSutChecker.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/AgainstSutChecker.java
@@ -50,11 +50,11 @@ import org.apache.cassandra.harry.tracker.DataTracker;
*/
public class AgainstSutChecker implements Model
{
- private final OpSelectors.Clock clock;
- private final SystemUnderTest sut;
- private final SchemaSpec schema;
- private final SchemaSpec doubleWriteTable;
- private final DataTracker tracker;
+ protected final OpSelectors.Clock clock;
+ protected final SystemUnderTest sut;
+ protected final SchemaSpec schema;
+ protected final SchemaSpec doubleWriteTable;
+ protected final DataTracker tracker;
public AgainstSutChecker(DataTracker tracker,
OpSelectors.Clock clock,
@@ -72,14 +72,12 @@ public class AgainstSutChecker implements Model
public void validate(Query query)
{
tracker.beginValidation(query.pd);
- CompiledStatement s1 = query.toSelectStatement(schema.allColumnsSet,
true);
- CompiledStatement s2 = s1.withSchema(schema.keyspace, schema.table,
- doubleWriteTable.keyspace,
doubleWriteTable.table);
- List<ResultSetRow> rows1 = SelectHelper.execute(sut, clock, s1,
schema);
- List<ResultSetRow> rows2 = SelectHelper.execute(sut, clock, s2,
doubleWriteTable);
+
+ List<ResultSetRow> rows1 = executeOnMainSchema(query);
+ List<ResultSetRow> rows2 = executeOnDebugSchema(query);
if (rows1.size() != rows2.size())
- throw new IllegalStateException(String.format("Sizes do not match
%d %d", rows1.size(), rows2.size()));
+ throw new IllegalStateException(String.format("Sizes do not match
%d %d\n%s\n%s\nQuery:%s\n", rows1.size(), rows2.size(), rows1, rows2,
query.toSelectStatement()));
for (int i = 0; i < rows1.size(); i++)
{
@@ -95,5 +93,17 @@ public class AgainstSutChecker implements Model
tracker.endValidation(query.pd);
}
+ protected final List<ResultSetRow> executeOnMainSchema(Query query)
+ {
+ CompiledStatement s1 = query.toSelectStatement(schema.allColumnsSet,
true);
+ return SelectHelper.execute(sut, clock, s1, schema);
+ }
+
+ protected List<ResultSetRow> executeOnDebugSchema(Query query)
+ {
+ CompiledStatement s2 =
query.toSelectStatement(doubleWriteTable.allColumnsSet, true)
+ .withSchema(schema.keyspace, schema.table,
doubleWriteTable.keyspace, doubleWriteTable.table);
+ return SelectHelper.execute(sut, clock, s2, schema);
+ }
}
diff --git a/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
b/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
index 36e4499c7d..8f2c6f5a4b 100644
--- a/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
+++ b/test/harry/main/org/apache/cassandra/harry/model/SelectHelper.java
@@ -59,7 +59,7 @@ public class SelectHelper
return select(schema, pd, null, relations, reverse, includeWriteTime);
}
- public static CompiledStatement select(SchemaSpec schema, long pd,
Set<ColumnSpec<?>> columns, List<Relation> relations, boolean reverse, boolean
includeWriteTime)
+ public static CompiledStatement select(SchemaSpec schema, Long pd,
Set<ColumnSpec<?>> columns, List<Relation> relations, boolean reverse, boolean
includeWriteTime)
{
boolean isWildcardQuery = columns == null;
if (isWildcardQuery)
@@ -126,21 +126,28 @@ public class SelectHelper
List<Object> bindings = new ArrayList<>();
- schema.inflateRelations(pd,
- relations,
- new SchemaSpec.AddRelationCallback()
- {
- boolean isFirst = true;
- public void accept(ColumnSpec<?> spec,
Relation.RelationKind kind, Object value)
- {
- if (isFirst)
- isFirst = false;
- else
- b.append(" AND ");
- b.append(kind.getClause(spec));
- bindings.add(value);
- }
- });
+ SchemaSpec.AddRelationCallback consumer = new
SchemaSpec.AddRelationCallback()
+ {
+ boolean isFirst = true;
+ public void accept(ColumnSpec<?> spec, Relation.RelationKind kind,
Object value)
+ {
+ if (isFirst)
+ isFirst = false;
+ else
+ b.append(" AND ");
+ b.append(kind.getClause(spec));
+ bindings.add(value);
+ }
+ };
+ if (pd != null)
+ {
+ Object[] pk = schema.inflatePartitionKey(pd);
+ for (int i = 0; i < pk.length; i++)
+ consumer.accept(schema.partitionKeys.get(i),
Relation.RelationKind.EQ, pk[i]);
+
+ }
+ schema.inflateRelations(relations, consumer);
+
addOrderBy(schema, b, reverse);
b.append(";");
Object[] bindingsArr = bindings.toArray(new Object[bindings.size()]);
diff --git
a/test/harry/main/org/apache/cassandra/harry/model/reconciler/PartitionState.java
b/test/harry/main/org/apache/cassandra/harry/model/reconciler/PartitionState.java
index 9d789cdd8c..4792de638d 100644
---
a/test/harry/main/org/apache/cassandra/harry/model/reconciler/PartitionState.java
+++
b/test/harry/main/org/apache/cassandra/harry/model/reconciler/PartitionState.java
@@ -101,7 +101,6 @@ public class PartitionState implements
Iterable<Reconciler.RowState>
rows.put(cd, rowState);
}
PartitionState ps = new PartitionState(pd, debugCd, staticRow, rows,
schema);
- System.out.println("ps.rows.size() = " + ps.rows.size());
return ps;
}
diff --git
a/test/harry/main/org/apache/cassandra/harry/operations/CompiledStatement.java
b/test/harry/main/org/apache/cassandra/harry/operations/CompiledStatement.java
index dc29bb1cbb..3d1428f782 100644
---
a/test/harry/main/org/apache/cassandra/harry/operations/CompiledStatement.java
+++
b/test/harry/main/org/apache/cassandra/harry/operations/CompiledStatement.java
@@ -41,6 +41,13 @@ public class CompiledStatement
bindings);
}
+ public CompiledStatement withFiltering()
+ {
+ return new CompiledStatement(cql.replace(";",
+ " ALLOW FILTERING;"),
+ bindings);
+ }
+
public Object[] bindings()
{
return bindings;
diff --git
a/test/harry/main/org/apache/cassandra/harry/operations/FilteringQuery.java
b/test/harry/main/org/apache/cassandra/harry/operations/FilteringQuery.java
index e698025901..1a39e50fa8 100644
--- a/test/harry/main/org/apache/cassandra/harry/operations/FilteringQuery.java
+++ b/test/harry/main/org/apache/cassandra/harry/operations/FilteringQuery.java
@@ -65,4 +65,10 @@ public class FilteringQuery extends Query
{
throw new IllegalStateException("not implemented for filtering query");
}
+
+ @Override
+ public String toString()
+ {
+ return "FilteringQuery{pd=" + pd + ", reverse=" + reverse + ",
relations=" + relations + '}';
+ }
}
diff --git
a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
index d024688253..3d142e6c7d 100644
--- a/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
+++ b/test/harry/main/org/apache/cassandra/harry/sut/injvm/InJvmSutBase.java
@@ -129,12 +129,17 @@ public class InJvmSutBase<NODE extends IInstance, CLUSTER
extends ICluster<NODE>
cluster.schemaChange(statement);
}
+ public Object[][] execute(String statement, ConsistencyLevel cl, int
pageSize, Object... bindings)
+ {
+ return execute(statement, cl, loadBalancingStrategy.get(), pageSize,
bindings);
+ }
+
public Object[][] execute(String statement, ConsistencyLevel cl, Object...
bindings)
{
- return execute(statement, cl, loadBalancingStrategy.get(), bindings);
+ return execute(statement, cl, loadBalancingStrategy.get(), 1,
bindings);
}
- public Object[][] execute(String statement, ConsistencyLevel cl, int
coordinator, Object... bindings)
+ public Object[][] execute(String statement, ConsistencyLevel cl, int
coordinator, int pageSize, Object... bindings)
{
if (isShutdown.get())
throw new RuntimeException("Instance is shut down");
@@ -151,7 +156,7 @@ public class InJvmSutBase<NODE extends IInstance, CLUSTER
extends ICluster<NODE>
return Iterators.toArray(cluster
// round-robin
.coordinator(coordinator)
- .executeWithPaging(statement,
toApiCl(cl), 1, bindings),
+ .executeWithPaging(statement,
toApiCl(cl), pageSize, bindings),
Object[].class);
}
else
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]