This is an automated email from the ASF dual-hosted git repository.

aweisberg pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e7ef1da1749db6fdfb7af4961ee9da21912c4d38
Author: Ariel Weisberg <[email protected]>
AuthorDate: Wed Sep 25 13:58:01 2024 -0400

    Accord should not block partition restricted index queries
    
    Patch by Ariel Weisberg; Reviewed by David Capwell for CASSANDRA-19955
---
 .../cql3/statements/TransactionStatement.java      |  21 +-
 .../statements/schema/AlterTableStatement.java     |  15 +-
 .../cassandra/db/SinglePartitionReadCommand.java   |  26 ++
 .../org/apache/cassandra/db/filter/RowFilter.java  |   7 +
 .../cassandra/index/sai/plan/QueryController.java  |  24 +-
 .../org/apache/cassandra/schema/TableMetadata.java |   4 +
 .../cassandra/service/accord/IAccordService.java   |   2 +-
 .../cassandra/service/accord/txn/TxnNamedRead.java |   9 +-
 .../cassandra/service/accord/txn/TxnRead.java      |   2 +-
 .../cassandra/service/reads/DataResolver.java      |   7 +-
 .../distributed/test/accord/AccordCQLTestBase.java | 187 ++++++++++++-
 .../distributed/test/accord/AccordTestBase.java    |   1 +
 .../cassandra/fuzz/sai/AccordMultiNodeSAITest.java |  35 +++
 .../fuzz/sai/AccordSingleNodeSAITest.java          |  27 ++
 .../cassandra/fuzz/sai/MultiNodeSAITest.java       |  80 +-----
 ...iNodeSAITest.java => MultiNodeSAITestBase.java} |  24 +-
 .../cassandra/fuzz/sai/SingleNodeSAITest.java      | 288 +--------------------
 ...NodeSAITest.java => SingleNodeSAITestBase.java} |  44 +++-
 .../cql3/statements/TransactionStatementTest.java  |  77 +-----
 .../cql3/validation/entities/TupleTypeTest.java    |   2 +-
 20 files changed, 425 insertions(+), 457 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
index 6d59a3a37d..e948d5dcaa 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TransactionStatement.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.SinglePartitionReadQuery;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.partitions.FilteredPartition;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.service.ClientState;
@@ -78,8 +79,8 @@ import org.apache.cassandra.service.accord.txn.TxnReference;
 import org.apache.cassandra.service.accord.txn.TxnResult;
 import org.apache.cassandra.service.accord.txn.TxnUpdate;
 import org.apache.cassandra.service.accord.txn.TxnWrite;
-import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.service.consensus.TransactionalMode;
+import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.transport.messages.ResultMessage;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -97,6 +98,7 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
     private static final Logger logger = 
LoggerFactory.getLogger(TransactionStatement.class);
 
     public static final String DUPLICATE_TUPLE_NAME_MESSAGE = "The name '%s' 
has already been used by a LET assignment.";
+    public static final String INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE = 
"SELECT must specify either all partition key elements. Partition key elements 
must be always specified with equality operators; %s %s";
     public static final String INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE = "SELECT 
must specify either all primary key elements or all partition key elements and 
LIMIT 1. In both cases partition key elements must be always specified with 
equality operators; %s %s";
     public static final String NO_CONDITIONS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify their own conditions; %s statement %s";
     public static final String NO_TIMESTAMPS_IN_UPDATES_MESSAGE = "Updates 
within transactions may not specify custom timestamps; %s statement %s";
@@ -107,6 +109,7 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
     public static final String SELECT_REFS_NEED_COLUMN_MESSAGE = "SELECT 
references must specify a column.";
     public static final String TRANSACTIONS_DISABLED_MESSAGE = "Accord 
transactions are disabled. (See accord.enabled in cassandra.yaml)";
     public static final String ILLEGAL_RANGE_QUERY_MESSAGE = "Range queries 
