This is an automated email from the ASF dual-hosted git repository. aweisberg pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit e7ef1da1749db6fdfb7af4961ee9da21912c4d38 Author: Ariel Weisberg <[email protected]> AuthorDate: Wed Sep 25 13:58:01 2024 -0400 Accord should not block partition restricted index queries Patch by Ariel Weisberg; Reviewed by David Capwell for CASSANDRA-19955 --- .../cql3/statements/TransactionStatement.java | 21 +- .../statements/schema/AlterTableStatement.java | 15 +- .../cassandra/db/SinglePartitionReadCommand.java | 26 ++ .../org/apache/cassandra/db/filter/RowFilter.java | 7 + .../cassandra/index/sai/plan/QueryController.java | 24 +- .../org/apache/cassandra/schema/TableMetadata.java | 4 + .../cassandra/service/accord/IAccordService.java | 2 +- .../cassandra/service/accord/txn/TxnNamedRead.java | 9 +- .../cassandra/service/accord/txn/TxnRead.java | 2 +- .../cassandra/service/reads/DataResolver.java | 7 +- .../distributed/test/accord/AccordCQLTestBase.java | 187 ++++++++++++- .../distributed/test/accord/AccordTestBase.java | 1 + .../cassandra/fuzz/sai/AccordMultiNodeSAITest.java | 35 +++ .../fuzz/sai/AccordSingleNodeSAITest.java | 27 ++ .../cassandra/fuzz/sai/MultiNodeSAITest.java | 80 +----- ...iNodeSAITest.java => MultiNodeSAITestBase.java} | 24 +- .../cassandra/fuzz/sai/SingleNodeSAITest.java | 288 +-------------------- ...NodeSAITest.java => SingleNodeSAITestBase.java} | 44 +++- .../cql3/statements/TransactionStatementTest.java | 77 +----- .../cql3/validation/entities/TupleTypeTest.java | 2 +- 20 files changed, 425 insertions(+), 457 deletions(-) diff --git a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java index 6d59a3a37d..e948d5dcaa 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java @@ -61,6 +61,7 @@ import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.SinglePartitionReadQuery; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.partitions.FilteredPartition; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.service.ClientState; @@ -78,8 +79,8 @@ import org.apache.cassandra.service.accord.txn.TxnReference; import org.apache.cassandra.service.accord.txn.TxnResult; import org.apache.cassandra.service.accord.txn.TxnUpdate; import org.apache.cassandra.service.accord.txn.TxnWrite; -import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.FBUtilities; @@ -97,6 +98,7 @@ public class TransactionStatement implements CQLStatement.CompositeCQLStatement, private static final Logger logger = LoggerFactory.getLogger(TransactionStatement.class); public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' has already been used by a LET assignment."; + public static final String INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE = "SELECT must specify either all partition key elements. Partition key elements must be always specified with equality operators; %s %s"; public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "SELECT must specify either all primary key elements or all partition key elements and LIMIT 1. In both cases partition key elements must be always specified with equality operators; %s %s"; public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates within transactions may not specify their own conditions; %s statement %s"; public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates within transactions may not specify custom timestamps; %s statement %s"; @@ -107,6 +109,7 @@ public class TransactionStatement implements CQLStatement.CompositeCQLStatement, public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT references must specify a column."; public static final String TRANSACTIONS_DISABLED_MESSAGE = "Accord transactions are disabled. (See accord.enabled in cassandra.yaml)"; public static final String ILLEGAL_RANGE_QUERY_MESSAGE = "Range queries are not allowed for reads within a transaction; %s %s"; + public static final String UNSUPPORTED_MIGRATION = "Transaction Statement is unsupported when migrating away from Accord or before migration to Accord is complete for a range"; static class NamedSelect { @@ -383,14 +386,11 @@ public class TransactionStatement implements CQLStatement.CompositeCQLStatement, for (NamedSelect assignment : assignments) checkFalse(isSelectingMultipleClusterings(assignment.select, options), INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "LET assignment", assignment.select.source); - if (returningSelect != null) - checkFalse(isSelectingMultipleClusterings(returningSelect.select, options), INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "returning SELECT", returningSelect.select.source); - Txn txn = createTxn(state.getClientState(), options); TxnResult txnResult = AccordService.instance().coordinate(txn, options.getConsistency(), requestTime); if (txnResult.kind() == retry_new_protocol) - throw new IllegalStateException("Transaction statement should never be required to switch consensus protocols"); + throw new InvalidRequestException(UNSUPPORTED_MIGRATION); TxnData data = (TxnData)txnResult; if (returningSelect != null) @@ -560,7 +560,7 @@ public class TransactionStatement implements CQLStatement.CompositeCQLStatement, throw invalidRequest(NO_COUNTERS_IN_TXNS_MESSAGE, "SELECT", prepared.source); returningSelect = new NamedSelect(TxnDataName.returning(), prepared); - checkAtMostOneRowSpecified(returningSelect.select, "returning select"); + checkAtMostOnePartitionSpecified(returningSelect.select, "returning select"); } List<RowDataReference> returningReferences = null; @@ -600,6 +600,15 @@ public class TransactionStatement implements CQLStatement.CompositeCQLStatement, return new TransactionStatement(preparedAssignments, returningSelect, returningReferences, preparedUpdates, preparedConditions, bindVariables); } + /** + * Do not use this method in execution!!! It is only allowed during prepare because it outputs a query raw text. + * We don't want it print it for a user who provided an identifier of someone's else prepared statement. + */ + private static void checkAtMostOnePartitionSpecified(SelectStatement select, String name) + { + checkTrue(select.getRestrictions().hasPartitionKeyRestrictions(), INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE, name, select.source); + } + /** * Do not use this method in execution!!! It is only allowed during prepare because it outputs a query raw text. * We don't want it print it for a user who provided an identifier of someone's else prepared statement. diff --git a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java index b3536910c8..38a8a5c23f 100644 --- a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java @@ -66,6 +66,7 @@ import org.apache.cassandra.schema.UserFunctions; import org.apache.cassandra.schema.ViewMetadata; import org.apache.cassandra.schema.Views; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode; import org.apache.cassandra.service.reads.repair.ReadRepairStrategy; import org.apache.cassandra.tcm.ClusterMetadata; @@ -87,6 +88,9 @@ public abstract class AlterTableStatement extends AlterSchemaStatement { private static final Logger logger = LoggerFactory.getLogger(AlterTableStatement.class); + public static final String ACCORD_COUNTER_TABLES_UNSUPPORTED = "Counters are not supported with Accord for table %s.%s"; + public static final String ACCORD_COUNTER_COLUMN_UNSUPPORTED = "Cannot add a counter column to Accord table %s.%s with transactional mode %s and transactional migration from %s"; + protected final String tableName; private final boolean ifExists; protected ClientState state; @@ -325,6 +329,9 @@ public abstract class AlterTableStatement extends AlterSchemaStatement return; } + if (type.isCounter() && (table.params.transactionalMode.accordIsEnabled || table.params.transactionalMigrationFrom.migratingFromAccord())) + throw ire(format(ACCORD_COUNTER_COLUMN_UNSUPPORTED, keyspaceName, tableName, table.params.transactionalMode, table.params.transactionalMigrationFrom)); + if (table.isCompactTable()) throw ire("Cannot add new column to a COMPACT STORAGE table"); @@ -578,7 +585,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement validateDefaultTimeToLive(attrs.asNewTableParams()); } - private TableParams validateAndUpdateTransactionalMigration(TableParams prev, TableParams next) + private TableParams validateAndUpdateTransactionalMigration(boolean isCounter, TableParams prev, TableParams next) { if (next.transactionalMode.accordIsEnabled && SchemaConstants.isSystemKeyspace(keyspaceName)) throw ire("Cannot enable accord on system tables (%s.%s)", keyspaceName, tableName); @@ -588,6 +595,10 @@ public abstract class AlterTableStatement extends AlterSchemaStatement boolean explicitlySetMigrationFrom = attrs.hasOption(Option.TRANSACTIONAL_MIGRATION_FROM); // set table to migrating TransactionalMigrationFromMode newMigrateFrom = TransactionalMigrationFromMode.fromMode(prev.transactionalMode, next.transactionalMode); + + if (isCounter && (next.transactionalMode != TransactionalMode.off || newMigrateFrom != TransactionalMigrationFromMode.none || next.transactionalMigrationFrom != TransactionalMigrationFromMode.none)) + throw ire(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, keyspaceName, tableName)); + boolean forceMigrationChange = modeChange && explicitlySetMigrationFrom && next.transactionalMigrationFrom != newMigrateFrom; if (modeChange && next.transactionalMode.accordIsEnabled && !DatabaseDescriptor.getAccordTransactionsEnabled()) @@ -642,7 +653,7 @@ public abstract class AlterTableStatement extends AlterSchemaStatement if (!params.compression.isEnabled()) Guardrails.uncompressedTablesEnabled.ensureEnabled(state); - params = validateAndUpdateTransactionalMigration(table.params, params); + params = validateAndUpdateTransactionalMigration(table.isCounter(), table.params, params); return keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params))); } diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index ea770b812f..06bedf08da 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -1272,6 +1272,32 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePar return false; } + /* + * The execution method does not need to perform reconciliation so the read command + * should execute in a mannager suited to not needing reconciliation. Such as when + * executing transactionally at a single replica and doing an index scan where the index + * scan should not return extra rows and expect post filtering at the coordinator. + */ + public SinglePartitionReadCommand withoutReconciliation() + { + if (rowFilter().isEmpty()) + return this; + return create(serializedAtEpoch(), + isDigestQuery(), + digestVersion(), + acceptsTransient(), + allowsOutOfRangeReads(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter().withoutReconciliation(), + limits(), + partitionKey(), + clusteringIndexFilter(), + indexQueryPlan(), + isTrackingWarnings()); + } + /** * Groups multiple single partition read commands. */ diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index f7e9ad911b..9ea44ead96 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -383,6 +383,13 @@ public class RowFilter implements Iterable<RowFilter.Expression> return withNewExpressions(newExpressions); } + public RowFilter withoutReconciliation() + { + if (needsReconciliation) + return new RowFilter(expressions, false); + return this; + } + public boolean hasNonKeyExpressions() { for (Expression e : expressions) diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java index 4f8efb8248..2db2ba904a 100644 --- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java +++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java @@ -24,8 +24,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; - import javax.annotation.Nullable; import com.google.common.collect.Lists; @@ -167,6 +167,22 @@ public class QueryController return partition.queryMemtableAndDisk(cfs, executionController); } + private static Runnable getIndexReleaser(Set<SSTableIndex> referencedIndexes) + { + return new Runnable() + { + boolean closed; + @Override + public void run() + { + if (closed) + return; + closed = true; + referencedIndexes.forEach(SSTableIndex::releaseQuietly); + } + }; + } + /** * Build a {@link KeyRangeIterator.Builder} from the given list of {@link Expression}s. * <p> @@ -195,7 +211,7 @@ public class QueryController expressions = expressions.stream().filter(e -> e.getIndexOperator() != Expression.IndexOperator.ANN).collect(Collectors.toList()); QueryViewBuilder.QueryView queryView = new QueryViewBuilder(expressions, mergeRange).build(); - Runnable onClose = () -> queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly); + Runnable onClose = getIndexReleaser(queryView.referencedIndexes); KeyRangeIterator.Builder builder = command.rowFilter().isStrict() ? KeyRangeIntersectionIterator.builder(expressions.size(), onClose) : KeyRangeUnionIterator.builder(expressions.size(), onClose); @@ -314,7 +330,7 @@ public class QueryController KeyRangeIterator memtableResults = index.memtableIndexManager().searchMemtableIndexes(queryContext, planExpression, mergeRange); QueryViewBuilder.QueryView queryView = new QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build(); - Runnable onClose = () -> queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly); + Runnable onClose = getIndexReleaser(queryView.referencedIndexes); try { @@ -354,7 +370,7 @@ public class QueryController // search memtable before referencing sstable indexes; otherwise we may miss newly flushed memtable index KeyRangeIterator memtableResults = index.memtableIndexManager().limitToTopResults(queryContext, sourceKeys, planExpression); QueryViewBuilder.QueryView queryView = new QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build(); - Runnable onClose = () -> queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly); + Runnable onClose = getIndexReleaser(queryView.referencedIndexes); try { diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java index ac9d77e01c..37e66fcc80 100644 --- a/src/java/org/apache/cassandra/schema/TableMetadata.java +++ b/src/java/org/apache/cassandra/schema/TableMetadata.java @@ -67,6 +67,7 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.service.accord.fastpath.FastPathStrategy; import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.serialization.UDTAndFunctionsAwareMetadataSerializer; @@ -76,6 +77,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.github.jamm.Unmetered; +import static accord.utils.Invariants.checkState; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.transform; import static java.lang.String.format; @@ -553,6 +555,8 @@ public class TableMetadata implements SchemaElement except("Missing partition keys for table %s", toString()); indexes.validate(this); + + checkState((params.transactionalMode == TransactionalMode.off && params.transactionalMigrationFrom == TransactionalMigrationFromMode.none) || !isCounter(), "Counters are not supported with Accord for table " + this); } /** diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java b/src/java/org/apache/cassandra/service/accord/IAccordService.java index 97ebae6963..4d3505f626 100644 --- a/src/java/org/apache/cassandra/service/accord/IAccordService.java +++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java @@ -64,7 +64,7 @@ import static com.google.common.base.Preconditions.checkNotNull; public interface IAccordService { EnumSet<ConsistencyLevel> SUPPORTED_COMMIT_CONSISTENCY_LEVELS = EnumSet.of(ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, ConsistencyLevel.SERIAL, ConsistencyLevel.ALL); - EnumSet<ConsistencyLevel> SUPPORTED_READ_CONSISTENCY_LEVELS = EnumSet.of(ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, ConsistencyLevel.SERIAL); + EnumSet<ConsistencyLevel> SUPPORTED_READ_CONSISTENCY_LEVELS = EnumSet.of(ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, ConsistencyLevel.SERIAL, ConsistencyLevel.ALL); IVerbHandler<? extends Request> requestHandler(); IVerbHandler<? extends Reply> responseHandler(); diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java index 4787e2105b..d2c92583af 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java @@ -33,12 +33,12 @@ import accord.utils.async.AsyncChain; import accord.utils.async.AsyncChains; import org.apache.cassandra.concurrent.DebuggableTask; import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; @@ -119,7 +119,7 @@ public class TxnNamedRead extends AbstractSerialized<ReadCommand> return key; } - public AsyncChain<Data> read(Timestamp executeAt) + public AsyncChain<Data> read(ConsistencyLevel consistencyLevel, Timestamp executeAt) { SinglePartitionReadCommand command = (SinglePartitionReadCommand) get(); // TODO (required, safety): before release, double check reasoning that this is safe @@ -129,6 +129,8 @@ public class TxnNamedRead extends AbstractSerialized<ReadCommand> // this simply looks like the transaction witnessed TTL'd data and the data then expired // immediately after the transaction executed, and this simplifies things a great deal int nowInSeconds = (int) TimeUnit.MICROSECONDS.toSeconds(executeAt.hlc()); + if (consistencyLevel == null || consistencyLevel == ConsistencyLevel.ONE) + command = command.withoutReconciliation(); return performLocalRead(command, nowInSeconds); } @@ -144,8 +146,7 @@ public class TxnNamedRead extends AbstractSerialized<ReadCommand> SinglePartitionReadCommand read = command.withNowInSec(nowInSeconds); try (ReadExecutionController controller = read.executionController(); - UnfilteredPartitionIterator partition = read.executeLocally(controller); - PartitionIterator iterator = UnfilteredPartitionIterators.filter(partition, read.nowInSec())) + PartitionIterator iterator = UnfilteredPartitionIterators.filter(read.executeLocally(controller), read.nowInSec())) { TxnData result = new TxnData(); if (iterator.hasNext()) diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java index 391da765bf..f9d09409fb 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java @@ -200,7 +200,7 @@ public class TxnRead extends AbstractKeySorted<TxnNamedRead> implements Read public AsyncChain<Data> read(Seekable key, SafeCommandStore safeStore, Timestamp executeAt, DataStore store) { List<AsyncChain<Data>> results = new ArrayList<>(); - forEachWithKey((PartitionKey) key, read -> results.add(read.read(executeAt))); + forEachWithKey((PartitionKey) key, read -> results.add(read.read(cassandraConsistencyLevel, executeAt))); if (results.isEmpty()) // Result type must match everywhere diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java index 32bad6526d..7716856a28 100644 --- a/src/java/org/apache/cassandra/service/reads/DataResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java @@ -131,8 +131,7 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< private boolean usesReplicaFilteringProtection() { - // Key columns are immutable and should never need to participate in replica filtering - if (!command.rowFilter().hasNonKeyExpressions()) + if (command.rowFilter().isEmpty()) return false; if (command.isTopK()) @@ -278,6 +277,10 @@ public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< private UnaryOperator<PartitionIterator> preCountFilterForReplicaFilteringProtection() { + // Key columns are immutable and should never need to participate in replica filtering + if (!command.rowFilter().hasNonKeyExpressions()) + return results -> results; + return results -> { Index.Searcher searcher = command.indexSearcher(); // in case of "ALLOW FILTERING" without index diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java index c58b5d62b8..bbf9f30b6e 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java @@ -55,19 +55,25 @@ import org.apache.cassandra.distributed.api.ICoordinator; import org.apache.cassandra.distributed.api.QueryResults; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.distributed.shared.AssertUtils; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.service.accord.AccordTestUtils; import org.apache.cassandra.service.consensus.TransactionalMode; +import org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FailingConsumer; import org.assertj.core.api.Assertions; +import static java.lang.String.format; import static java.util.Collections.singletonList; import static org.apache.cassandra.cql3.CQLTester.row; +import static org.apache.cassandra.cql3.statements.schema.AlterTableStatement.ACCORD_COUNTER_COLUMN_UNSUPPORTED; +import static org.apache.cassandra.cql3.statements.schema.AlterTableStatement.ACCORD_COUNTER_TABLES_UNSUPPORTED; import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.fail; public abstract class AccordCQLTestBase extends AccordTestBase { @@ -92,12 +98,189 @@ public abstract class AccordCQLTestBase extends AccordTestBase SHARED_CLUSTER.schemaChange("CREATE TYPE " + KEYSPACE + ".person (height int, age int)"); } + @Test + public void testCounterCreateTableTransactionalModeFails() throws Exception + { + try + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v counter, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), cluster -> {}); + fail("Expected exception"); + } + catch (Throwable t) + { + assertEquals(IllegalStateException.class.getName(), t.getClass().getName()); + assertEquals(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, KEYSPACE, accordTableName), t.getMessage()); + } + } + + @Test + public void testCounterCreateTableTransactionalMigrationFromModeFails() throws Exception + { + try + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v counter, primary key (k, c)) WITH transactional_migration_from = '" + transactionalMode.name() + "'", cluster -> {}); + fail("Expected exception"); + } + catch (Throwable t) + { + assertEquals(IllegalStateException.class.getName(), t.getClass().getName()); + assertEquals(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, KEYSPACE, accordTableName), t.getMessage()); + } + } + + @Test + public void testCounterAlterTableTransactionalModeFails() throws Exception + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v counter, primary key (k, c))", cluster -> { + try + { + cluster.coordinator(1).execute("ALTER TABLE " + qualifiedAccordTableName + " WITH transactional_mode = '" + transactionalMode.name() + "';", ConsistencyLevel.ALL); + fail("Expected exception"); + } + catch (Throwable t) + { + assertEquals(InvalidRequestException.class.getName(), t.getClass().getName()); + assertEquals(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, KEYSPACE, accordTableName), t.getMessage()); + } + }); + } + + @Test + public void testCounterAlterTableTransactionalMigrationFromModeFails() throws Exception + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v counter, primary key (k, c))", cluster -> { + try + { + cluster.coordinator(1).execute("ALTER TABLE " + qualifiedAccordTableName + " WITH transactional_migration_from = '" + transactionalMode.name() + "';", ConsistencyLevel.ALL); + fail("Expected exception"); + } + catch (Throwable t) + { + assertEquals(InvalidRequestException.class.getName(), t.getClass().getName()); + assertEquals(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, KEYSPACE, accordTableName), t.getMessage()); + } + }); + } + + @Test + public void testCounterAddColumnFailsWithAccord() throws Exception + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, s int static, v int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), cluster -> { + try + { + cluster.coordinator(1).execute("ALTER TABLE " + qualifiedAccordTableName + " ADD (v2 counter);", ConsistencyLevel.ALL); + fail("Expected exception"); + } + catch (Throwable t) + { + assertEquals(InvalidRequestException.class.getName(), t.getClass().getName()); + assertEquals(format(ACCORD_COUNTER_COLUMN_UNSUPPORTED, KEYSPACE, accordTableName, transactionalMode, TransactionalMigrationFromMode.none), t.getMessage()); + } + }); + } + + @Test + public void testCounterAddColumnFailsWithMigration() throws Exception + { + test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, s int static, v int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), cluster -> { + try + { + cluster.coordinator(1).execute("ALTER TABLE " + qualifiedAccordTableName + " WITH transactional_mode = '" + TransactionalMode.off + "';", ConsistencyLevel.ALL); + cluster.coordinator(1).execute("ALTER TABLE " + qualifiedAccordTableName + " ADD (v2 counter);", ConsistencyLevel.ALL); + fail("Expected exception"); + } + catch (Throwable t) + { + assertEquals(InvalidRequestException.class.getName(), t.getClass().getName()); + assertEquals(format(ACCORD_COUNTER_COLUMN_UNSUPPORTED, KEYSPACE, accordTableName, TransactionalMode.off, transactionalMode), t.getMessage()); + } + }); + } + @Override protected void test(FailingConsumer<Cluster> fn) throws Exception { test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), fn); } + @Test + public void testPartitionMultiRowReturn() throws Exception + { + test(cluster -> { + for (int i = 0; i < 3; i++) + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), ConsistencyLevel.ALL, 42, 43 + i, 44 + i); + + String txn = "BEGIN TRANSACTION " + + "SELECT * " + + "FROM " + qualifiedAccordTableName + " " + + "WHERE k = 42;" + + "COMMIT TRANSACTION;"; + SimpleQueryResult result = cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL); + assertThat(result).hasSize(3) + .contains(42, 43, 44) + .contains(42, 44, 45) + .contains(42, 45, 46); + }); + } + + @Test + public void testSaiMultiRowReturn() throws Exception + { + test(cluster -> { + cluster.schemaChange("CREATE INDEX ON " + qualifiedAccordTableName + "(v) USING 'sai';"); + for (int i = 0; i < 3; i++) + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), ConsistencyLevel.ALL, 42, 43 + i, 44 + i); + + String txn = "BEGIN TRANSACTION " + + "SELECT * " + + "FROM " + qualifiedAccordTableName + " " + + "WHERE k = 42 AND v = 45;" + + "COMMIT TRANSACTION;"; + SimpleQueryResult result = cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL); + assertThat(result).hasSize(1) + .contains(42, 44, 45); + }); + } + + // This fails and it is expected, mostly just here as documentation until it is fixed + @Test + public void testSasiMultiRowReturn() throws Exception + { + test(cluster -> { + cluster.schemaChange("CREATE INDEX ON " + qualifiedAccordTableName + "(v) USING 'org.apache.cassandra.index.sasi.SASIIndex';"); + for (int i = 0; i < 3; i++) + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), ConsistencyLevel.ALL, 42, 43 + i, 44 + i); + + String txn = "BEGIN TRANSACTION " + + "SELECT * " + + "FROM " + qualifiedAccordTableName + " " + + "WHERE k = 42 AND v = 45;" + + "COMMIT TRANSACTION;"; + SimpleQueryResult result = cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL); + assertThat(result).hasSize(1) + .contains(42, 44, 45); + }); + } + + @Test + public void testLegacy2iMultiRowReturn() throws Exception + { + test(cluster -> { + cluster.schemaChange("CREATE INDEX ON " + qualifiedAccordTableName + "(v);"); + for (int i = 0; i < 3; i++) + cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), ConsistencyLevel.ALL, 42, 43 + i, 44 + i); + + String txn = "BEGIN TRANSACTION " + + "SELECT * " + + "FROM " + qualifiedAccordTableName + " " + + "WHERE k = 42 AND v = 45;" + + "COMMIT TRANSACTION;"; + SimpleQueryResult result = cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL); + assertThat(result).hasSize(1) + .contains(42, 44, 45); + }); + } + @Test public void testNonExistingKeyWithStaticUpdate() throws Exception { @@ -204,8 +387,8 @@ public abstract class AccordCQLTestBase extends AccordTestBase cluster.get(1).runOnInstance(() -> { StringBuilder sb = new StringBuilder("BEGIN TRANSACTION\n"); for (int i = 0; i < keyStrings.size() - 1; i++) - sb.append(String.format("LET row%d = (SELECT * FROM %s WHERE k=%s AND c=0);\n", i, currentTable, keyStrings.get(i))); - sb.append(String.format("SELECT * FROM %s WHERE k=%s AND c=0;\n", currentTable, keyStrings.get(keyStrings.size() - 1))); + sb.append(format("LET row%d = (SELECT * FROM %s WHERE k=%s AND c=0);\n", i, currentTable, keyStrings.get(i))); + sb.append(format("SELECT * FROM %s WHERE k=%s AND c=0;\n", currentTable, keyStrings.get(keyStrings.size() - 1))); sb.append("COMMIT TRANSACTION"); Unseekables<?> routables = AccordTestUtils.createTxn(sb.toString()).keys().toParticipants(); diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java index 92e81c73f3..b3e2407af2 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java @@ -335,6 +335,7 @@ public abstract class AccordTestBase extends TestBaseImpl Cluster.Builder builder = Cluster.build(nodes) .withoutVNodes() .withConfig(c -> c.with(Feature.GOSSIP) + .set("sasi_indexes_enabled", "true") .set("write_request_timeout", "10s") .set("transaction_timeout", "15s") .set("native_transport_timeout", "30s") diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/AccordMultiNodeSAITest.java b/test/distributed/org/apache/cassandra/fuzz/sai/AccordMultiNodeSAITest.java new file mode 100644 index 0000000000..a69f83b530 --- /dev/null +++ b/test/distributed/org/apache/cassandra/fuzz/sai/AccordMultiNodeSAITest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fuzz.sai; + +import org.junit.BeforeClass; + +public class AccordMultiNodeSAITest extends MultiNodeSAITestBase +{ + @BeforeClass + public static void before() throws Throwable + { + MultiNodeSAITestBase.before(true); + } + + public AccordMultiNodeSAITest() + { + super(true); + } +} diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/AccordSingleNodeSAITest.java b/test/distributed/org/apache/cassandra/fuzz/sai/AccordSingleNodeSAITest.java new file mode 100644 index 0000000000..40164cdcf7 --- /dev/null +++ b/test/distributed/org/apache/cassandra/fuzz/sai/AccordSingleNodeSAITest.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.fuzz.sai; + +public class AccordSingleNodeSAITest extends SingleNodeSAITestBase +{ + public AccordSingleNodeSAITest() + { + super(true); + } +} diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java index a86472b99a..5153fb15ed 100644 --- a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java +++ b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java @@ -18,88 +18,18 @@ package org.apache.cassandra.fuzz.sai; -import org.junit.Before; import org.junit.BeforeClass; -import org.apache.cassandra.distributed.Cluster; -import org.apache.cassandra.distributed.test.sai.SAIUtil; -import org.apache.cassandra.harry.ddl.SchemaSpec; -import org.apache.cassandra.harry.sut.injvm.InJvmSut; -import org.apache.cassandra.harry.sut.injvm.InJvmSutBase; - -import static org.apache.cassandra.distributed.api.Feature.GOSSIP; -import static org.apache.cassandra.distributed.api.Feature.NETWORK; - -public class MultiNodeSAITest extends SingleNodeSAITest +public class MultiNodeSAITest extends MultiNodeSAITestBase { - /** - * Chosing a fetch size has implications for how well this test will excercise paging, short-read protection, and - * other important parts of the distributed query apparatus. This should be set low enough to ensure a significant - * number of queries during validation page, but not too low that more expesive queries time out and fail the test. - */ - private static final int FETCH_SIZE = 10; - @BeforeClass public static void before() throws Throwable { - cluster = Cluster.build() - .withNodes(2) - // At lower fetch sizes, queries w/ hundreds or thousands of matches can take a very long time. - .withConfig(InJvmSutBase.defaultConfig().andThen(c -> c.set("range_request_timeout", "180s") - .set("read_request_timeout", "180s") - .set("native_transport_timeout", "180s") - .set("slow_query_log_timeout", "180s") - .with(GOSSIP).with(NETWORK))) - .createWithoutStarting(); - cluster.setUncaughtExceptionsFilter(t -> { - logger.error("Caught exception, reporting during shutdown. Ignoring.", t); - return true; - }); - cluster.startup(); - cluster = init(cluster); - sut = new InJvmSut(cluster) { - @Override - public Object[][] execute(String cql, ConsistencyLevel cl, Object[] bindings) - { - // The goal here is to make replicas as out of date as possible, modulo the efforts of repair - // and read-repair in the test itself. - if (cql.contains("SELECT")) - return super.execute(cql, ConsistencyLevel.ALL, FETCH_SIZE, bindings); - return super.execute(cql, ConsistencyLevel.NODE_LOCAL, bindings); - } - }; - } - - @Before - public void beforeEach() - { - cluster.schemaChange("DROP KEYSPACE IF EXISTS harry"); - cluster.schemaChange("CREATE KEYSPACE harry WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2};"); - } - - @Override - protected void flush(SchemaSpec schema) - { - cluster.get(1).nodetool("flush", schema.keyspace, schema.table); - cluster.get(2).nodetool("flush", schema.keyspace, schema.table); - } - - @Override - protected void repair(SchemaSpec schema) - { - cluster.get(1).nodetool("repair", schema.keyspace); - } - - @Override - protected void compact(SchemaSpec schema) - { - cluster.get(1).nodetool("compact", schema.keyspace); - cluster.get(2).nodetool("compact", schema.keyspace); + MultiNodeSAITestBase.before(false); } - @Override - protected void waitForIndexesQueryable(SchemaSpec schema) + public MultiNodeSAITest() { - SAIUtil.waitForIndexQueryable(cluster, schema.keyspace); + super(false); } -} \ No newline at end of file +} diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITestBase.java similarity index 81% copy from test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java copy to test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITestBase.java index a86472b99a..ee98e5c1fb 100644 --- a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java +++ b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITestBase.java @@ -30,7 +30,7 @@ import org.apache.cassandra.harry.sut.injvm.InJvmSutBase; import static org.apache.cassandra.distributed.api.Feature.GOSSIP; import static org.apache.cassandra.distributed.api.Feature.NETWORK; -public class MultiNodeSAITest extends SingleNodeSAITest +public abstract class MultiNodeSAITestBase extends SingleNodeSAITestBase { /** * Chosing a fetch size has implications for how well this test will excercise paging, short-read protection, and @@ -39,14 +39,26 @@ public class MultiNodeSAITest extends SingleNodeSAITest */ private static final int FETCH_SIZE = 10; + public MultiNodeSAITestBase(boolean withAccord) + { + super(withAccord); + } + @BeforeClass public static void before() throws Throwable + { + before(false); + } + + public static void before(boolean withAccord) throws Throwable { cluster = Cluster.build() .withNodes(2) // At lower fetch sizes, queries w/ hundreds or thousands of matches can take a very long time. .withConfig(InJvmSutBase.defaultConfig().andThen(c -> c.set("range_request_timeout", "180s") .set("read_request_timeout", "180s") + .set("transaction_timeout", "180s") + .set("write_request_timeout", "180s") .set("native_transport_timeout", "180s") .set("slow_query_log_timeout", "180s") .with(GOSSIP).with(NETWORK))) @@ -61,11 +73,15 @@ public class MultiNodeSAITest extends SingleNodeSAITest @Override public Object[][] execute(String cql, ConsistencyLevel cl, Object[] bindings) { - // The goal here is to make replicas as out of date as possible, modulo the efforts of repair - // and read-repair in the test itself. if (cql.contains("SELECT")) return super.execute(cql, ConsistencyLevel.ALL, FETCH_SIZE, bindings); - return super.execute(cql, ConsistencyLevel.NODE_LOCAL, bindings); + + // The goal here is to make replicas as out of date as possible, modulo the efforts of repair + // and read-repair in the test itself. node_local bypasses Accord which breaks any attempt at testing Accord + // so if we are running with Accord use QUORUM (which Accord will ignore since it runs with transactional + // mode full). + ConsistencyLevel consistencyLevel = withAccord ? ConsistencyLevel.QUORUM : ConsistencyLevel.NODE_LOCAL; + return super.execute(cql, consistencyLevel, bindings); } }; } diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java index bbe4ce8398..84ad2f013b 100644 --- a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java +++ b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java @@ -18,290 +18,10 @@ package org.apache.cassandra.fuzz.sai; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.junit.Test; - -import org.apache.cassandra.config.CassandraRelevantProperties; -import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase; -import org.apache.cassandra.harry.ddl.ColumnSpec; -import org.apache.cassandra.harry.ddl.SchemaSpec; -import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder; -import org.apache.cassandra.harry.gen.DataGenerators; -import org.apache.cassandra.harry.gen.EntropySource; -import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource; -import org.apache.cassandra.harry.model.QuiescentChecker; -import org.apache.cassandra.harry.model.SelectHelper; -import org.apache.cassandra.harry.model.reconciler.PartitionState; -import org.apache.cassandra.harry.model.reconciler.Reconciler; -import org.apache.cassandra.harry.operations.FilteringQuery; -import org.apache.cassandra.harry.operations.Query; -import org.apache.cassandra.harry.operations.Relation; -import org.apache.cassandra.harry.sut.SystemUnderTest; -import org.apache.cassandra.harry.sut.TokenPlacementModel; -import org.apache.cassandra.harry.tracker.DataTracker; -import org.apache.cassandra.harry.tracker.DefaultDataTracker; - -public class SingleNodeSAITest extends IntegrationTestBase +public class SingleNodeSAITest extends SingleNodeSAITestBase { - private static final int RUNS = 1; - - private static final int OPERATIONS_PER_RUN = 30_000; - private static final int REPAIR_SKIP = OPERATIONS_PER_RUN / 2; - private static final int FLUSH_SKIP = OPERATIONS_PER_RUN / 7; - private static final int VALIDATION_SKIP = OPERATIONS_PER_RUN / 100; - - private static final int NUM_PARTITIONS = OPERATIONS_PER_RUN / 1000; - protected static final int MAX_PARTITION_SIZE = 10_000; - private static final int UNIQUE_CELL_VALUES = 5; - - long seed = 1; - - @Test - public void basicSaiTest() - { - CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6); - SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl1", - Arrays.asList(ColumnSpec.ck("pk1", ColumnSpec.int64Type), - ColumnSpec.ck("pk2", ColumnSpec.asciiType(4, 100)), - ColumnSpec.ck("pk3", ColumnSpec.int64Type)), - Arrays.asList(ColumnSpec.ck("ck1", ColumnSpec.asciiType(4, 100)), - ColumnSpec.ck("ck2", ColumnSpec.asciiType, true), - ColumnSpec.ck("ck3", ColumnSpec.int64Type)), - Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)), - ColumnSpec.regularColumn("v2", ColumnSpec.int64Type), - ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)), - List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100)))) - .withCompactionStrategy("LeveledCompactionStrategy"); - - sut.schemaChange(schema.compile().cql()); - sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table + "_debug").compile().cql()); - sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai' ", - schema.regularColumns.get(0).name, - schema.keyspace, - schema.table, - schema.regularColumns.get(0).name)); - sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai';", - schema.regularColumns.get(1).name, - schema.keyspace, - schema.table, - schema.regularColumns.get(1).name)); - sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai';", - schema.regularColumns.get(2).name, - schema.keyspace, - schema.table, - schema.regularColumns.get(2).name)); - sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) USING 'sai';", - schema.staticColumns.get(0).name, - schema.keyspace, - schema.table, - schema.staticColumns.get(0).name)); - - waitForIndexesQueryable(schema); - - DataTracker tracker = new DefaultDataTracker(); - TokenPlacementModel.ReplicationFactor rf = new TokenPlacementModel.SimpleReplicationFactor(cluster.size()); - ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed, - MAX_PARTITION_SIZE, - MAX_PARTITION_SIZE, - tracker, - sut, - schema, - rf, - SystemUnderTest.ConsistencyLevel.QUORUM); - - for (int run = 0; run < RUNS; run++) - { - logger.info("Starting run {}/{}...", run + 1, RUNS); - EntropySource random = new JdkRandomEntropySource(run); - - // Populate the array of possible values for all operations in the run: - long[] values = new long[UNIQUE_CELL_VALUES]; - for (int i = 0; i < values.length; i++) - values[i] = random.next(); - - for (int i = 0; i < OPERATIONS_PER_RUN; i++) - { - int partitionIndex = random.nextInt(0, NUM_PARTITIONS); - - history.visitPartition(partitionIndex) - .insert(random.nextInt(MAX_PARTITION_SIZE), - new long[] { random.nextBoolean() ? DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)], - random.nextBoolean() ? DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)], - random.nextBoolean() ? DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] }, - new long[] { random.nextBoolean() ? DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] }); - - if (random.nextFloat() > 0.99f) - { - int row1 = random.nextInt(MAX_PARTITION_SIZE); - int row2 = random.nextInt(MAX_PARTITION_SIZE); - history.visitPartition(partitionIndex).deleteRowRange(Math.min(row1, row2), Math.max(row1, row2), - random.nextBoolean(), random.nextBoolean()); - } - else if (random.nextFloat() > 0.999f) - { - history.visitPartition(partitionIndex).deleteRowSlice(); - } - - if (random.nextFloat() > 0.995f) - { - history.visitPartition(partitionIndex).deleteColumns(); - } - - if (random.nextFloat() > 0.9995f) - { - history.visitPartition(partitionIndex).deletePartition(); - } - - if (i % REPAIR_SKIP == 0) - { - logger.debug("Repairing/flushing after operation {}...", i); - repair(schema); - } - else if (i % FLUSH_SKIP == 0) - { - logger.debug("Flushing after operation {}...", i); - flush(schema); - } - - if (i % VALIDATION_SKIP != 0) - continue; - - logger.debug("Validating partition at index {} after operation {} in run {}...", partitionIndex, i, run + 1); - - for (int j = 0; j < 10; j++) - { - List<Relation> relations = new ArrayList<>(); - - // For one text column and 2 numeric columns, we can use between 1 and 5 total relations. - int num = random.nextInt(1, 5); - - List<List<Relation.RelationKind>> pick = new ArrayList<>(); - //noinspection ArraysAsListWithZeroOrOneArgument - pick.add(new ArrayList<>(Arrays.asList(Relation.RelationKind.EQ))); // text column supports only EQ - pick.add(new ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT, Relation.RelationKind.LT))); - pick.add(new ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT, Relation.RelationKind.LT))); - - if (random.nextFloat() > 0.75f) - { - relations.addAll(Query.clusteringSliceQuery(schema, - partitionIndex, - random.next(), - random.next(), - random.nextBoolean(), - random.nextBoolean(), - false).relations); - } - - for (int k = 0; k < num; k++) - { - int column = random.nextInt(schema.regularColumns.size()); - Relation.RelationKind relationKind = pickKind(random, pick, column); - - if (relationKind != null) - relations.add(Relation.relation(relationKind, - schema.regularColumns.get(column), - values[random.nextInt(values.length)])); - } - - if (random.nextFloat() > 0.7f) - { - relations.add(Relation.relation(Relation.RelationKind.EQ, - schema.staticColumns.get(0), - values[random.nextInt(values.length)])); - } - - long pd = history.pdSelector().pdAtPosition(partitionIndex); - FilteringQuery query = new FilteringQuery(pd, false, relations, schema); - Reconciler reconciler = new Reconciler(history.pdSelector(), schema, history::visitor); - Set<ColumnSpec<?>> columns = new HashSet<>(schema.allColumns); - - PartitionState modelState = reconciler.inflatePartitionState(pd, tracker, query).filter(query); - - if (modelState.rows().size() > 0) - logger.debug("Model contains {} matching rows for query {}.", modelState.rows().size(), query); - - try - { - QuiescentChecker.validate(schema, - tracker, - columns, - modelState, - SelectHelper.execute(sut, history.clock(), query), - query); - - // Run the query again to see if the first execution caused an issue via read-repair: - QuiescentChecker.validate(schema, - tracker, - columns, - modelState, - SelectHelper.execute(sut, history.clock(), query), - query); - } - catch (Throwable t) - { - logger.debug("Partition index = {}, run = {}, j = {}, i = {}", partitionIndex, run, j, i); - - Query partitionQuery = Query.selectAllColumns(schema, pd, false); - QuiescentChecker.validate(schema, - tracker, - columns, - reconciler.inflatePartitionState(pd, tracker, partitionQuery), - SelectHelper.execute(sut, history.clock(), partitionQuery), - partitionQuery); - logger.debug("Partition state agrees. Throwing original error..."); - - throw t; - } - } - } - - if (run + 1 < RUNS) - { - logger.debug("Forcing compaction at the end of run {}...", run + 1); - compact(schema); - } - } - } - - protected void flush(SchemaSpec schema) + public SingleNodeSAITest() { - cluster.get(1).nodetool("flush", schema.keyspace, schema.table); - } - - protected void compact(SchemaSpec schema) - { - cluster.get(1).nodetool("compact", schema.keyspace); - } - - protected void repair(SchemaSpec schema) - { - // Repair is nonsensical for a single node, but a repair does flush first, so do that at least. - cluster.get(1).nodetool("flush", schema.keyspace, schema.table); - } - - protected void waitForIndexesQueryable(SchemaSpec schema) {} - - private static Relation.RelationKind pickKind(EntropySource random, List<List<Relation.RelationKind>> options, int column) - { - Relation.RelationKind kind = null; - - if (!options.get(column).isEmpty()) - { - List<Relation.RelationKind> possible = options.get(column); - int chosen = random.nextInt(possible.size()); - kind = possible.remove(chosen); - - if (kind == Relation.RelationKind.EQ) - possible.clear(); // EQ precludes LT and GT - else - possible.remove(Relation.RelationKind.EQ); // LT GT preclude EQ - } - - return kind; + super(false); } -} \ No newline at end of file +} diff --git a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java similarity index 87% copy from test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java copy to test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java index bbe4ce8398..bb64848ccc 100644 --- a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java +++ b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java @@ -22,11 +22,14 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; +import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.CassandraRelevantProperties; +import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase; import org.apache.cassandra.harry.ddl.ColumnSpec; import org.apache.cassandra.harry.ddl.SchemaSpec; @@ -43,10 +46,16 @@ import org.apache.cassandra.harry.operations.Query; import org.apache.cassandra.harry.operations.Relation; import org.apache.cassandra.harry.sut.SystemUnderTest; import org.apache.cassandra.harry.sut.TokenPlacementModel; +import org.apache.cassandra.harry.sut.injvm.InJvmSut; +import org.apache.cassandra.harry.sut.injvm.InJvmSutBase; import org.apache.cassandra.harry.tracker.DataTracker; import org.apache.cassandra.harry.tracker.DefaultDataTracker; +import org.apache.cassandra.service.consensus.TransactionalMode; -public class SingleNodeSAITest extends IntegrationTestBase +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + +public abstract class SingleNodeSAITestBase extends IntegrationTestBase { private static final int RUNS = 1; @@ -61,6 +70,36 @@ public class SingleNodeSAITest extends IntegrationTestBase long seed = 1; + protected boolean withAccord; + + @BeforeClass + public static void before() throws Throwable + { + cluster = Cluster.build() + .withNodes(1) + // At lower fetch sizes, queries w/ hundreds or thousands of matches can take a very long time. + .withConfig(InJvmSutBase.defaultConfig().andThen(c -> c.set("range_request_timeout", "180s") + .set("read_request_timeout", "180s") + .set("transaction_timeout", "180s") + .set("write_request_timeout", "180s") + .set("native_transport_timeout", "180s") + .set("slow_query_log_timeout", "180s") + .with(GOSSIP).with(NETWORK))) + .createWithoutStarting(); + cluster.setUncaughtExceptionsFilter(t -> { + logger.error("Caught exception, reporting during shutdown. Ignoring.", t); + return true; + }); + cluster.startup(); + cluster = init(cluster); + sut = new InJvmSut(cluster); + } + + public SingleNodeSAITestBase(boolean withAccord) + { + this.withAccord = withAccord; + } + @Test public void basicSaiTest() { @@ -75,7 +114,8 @@ public class SingleNodeSAITest extends IntegrationTestBase Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)), ColumnSpec.regularColumn("v2", ColumnSpec.int64Type), ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)), - List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100)))) + List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))), + withAccord ? Optional.of(TransactionalMode.full) : Optional.empty()) .withCompactionStrategy("LeveledCompactionStrategy"); sut.schemaChange(schema.compile().cql()); diff --git a/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java b/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java index 6da221862f..da7d9213e8 100644 --- a/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java +++ b/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java @@ -18,8 +18,6 @@ package org.apache.cassandra.cql3.statements; -import org.apache.cassandra.transport.Dispatcher; -import org.assertj.core.api.Assertions; import org.junit.BeforeClass; import org.junit.Test; @@ -32,14 +30,16 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.transport.messages.ResultMessage; +import org.assertj.core.api.Assertions; import static org.apache.cassandra.cql3.statements.TransactionStatement.DUPLICATE_TUPLE_NAME_MESSAGE; import static org.apache.cassandra.cql3.statements.TransactionStatement.EMPTY_TRANSACTION_MESSAGE; import static org.apache.cassandra.cql3.statements.TransactionStatement.ILLEGAL_RANGE_QUERY_MESSAGE; +import static org.apache.cassandra.cql3.statements.TransactionStatement.INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE; import static org.apache.cassandra.cql3.statements.TransactionStatement.INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE; import static org.apache.cassandra.cql3.statements.TransactionStatement.NO_CONDITIONS_IN_UPDATES_MESSAGE; -import static org.apache.cassandra.cql3.statements.TransactionStatement.NO_COUNTERS_IN_TXNS_MESSAGE; import static org.apache.cassandra.cql3.statements.TransactionStatement.NO_TIMESTAMPS_IN_UPDATES_MESSAGE; import static org.apache.cassandra.cql3.statements.TransactionStatement.SELECT_REFS_NEED_COLUMN_MESSAGE; import static org.apache.cassandra.cql3.statements.TransactionStatement.TRANSACTIONS_DISABLED_ON_TABLE_MESSAGE; @@ -58,7 +58,6 @@ public class TransactionStatementTest private static final TableId TABLE4_ID = TableId.fromString("00000000-0000-0000-0000-000000000004"); private static final TableId TABLE5_ID = TableId.fromString("00000000-0000-0000-0000-000000000005"); private static final TableId TABLE6_ID = TableId.fromString("00000000-0000-0000-0000-000000000006"); - private static final TableId TABLE7_ID = TableId.fromString("00000000-0000-0000-0000-000000000007"); @BeforeClass public static void beforeClass() throws Exception @@ -70,45 +69,7 @@ public class TransactionStatementTest parse("CREATE TABLE tbl3 (k int PRIMARY KEY, \"with spaces\" int, \"with\"\"quote\" int, \"MiXeD_CaSe\" int) WITH transactional_mode = 'full'", "ks").id(TABLE3_ID), parse("CREATE TABLE tbl4 (k int PRIMARY KEY, int_list list<int>) WITH transactional_mode = 'full'", "ks").id(TABLE4_ID), parse("CREATE TABLE tbl5 (k int PRIMARY KEY, v int) WITH transactional_mode = 'full'", "ks").id(TABLE5_ID), - parse("CREATE TABLE tbl6 (k int PRIMARY KEY, c counter) WITH transactional_mode = 'full'", "ks").id(TABLE6_ID), - parse("CREATE TABLE tbl7 (k int PRIMARY KEY, v int) WITH transactional_mode = 'off'", "ks").id(TABLE7_ID)); - } - - @Test - public void shouldRejectCounterMutation() - { - String query = "BEGIN TRANSACTION\n" + - " UPDATE ks.tbl6 SET c += 100 WHERE k = 0;\n" + - "COMMIT TRANSACTION"; - - Assertions.assertThatThrownBy(() -> prepare(query)) - .isInstanceOf(InvalidRequestException.class) - .hasMessageContaining(String.format(NO_COUNTERS_IN_TXNS_MESSAGE, "UPDATE", "at [2:5]")); - } - - @Test - public void shouldRejectCounterReadInLet() - { - String query = "BEGIN TRANSACTION\n" + - " LET row1 = (SELECT * FROM ks.tbl6 WHERE k=0);\n" + - " SELECT row1.c;\n" + - "COMMIT TRANSACTION"; - - Assertions.assertThatThrownBy(() -> prepare(query)) - .isInstanceOf(InvalidRequestException.class) - .hasMessageContaining(String.format(NO_COUNTERS_IN_TXNS_MESSAGE, "SELECT", "at [2:15]")); - } - - @Test - public void shouldRejectCounterReadInSelect() - { - String query = "BEGIN TRANSACTION\n" + - " SELECT * FROM ks.tbl6 WHERE k=0;\n" + - "COMMIT TRANSACTION"; - - Assertions.assertThatThrownBy(() -> prepare(query)) - .isInstanceOf(InvalidRequestException.class) - .hasMessageContaining(String.format(NO_COUNTERS_IN_TXNS_MESSAGE, "SELECT", "at [2:3]")); + parse("CREATE TABLE tbl6 (k int PRIMARY KEY, v int) WITH transactional_mode = 'off'", "ks").id(TABLE6_ID)); } @Test @@ -236,28 +197,6 @@ public class TransactionStatementTest .hasMessageContaining(String.format(INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "LET assignment row1", "at [2:15]")); } - @Test - public void shouldRejectIllegalLimitInSelect() - { - String select = "SELECT * FROM ks.tbl1 WHERE k = 1 LIMIT 2"; - String query = "BEGIN TRANSACTION\n" + select + ";\nCOMMIT TRANSACTION"; - - Assertions.assertThatThrownBy(() -> prepare(query)) - .isInstanceOf(InvalidRequestException.class) - .hasMessageContaining(String.format(INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "returning select", "at [2:1]")); - } - - @Test - public void shouldRejectIncompletePrimaryKeyInSelect() - { - String select = "SELECT * FROM ks.tbl1 WHERE k = 1"; - String query = "BEGIN TRANSACTION\n" + select + ";\nCOMMIT TRANSACTION"; - - Assertions.assertThatThrownBy(() -> prepare(query)) - .isInstanceOf(InvalidRequestException.class) - .hasMessageContaining(String.format(INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "returning select", "at [2:1]")); - } - @Test public void shouldRejectUpdateWithCondition() { @@ -386,7 +325,7 @@ public class TransactionStatementTest Assertions.assertThatThrownBy(() -> prepare(query)) .isInstanceOf(InvalidRequestException.class) - .hasMessageContaining(String.format(ILLEGAL_RANGE_QUERY_MESSAGE, "returning select", "at [2:1]")); + .hasMessageContaining(String.format(INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE, "returning select", "at [2:1]")); } @Test @@ -407,7 +346,7 @@ public class TransactionStatementTest public void shouldRejectLetSelectOnNonTransactionalTable() { String query = "BEGIN TRANSACTION\n" + - " LET row1 = (SELECT * FROM ks.tbl7 WHERE k = 0);\n" + + " LET row1 = (SELECT * FROM ks.tbl6 WHERE k = 0);\n" + " INSERT INTO ks.tbl5 (k, v) VALUES (1, 2);\n" + "COMMIT TRANSACTION;"; @@ -420,7 +359,7 @@ public class TransactionStatementTest public void shouldRejectSelectOnNonTransactionalTable() { String query = "BEGIN TRANSACTION\n" + - " SELECT * FROM ks.tbl7 WHERE k = 0;\n" + + " SELECT * FROM ks.tbl6 WHERE k = 0;\n" + "COMMIT TRANSACTION;"; Assertions.assertThatThrownBy(() -> prepare(query)) @@ -432,7 +371,7 @@ public class TransactionStatementTest public void shouldRejectUpdateOnNonTransactionalTable() { String query = "BEGIN TRANSACTION\n" + - " INSERT INTO ks.tbl7 (k, v) VALUES (1, 2);\n" + + " INSERT INTO ks.tbl6 (k, v) VALUES (1, 2);\n" + "COMMIT TRANSACTION;"; Assertions.assertThatThrownBy(() -> prepare(query)) diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java index 452bb0bc36..b8ea6445d4 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java @@ -171,7 +171,7 @@ public class TupleTypeTest extends CQLTester createIndex("CREATE INDEX tuple_index ON %s (t)"); // select using unset - assertInvalidMessage("Invalid unset value for tuple field number 0", "SELECT * FROM %s WHERE k = ? and t = (?,?,?)", unset(), unset(), unset(), unset()); + assertInvalidMessage("Invalid unset value for tuple field number 0", "SELECT * FROM %s WHERE k = ? and t = (?,?,?)", 42, unset(), unset(), unset()); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