are not allowed for reads within a transaction; %s %s";
+    public static final String UNSUPPORTED_MIGRATION = "Transaction Statement 
is unsupported when migrating away from Accord or before migration to Accord is 
complete for a range";
 
     static class NamedSelect
     {
@@ -383,14 +386,11 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
             for (NamedSelect assignment : assignments)
                 checkFalse(isSelectingMultipleClusterings(assignment.select, 
options), INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "LET assignment", 
assignment.select.source);
 
-            if (returningSelect != null)
-                
checkFalse(isSelectingMultipleClusterings(returningSelect.select, options), 
INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "returning SELECT", 
returningSelect.select.source);
-
             Txn txn = createTxn(state.getClientState(), options);
 
             TxnResult txnResult = AccordService.instance().coordinate(txn, 
options.getConsistency(), requestTime);
             if (txnResult.kind() == retry_new_protocol)
-                throw new IllegalStateException("Transaction statement should 
never be required to switch consensus protocols");
+                throw new InvalidRequestException(UNSUPPORTED_MIGRATION);
             TxnData data = (TxnData)txnResult;
 
             if (returningSelect != null)
@@ -560,7 +560,7 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
                     throw invalidRequest(NO_COUNTERS_IN_TXNS_MESSAGE, 
"SELECT", prepared.source);
 
                 returningSelect = new NamedSelect(TxnDataName.returning(), 
prepared);
-                checkAtMostOneRowSpecified(returningSelect.select, "returning 
select");
+                checkAtMostOnePartitionSpecified(returningSelect.select, 
"returning select");
             }
 
             List<RowDataReference> returningReferences = null;
@@ -600,6 +600,15 @@ public class TransactionStatement implements 
CQLStatement.CompositeCQLStatement,
             return new TransactionStatement(preparedAssignments, 
returningSelect, returningReferences, preparedUpdates, preparedConditions, 
bindVariables);
         }
 
+        /**
+         * Do not use this method in execution!!! It is only allowed during 
prepare because it outputs a query raw text.
+         * We don't want it print it for a user who provided an identifier of 
someone's else prepared statement.
+         */
+        private static void checkAtMostOnePartitionSpecified(SelectStatement 
select, String name)
+        {
+            checkTrue(select.getRestrictions().hasPartitionKeyRestrictions(), 
INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE, name, select.source);
+        }
+
         /**
          * Do not use this method in execution!!! It is only allowed during 
prepare because it outputs a query raw text.
          * We don't want it print it for a user who provided an identifier of 
someone's else prepared statement.
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
index b3536910c8..38a8a5c23f 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/schema/AlterTableStatement.java
@@ -66,6 +66,7 @@ import org.apache.cassandra.schema.UserFunctions;
 import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.schema.Views;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.consensus.TransactionalMode;
 import 
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
 import org.apache.cassandra.service.reads.repair.ReadRepairStrategy;
 import org.apache.cassandra.tcm.ClusterMetadata;
@@ -87,6 +88,9 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AlterTableStatement.class);
 
+    public static final String ACCORD_COUNTER_TABLES_UNSUPPORTED = "Counters 
are not supported with Accord for table %s.%s";
+    public static final String ACCORD_COUNTER_COLUMN_UNSUPPORTED = "Cannot add 
a counter column to Accord table %s.%s with transactional mode %s and 
transactional migration from %s";
+
     protected final String tableName;
     private final boolean ifExists;
     protected ClientState state;
@@ -325,6 +329,9 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
                 return;
             }
 
+            if (type.isCounter() && 
(table.params.transactionalMode.accordIsEnabled || 
table.params.transactionalMigrationFrom.migratingFromAccord()))
+                throw ire(format(ACCORD_COUNTER_COLUMN_UNSUPPORTED, 
keyspaceName, tableName, table.params.transactionalMode, 
table.params.transactionalMigrationFrom));
+
             if (table.isCompactTable())
                 throw ire("Cannot add new column to a COMPACT STORAGE table");
 
@@ -578,7 +585,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             validateDefaultTimeToLive(attrs.asNewTableParams());
         }
 
-        private TableParams 
validateAndUpdateTransactionalMigration(TableParams prev, TableParams next)
+        private TableParams validateAndUpdateTransactionalMigration(boolean 
isCounter, TableParams prev, TableParams next)
         {
             if (next.transactionalMode.accordIsEnabled && 
SchemaConstants.isSystemKeyspace(keyspaceName))
                 throw ire("Cannot enable accord on system tables (%s.%s)", 
keyspaceName, tableName);
@@ -588,6 +595,10 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             boolean explicitlySetMigrationFrom = 
attrs.hasOption(Option.TRANSACTIONAL_MIGRATION_FROM);
             // set table to migrating
             TransactionalMigrationFromMode newMigrateFrom = 
TransactionalMigrationFromMode.fromMode(prev.transactionalMode, 
next.transactionalMode);
+
+            if (isCounter && (next.transactionalMode != TransactionalMode.off 
|| newMigrateFrom != TransactionalMigrationFromMode.none || 
next.transactionalMigrationFrom != TransactionalMigrationFromMode.none))
+                throw ire(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, 
keyspaceName, tableName));
+
             boolean forceMigrationChange = modeChange && 
explicitlySetMigrationFrom && next.transactionalMigrationFrom != newMigrateFrom;
 
             if (modeChange && next.transactionalMode.accordIsEnabled && 
!DatabaseDescriptor.getAccordTransactionsEnabled())
@@ -642,7 +653,7 @@ public abstract class AlterTableStatement extends 
AlterSchemaStatement
             if (!params.compression.isEnabled())
                 Guardrails.uncompressedTablesEnabled.ensureEnabled(state);
 
-            params = validateAndUpdateTransactionalMigration(table.params, 
params);
+            params = 
validateAndUpdateTransactionalMigration(table.isCounter(), table.params, 
params);
 
             return 
keyspace.withSwapped(keyspace.tables.withSwapped(table.withSwapped(params)));
         }
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index ea770b812f..06bedf08da 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -1272,6 +1272,32 @@ public class SinglePartitionReadCommand extends 
ReadCommand implements SinglePar
         return false;
     }
 
+    /*
+     * The execution method does not need to perform reconciliation so the 
read command
+     * should execute in a mannager suited to not needing reconciliation. Such 
as when
+     * executing transactionally at a single replica and doing an index scan 
where the index
+     * scan should not return extra rows and expect post filtering at the 
coordinator.
+     */
+    public SinglePartitionReadCommand withoutReconciliation()
+    {
+        if (rowFilter().isEmpty())
+            return this;
+        return create(serializedAtEpoch(),
+                      isDigestQuery(),
+                      digestVersion(),
+                      acceptsTransient(),
+                      allowsOutOfRangeReads(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter().withoutReconciliation(),
+                      limits(),
+                      partitionKey(),
+                      clusteringIndexFilter(),
+                      indexQueryPlan(),
+                      isTrackingWarnings());
+    }
+
     /**
      * Groups multiple single partition read commands.
      */
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index f7e9ad911b..9ea44ead96 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -383,6 +383,13 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
         return withNewExpressions(newExpressions);
     }
 
+    public RowFilter withoutReconciliation()
+    {
+        if (needsReconciliation)
+            return new RowFilter(expressions, false);
+        return this;
+    }
+
     public boolean hasNonKeyExpressions()
     {
         for (Expression e : expressions)
diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java 
b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
index 4f8efb8248..2db2ba904a 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
@@ -24,8 +24,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
-
 import javax.annotation.Nullable;
 
 import com.google.common.collect.Lists;
@@ -167,6 +167,22 @@ public class QueryController
         return partition.queryMemtableAndDisk(cfs, executionController);
     }
 
+    private static Runnable getIndexReleaser(Set<SSTableIndex> 
referencedIndexes)
+    {
+        return new Runnable()
+        {
+            boolean closed;
+            @Override
+            public void run()
+            {
+                if (closed)
+                    return;
+                closed = true;
+                referencedIndexes.forEach(SSTableIndex::releaseQuietly);
+            }
+        };
+    }
+
     /**
      * Build a {@link KeyRangeIterator.Builder} from the given list of {@link 
Expression}s.
      * <p>
@@ -195,7 +211,7 @@ public class QueryController
         expressions = expressions.stream().filter(e -> e.getIndexOperator() != 
Expression.IndexOperator.ANN).collect(Collectors.toList());
 
         QueryViewBuilder.QueryView queryView = new 
QueryViewBuilder(expressions, mergeRange).build();
-        Runnable onClose = () -> 
queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
+        Runnable onClose = getIndexReleaser(queryView.referencedIndexes);
         KeyRangeIterator.Builder builder = command.rowFilter().isStrict()
                                            ? 
KeyRangeIntersectionIterator.builder(expressions.size(), onClose)
                                            : 
KeyRangeUnionIterator.builder(expressions.size(), onClose);
@@ -314,7 +330,7 @@ public class QueryController
         KeyRangeIterator memtableResults = 
index.memtableIndexManager().searchMemtableIndexes(queryContext, 
planExpression, mergeRange);
 
         QueryViewBuilder.QueryView queryView = new 
QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build();
-        Runnable onClose = () -> 
queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
+        Runnable onClose = getIndexReleaser(queryView.referencedIndexes);
 
         try
         {
@@ -354,7 +370,7 @@ public class QueryController
         // search memtable before referencing sstable indexes; otherwise we 
may miss newly flushed memtable index
         KeyRangeIterator memtableResults = 
index.memtableIndexManager().limitToTopResults(queryContext, sourceKeys, 
planExpression);
         QueryViewBuilder.QueryView queryView = new 
QueryViewBuilder(Collections.singleton(planExpression), mergeRange).build();
-        Runnable onClose = () -> 
queryView.referencedIndexes.forEach(SSTableIndex::releaseQuietly);
+        Runnable onClose = getIndexReleaser(queryView.referencedIndexes);
 
         try
         {
diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java 
b/src/java/org/apache/cassandra/schema/TableMetadata.java
index ac9d77e01c..37e66fcc80 100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@ -67,6 +67,7 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.service.accord.fastpath.FastPathStrategy;
 import org.apache.cassandra.service.consensus.TransactionalMode;
+import 
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
 import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.tcm.Epoch;
 import 
org.apache.cassandra.tcm.serialization.UDTAndFunctionsAwareMetadataSerializer;
@@ -76,6 +77,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.github.jamm.Unmetered;
 
+import static accord.utils.Invariants.checkState;
 import static com.google.common.collect.Iterables.any;
 import static com.google.common.collect.Iterables.transform;
 import static java.lang.String.format;
@@ -553,6 +555,8 @@ public class TableMetadata implements SchemaElement
             except("Missing partition keys for table %s", toString());
 
         indexes.validate(this);
+
+        checkState((params.transactionalMode == TransactionalMode.off && 
params.transactionalMigrationFrom == TransactionalMigrationFromMode.none) || 
!isCounter(), "Counters are not supported with Accord for table " + this);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/service/accord/IAccordService.java 
b/src/java/org/apache/cassandra/service/accord/IAccordService.java
index 97ebae6963..4d3505f626 100644
--- a/src/java/org/apache/cassandra/service/accord/IAccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/IAccordService.java
@@ -64,7 +64,7 @@ import static 
com.google.common.base.Preconditions.checkNotNull;
 public interface IAccordService
 {
     EnumSet<ConsistencyLevel> SUPPORTED_COMMIT_CONSISTENCY_LEVELS = 
EnumSet.of(ConsistencyLevel.ANY, ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, 
ConsistencyLevel.SERIAL, ConsistencyLevel.ALL);
-    EnumSet<ConsistencyLevel> SUPPORTED_READ_CONSISTENCY_LEVELS = 
EnumSet.of(ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, 
ConsistencyLevel.SERIAL);
+    EnumSet<ConsistencyLevel> SUPPORTED_READ_CONSISTENCY_LEVELS = 
EnumSet.of(ConsistencyLevel.ONE, ConsistencyLevel.QUORUM, 
ConsistencyLevel.SERIAL, ConsistencyLevel.ALL);
 
     IVerbHandler<? extends Request> requestHandler();
     IVerbHandler<? extends Reply> responseHandler();
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
index 4787e2105b..d2c92583af 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java
@@ -33,12 +33,12 @@ import accord.utils.async.AsyncChain;
 import accord.utils.async.AsyncChains;
 import org.apache.cassandra.concurrent.DebuggableTask;
 import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.FilteredPartition;
 import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -119,7 +119,7 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
         return key;
     }
 
-    public AsyncChain<Data> read(Timestamp executeAt)
+    public AsyncChain<Data> read(ConsistencyLevel consistencyLevel, Timestamp 
executeAt)
     {
         SinglePartitionReadCommand command = (SinglePartitionReadCommand) 
get();
         // TODO (required, safety): before release, double check reasoning 
that this is safe
@@ -129,6 +129,8 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
         // this simply looks like the transaction witnessed TTL'd data and the 
data then expired
         // immediately after the transaction executed, and this simplifies 
things a great deal
         int nowInSeconds = (int) 
TimeUnit.MICROSECONDS.toSeconds(executeAt.hlc());
+        if (consistencyLevel == null || consistencyLevel == 
ConsistencyLevel.ONE)
+            command = command.withoutReconciliation();
         return performLocalRead(command, nowInSeconds);
     }
 
@@ -144,8 +146,7 @@ public class TxnNamedRead extends 
AbstractSerialized<ReadCommand>
             SinglePartitionReadCommand read = 
command.withNowInSec(nowInSeconds);
 
             try (ReadExecutionController controller = 
read.executionController();
-                 UnfilteredPartitionIterator partition = 
read.executeLocally(controller);
-                 PartitionIterator iterator = 
UnfilteredPartitionIterators.filter(partition, read.nowInSec()))
+                 PartitionIterator iterator = 
UnfilteredPartitionIterators.filter(read.executeLocally(controller), 
read.nowInSec()))
             {
                 TxnData result = new TxnData();
                 if (iterator.hasNext())
diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java 
b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
index 391da765bf..f9d09409fb 100644
--- a/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
+++ b/src/java/org/apache/cassandra/service/accord/txn/TxnRead.java
@@ -200,7 +200,7 @@ public class TxnRead extends 
AbstractKeySorted<TxnNamedRead> implements Read
     public AsyncChain<Data> read(Seekable key, SafeCommandStore safeStore, 
Timestamp executeAt, DataStore store)
     {
         List<AsyncChain<Data>> results = new ArrayList<>();
-        forEachWithKey((PartitionKey) key, read -> 
results.add(read.read(executeAt)));
+        forEachWithKey((PartitionKey) key, read -> 
results.add(read.read(cassandraConsistencyLevel, executeAt)));
 
         if (results.isEmpty())
             // Result type must match everywhere
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java 
b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 32bad6526d..7716856a28 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -131,8 +131,7 @@ public class DataResolver<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<
 
     private boolean usesReplicaFilteringProtection()
     {
-        // Key columns are immutable and should never need to participate in 
replica filtering
-        if (!command.rowFilter().hasNonKeyExpressions())
+        if (command.rowFilter().isEmpty())
             return false;
 
         if (command.isTopK())
@@ -278,6 +277,10 @@ public class DataResolver<E extends Endpoints<E>, P 
extends ReplicaPlan.ForRead<
 
     private  UnaryOperator<PartitionIterator> 
preCountFilterForReplicaFilteringProtection()
     {
+        // Key columns are immutable and should never need to participate in 
replica filtering
+        if (!command.rowFilter().hasNonKeyExpressions())
+            return results -> results;
+
         return results -> {
             Index.Searcher searcher = command.indexSearcher();
             // in case of "ALLOW FILTERING" without index
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
index c58b5d62b8..bbf9f30b6e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordCQLTestBase.java
@@ -55,19 +55,25 @@ import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.QueryResults;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.distributed.shared.AssertUtils;
+import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.service.accord.AccordTestUtils;
 import org.apache.cassandra.service.consensus.TransactionalMode;
+import 
org.apache.cassandra.service.consensus.migration.TransactionalMigrationFromMode;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FailingConsumer;
 import org.assertj.core.api.Assertions;
 
+import static java.lang.String.format;
 import static java.util.Collections.singletonList;
 import static org.apache.cassandra.cql3.CQLTester.row;
+import static 
org.apache.cassandra.cql3.statements.schema.AlterTableStatement.ACCORD_COUNTER_COLUMN_UNSUPPORTED;
+import static 
org.apache.cassandra.cql3.statements.schema.AlterTableStatement.ACCORD_COUNTER_TABLES_UNSUPPORTED;
 import static org.apache.cassandra.distributed.util.QueryResultUtil.assertThat;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 
 public abstract class AccordCQLTestBase extends AccordTestBase
 {
@@ -92,12 +98,189 @@ public abstract class AccordCQLTestBase extends 
AccordTestBase
         SHARED_CLUSTER.schemaChange("CREATE TYPE " + KEYSPACE + ".person 
(height int, age int)");
     }
 
+    @Test
+    public void testCounterCreateTableTransactionalModeFails() throws Exception
+    {
+        try
+        {
+            test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, 
v counter, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), cluster 
-> {});
+            fail("Expected exception");
+        }
+        catch (Throwable t)
+        {
+            assertEquals(IllegalStateException.class.getName(), 
t.getClass().getName());
+            assertEquals(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, KEYSPACE, 
accordTableName), t.getMessage());
+        }
+    }
+
+    @Test
+    public void testCounterCreateTableTransactionalMigrationFromModeFails() 
throws Exception
+    {
+        try
+        {
+            test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, 
v counter, primary key (k, c)) WITH transactional_migration_from = '" + 
transactionalMode.name() + "'", cluster -> {});
+            fail("Expected exception");
+        }
+        catch (Throwable t)
+        {
+            assertEquals(IllegalStateException.class.getName(), 
t.getClass().getName());
+            assertEquals(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, KEYSPACE, 
accordTableName), t.getMessage());
+        }
+    }
+
+    @Test
+    public void testCounterAlterTableTransactionalModeFails() throws Exception
+    {
+        test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v 
counter, primary key (k, c))", cluster -> {
+            try
+            {
+                cluster.coordinator(1).execute("ALTER TABLE " + 
qualifiedAccordTableName + " WITH transactional_mode = '" + 
transactionalMode.name() + "';", ConsistencyLevel.ALL);
+                fail("Expected exception");
+            }
+            catch (Throwable t)
+            {
+                assertEquals(InvalidRequestException.class.getName(), 
t.getClass().getName());
+                assertEquals(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, 
KEYSPACE, accordTableName), t.getMessage());
+            }
+        });
+    }
+
+    @Test
+    public void testCounterAlterTableTransactionalMigrationFromModeFails() 
throws Exception
+    {
+        test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v 
counter, primary key (k, c))", cluster -> {
+            try
+            {
+                cluster.coordinator(1).execute("ALTER TABLE " + 
qualifiedAccordTableName + " WITH transactional_migration_from = '" + 
transactionalMode.name() + "';", ConsistencyLevel.ALL);
+                fail("Expected exception");
+            }
+            catch (Throwable t)
+            {
+                assertEquals(InvalidRequestException.class.getName(), 
t.getClass().getName());
+                assertEquals(format(ACCORD_COUNTER_TABLES_UNSUPPORTED, 
KEYSPACE, accordTableName), t.getMessage());
+            }
+        });
+    }
+
+    @Test
+    public void testCounterAddColumnFailsWithAccord() throws Exception
+    {
+        test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, s 
int static, v int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), 
cluster -> {
+            try
+            {
+                cluster.coordinator(1).execute("ALTER TABLE " + 
qualifiedAccordTableName + " ADD (v2 counter);", ConsistencyLevel.ALL);
+                fail("Expected exception");
+            }
+            catch (Throwable t)
+            {
+                assertEquals(InvalidRequestException.class.getName(), 
t.getClass().getName());
+                assertEquals(format(ACCORD_COUNTER_COLUMN_UNSUPPORTED, 
KEYSPACE, accordTableName, transactionalMode, 
TransactionalMigrationFromMode.none), t.getMessage());
+            }
+        });
+    }
+
+    @Test
+    public void testCounterAddColumnFailsWithMigration() throws Exception
+    {
+        test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, s 
int static, v int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), 
cluster -> {
+            try
+            {
+                cluster.coordinator(1).execute("ALTER TABLE " + 
qualifiedAccordTableName + " WITH transactional_mode = '" + 
TransactionalMode.off + "';", ConsistencyLevel.ALL);
+                cluster.coordinator(1).execute("ALTER TABLE " + 
qualifiedAccordTableName + " ADD (v2 counter);", ConsistencyLevel.ALL);
+                fail("Expected exception");
+            }
+            catch (Throwable t)
+            {
+                assertEquals(InvalidRequestException.class.getName(), 
t.getClass().getName());
+                assertEquals(format(ACCORD_COUNTER_COLUMN_UNSUPPORTED, 
KEYSPACE, accordTableName, TransactionalMode.off, transactionalMode), 
t.getMessage());
+            }
+        });
+    }
+
     @Override
     protected void test(FailingConsumer<Cluster> fn) throws Exception
     {
         test("CREATE TABLE " + qualifiedAccordTableName + " (k int, c int, v 
int, primary key (k, c)) WITH " + transactionalMode.asCqlParam(), fn);
     }
 
+    @Test
+    public void testPartitionMultiRowReturn() throws Exception
+    {
+        test(cluster -> {
+            for (int i = 0; i < 3; i++)
+                cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), 
ConsistencyLevel.ALL, 42, 43 + i, 44 + i);
+
+            String txn = "BEGIN TRANSACTION " +
+                             "SELECT * " +
+                             "FROM " + qualifiedAccordTableName + " " +
+                             "WHERE k = 42;" +
+                         "COMMIT TRANSACTION;";
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL);
+            assertThat(result).hasSize(3)
+                              .contains(42, 43, 44)
+                              .contains(42, 44, 45)
+                              .contains(42, 45, 46);
+        });
+    }
+
+    @Test
+    public void testSaiMultiRowReturn() throws Exception
+    {
+        test(cluster -> {
+            cluster.schemaChange("CREATE INDEX ON " + qualifiedAccordTableName 
+ "(v) USING 'sai';");
+            for (int i = 0; i < 3; i++)
+                cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), 
ConsistencyLevel.ALL, 42, 43 + i, 44 + i);
+
+            String txn = "BEGIN TRANSACTION " +
+                         "SELECT * " +
+                         "FROM " + qualifiedAccordTableName + " " +
+                         "WHERE k = 42 AND v = 45;" +
+                         "COMMIT TRANSACTION;";
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL);
+            assertThat(result).hasSize(1)
+                              .contains(42, 44, 45);
+        });
+    }
+
+    // This fails and it is expected, mostly just here as documentation until 
it is fixed
+    @Test
+    public void testSasiMultiRowReturn() throws Exception
+    {
+        test(cluster -> {
+            cluster.schemaChange("CREATE INDEX ON " + qualifiedAccordTableName 
+ "(v) USING 'org.apache.cassandra.index.sasi.SASIIndex';");
+            for (int i = 0; i < 3; i++)
+                cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), 
ConsistencyLevel.ALL, 42, 43 + i, 44 + i);
+
+            String txn = "BEGIN TRANSACTION " +
+                         "SELECT * " +
+                         "FROM " + qualifiedAccordTableName + " " +
+                         "WHERE k = 42 AND v = 45;" +
+                         "COMMIT TRANSACTION;";
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL);
+            assertThat(result).hasSize(1)
+                              .contains(42, 44, 45);
+        });
+    }
+
+    @Test
+    public void testLegacy2iMultiRowReturn() throws Exception
+    {
+        test(cluster -> {
+            cluster.schemaChange("CREATE INDEX ON " + qualifiedAccordTableName 
+ "(v);");
+            for (int i = 0; i < 3; i++)
+                cluster.coordinator(1).execute(wrapInTxn("INSERT INTO " + 
qualifiedAccordTableName + " (k, c, v) VALUES (?, ?, ?)"), 
ConsistencyLevel.ALL, 42, 43 + i, 44 + i);
+
+            String txn = "BEGIN TRANSACTION " +
+                         "SELECT * " +
+                         "FROM " + qualifiedAccordTableName + " " +
+                         "WHERE k = 42 AND v = 45;" +
+                         "COMMIT TRANSACTION;";
+            SimpleQueryResult result = 
cluster.coordinator(1).executeWithResult(txn, ConsistencyLevel.SERIAL);
+            assertThat(result).hasSize(1)
+                              .contains(42, 44, 45);
+        });
+    }
+
     @Test
     public void testNonExistingKeyWithStaticUpdate() throws Exception
     {
@@ -204,8 +387,8 @@ public abstract class AccordCQLTestBase extends 
AccordTestBase
             cluster.get(1).runOnInstance(() -> {
                 StringBuilder sb = new StringBuilder("BEGIN TRANSACTION\n");
                 for (int i = 0; i < keyStrings.size() - 1; i++)
-                    sb.append(String.format("LET row%d = (SELECT * FROM %s 
WHERE k=%s AND c=0);\n", i, currentTable, keyStrings.get(i)));
-                sb.append(String.format("SELECT * FROM %s WHERE k=%s AND 
c=0;\n", currentTable, keyStrings.get(keyStrings.size() - 1)));
+                    sb.append(format("LET row%d = (SELECT * FROM %s WHERE k=%s 
AND c=0);\n", i, currentTable, keyStrings.get(i)));
+                sb.append(format("SELECT * FROM %s WHERE k=%s AND c=0;\n", 
currentTable, keyStrings.get(keyStrings.size() - 1)));
                 sb.append("COMMIT TRANSACTION");
 
                 Unseekables<?> routables = 
AccordTestUtils.createTxn(sb.toString()).keys().toParticipants();
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
index 92e81c73f3..b3e2407af2 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordTestBase.java
@@ -335,6 +335,7 @@ public abstract class AccordTestBase extends TestBaseImpl
         Cluster.Builder builder = Cluster.build(nodes)
                                          .withoutVNodes()
                                          .withConfig(c -> 
c.with(Feature.GOSSIP)
+                                                           
.set("sasi_indexes_enabled", "true")
                                                            
.set("write_request_timeout", "10s")
                                                            
.set("transaction_timeout", "15s")
                                                            
.set("native_transport_timeout", "30s")
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/sai/AccordMultiNodeSAITest.java 
b/test/distributed/org/apache/cassandra/fuzz/sai/AccordMultiNodeSAITest.java
new file mode 100644
index 0000000000..a69f83b530
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/AccordMultiNodeSAITest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.sai;
+
+import org.junit.BeforeClass;
+
+public class AccordMultiNodeSAITest extends MultiNodeSAITestBase
+{
+    @BeforeClass
+    public static void before() throws Throwable
+    {
+        MultiNodeSAITestBase.before(true);
+    }
+
+    public AccordMultiNodeSAITest()
+    {
+        super(true);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/sai/AccordSingleNodeSAITest.java 
b/test/distributed/org/apache/cassandra/fuzz/sai/AccordSingleNodeSAITest.java
new file mode 100644
index 0000000000..40164cdcf7
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/fuzz/sai/AccordSingleNodeSAITest.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.fuzz.sai;
+
+public class AccordSingleNodeSAITest extends SingleNodeSAITestBase
+{
+    public AccordSingleNodeSAITest()
+    {
+        super(true);
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java 
b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java
index a86472b99a..5153fb15ed 100644
--- a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java
@@ -18,88 +18,18 @@
 
 package org.apache.cassandra.fuzz.sai;
 
-import org.junit.Before;
 import org.junit.BeforeClass;
 
-import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.test.sai.SAIUtil;
-import org.apache.cassandra.harry.ddl.SchemaSpec;
-import org.apache.cassandra.harry.sut.injvm.InJvmSut;
-import org.apache.cassandra.harry.sut.injvm.InJvmSutBase;
-
-import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
-import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-
-public class MultiNodeSAITest extends SingleNodeSAITest
+public class MultiNodeSAITest extends MultiNodeSAITestBase
 {
-    /**
-     * Chosing a fetch size has implications for how well this test will 
excercise paging, short-read protection, and
-     * other important parts of the distributed query apparatus. This should 
be set low enough to ensure a significant
-     * number of queries during validation page, but not too low that more 
expesive queries time out and fail the test.
-     */
-    private static final int FETCH_SIZE = 10;
-
     @BeforeClass
     public static void before() throws Throwable
     {
-        cluster = Cluster.build()
-                         .withNodes(2)
-                         // At lower fetch sizes, queries w/ hundreds or 
thousands of matches can take a very long time. 
-                         .withConfig(InJvmSutBase.defaultConfig().andThen(c -> 
c.set("range_request_timeout", "180s")
-                                                                               
 .set("read_request_timeout", "180s")
-                                                                               
 .set("native_transport_timeout", "180s")
-                                                                               
 .set("slow_query_log_timeout", "180s")
-                                                                               
 .with(GOSSIP).with(NETWORK)))
-                         .createWithoutStarting();
-        cluster.setUncaughtExceptionsFilter(t -> {
-            logger.error("Caught exception, reporting during shutdown. 
Ignoring.", t);
-            return true;
-        });
-        cluster.startup();
-        cluster = init(cluster);
-        sut = new InJvmSut(cluster) {
-            @Override
-            public Object[][] execute(String cql, ConsistencyLevel cl, 
Object[] bindings)
-            {
-                // The goal here is to make replicas as out of date as 
possible, modulo the efforts of repair
-                // and read-repair in the test itself.
-                if (cql.contains("SELECT"))
-                    return super.execute(cql, ConsistencyLevel.ALL, 
FETCH_SIZE, bindings);
-                return super.execute(cql, ConsistencyLevel.NODE_LOCAL, 
bindings);
-            }
-        };
-    }
-
-    @Before
-    public void beforeEach()
-    {
-        cluster.schemaChange("DROP KEYSPACE IF EXISTS harry");
-        cluster.schemaChange("CREATE KEYSPACE harry WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': 2};");
-    }
-
-    @Override
-    protected void flush(SchemaSpec schema)
-    {
-        cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
-        cluster.get(2).nodetool("flush", schema.keyspace, schema.table);
-    }
-
-    @Override
-    protected void repair(SchemaSpec schema)
-    {
-        cluster.get(1).nodetool("repair", schema.keyspace);
-    }
-
-    @Override
-    protected void compact(SchemaSpec schema)
-    {
-        cluster.get(1).nodetool("compact", schema.keyspace);
-        cluster.get(2).nodetool("compact", schema.keyspace);
+        MultiNodeSAITestBase.before(false);
     }
 
-    @Override
-    protected void waitForIndexesQueryable(SchemaSpec schema)
+    public MultiNodeSAITest()
     {
-        SAIUtil.waitForIndexQueryable(cluster, schema.keyspace);
+        super(false);
     }
-}
\ No newline at end of file
+}
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java 
b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITestBase.java
similarity index 81%
copy from test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java
copy to test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITestBase.java
index a86472b99a..ee98e5c1fb 100644
--- a/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/MultiNodeSAITestBase.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.harry.sut.injvm.InJvmSutBase;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 
-public class MultiNodeSAITest extends SingleNodeSAITest
+public abstract class MultiNodeSAITestBase extends SingleNodeSAITestBase
 {
     /**
      * Chosing a fetch size has implications for how well this test will 
excercise paging, short-read protection, and
@@ -39,14 +39,26 @@ public class MultiNodeSAITest extends SingleNodeSAITest
      */
     private static final int FETCH_SIZE = 10;
 
+    public MultiNodeSAITestBase(boolean withAccord)
+    {
+        super(withAccord);
+    }
+
     @BeforeClass
     public static void before() throws Throwable
+    {
+        before(false);
+    }
+
+    public static void before(boolean withAccord) throws Throwable
     {
         cluster = Cluster.build()
                          .withNodes(2)
                          // At lower fetch sizes, queries w/ hundreds or 
thousands of matches can take a very long time. 
                          .withConfig(InJvmSutBase.defaultConfig().andThen(c -> 
c.set("range_request_timeout", "180s")
                                                                                
 .set("read_request_timeout", "180s")
+                                                                               
 .set("transaction_timeout", "180s")
+                                                                               
 .set("write_request_timeout", "180s")
                                                                                
 .set("native_transport_timeout", "180s")
                                                                                
 .set("slow_query_log_timeout", "180s")
                                                                                
 .with(GOSSIP).with(NETWORK)))
@@ -61,11 +73,15 @@ public class MultiNodeSAITest extends SingleNodeSAITest
             @Override
             public Object[][] execute(String cql, ConsistencyLevel cl, 
Object[] bindings)
             {
-                // The goal here is to make replicas as out of date as 
possible, modulo the efforts of repair
-                // and read-repair in the test itself.
                 if (cql.contains("SELECT"))
                     return super.execute(cql, ConsistencyLevel.ALL, 
FETCH_SIZE, bindings);
-                return super.execute(cql, ConsistencyLevel.NODE_LOCAL, 
bindings);
+
+                // The goal here is to make replicas as out of date as 
possible, modulo the efforts of repair
+                // and read-repair in the test itself. node_local bypasses 
Accord which breaks any attempt at testing Accord
+                // so if we are running with Accord use QUORUM (which Accord 
will ignore since it runs with transactional
+                // mode full).
+                ConsistencyLevel consistencyLevel = withAccord ? 
ConsistencyLevel.QUORUM : ConsistencyLevel.NODE_LOCAL;
+                return super.execute(cql, consistencyLevel, bindings);
             }
         };
     }
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java 
b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
index bbe4ce8398..84ad2f013b 100644
--- a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
@@ -18,290 +18,10 @@
 
 package org.apache.cassandra.fuzz.sai;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.junit.Test;
-
-import org.apache.cassandra.config.CassandraRelevantProperties;
-import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
-import org.apache.cassandra.harry.ddl.ColumnSpec;
-import org.apache.cassandra.harry.ddl.SchemaSpec;
-import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
-import org.apache.cassandra.harry.gen.DataGenerators;
-import org.apache.cassandra.harry.gen.EntropySource;
-import org.apache.cassandra.harry.gen.rng.JdkRandomEntropySource;
-import org.apache.cassandra.harry.model.QuiescentChecker;
-import org.apache.cassandra.harry.model.SelectHelper;
-import org.apache.cassandra.harry.model.reconciler.PartitionState;
-import org.apache.cassandra.harry.model.reconciler.Reconciler;
-import org.apache.cassandra.harry.operations.FilteringQuery;
-import org.apache.cassandra.harry.operations.Query;
-import org.apache.cassandra.harry.operations.Relation;
-import org.apache.cassandra.harry.sut.SystemUnderTest;
-import org.apache.cassandra.harry.sut.TokenPlacementModel;
-import org.apache.cassandra.harry.tracker.DataTracker;
-import org.apache.cassandra.harry.tracker.DefaultDataTracker;
-
-public class SingleNodeSAITest extends IntegrationTestBase
+public class SingleNodeSAITest extends SingleNodeSAITestBase
 {
-    private static final int RUNS = 1;
-
-    private static final int OPERATIONS_PER_RUN = 30_000;
-    private static final int REPAIR_SKIP = OPERATIONS_PER_RUN / 2;
-    private static final int FLUSH_SKIP = OPERATIONS_PER_RUN / 7;
-    private static final int VALIDATION_SKIP = OPERATIONS_PER_RUN / 100;
-
-    private static final int NUM_PARTITIONS = OPERATIONS_PER_RUN / 1000;
-    protected static final int MAX_PARTITION_SIZE = 10_000;
-    private static final int UNIQUE_CELL_VALUES = 5;
-
-    long seed = 1;
-
-    @Test
-    public void basicSaiTest()
-    {
-        CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(6);
-        SchemaSpec schema = new SchemaSpec(KEYSPACE, "tbl1",
-                                           Arrays.asList(ColumnSpec.ck("pk1", 
ColumnSpec.int64Type),
-                                                         ColumnSpec.ck("pk2", 
ColumnSpec.asciiType(4, 100)),
-                                                         ColumnSpec.ck("pk3", 
ColumnSpec.int64Type)),
-                                           Arrays.asList(ColumnSpec.ck("ck1", 
ColumnSpec.asciiType(4, 100)),
-                                                         ColumnSpec.ck("ck2", 
ColumnSpec.asciiType, true),
-                                                         ColumnSpec.ck("ck3", 
ColumnSpec.int64Type)),
-                                           
Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)),
-                                                         
ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
-                                                         
ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)),
-                                           
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))))
-                            
.withCompactionStrategy("LeveledCompactionStrategy");
-
-        sut.schemaChange(schema.compile().cql());
-        sut.schemaChange(schema.cloneWithName(schema.keyspace, schema.table + 
"_debug").compile().cql());
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai' ",
-                                       schema.regularColumns.get(0).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(0).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.regularColumns.get(1).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(1).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.regularColumns.get(2).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.regularColumns.get(2).name));
-        sut.schemaChange(String.format("CREATE INDEX %s_sai_idx ON %s.%s (%s) 
USING 'sai';",
-                                       schema.staticColumns.get(0).name,
-                                       schema.keyspace,
-                                       schema.table,
-                                       schema.staticColumns.get(0).name));
-
-        waitForIndexesQueryable(schema);
-
-        DataTracker tracker = new DefaultDataTracker();
-        TokenPlacementModel.ReplicationFactor rf = new 
TokenPlacementModel.SimpleReplicationFactor(cluster.size());
-        ReplayingHistoryBuilder history = new ReplayingHistoryBuilder(seed,
-                                                                      
MAX_PARTITION_SIZE,
-                                                                      
MAX_PARTITION_SIZE,
-                                                                      tracker,
-                                                                      sut,
-                                                                      schema,
-                                                                      rf,
-                                                                      
SystemUnderTest.ConsistencyLevel.QUORUM);
-
-        for (int run = 0; run < RUNS; run++)
-        {
-            logger.info("Starting run {}/{}...", run + 1, RUNS);
-            EntropySource random = new JdkRandomEntropySource(run);
-
-            // Populate the array of possible values for all operations in the 
run:
-            long[] values = new long[UNIQUE_CELL_VALUES];
-            for (int i = 0; i < values.length; i++)
-                values[i] = random.next();
-
-            for (int i = 0; i < OPERATIONS_PER_RUN; i++)
-            {
-                int partitionIndex = random.nextInt(0, NUM_PARTITIONS);
-
-                history.visitPartition(partitionIndex)
-                       .insert(random.nextInt(MAX_PARTITION_SIZE),
-                               new long[] { random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)],
-                                            random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)],
-                                            random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] },
-                               new long[] { random.nextBoolean() ? 
DataGenerators.UNSET_DESCR : values[random.nextInt(values.length)] });
-
-                if (random.nextFloat() > 0.99f)
-                {
-                    int row1 = random.nextInt(MAX_PARTITION_SIZE);
-                    int row2 = random.nextInt(MAX_PARTITION_SIZE);
-                    
history.visitPartition(partitionIndex).deleteRowRange(Math.min(row1, row2), 
Math.max(row1, row2),
-                                                                          
random.nextBoolean(), random.nextBoolean());
-                }
-                else if (random.nextFloat() > 0.999f)
-                {
-                    history.visitPartition(partitionIndex).deleteRowSlice();
-                }
-
-                if (random.nextFloat() > 0.995f)
-                {
-                    history.visitPartition(partitionIndex).deleteColumns();
-                }
-
-                if (random.nextFloat() > 0.9995f)
-                {
-                    history.visitPartition(partitionIndex).deletePartition();
-                }
-
-                if (i % REPAIR_SKIP == 0)
-                {
-                    logger.debug("Repairing/flushing after operation {}...", 
i);
-                    repair(schema);
-                }
-                else if (i % FLUSH_SKIP == 0)
-                {
-                    logger.debug("Flushing after operation {}...", i);
-                    flush(schema);
-                }
-
-                if (i % VALIDATION_SKIP != 0)
-                    continue;
-
-                logger.debug("Validating partition at index {} after operation 
{} in run {}...", partitionIndex, i, run + 1);
-
-                for (int j = 0; j < 10; j++)
-                {
-                    List<Relation> relations = new ArrayList<>();
-
-                    // For one text column and 2 numeric columns, we can use 
between 1 and 5 total relations.
-                    int num = random.nextInt(1, 5);
-
-                    List<List<Relation.RelationKind>> pick = new ArrayList<>();
-                    //noinspection ArraysAsListWithZeroOrOneArgument
-                    pick.add(new 
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ))); // text column supports 
only EQ
-                    pick.add(new 
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT, 
Relation.RelationKind.LT)));
-                    pick.add(new 
ArrayList<>(Arrays.asList(Relation.RelationKind.EQ, Relation.RelationKind.GT, 
Relation.RelationKind.LT)));
-
-                    if (random.nextFloat() > 0.75f)
-                    {
-                        relations.addAll(Query.clusteringSliceQuery(schema,
-                                                                    
partitionIndex,
-                                                                    
random.next(),
-                                                                    
random.next(),
-                                                                    
random.nextBoolean(),
-                                                                    
random.nextBoolean(),
-                                                                    
false).relations);
-                    }
-
-                    for (int k = 0; k < num; k++)
-                    {
-                        int column = 
random.nextInt(schema.regularColumns.size());
-                        Relation.RelationKind relationKind = pickKind(random, 
pick, column);
-
-                        if (relationKind != null)
-                            relations.add(Relation.relation(relationKind,
-                                                            
schema.regularColumns.get(column),
-                                                            
values[random.nextInt(values.length)]));
-                    }
-
-                    if (random.nextFloat() > 0.7f)
-                    {
-                        
relations.add(Relation.relation(Relation.RelationKind.EQ,
-                                                        
schema.staticColumns.get(0),
-                                                        
values[random.nextInt(values.length)]));
-                    }
-
-                    long pd = 
history.pdSelector().pdAtPosition(partitionIndex);
-                    FilteringQuery query = new FilteringQuery(pd, false, 
relations, schema);
-                    Reconciler reconciler = new 
Reconciler(history.pdSelector(), schema, history::visitor);
-                    Set<ColumnSpec<?>> columns = new 
HashSet<>(schema.allColumns);
-
-                    PartitionState modelState = 
reconciler.inflatePartitionState(pd, tracker, query).filter(query);
-
-                    if (modelState.rows().size() > 0)
-                        logger.debug("Model contains {} matching rows for 
query {}.", modelState.rows().size(), query);
-
-                    try
-                    {
-                        QuiescentChecker.validate(schema,
-                                                  tracker,
-                                                  columns,
-                                                  modelState,
-                                                  SelectHelper.execute(sut, 
history.clock(), query),
-                                                  query);
-
-                        // Run the query again to see if the first execution 
caused an issue via read-repair:
-                        QuiescentChecker.validate(schema,
-                                                  tracker,
-                                                  columns,
-                                                  modelState,
-                                                  SelectHelper.execute(sut, 
history.clock(), query),
-                                                  query);
-                    }
-                    catch (Throwable t)
-                    {
-                        logger.debug("Partition index = {}, run = {}, j = {}, 
i = {}", partitionIndex, run, j, i);
-
-                        Query partitionQuery = Query.selectAllColumns(schema, 
pd, false);
-                        QuiescentChecker.validate(schema,
-                                                  tracker,
-                                                  columns,
-                                                  
reconciler.inflatePartitionState(pd, tracker, partitionQuery),
-                                                  SelectHelper.execute(sut, 
history.clock(), partitionQuery),
-                                                  partitionQuery);
-                        logger.debug("Partition state agrees. Throwing 
original error...");
-
-                        throw t;
-                    }
-                }
-            }
-
-            if (run + 1 < RUNS)
-            {
-                logger.debug("Forcing compaction at the end of run {}...", run 
+ 1);
-                compact(schema);
-            }
-        }
-    }
-
-    protected void flush(SchemaSpec schema)
+    public SingleNodeSAITest()
     {
-        cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
-    }
-    
-    protected void compact(SchemaSpec schema)
-    {
-        cluster.get(1).nodetool("compact", schema.keyspace);
-    }
-
-    protected void repair(SchemaSpec schema)
-    {
-        // Repair is nonsensical for a single node, but a repair does flush 
first, so do that at least.
-        cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
-    }
-
-    protected void waitForIndexesQueryable(SchemaSpec schema) {}
-
-    private static Relation.RelationKind pickKind(EntropySource random, 
List<List<Relation.RelationKind>> options, int column)
-    {
-        Relation.RelationKind kind = null;
-
-        if (!options.get(column).isEmpty())
-        {
-            List<Relation.RelationKind> possible = options.get(column);
-            int chosen = random.nextInt(possible.size());
-            kind = possible.remove(chosen);
-
-            if (kind == Relation.RelationKind.EQ)
-                possible.clear(); // EQ precludes LT and GT
-            else
-                possible.remove(Relation.RelationKind.EQ); // LT GT preclude EQ
-        }
-
-        return kind;
+        super(false);
     }
-}
\ No newline at end of file
+}
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java 
b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java
similarity index 87%
copy from test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
copy to 
test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java
index bbe4ce8398..bb64848ccc 100644
--- a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITest.java
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java
@@ -22,11 +22,14 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.fuzz.harry.integration.model.IntegrationTestBase;
 import org.apache.cassandra.harry.ddl.ColumnSpec;
 import org.apache.cassandra.harry.ddl.SchemaSpec;
@@ -43,10 +46,16 @@ import org.apache.cassandra.harry.operations.Query;
 import org.apache.cassandra.harry.operations.Relation;
 import org.apache.cassandra.harry.sut.SystemUnderTest;
 import org.apache.cassandra.harry.sut.TokenPlacementModel;
+import org.apache.cassandra.harry.sut.injvm.InJvmSut;
+import org.apache.cassandra.harry.sut.injvm.InJvmSutBase;
 import org.apache.cassandra.harry.tracker.DataTracker;
 import org.apache.cassandra.harry.tracker.DefaultDataTracker;
+import org.apache.cassandra.service.consensus.TransactionalMode;
 
-public class SingleNodeSAITest extends IntegrationTestBase
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
+public abstract class SingleNodeSAITestBase extends IntegrationTestBase
 {
     private static final int RUNS = 1;
 
@@ -61,6 +70,36 @@ public class SingleNodeSAITest extends IntegrationTestBase
 
     long seed = 1;
 
+    protected boolean withAccord;
+
+    @BeforeClass
+    public static void before() throws Throwable
+    {
+        cluster = Cluster.build()
+                         .withNodes(1)
+                         // At lower fetch sizes, queries w/ hundreds or 
thousands of matches can take a very long time.
+                         .withConfig(InJvmSutBase.defaultConfig().andThen(c -> 
c.set("range_request_timeout", "180s")
+                                                                               
 .set("read_request_timeout", "180s")
+                                                                               
 .set("transaction_timeout", "180s")
+                                                                               
 .set("write_request_timeout", "180s")
+                                                                               
 .set("native_transport_timeout", "180s")
+                                                                               
 .set("slow_query_log_timeout", "180s")
+                                                                               
 .with(GOSSIP).with(NETWORK)))
+                         .createWithoutStarting();
+        cluster.setUncaughtExceptionsFilter(t -> {
+            logger.error("Caught exception, reporting during shutdown. 
Ignoring.", t);
+            return true;
+        });
+        cluster.startup();
+        cluster = init(cluster);
+        sut = new InJvmSut(cluster);
+    }
+
+    public SingleNodeSAITestBase(boolean withAccord)
+    {
+        this.withAccord = withAccord;
+    }
+
     @Test
     public void basicSaiTest()
     {
@@ -75,7 +114,8 @@ public class SingleNodeSAITest extends IntegrationTestBase
                                            
Arrays.asList(ColumnSpec.regularColumn("v1", ColumnSpec.asciiType(40, 100)),
                                                          
ColumnSpec.regularColumn("v2", ColumnSpec.int64Type),
                                                          
ColumnSpec.regularColumn("v3", ColumnSpec.int64Type)),
-                                           
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))))
+                                           
List.of(ColumnSpec.staticColumn("s1", ColumnSpec.asciiType(40, 100))),
+                                           withAccord ? 
Optional.of(TransactionalMode.full) : Optional.empty())
                             
.withCompactionStrategy("LeveledCompactionStrategy");
 
         sut.schemaChange(schema.compile().cql());
diff --git 
a/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java 
b/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
index 6da221862f..da7d9213e8 100644
--- 
a/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
+++ 
b/test/unit/org/apache/cassandra/cql3/statements/TransactionStatementTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.cql3.statements;
 
-import org.apache.cassandra.transport.Dispatcher;
-import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -32,14 +30,16 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
+import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.transport.messages.ResultMessage;
+import org.assertj.core.api.Assertions;
 
 import static 
org.apache.cassandra.cql3.statements.TransactionStatement.DUPLICATE_TUPLE_NAME_MESSAGE;
 import static 
org.apache.cassandra.cql3.statements.TransactionStatement.EMPTY_TRANSACTION_MESSAGE;
 import static 
org.apache.cassandra.cql3.statements.TransactionStatement.ILLEGAL_RANGE_QUERY_MESSAGE;
+import static 
org.apache.cassandra.cql3.statements.TransactionStatement.INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE;
 import static 
org.apache.cassandra.cql3.statements.TransactionStatement.INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE;
 import static 
org.apache.cassandra.cql3.statements.TransactionStatement.NO_CONDITIONS_IN_UPDATES_MESSAGE;
-import static 
org.apache.cassandra.cql3.statements.TransactionStatement.NO_COUNTERS_IN_TXNS_MESSAGE;
 import static 
org.apache.cassandra.cql3.statements.TransactionStatement.NO_TIMESTAMPS_IN_UPDATES_MESSAGE;
 import static 
org.apache.cassandra.cql3.statements.TransactionStatement.SELECT_REFS_NEED_COLUMN_MESSAGE;
 import static 
org.apache.cassandra.cql3.statements.TransactionStatement.TRANSACTIONS_DISABLED_ON_TABLE_MESSAGE;
@@ -58,7 +58,6 @@ public class TransactionStatementTest
     private static final TableId TABLE4_ID = 
TableId.fromString("00000000-0000-0000-0000-000000000004");
     private static final TableId TABLE5_ID = 
TableId.fromString("00000000-0000-0000-0000-000000000005");
     private static final TableId TABLE6_ID = 
TableId.fromString("00000000-0000-0000-0000-000000000006");
-    private static final TableId TABLE7_ID = 
TableId.fromString("00000000-0000-0000-0000-000000000007");
 
     @BeforeClass
     public static void beforeClass() throws Exception
@@ -70,45 +69,7 @@ public class TransactionStatementTest
                                     parse("CREATE TABLE tbl3 (k int PRIMARY 
KEY, \"with spaces\" int, \"with\"\"quote\" int, \"MiXeD_CaSe\" int) WITH 
transactional_mode = 'full'", "ks").id(TABLE3_ID),
                                     parse("CREATE TABLE tbl4 (k int PRIMARY 
KEY, int_list list<int>) WITH transactional_mode = 'full'", "ks").id(TABLE4_ID),
                                     parse("CREATE TABLE tbl5 (k int PRIMARY 
KEY, v int) WITH transactional_mode = 'full'", "ks").id(TABLE5_ID),
-                                    parse("CREATE TABLE tbl6 (k int PRIMARY 
KEY, c counter) WITH transactional_mode = 'full'", "ks").id(TABLE6_ID),
-                                    parse("CREATE TABLE tbl7 (k int PRIMARY 
KEY, v int) WITH transactional_mode = 'off'", "ks").id(TABLE7_ID));
-    }
-
-    @Test
-    public void shouldRejectCounterMutation()
-    {
-        String query = "BEGIN TRANSACTION\n" +
-                       "    UPDATE ks.tbl6 SET c += 100 WHERE k = 0;\n" +
-                       "COMMIT TRANSACTION";
-
-        Assertions.assertThatThrownBy(() -> prepare(query))
-                  .isInstanceOf(InvalidRequestException.class)
-                  
.hasMessageContaining(String.format(NO_COUNTERS_IN_TXNS_MESSAGE, "UPDATE", "at 
[2:5]"));
-    }
-
-    @Test
-    public void shouldRejectCounterReadInLet()
-    {
-        String query = "BEGIN TRANSACTION\n" +
-                       "  LET row1 = (SELECT * FROM ks.tbl6 WHERE k=0);\n" +
-                       "  SELECT row1.c;\n" +
-                       "COMMIT TRANSACTION";
-
-        Assertions.assertThatThrownBy(() -> prepare(query))
-                  .isInstanceOf(InvalidRequestException.class)
-                  
.hasMessageContaining(String.format(NO_COUNTERS_IN_TXNS_MESSAGE, "SELECT", "at 
[2:15]"));
-    }
-
-    @Test
-    public void shouldRejectCounterReadInSelect()
-    {
-        String query = "BEGIN TRANSACTION\n" +
-                       "  SELECT * FROM ks.tbl6 WHERE k=0;\n" +
-                       "COMMIT TRANSACTION";
-
-        Assertions.assertThatThrownBy(() -> prepare(query))
-                  .isInstanceOf(InvalidRequestException.class)
-                  
.hasMessageContaining(String.format(NO_COUNTERS_IN_TXNS_MESSAGE, "SELECT", "at 
[2:3]"));
+                                    parse("CREATE TABLE tbl6 (k int PRIMARY 
KEY, v int) WITH transactional_mode = 'off'", "ks").id(TABLE6_ID));
     }
 
     @Test
@@ -236,28 +197,6 @@ public class TransactionStatementTest
                   
.hasMessageContaining(String.format(INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, "LET 
assignment row1", "at [2:15]"));
     }
 
-    @Test
-    public void shouldRejectIllegalLimitInSelect()
-    {
-        String select = "SELECT * FROM ks.tbl1 WHERE k = 1 LIMIT 2";
-        String query = "BEGIN TRANSACTION\n" + select + ";\nCOMMIT 
TRANSACTION";
-
-        Assertions.assertThatThrownBy(() -> prepare(query))
-                  .isInstanceOf(InvalidRequestException.class)
-                  
.hasMessageContaining(String.format(INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, 
"returning select", "at [2:1]"));
-    }
-
-    @Test
-    public void shouldRejectIncompletePrimaryKeyInSelect()
-    {
-        String select = "SELECT * FROM ks.tbl1 WHERE k = 1";
-        String query = "BEGIN TRANSACTION\n" + select + ";\nCOMMIT 
TRANSACTION";
-
-        Assertions.assertThatThrownBy(() -> prepare(query))
-                  .isInstanceOf(InvalidRequestException.class)
-                  
.hasMessageContaining(String.format(INCOMPLETE_PRIMARY_KEY_SELECT_MESSAGE, 
"returning select", "at [2:1]"));
-    }
-
     @Test
     public void shouldRejectUpdateWithCondition()
     {
@@ -386,7 +325,7 @@ public class TransactionStatementTest
 
         Assertions.assertThatThrownBy(() -> prepare(query))
                   .isInstanceOf(InvalidRequestException.class)
-                  
.hasMessageContaining(String.format(ILLEGAL_RANGE_QUERY_MESSAGE, "returning 
select", "at [2:1]"));
+                  
.hasMessageContaining(String.format(INCOMPLETE_PARTITION_KEY_SELECT_MESSAGE, 
"returning select", "at [2:1]"));
     }
 
     @Test
@@ -407,7 +346,7 @@ public class TransactionStatementTest
     public void shouldRejectLetSelectOnNonTransactionalTable()
     {
         String query = "BEGIN TRANSACTION\n" +
-                       "  LET row1 = (SELECT * FROM ks.tbl7 WHERE k = 0);\n" +
+                       "  LET row1 = (SELECT * FROM ks.tbl6 WHERE k = 0);\n" +
                        "  INSERT INTO ks.tbl5 (k, v) VALUES (1, 2);\n" +
                        "COMMIT TRANSACTION;";
 
@@ -420,7 +359,7 @@ public class TransactionStatementTest
     public void shouldRejectSelectOnNonTransactionalTable()
     {
         String query = "BEGIN TRANSACTION\n" +
-                       "  SELECT * FROM ks.tbl7 WHERE k = 0;\n" +
+                       "  SELECT * FROM ks.tbl6 WHERE k = 0;\n" +
                        "COMMIT TRANSACTION;";
 
         Assertions.assertThatThrownBy(() -> prepare(query))
@@ -432,7 +371,7 @@ public class TransactionStatementTest
     public void shouldRejectUpdateOnNonTransactionalTable()
     {
         String query = "BEGIN TRANSACTION\n" +
-                       "  INSERT INTO ks.tbl7 (k, v) VALUES (1, 2);\n" +
+                       "  INSERT INTO ks.tbl6 (k, v) VALUES (1, 2);\n" +
                        "COMMIT TRANSACTION;";
 
         Assertions.assertThatThrownBy(() -> prepare(query))
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
index 452bb0bc36..b8ea6445d4 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/entities/TupleTypeTest.java
@@ -171,7 +171,7 @@ public class TupleTypeTest extends CQLTester
 
         createIndex("CREATE INDEX tuple_index ON %s (t)");
         // select using unset
-        assertInvalidMessage("Invalid unset value for tuple field number 0", 
"SELECT * FROM %s WHERE k = ? and t = (?,?,?)", unset(), unset(), unset(), 
unset());
+        assertInvalidMessage("Invalid unset value for tuple field number 0", 
"SELECT * FROM %s WHERE k = ? and t = (?,?,?)", 42, unset(), unset(), unset());
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to