This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit e0c7d7b9389fedf9b3f982ede2a93a9710f7fe52
Merge: 0e41c80c81 bd99331bf2
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Feb 12 17:32:50 2025 -0600

    Merge branch 'cassandra-5.0' into trunk
    
    * cassandra-5.0:
      Avoid fetching entire partitions on unresolved static rows in RFP when no 
static column predicates exist

 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/filter/RowFilter.java  | 11 ++++-
 .../cassandra/index/sai/plan/QueryController.java  |  6 ++-
 .../cassandra/service/reads/DataResolver.java      |  2 +-
 .../service/reads/ReplicaFilteringProtection.java  | 10 ++--
 .../test/ReplicaFilteringProtectionTest.java       | 19 ++++++++
 .../test/cql3/SingleNodeTableWalkTest.java         | 53 +++++-----------------
 .../distributed/test/sai/StrictFilteringTest.java  | 23 +++++++++-
 .../cassandra/fuzz/sai/SingleNodeSAITestBase.java  | 16 ++++---
 .../unit/org/apache/cassandra/cql3/KnownIssue.java |  6 +--
 10 files changed, 86 insertions(+), 61 deletions(-)

diff --cc CHANGES.txt
index 49cce9e160,f659a1f220..aba2b957f7
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,135 -1,5 +1,136 @@@
 -5.0.4
 +5.1
 + * Improve error messages for constraints (CASSANDRA-20266)
 + * Add system_views.partition_key_statistics for querying SSTable metadata 
(CASSANDRA-20161)
 + * CEP-42 - Add Constraints Framework (CASSANDRA-19947)
 + * Add table metric PurgeableTombstoneScannedHistogram and a tracing event 
for scanned purgeable tombstones (CASSANDRA-20132)
 + * Make sure we can parse the expanded CQL before writing it to the log or 
sending it to replicas (CASSANDRA-20218)
 + * Add format_bytes and format_time functions (CASSANDRA-19546)
 + * Fix error when trying to assign a tuple to target type not being a tuple 
(CASSANDRA-20237)
 + * Fail CREATE TABLE LIKE statement if UDTs in target keyspace do not exist 
or they have different structure from ones in source keyspace (CASSANDRA-19966)
 + * Support octet_length and length functions (CASSANDRA-20102)
 + * Make JsonUtils serialize Instant always with the same format 
(CASSANDRA-20209)
 + * Port Harry v2 to trunk (CASSANDRA-20200)
 + * Enable filtering of snapshots on keyspace, table and snapshot name in 
nodetool listsnapshots (CASSANDRA-20151)
 + * Create manifest upon loading where it does not exist or enrich it 
(CASSANDRA-20150)
 + * Propagate true size of snapshot in SnapshotDetailsTabularData to not call 
JMX twice in nodetool listsnapshots (CASSANDRA-20149)
 + * Implementation of CEP-43 - copying a table via CQL by CREATE TABLE LIKE 
(CASSANDRA-19964)
 + * Periodically disconnect roles that are revoked or have LOGIN=FALSE set 
(CASSANDRA-19385)
 + * AST library for CQL-based fuzz tests (CASSANDRA-20198)
 + * Support audit logging for JMX operations (CASSANDRA-20128)
 + * Enable sorting of nodetool status output (CASSANDRA-20104)
 + * Support downgrading after CMS is initialized (CASSANDRA-20145)
 + * Deprecate IEndpointSnitch (CASSANDRA-19488)
 + * Check presence of a snapshot in a case-insensitive manner on macOS 
platform to prevent hardlinking failures (CASSANDRA-20146)
 + * Enable JMX server configuration to be in cassandra.yaml (CASSANDRA-11695)
 + * Parallelized UCS compactions (CASSANDRA-18802)
 + * Avoid prepared statement invalidation race when committing schema changes 
(CASSANDRA-20116)
 + * Restore optimization in MultiCBuilder around building one clustering 
(CASSANDRA-20129)
 + * Consolidate all snapshot management to SnapshotManager and introduce 
SnapshotManagerMBean (CASSANDRA-18111)
 + * Fix RequestFailureReason constants codes (CASSANDRA-20126)
 + * Introduce SSTableSimpleScanner for compaction (CASSANDRA-20092)
 + * Include column drop timestamp in alter table transformation 
(CASSANDRA-18961)
 + * Make JMX SSL configurable in cassandra.yaml (CASSANDRA-18508)
 + * Fix cqlsh CAPTURE command to save query results without trace details when 
TRACING is ON (CASSANDRA-19105)
 + * Optionally prevent tombstone purging during repair (CASSANDRA-20071)
 + * Add post-filtering support for the IN operator in SAI queries 
(CASSANDRA-20025)
 + * Don’t finish ongoing decommission and move operations during startup 
(CASSANDRA-20040)
 + * Nodetool reconfigure cms has correct return code when streaming fails 
(CASSANDRA-19972)
 + * Reintroduce RestrictionSet#iterator() optimization around multi-column 
restrictions (CASSANDRA-20034)
 + * Explicitly localize strings to Locale.US for internal implementation 
(CASSANDRA-19953)
 + * Add -H option for human-friendly output in nodetool compactionhistory 
(CASSANDRA-20015)
 + * Fix type check for referenced duration type for nested types 
(CASSANDRA-19890)
 + * In simulation tests, correctly set the tokens of replacement nodes 
(CASSANDRA-19997)
 + * During TCM upgrade, retain all properties of existing system tables 
(CASSANDRA-19992)
 + * Properly cancel in-flight futures and reject requests in 
EpochAwareDebounce during shutdown (CASSANDRA-19848)
 + * Provide clearer exception message on failing commitlog_disk_access_mode 
combinations (CASSANDRA-19812)
 + * Add total space used for a keyspace to nodetool tablestats 
(CASSANDRA-19671)
 + * Ensure Relation#toRestriction() handles ReversedType properly 
(CASSANDRA-19950)
 + * Add JSON and YAML output option to nodetool gcstats (CASSANDRA-19771)
 + * Introduce metadata serialization version V4 (CASSANDRA-19970)
 + * Allow CMS reconfiguration to work around DOWN nodes (CASSANDRA-19943)
 + * Make TableParams.Serializer set allowAutoSnapshots and incrementalBackups 
(CASSANDRA-19954)
 + * Make sstabledump possible to show tombstones only (CASSANDRA-19939)
 + * Ensure that RFP queries potentially stale replicas even with only key 
columns in the row filter (CASSANDRA-19938)
 + * Allow nodes to change IP address while upgrading to TCM (CASSANDRA-19921)
 + * Retain existing keyspace params on system tables after upgrade 
(CASSANDRA-19916)
 + * Deprecate use of gossip state for paxos electorate verification 
(CASSANDRA-19904)
 + * Update dtest-api to 0.0.17 to fix jvm17 crash in jvm-dtests 
(CASSANDRA-19239)
 + * Add resource leak test and Update Netty to 4.1.113.Final to fix leak 
(CASSANDRA-19783)
 + * Fix incorrect nodetool suggestion when gossip mode is running 
(CASSANDRA-19905)
 + * SAI support for BETWEEN operator (CASSANDRA-19688)
 + * Fix BETWEEN filtering for reversed clustering columns (CASSANDRA-19878)
 + * Retry if node leaves CMS while committing a transformation 
(CASSANDRA-19872)
 + * Add support for NOT operators in WHERE clauses. Fixed Three Valued Logic 
(CASSANDRA-18584)
 + * Allow getendpoints for system tables and make sure getNaturalReplicas work 
for MetaStrategy (CASSANDRA-19846)
 + * On upgrade, handle pre-existing tables with unexpected table ids 
(CASSANDRA-19845)
 + * Reconfigure CMS before assassinate (CASSANDRA-19768)
 + * Warn about unqualified prepared statement only if it is select or 
modification statement (CASSANDRA-18322)
 + * Update legacy peers tables during node replacement (CASSANDRA-19782)
 + * Refactor ColumnCondition (CASSANDRA-19620)
 + * Allow configuring log format for Audit Logs (CASSANDRA-19792)
 + * Support for noboolean rpm (centos7 compatible) packages removed 
(CASSANDRA-19787)
 + * Allow threads waiting for the metadata log follower to be interrupted 
(CASSANDRA-19761)
 + * Support dictionary lookup for CassandraPasswordValidator (CASSANDRA-19762)
 + * Disallow denylisting keys in system_cluster_metadata (CASSANDRA-19713)
 + * Fix gossip status after replacement (CASSANDRA-19712)
 + * Ignore repair requests for system_cluster_metadata (CASSANDRA-19711)
 + * Avoid ClassCastException when verifying tables with reversed partitioner 
(CASSANDRA-19710)
 + * Always repair the full range when repairing system_cluster_metadata 
(CASSANDRA-19709)
 + * Use table-specific partitioners during Paxos repair (CASSANDRA-19714)
 + * Expose current compaction throughput in nodetool (CASSANDRA-13890)
 + * CEP-24 Password validation / generation (CASSANDRA-17457)
 + * Reconfigure CMS after replacement, bootstrap and move operations 
(CASSANDRA-19705)
 + * Support querying LocalStrategy tables with any partitioner 
(CASSANDRA-19692)
 + * Relax slow_query_log_timeout for MultiNodeSAITest (CASSANDRA-19693)
 + * Audit Log entries are missing identity for mTLS connections 
(CASSANDRA-19669)
 + * Add support for the BETWEEN operator in WHERE clauses (CASSANDRA-19604)
 + * Replace Stream iteration with for-loop for 
SimpleRestriction::bindAndGetClusteringElements (CASSANDRA-19679)
 + * Consolidate logging on trace level (CASSANDRA-19632)
 + * Expand DDL statements on coordinator before submission to the CMS 
(CASSANDRA-19592)
 + * Fix number of arguments of String.format() in various classes 
(CASSANDRA-19645)
 + * Remove unused fields from config (CASSANDRA-19599)
 + * Refactor Relation and Restriction hierarchies (CASSANDRA-19341)
 + * Raise priority of TCM internode messages during critical operations 
(CASSANDRA-19517)
 + * Add nodetool command to unregister LEFT nodes (CASSANDRA-19581)
 + * Add cluster metadata id to gossip syn messages (CASSANDRA-19613)
 + * Reduce heap usage occupied by the metrics (CASSANDRA-19567)
 + * Improve handling of transient replicas during range movements 
(CASSANDRA-19344)
 + * Enable debounced internode log requests to be cancelled at shutdown 
(CASSANDRA-19514)
 + * Correctly set last modified epoch when combining multistep operations into 
a single step (CASSANDRA-19538)
 + * Add new TriggersPolicy configuration to allow operators to disable 
triggers (CASSANDRA-19532)
 + * Use Transformation.Kind.id in local and distributed log tables 
(CASSANDRA-19516)
 + * Remove period field from ClusterMetadata and metadata log tables 
(CASSANDRA-19482)
 + * Enrich system_views.pending_hints vtable with hints sizes (CASSANDRA-19486)
 + * Expose all dropwizard metrics in virtual tables (CASSANDRA-14572)
 + * Ensured that PropertyFileSnitchTest do not overwrite 
cassandra-toploogy.properties (CASSANDRA-19502)
 + * Add option for MutualTlsAuthenticator to restrict the certificate validity 
period (CASSANDRA-18951)
 + * Fix StorageService::constructRangeToEndpointMap for non-distributed 
keyspaces (CASSANDRA-19255)
 + * Group nodetool cms commands into single command group (CASSANDRA-19393)
 + * Register the measurements of the bootstrap process as Dropwizard metrics 
(CASSANDRA-19447)
 + * Add LIST SUPERUSERS CQL statement (CASSANDRA-19417)
 + * Modernize CQLSH datetime conversions (CASSANDRA-18879)
 + * Harry model and in-JVM tests for partition-restricted 2i queries 
(CASSANDRA-18275)
 + * Refactor cqlshmain global constants (CASSANDRA-19201)
 + * Remove native_transport_port_ssl (CASSANDRA-19397)
 + * Make nodetool reconfigurecms sync by default and add --cancel to be able 
to cancel ongoing reconfigurations (CASSANDRA-19216)
 + * Expose auth mode in system_views.clients, nodetool clientstats, metrics 
(CASSANDRA-19366)
 + * Remove sealed_periods and last_sealed_period tables (CASSANDRA-19189)
 + * Improve setup and initialisation of LocalLog/LogSpec (CASSANDRA-19271)
 + * Refactor structure of caching metrics and expose auth cache metrics via 
JMX (CASSANDRA-17062)
 + * Allow CQL client certificate authentication to work without sending an 
AUTHENTICATE request (CASSANDRA-18857)
 + * Extend nodetool tpstats and system_views.thread_pools with detailed pool 
parameters (CASSANDRA-19289)
 + * Remove dependency on Sigar in favor of OSHI (CASSANDRA-16565)
 + * Simplify the bind marker and Term logic (CASSANDRA-18813)
 + * Limit cassandra startup to supported JDKs, allow higher JDKs by setting 
CASSANDRA_JDK_UNSUPPORTED (CASSANDRA-18688)
 + * Standardize nodetool tablestats formatting of data units (CASSANDRA-19104)
 + * Make nodetool tablestats use number of significant digits for time and 
average values consistently (CASSANDRA-19015)
 + * Upgrade jackson to 2.15.3 and snakeyaml to 2.1 (CASSANDRA-18875)
 + * Transactional Cluster Metadata [CEP-21] (CASSANDRA-18330)
 + * Add ELAPSED command to cqlsh (CASSANDRA-18861)
 + * Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
 + * Clean up obsolete functions and simplify cql_version handling in cqlsh 
(CASSANDRA-18787)
 +Merged from 5.0:
+  * Avoid fetching entire partitions on unresolved static rows in RFP when no 
static column predicates exist (CASSANDRA-20243)
   * Avoid indexing empty values for non-literals and types that do not allow 
them (CASSANDRA-20313)
   * Fix incorrect results of min / max in-built functions on clustering 
columns in descending order (CASSANDRA-20295)
   * Avoid possible consistency violations for SAI intersection queries over 
repaired index matches and multiple non-indexed column matches (CASSANDRA-20189)
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
index 6412fdf2d9,0000000000..c18d6cfecf
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/cql3/SingleNodeTableWalkTest.java
@@@ -1,594 -1,0 +1,563 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.test.cql3;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.EnumSet;
 +import java.util.LinkedHashMap;
 +import java.util.List;
 +import java.util.NavigableSet;
 +import java.util.Optional;
 +import java.util.TreeMap;
 +import java.util.stream.Collectors;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.ImmutableList;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Sets;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import accord.utils.Gen;
 +import accord.utils.Gens;
 +import accord.utils.Property;
 +import accord.utils.RandomSource;
 +import org.apache.cassandra.cql3.KnownIssue;
 +import org.apache.cassandra.cql3.ast.Bind;
 +import org.apache.cassandra.cql3.ast.Conditional;
 +import org.apache.cassandra.cql3.ast.CreateIndexDDL;
 +import org.apache.cassandra.cql3.ast.FunctionCall;
 +import org.apache.cassandra.cql3.ast.Mutation;
 +import org.apache.cassandra.cql3.ast.ReferenceExpression;
 +import org.apache.cassandra.cql3.ast.Select;
 +import org.apache.cassandra.cql3.ast.Symbol;
 +import org.apache.cassandra.cql3.ast.TableReference;
 +import org.apache.cassandra.cql3.ast.Value;
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.marshal.AbstractType;
 +import org.apache.cassandra.db.marshal.InetAddressType;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.test.sai.SAIUtil;
 +import org.apache.cassandra.harry.model.BytesPartitionState;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.utils.ASTGenerators;
 +import org.apache.cassandra.utils.AbstractTypeGenerators;
 +import org.apache.cassandra.utils.AbstractTypeGenerators.TypeGenBuilder;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.CassandraGenerators.TableMetadataBuilder;
 +import org.apache.cassandra.utils.ImmutableUniqueList;
 +import org.quicktheories.generators.SourceDSL;
 +
 +import static accord.utils.Property.commands;
 +import static accord.utils.Property.stateful;
 +import static 
org.apache.cassandra.utils.AbstractTypeGenerators.getTypeSupport;
 +import static org.apache.cassandra.utils.Generators.toGen;
 +
 +public class SingleNodeTableWalkTest extends StatefulASTBase
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(SingleNodeTableWalkTest.class);
 +
 +    protected void preCheck(Cluster cluster, Property.StatefulBuilder builder)
 +    {
 +        // if a failing seed is detected, populate here
 +        // Example: builder.withSeed(42L);
 +        // CQL operations may have opertors such as +, -, and / (example 4 + 
4), to "apply" them to get a constant value
 +        // CQL_DEBUG_APPLY_OPERATOR = true;
 +    }
 +
 +    protected TypeGenBuilder supportedTypes()
 +    {
 +        return 
AbstractTypeGenerators.withoutUnsafeEquality(AbstractTypeGenerators.builder()
 +                                                                              
    .withTypeKinds(AbstractTypeGenerators.TypeKind.PRIMITIVE));
 +    }
 +
 +    protected List<CreateIndexDDL.Indexer> supportedIndexers()
 +    {
 +        // since legacy is async it's not clear how the test can wait for the 
background write to complete...
 +        return Collections.singletonList(CreateIndexDDL.SAI);
 +    }
 +
 +    public Property.Command<State, Void, ?> selectExisting(RandomSource rs, 
State state)
 +    {
 +        NavigableSet<BytesPartitionState.Ref> keys = 
state.model.partitionKeys();
 +        BytesPartitionState.Ref ref = rs.pickOrderedSet(keys);
 +        Clustering<ByteBuffer> key = ref.key;
 +
 +        Select.Builder builder = Select.builder().table(state.metadata);
 +        ImmutableUniqueList<Symbol> pks = 
state.model.factory.partitionColumns;
 +        ImmutableUniqueList<Symbol> cks = 
state.model.factory.clusteringColumns;
 +        for (Symbol pk : pks)
 +            builder.value(pk, key.bufferAt(pks.indexOf(pk)));
 +
 +        boolean wholePartition = cks.isEmpty() || rs.nextBoolean();
 +        if (!wholePartition)
 +        {
 +            // find a row to select
 +            BytesPartitionState partition = state.model.get(ref);
 +            if (partition.isEmpty())
 +            {
 +                wholePartition = true;
 +            }
 +            else
 +            {
 +                NavigableSet<Clustering<ByteBuffer>> clusteringKeys = 
partition.clusteringKeys();
 +                Clustering<ByteBuffer> clusteringKey = 
rs.pickOrderedSet(clusteringKeys);
 +                for (Symbol ck : cks)
 +                    builder.value(ck, 
clusteringKey.bufferAt(cks.indexOf(ck)));
 +            }
 +        }
 +        Select select = builder.build();
 +        return state.command(rs, select, (wholePartition ? "Whole Partition" 
: "Single Row"));
 +    }
 +
 +    public Property.Command<State, Void, ?> selectToken(RandomSource rs, 
State state)
 +    {
 +        NavigableSet<BytesPartitionState.Ref> keys = 
state.model.partitionKeys();
 +        BytesPartitionState.Ref ref = rs.pickOrderedSet(keys);
 +
 +        Select.Builder builder = Select.builder().table(state.metadata);
 +        
builder.where(FunctionCall.tokenByColumns(state.model.factory.partitionColumns),
 +                      Conditional.Where.Inequality.EQUAL,
 +                      token(state, ref));
 +
 +        Select select = builder.build();
 +        return state.command(rs, select, "by token");
 +    }
 +
 +    public Property.Command<State, Void, ?> selectTokenRange(RandomSource rs, 
State state)
 +    {
 +        NavigableSet<BytesPartitionState.Ref> keys = 
state.model.partitionKeys();
 +        BytesPartitionState.Ref start, end;
 +        switch (keys.size())
 +        {
 +            case 1:
 +                start = end = Iterables.get(keys, 0);
 +                break;
 +            case 2:
 +                start = Iterables.get(keys, 0);
 +                end = Iterables.get(keys, 1);
 +                break;
 +            case 0:
 +                throw new IllegalArgumentException("Unable to select token 
ranges when no partitions exist");
 +            default:
 +            {
 +                int si = rs.nextInt(0, keys.size() - 1);
 +                int ei = rs.nextInt(si + 1, keys.size());
 +                start = Iterables.get(keys, si);
 +                end = Iterables.get(keys, ei);
 +            }
 +            break;
 +        }
 +        Select.Builder builder = Select.builder().table(state.metadata);
 +        FunctionCall pkToken = 
FunctionCall.tokenByColumns(state.model.factory.partitionColumns);
 +        boolean startInclusive = rs.nextBoolean();
 +        boolean endInclusive = rs.nextBoolean();
 +        if (startInclusive && endInclusive && rs.nextBoolean())
 +        {
 +            // between
 +            builder.between(pkToken, token(state, start), token(state, end));
 +        }
 +        else
 +        {
 +            builder.where(pkToken,
 +                          startInclusive ? 
Conditional.Where.Inequality.GREATER_THAN_EQ : 
Conditional.Where.Inequality.GREATER_THAN,
 +                          token(state, start));
 +            builder.where(pkToken,
 +                          endInclusive ? 
Conditional.Where.Inequality.LESS_THAN_EQ : 
Conditional.Where.Inequality.LESS_THAN,
 +                          token(state, end));
 +        }
 +        Select select = builder.build();
 +        return state.command(rs, select, "by token range");
 +    }
 +
 +    public Property.Command<State, Void, ?> 
partitionRestrictedQuery(RandomSource rs, State state)
 +    {
 +        //TODO (now): remove duplicate logic
 +        NavigableSet<BytesPartitionState.Ref> keys = 
state.model.partitionKeys();
 +        BytesPartitionState.Ref ref = rs.pickOrderedSet(keys);
 +        Clustering<ByteBuffer> key = ref.key;
 +
 +        Select.Builder builder = Select.builder().table(state.metadata);
 +        ImmutableUniqueList<Symbol> pks = 
state.model.factory.partitionColumns;
 +        for (Symbol pk : pks)
 +            builder.value(pk, key.bufferAt(pks.indexOf(pk)));
 +
 +
-         Symbol symbol;
 +        List<Symbol> searchableColumns = state.nonPartitionColumns;
-         if (state.hasMultiNodeAllowFilteringWithLocalWritesIssue())
-         {
-             if (state.nonPkIndexedColumns.isEmpty())
-                 throw new AssertionError("Ignoring 
AF_MULTI_NODE_AND_NODE_LOCAL_WRITES is defined, but no non-partition columns 
are indexed");
-             symbol = rs.pick(state.nonPkIndexedColumns);
-         }
-         else
-         {
-             symbol = rs.pick(searchableColumns);
-         }
++        Symbol symbol = rs.pick(searchableColumns);
 +
 +        TreeMap<ByteBuffer, List<BytesPartitionState.PrimaryKey>> universe = 
state.model.index(ref, symbol);
 +        // we need to index 'null' so LT works, but we can not directly query 
it... so filter out when selecting values
 +        NavigableSet<ByteBuffer> allowed = 
Sets.filter(universe.navigableKeySet(), b -> 
!ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(b));
 +        if (allowed.isEmpty())
 +            return Property.ignoreCommand();
 +        ByteBuffer value = rs.pickOrderedSet(allowed);
 +
 +        EnumSet<CreateIndexDDL.QueryType> supported = 
!state.indexes.containsKey(symbol)
 +                                                      ? 
EnumSet.noneOf(CreateIndexDDL.QueryType.class)
 +                                                      : 
state.indexes.get(symbol).supportedQueries();
 +        if (supported.isEmpty() || 
!supported.contains(CreateIndexDDL.QueryType.Range))
 +            builder.allowFiltering();
 +
 +        // there are known SAI bugs, so need to avoid them to stay stable...
 +        if (state.indexes.containsKey(symbol) && 
state.indexes.get(symbol).indexDDL.indexer == CreateIndexDDL.SAI)
 +        {
 +            if (symbol.type() == InetAddressType.instance
 +                && IGNORED_ISSUES.contains(KnownIssue.SAI_INET_MIXED))
 +                return eqSearch(rs, state, symbol, value, builder);
 +        }
 +
 +        if (rs.nextBoolean())
 +            return simpleRangeSearch(rs, state, symbol, value, builder);
 +        //TODO (coverage): define search that has a upper and lower bound: a 
> and a < | a beteeen ? and ?
 +        return eqSearch(rs, state, symbol, value, builder);
 +    }
 +
 +    public Property.Command<State, Void, ?> nonPartitionQuery(RandomSource 
rs, State state)
 +    {
-         Symbol symbol;
-         if (state.hasMultiNodeAllowFilteringWithLocalWritesIssue())
-         {
-             symbol = rs.pickUnorderedSet(state.indexes.keySet());
-         }
-         else
-         {
-             symbol = rs.pick(state.searchableColumns);
-         }
++        Symbol symbol = rs.pick(state.searchableColumns);
 +        TreeMap<ByteBuffer, List<BytesPartitionState.PrimaryKey>> universe = 
state.model.index(symbol);
 +        // we need to index 'null' so LT works, but we can not directly query 
it... so filter out when selecting values
 +        NavigableSet<ByteBuffer> allowed = 
Sets.filter(universe.navigableKeySet(), b -> 
!ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(b));
 +        if (allowed.isEmpty())
 +            return Property.ignoreCommand();
 +        ByteBuffer value = rs.pickOrderedSet(allowed);
 +        Select.Builder builder = Select.builder().table(state.metadata);
 +
 +        EnumSet<CreateIndexDDL.QueryType> supported = 
!state.indexes.containsKey(symbol) ? 
EnumSet.noneOf(CreateIndexDDL.QueryType.class) : 
state.indexes.get(symbol).supportedQueries();
 +        if (supported.isEmpty() || 
!supported.contains(CreateIndexDDL.QueryType.Range))
 +            builder.allowFiltering();
 +
 +        // there are known SAI bugs, so need to avoid them to stay stable...
 +        if (state.indexes.containsKey(symbol) && 
state.indexes.get(symbol).indexDDL.indexer == CreateIndexDDL.SAI)
 +        {
 +            if (symbol.type() == InetAddressType.instance
 +                && IGNORED_ISSUES.contains(KnownIssue.SAI_INET_MIXED))
 +                return eqSearch(rs, state, symbol, value, builder);
 +        }
 +
 +        if (rs.nextBoolean())
 +            return simpleRangeSearch(rs, state, symbol, value, builder);
 +        //TODO (coverage): define search that has a upper and lower bound: a 
> and a < | a beteeen ? and ?
 +        return eqSearch(rs, state, symbol, value, builder);
 +    }
 +
 +    public Property.Command<State, Void, ?> multiColumnQuery(RandomSource rs, 
State state)
 +    {
 +        List<Symbol> allowedColumns = state.multiColumnQueryColumns();
 +
 +        if (allowedColumns.size() <= 1)
 +            throw new IllegalArgumentException("Unable to do multiple column 
query when there is only a single column");
 +
 +        int numColumns = rs.nextInt(1, allowedColumns.size()) + 1;
 +
 +        List<Symbol> cols = 
Gens.lists(Gens.pick(allowedColumns)).unique().ofSize(numColumns).next(rs);
 +
 +        Select.Builder builder = 
Select.builder().table(state.metadata).allowFiltering();
 +
 +        for (Symbol symbol : cols)
 +        {
 +            TreeMap<ByteBuffer, List<BytesPartitionState.PrimaryKey>> 
universe = state.model.index(symbol);
 +            NavigableSet<ByteBuffer> allowed = 
Sets.filter(universe.navigableKeySet(), b -> 
!ByteBufferUtil.EMPTY_BYTE_BUFFER.equals(b));
 +            //TODO (now): support
 +            if (allowed.isEmpty())
 +                return Property.ignoreCommand();
 +            ByteBuffer value = rs.pickOrderedSet(allowed);
 +            builder.value(symbol, value);
 +        }
 +
 +        Select select = builder.build();
 +        String annotate = cols.stream().map(symbol -> {
 +            var indexed = state.indexes.get(symbol);
 +            return symbol.detailedName() + (indexed == null ? "" : " (indexed 
with " + indexed.indexDDL.indexer.name() + ")");
 +        }).collect(Collectors.joining(", "));
 +        return state.command(rs, select, annotate);
 +    }
 +
 +    private Property.Command<State, Void, ?> simpleRangeSearch(RandomSource 
rs, State state, Symbol symbol, ByteBuffer value, Select.Builder builder)
 +    {
 +        // do a simple search, like > or <
 +        Conditional.Where.Inequality kind = state.rangeInequalityGen.next(rs);
 +        builder.where(symbol, kind, value);
 +        Select select = builder.build();
 +        var indexed = state.indexes.get(symbol);
 +        return state.command(rs, select, symbol.detailedName() + (indexed == 
null ? "" : ", indexed with " + indexed.indexDDL.indexer.name()));
 +    }
 +
 +    private Property.Command<State, Void, ?> eqSearch(RandomSource rs, State 
state, Symbol symbol, ByteBuffer value, Select.Builder builder)
 +    {
 +        builder.value(symbol, value);
 +
 +        Select select = builder.build();
 +        var indexed = state.indexes.get(symbol);
 +        return state.command(rs, select, symbol.detailedName() + (indexed == 
null ? "" : ", indexed with " + indexed.indexDDL.indexer.name()));
 +    }
 +
 +    protected State createState(RandomSource rs, Cluster cluster)
 +    {
 +        return new State(rs, cluster);
 +    }
 +
 +    protected Cluster createCluster() throws IOException
 +    {
 +        return createCluster(1, i -> {});
 +    }
 +
 +    @Test
 +    public void test() throws IOException
 +    {
 +        try (Cluster cluster = createCluster())
 +        {
 +            Property.StatefulBuilder statefulBuilder = 
stateful().withExamples(10).withSteps(400);
 +            preCheck(cluster, statefulBuilder);
 +            statefulBuilder.check(commands(() -> rs -> createState(rs, 
cluster))
 +                                  .add(StatefulASTBase::insert)
 +                                  .add(StatefulASTBase::fullTableScan)
 +                                  .addIf(State::hasPartitions, 
this::selectExisting)
 +                                  .addAllIf(State::supportTokens, b -> 
b.add(this::selectToken)
 +                                                                        
.add(this::selectTokenRange))
 +                                  .addIf(State::hasEnoughMemtable, 
StatefulASTBase::flushTable)
 +                                  .addIf(State::hasEnoughSSTables, 
StatefulASTBase::compactTable)
 +                                  .addIf(State::allowNonPartitionQuery, 
this::nonPartitionQuery)
 +                                  
.addIf(State::allowNonPartitionMultiColumnQuery, this::multiColumnQuery)
 +                                  .addIf(State::allowPartitionQuery, 
this::partitionRestrictedQuery)
 +                                  .destroyState(State::close)
 +                                  .onSuccess(onSuccess(logger))
 +                                  .build());
 +        }
 +    }
 +
 +    protected TableMetadata defineTable(RandomSource rs, String ks)
 +    {
 +        //TODO (correctness): the id isn't correct... this is what we use to 
create the table, so would miss the actual ID
 +        // Defaults may also be incorrect, but given this is the same version 
it "shouldn't"
 +        //TODO (coverage): partition is defined at the cluster level, so have 
to hard code in this model as the table is changed rather than cluster being 
recreated... this limits coverage
 +        return toGen(new TableMetadataBuilder()
 +                     .withTableKinds(TableMetadata.Kind.REGULAR)
 +                     .withKnownMemtables()
 +                     .withKeyspaceName(ks).withTableName("tbl")
 +                     .withSimpleColumnNames()
 +                     .withDefaultTypeGen(supportedTypes())
 +                     .withPartitioner(Murmur3Partitioner.instance)
 +                     .build())
 +               .next(rs);
 +    }
 +
 +    private List<CreateIndexDDL.Indexer> columnSupportsIndexing(TableMetadata 
metadata, ColumnMetadata col)
 +    {
 +        return supportedIndexers().stream()
 +                                  .filter(i -> i.supported(metadata, col))
 +                                  .collect(Collectors.toList());
 +    }
 +
 +    private static FunctionCall token(State state, BytesPartitionState.Ref 
ref)
 +    {
 +        Preconditions.checkNotNull(ref.key);
 +        List<Value> values = new ArrayList<>(ref.key.size());
 +        for (int i = 0; i < ref.key.size(); i++)
 +        {
 +            ByteBuffer bb = ref.key.bufferAt(i);
 +            Symbol type = state.model.factory.partitionColumns.get(i);
 +            values.add(new Bind(bb, type.type()));
 +        }
 +        return FunctionCall.tokenByValue(values);
 +    }
 +
 +    public class State extends CommonState
 +    {
 +        protected final LinkedHashMap<Symbol, IndexedColumn> indexes;
 +        private final Gen<Mutation> mutationGen;
 +        private final List<Symbol> nonPartitionColumns;
 +        private final List<Symbol> searchableColumns;
++        private final List<Symbol> nonPkIndexedColumns;
 +
 +        public State(RandomSource rs, Cluster cluster)
 +        {
 +            super(rs, cluster, defineTable(rs, nextKeyspace()));
 +
 +            this.indexes = createIndexes(rs, metadata);
 +
 +            cluster.forEach(i -> i.nodetoolResult("disableautocompaction", 
metadata.keyspace, this.metadata.name).asserts().success());
 +
 +            List<LinkedHashMap<Symbol, Object>> uniquePartitions;
 +            {
 +                int unique = rs.nextInt(1, 10);
 +                List<Symbol> columns = model.factory.partitionColumns;
 +                List<Gen<?>> gens = new ArrayList<>(columns.size());
 +                for (int i = 0; i < columns.size(); i++)
 +                    
gens.add(toGen(getTypeSupport(columns.get(i).type()).valueGen));
 +                uniquePartitions = Gens.lists(r2 -> {
 +                    LinkedHashMap<Symbol, Object> vs = new LinkedHashMap<>();
 +                    for (int i = 0; i < columns.size(); i++)
 +                        vs.put(columns.get(i), gens.get(i).next(r2));
 +                    return vs;
 +                }).uniqueBestEffort().ofSize(unique).next(rs);
 +            }
 +
 +            ASTGenerators.MutationGenBuilder mutationGenBuilder = new 
ASTGenerators.MutationGenBuilder(metadata)
 +                                                                  
.withoutTransaction()
 +                                                                  
.withoutTtl()
 +                                                                  
.withoutTimestamp()
 +                                                                  
.withPartitions(SourceDSL.arbitrary().pick(uniquePartitions));
 +            if (IGNORED_ISSUES.contains(KnownIssue.SAI_EMPTY_TYPE))
 +            {
 +                model.factory.regularAndStaticColumns.stream()
 +                                                     // exclude SAI indexed 
columns
 +                                                     .filter(s -> 
!indexes.containsKey(s) || indexes.get(s).indexDDL.indexer != 
CreateIndexDDL.SAI)
 +                                                     
.forEach(mutationGenBuilder::allowEmpty);
 +            }
 +            else
 +            {
 +                
model.factory.regularAndStaticColumns.forEach(mutationGenBuilder::allowEmpty);
 +            }
 +            this.mutationGen = toGen(mutationGenBuilder.build());
 +
 +            nonPartitionColumns = ImmutableList.<Symbol>builder()
 +                                               
.addAll(model.factory.clusteringColumns)
 +                                               
.addAll(model.factory.staticColumns)
 +                                               
.addAll(model.factory.regularColumns)
 +                                               .build();
++            nonPkIndexedColumns = nonPartitionColumns.stream()
++                                                     
.filter(indexes::containsKey)
++                                                     
.collect(Collectors.toList());
 +
 +            searchableColumns = metadata.partitionKeyColumns().size() > 1 ?  
model.factory.selectionOrder : nonPartitionColumns;
 +        }
 +
 +        @Override
 +        protected Gen<Mutation> mutationGen()
 +        {
 +            return mutationGen;
 +        }
 +
 +        private LinkedHashMap<Symbol, IndexedColumn> 
createIndexes(RandomSource rs, TableMetadata metadata)
 +        {
 +            LinkedHashMap<Symbol, IndexedColumn> indexed = new 
LinkedHashMap<>();
 +            // for some test runs, avoid using indexes
 +            if (rs.nextBoolean())
 +                return indexed;
 +            for (ColumnMetadata col : metadata.columnsInFixedOrder())
 +            {
 +                Symbol symbol = Symbol.from(col);
 +                AbstractType<?> type = symbol.type();
 +
 +                if (col.name.toString().length() >= 48
 +                    && 
IGNORED_ISSUES.contains(KnownIssue.CUSTOM_INDEX_MAX_COLUMN_48))
 +                    continue;
 +
 +                if (type.isCollection() && !type.isFrozenCollection()) 
continue; //TODO (coverage): include non-frozen collections;  the index part 
works fine, its the select that fails... basic equality isn't allowed for map 
type... so how do you query?
 +                List<CreateIndexDDL.Indexer> allowed = 
columnSupportsIndexing(metadata, col);
 +                if (allowed.isEmpty()) continue;
 +                CreateIndexDDL.Indexer indexer = rs.pick(allowed);
 +                ReferenceExpression colExpression = Symbol.from(col);
 +                if (type.isFrozenCollection())
 +                    colExpression = new 
CreateIndexDDL.CollectionReference(CreateIndexDDL.CollectionReference.Kind.FULL,
 colExpression);
 +
 +                String name = "tbl_" + col.name;
 +                CreateIndexDDL ddl = new 
CreateIndexDDL(rs.pick(CreateIndexDDL.Version.values()),
 +                                                        indexer,
 +                                                        Optional.of(new 
Symbol(name, UTF8Type.instance)),
 +                                                        
TableReference.from(metadata),
 +                                                        
Collections.singletonList(colExpression),
 +                                                        
Collections.emptyMap());
 +                String stmt = ddl.toCQL();
 +                logger.info(stmt);
 +                cluster.schemaChange(stmt);
 +
 +                //noinspection OptionalGetWithoutIsPresent
 +                SAIUtil.waitForIndexQueryable(cluster, metadata.keyspace, 
ddl.name.get().name());
 +
 +                indexed.put(symbol, new IndexedColumn(symbol, ddl));
 +            }
 +            return indexed;
 +        }
 +
 +        public boolean hasPartitions()
 +        {
 +            return !model.isEmpty();
 +        }
 +
 +        public boolean supportTokens()
 +        {
 +            return hasPartitions();
 +        }
 +
 +        public boolean allowNonPartitionQuery()
 +        {
-             boolean result = !model.isEmpty() && !searchableColumns.isEmpty();
-             if (hasMultiNodeAllowFilteringWithLocalWritesIssue())
-             {
-                 return hasNonPkIndexedColumns() && result;
-             }
-             return result;
++            return !model.isEmpty() && !searchableColumns.isEmpty();
 +        }
 +
 +        public boolean allowNonPartitionMultiColumnQuery()
 +        {
 +            return allowNonPartitionQuery() && 
multiColumnQueryColumns().size() > 1;
 +        }
 +
 +        private List<Symbol> multiColumnQueryColumns()
 +        {
 +            List<Symbol> allowedColumns = searchableColumns;
-             if (hasMultiNodeAllowFilteringWithLocalWritesIssue())
++            if (hasMultiNodeMultiColumnAllowFilteringWithLocalWritesIssue())
 +                allowedColumns = nonPkIndexedColumns;
 +            return allowedColumns;
 +        }
 +
-         private boolean hasMultiNodeAllowFilteringWithLocalWritesIssue()
++        private boolean 
hasMultiNodeMultiColumnAllowFilteringWithLocalWritesIssue()
 +        {
-             return isMultiNode() && 
IGNORED_ISSUES.contains(KnownIssue.AF_MULTI_NODE_AND_NODE_LOCAL_WRITES);
++            return isMultiNode() && 
IGNORED_ISSUES.contains(KnownIssue.AF_MULTI_NODE_MULTI_COLUMN_AND_NODE_LOCAL_WRITES);
 +        }
 +
-         public List<Symbol> nonPkIndexedColumns;
 +        public boolean allowPartitionQuery()
 +        {
-             if (model.isEmpty() || nonPartitionColumns.isEmpty()) return 
false;
-             if (hasMultiNodeAllowFilteringWithLocalWritesIssue())
-                 return hasNonPkIndexedColumns();
-             return true;
-         }
- 
-         private boolean hasNonPkIndexedColumns()
-         {
-             nonPkIndexedColumns = nonPartitionColumns.stream()
-                                                      
.filter(indexes::containsKey)
-                                                      
.collect(Collectors.toList());
-             return !nonPkIndexedColumns.isEmpty();
++            return !(model.isEmpty() || nonPartitionColumns.isEmpty());
 +        }
 +
 +        @Override
 +        public String toString()
 +        {
 +            StringBuilder sb = new StringBuilder();
 +            sb.append("\nSetup:\n");
 +            toString(sb);
 +            indexes.values().forEach(c -> 
sb.append('\n').append(c.indexDDL.toCQL()).append(';'));
 +            return sb.toString();
 +        }
 +    }
 +
 +    public static class IndexedColumn
 +    {
 +        public final Symbol symbol;
 +        public final CreateIndexDDL indexDDL;
 +
 +        public IndexedColumn(Symbol symbol, CreateIndexDDL indexDDL)
 +        {
 +            this.symbol = symbol;
 +            this.indexDDL = indexDDL;
 +        }
 +
 +        public EnumSet<CreateIndexDDL.QueryType> supportedQueries()
 +        {
 +            return indexDDL.indexer.supportedQueries(symbol.type());
 +        }
 +    }
 +}
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
index 60646c6630,5ef92bb9ef..5905c4f8a7
--- 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
@@@ -25,10 -26,11 +25,9 @@@ import org.junit.AfterClass
  import org.junit.BeforeClass;
  import org.junit.Test;
  
 -import org.apache.cassandra.cql3.Operator;
  import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.api.ConsistencyLevel;
- import org.apache.cassandra.distributed.shared.AssertUtils;
  import org.apache.cassandra.distributed.test.TestBaseImpl;
 -import org.apache.cassandra.index.sai.plan.StorageAttachedIndexQueryPlan;
  
  import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
  import static org.apache.cassandra.distributed.api.Feature.NETWORK;
diff --cc 
test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java
index afdb52e716,0000000000..b915d1748e
mode 100644,000000..100644
--- a/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java
+++ b/test/distributed/org/apache/cassandra/fuzz/sai/SingleNodeSAITestBase.java
@@@ -1,350 -1,0 +1,354 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.fuzz.sai;
 +
 +
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Set;
++import java.util.concurrent.atomic.AtomicInteger;
 +import java.util.function.Consumer;
 +import java.util.function.Supplier;
 +
 +import com.google.common.collect.Streams;
 +import org.junit.AfterClass;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.CassandraRelevantProperties;
 +import org.apache.cassandra.distributed.Cluster;
 +import org.apache.cassandra.distributed.api.ConsistencyLevel;
 +import org.apache.cassandra.distributed.api.IInstanceConfig;
 +import org.apache.cassandra.distributed.test.TestBaseImpl;
 +import org.apache.cassandra.harry.SchemaSpec;
 +import org.apache.cassandra.harry.dsl.HistoryBuilder;
 +import org.apache.cassandra.harry.dsl.HistoryBuilderHelper;
 +import org.apache.cassandra.harry.dsl.ReplayingHistoryBuilder;
 +import org.apache.cassandra.harry.execution.InJvmDTestVisitExecutor;
 +import org.apache.cassandra.harry.gen.EntropySource;
 +import org.apache.cassandra.harry.gen.Generator;
 +import org.apache.cassandra.harry.gen.Generators;
 +import org.apache.cassandra.harry.gen.SchemaGenerators;
 +import org.apache.cassandra.index.sai.utils.IndexTermType;
 +
 +import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 +import static org.apache.cassandra.distributed.api.Feature.NETWORK;
 +import static org.apache.cassandra.harry.checker.TestHelper.withRandom;
 +import static 
org.apache.cassandra.harry.dsl.SingleOperationBuilder.IdxRelation;
 +
 +// TODO: "WITH OPTIONS = {'case_sensitive': 'false', 'normalize': 'true', 
'ascii': 'true'};",
 +public abstract class SingleNodeSAITestBase extends TestBaseImpl
 +{
 +    private static final int ITERATIONS = 5;
 +
 +    private static final int VALIDATION_SKIP = 739;
 +    private static final int QUERIES_PER_VALIDATION = 8;
 +
-     private static final int FLUSH_SKIP = 1499;
-     private static final int COMPACTION_SKIP = 1503;
-     private static final int DEFAULT_REPAIR_SKIP = 6101;
++    private static final int FLUSH_SKIP = 2217;
++    private static final int COMPACTION_SKIP = 4435;
++    private static final int DEFAULT_REPAIR_SKIP = 8869;
 +
-     private static final int OPERATIONS_PER_RUN = DEFAULT_REPAIR_SKIP * 5;
++    private static final int OPERATIONS_PER_RUN = 30_000;
 +
 +    private static final int NUM_PARTITIONS = 64;
 +    private static final int NUM_VISITED_PARTITIONS = 16;
 +    private static final int MAX_PARTITION_SIZE = 2000;
 +
 +    private static final int UNIQUE_CELL_VALUES = 5;
 +
 +    protected static final Logger logger = 
LoggerFactory.getLogger(SingleNodeSAITest.class);
 +    protected static Cluster cluster;
 +
 +    protected SingleNodeSAITestBase() {}
 +
 +    @BeforeClass
 +    public static void before() throws Throwable
 +    {
 +        init(1,
 +             // At lower fetch sizes, queries w/ hundreds or thousands of 
matches can take a very long time.
 +             defaultConfig().andThen(c -> c.set("range_request_timeout", 
"180s")
 +                                           .set("read_request_timeout", 
"180s")
 +                                           .set("write_request_timeout", 
"180s")
 +                                           .set("native_transport_timeout", 
"180s")
 +                                           .set("slow_query_log_timeout", 
"180s")
 +                                           .with(GOSSIP).with(NETWORK))
 +        );
 +    }
 +
 +    protected static void init(int nodes, Consumer<IInstanceConfig> cfg) 
throws Throwable
 +    {
 +        cluster = Cluster.build()
 +                         .withNodes(nodes)
 +                         .withConfig(cfg)
 +                         .createWithoutStarting();
 +        cluster.startup();
 +        cluster = init(cluster);
 +    }
 +    @AfterClass
 +    public static void afterClass()
 +    {
 +        cluster.close();
 +    }
 +
 +    @Before
 +    public void beforeEach()
 +    {
 +        cluster.schemaChange("DROP KEYSPACE IF EXISTS " + KEYSPACE);
 +        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf() + 
"};");
 +    }
 +
 +    protected int rf()
 +    {
 +        return 1;
 +    }
 +
 +    @Test
 +    public void simplifiedSaiTest()
 +    {
 +        withRandom(rng -> saiTest(rng,
 +                                  SchemaGenerators.trivialSchema(KEYSPACE, 
"simplified", 1000).generate(rng),
 +                                  () -> true,
 +                                  DEFAULT_REPAIR_SKIP));
 +    }
 +
 +    @Test
 +    public void indexOnlySaiTest()
 +    {
 +        for (int i = 0; i < ITERATIONS; i++)
 +        {
 +            logger.info("Starting iteration {}...", i);
 +            withRandom(rng -> saiTest(rng,
 +                                      
schemaGenerator(rng.nextBoolean()).generate(rng),
 +                                      () -> true,
 +                                      rng.nextBoolean() ? DEFAULT_REPAIR_SKIP 
: Integer.MAX_VALUE));
 +        }
 +    }
 +
 +    @Test
 +    public void mixedFilteringSaiTest()
 +    {
 +        for (int i = 0; i < ITERATIONS; i++)
 +        {
 +            logger.info("Starting iteration {}...", i);
 +            withRandom(rng -> saiTest(rng,
 +                                      
schemaGenerator(rng.nextBoolean()).generate(rng),
 +                                      () -> rng.nextFloat() < 0.7f,
 +                                      rng.nextBoolean() ? DEFAULT_REPAIR_SKIP 
: Integer.MAX_VALUE));
 +        }
 +    }
 +
 +    private void saiTest(EntropySource rng, SchemaSpec schema, 
Supplier<Boolean> createIndex, int repairSkip)
 +    {
 +        logger.info(schema.compile());
 +
 +        Generator<Integer> globalPkGen = Generators.int32(0, 
Math.min(NUM_PARTITIONS, schema.valueGenerators.pkPopulation()));
 +        Generator<Integer> ckGen = Generators.int32(0, 
schema.valueGenerators.ckPopulation());
 +
-         CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(100);
 +        beforeEach();
 +        cluster.forEach(i -> i.nodetool("disableautocompaction"));
 +
 +        cluster.schemaChange(schema.compile());
 +        cluster.schemaChange(schema.compile().replace(schema.keyspace + '.' + 
schema.table, schema.keyspace + ".debug_table"));
 +
++        AtomicInteger indexCount = new AtomicInteger();
++
 +        Streams.concat(schema.clusteringKeys.stream(), 
schema.regularColumns.stream(), schema.staticColumns.stream())
 +               .forEach(column -> {
 +                   if (createIndex.get())
 +                   {
 +                       logger.info("Adding index to column {}...", 
column.name);
 +                       cluster.schemaChange(String.format("CREATE INDEX 
%s_sai_idx ON %s.%s (%s) USING 'sai' ",
-                                column.name, schema.keyspace, schema.table, 
column.name));
++                                                          column.name, 
schema.keyspace, schema.table, column.name));
++                       indexCount.incrementAndGet();
 +                   }
 +                   else
 +                   {
 +                       logger.info("Leaving column {} unindexed...", 
column.name);
 +                   }
 +               });
 +
++        
CassandraRelevantProperties.SAI_INTERSECTION_CLAUSE_LIMIT.setInt(indexCount.get());
 +        waitForIndexesQueryable(schema);
 +
 +        HistoryBuilder history = new 
ReplayingHistoryBuilder(schema.valueGenerators,
 +                                                             (hb) -> 
InJvmDTestVisitExecutor.builder()
 +                                                                              
              .pageSizeSelector(pageSizeSelector(rng))
 +                                                                              
              .consistencyLevel(consistencyLevelSelector())
 +                                                                              
              .doubleWriting(schema, hb, cluster, "debug_table"));
 +        Set<Integer> partitions = new HashSet<>();
 +        int attempts = 0;
 +        while (partitions.size() < NUM_VISITED_PARTITIONS && attempts < 
NUM_VISITED_PARTITIONS * 10)
 +        {
 +            partitions.add(globalPkGen.generate(rng));
 +            attempts++;
 +        }
 +
 +        if (partitions.size() < NUM_VISITED_PARTITIONS)
 +            logger.warn("Unable to generate {} partitions to visit. 
Continuing with {}...", NUM_VISITED_PARTITIONS, partitions.size());
 +
 +        Generator<Integer> pkGen = Generators.pick(List.copyOf(partitions));
 +
 +        // Ensure that we don't attempt to use range queries against SAI 
indexes that don't support them:
 +        Set<Integer> eqOnlyRegularColumns = new HashSet<>();
 +        for (int i = 0; i < schema.regularColumns.size(); i++)
 +            if 
(IndexTermType.isEqOnlyType(schema.regularColumns.get(i).type.asServerType()))
 +                eqOnlyRegularColumns.add(i);
 +
 +        Set<Integer> eqOnlyStaticColumns = new HashSet<>();
 +        for (int i = 0; i < schema.staticColumns.size(); i++)
 +            if 
(IndexTermType.isEqOnlyType(schema.staticColumns.get(i).type.asServerType()))
 +                eqOnlyStaticColumns.add(i);
 +
 +        Set<Integer> eqOnlyClusteringColumns = new HashSet<>();
 +        for (int i = 0; i < schema.clusteringKeys.size(); i++)
 +            if 
(IndexTermType.isEqOnlyType(schema.clusteringKeys.get(i).type.asServerType()))
 +                eqOnlyClusteringColumns.add(i);
 +
 +        for (int i = 0; i < OPERATIONS_PER_RUN; i++)
 +        {
 +            int partitionIndex = pkGen.generate(rng);
 +            HistoryBuilderHelper.insertRandomData(schema, partitionIndex, 
ckGen.generate(rng), rng, 0.5d, history);
 +
 +            if (rng.nextFloat() > 0.99f)
 +            {
 +                int row1 = ckGen.generate(rng);
 +                int row2 = ckGen.generate(rng);
 +                history.deleteRowRange(partitionIndex,
 +                                       Math.min(row1, row2),
 +                                       Math.max(row1, row2),
 +                                       
rng.nextInt(schema.clusteringKeys.size()),
 +                                       rng.nextBoolean(),
 +                                       rng.nextBoolean());
 +            }
 +
 +            if (rng.nextFloat() > 0.995f)
 +                HistoryBuilderHelper.deleteRandomColumns(schema, 
partitionIndex, ckGen.generate(rng), rng, history);
 +
 +            if (rng.nextFloat() > 0.9995f)
 +                history.deletePartition(partitionIndex);
 +
 +            if (i % FLUSH_SKIP == 0)
 +                history.custom(() -> flush(schema), "Flush");
 +            else if (i % COMPACTION_SKIP == 0)
 +                history.custom(() -> compact(schema), "Compact");
 +            else if (i % repairSkip == 0)
 +                history.custom(() -> repair(schema), "Repair");
 +
 +            if (i > 0 && i % VALIDATION_SKIP == 0)
 +            {
 +                for (int j = 0; j < QUERIES_PER_VALIDATION; j++)
 +                {
 +                    List<IdxRelation> regularRelations = 
 +                            HistoryBuilderHelper.generateValueRelations(rng,
 +                                                                        
schema.regularColumns.size(),
 +                                                                        
column -> Math.min(schema.valueGenerators.regularPopulation(column), 
UNIQUE_CELL_VALUES),
 +                                                                        
eqOnlyRegularColumns::contains);
 +
 +                    List<IdxRelation> staticRelations = 
 +                            HistoryBuilderHelper.generateValueRelations(rng,
 +                                                                        
schema.staticColumns.size(),
 +                                                                        
column -> Math.min(schema.valueGenerators.staticPopulation(column), 
UNIQUE_CELL_VALUES),
 +                                                                        
eqOnlyStaticColumns::contains);
 +
 +                    Integer pk = pkGen.generate(rng);
 +
 +                    IdxRelation[] ckRelations = 
 +                            
HistoryBuilderHelper.generateClusteringRelations(rng,
 +                                                                             
schema.clusteringKeys.size(),
 +                                                                             
ckGen,
 +                                                                             
eqOnlyClusteringColumns).toArray(new IdxRelation[0]);
 +
 +                    IdxRelation[] regularRelationsArray = 
regularRelations.toArray(new IdxRelation[regularRelations.size()]);
 +                    IdxRelation[] staticRelationsArray = 
staticRelations.toArray(new IdxRelation[staticRelations.size()]);
 +
 +                    history.select(pk, ckRelations, regularRelationsArray, 
staticRelationsArray);
 +                }
 +            }
 +        }
 +    }
 +
 +    protected Generator<SchemaSpec> schemaGenerator(boolean disableReadRepair)
 +    {
 +        SchemaSpec.OptionsBuilder builder = 
SchemaSpec.optionsBuilder().disableReadRepair(disableReadRepair)
 +                                                                       
.compactionStrategy("LeveledCompactionStrategy");
 +        return SchemaGenerators.schemaSpecGen(KEYSPACE, "basic_sai", 
MAX_PARTITION_SIZE, builder);
 +    }
 +
 +    protected void flush(SchemaSpec schema)
 +    {
 +        cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
 +    }
 +
 +    protected void compact(SchemaSpec schema)
 +    {
 +        cluster.get(1).nodetool("compact", schema.keyspace);
 +    }
 +
 +    protected void repair(SchemaSpec schema)
 +    {
 +        // Repair is nonsensical for a single node, but a repair does flush 
first, so do that at least.
 +        cluster.get(1).nodetool("flush", schema.keyspace, schema.table);
 +    }
 +
 +    protected void waitForIndexesQueryable(SchemaSpec schema) {}
 +
 +    public static Consumer<IInstanceConfig> defaultConfig()
 +    {
 +        return (cfg) -> {
 +            cfg.set("row_cache_size", "50MiB")
 +               .set("index_summary_capacity", "50MiB")
 +               .set("counter_cache_size", "50MiB")
 +               .set("key_cache_size", "50MiB")
 +               .set("file_cache_size", "50MiB")
 +               .set("index_summary_capacity", "50MiB")
 +               .set("memtable_heap_space", "128MiB")
 +               .set("memtable_offheap_space", "128MiB")
 +               .set("memtable_flush_writers", 1)
 +               .set("concurrent_compactors", 1)
 +               .set("concurrent_reads", 5)
 +               .set("concurrent_writes", 5)
 +               .set("compaction_throughput_mb_per_sec", 10)
 +               .set("hinted_handoff_enabled", false);
 +        };
 +    }
 +
 +    protected InJvmDTestVisitExecutor.ConsistencyLevelSelector 
consistencyLevelSelector()
 +    {
 +        return visit -> {
 +            if (visit.selectOnly)
 +                return ConsistencyLevel.ALL;
 +
 +            // The goal here is to make replicas as out of date as possible, 
modulo the efforts of repair
 +            // and read-repair in the test itself.
 +            return ConsistencyLevel.NODE_LOCAL;
 +
 +        };
 +    }
 +
 +    protected InJvmDTestVisitExecutor.PageSizeSelector 
pageSizeSelector(EntropySource rng)
 +    {
 +        // Chosing a fetch size has implications for how well this test will 
excercise paging, short-read protection, and
 +        // other important parts of the distributed query apparatus. This 
should be set low enough to ensure a significant
 +        // number of queries during validation page, but not too low that 
more expesive queries time out and fail the test.
 +        return lts -> rng.nextInt(1, 20);
 +    }
 +}
diff --cc test/unit/org/apache/cassandra/cql3/KnownIssue.java
index 37ef255e54,0000000000..a1924a91f9
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/cql3/KnownIssue.java
+++ b/test/unit/org/apache/cassandra/cql3/KnownIssue.java
@@@ -1,53 -1,0 +1,53 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.cql3;
 +
 +import java.util.EnumSet;
 +
 +/**
 + * In order to have a clean CI some known issues must be excluded from some 
tests until those issues are addressed.
 + *
 + * This type exists to make it easier to descover known issues and places in 
the code that account for them
 + */
 +public enum KnownIssue
 +{
 +    
BETWEEN_START_LARGER_THAN_END("https://issues.apache.org/jira/browse/CASSANDRA-20154";,
 +                                  "BETWEEN is matching values when start > 
end, which should never return anything"),
 +    SAI_INET_MIXED("https://issues.apache.org/jira/browse/CASSANDRA-19492";,
 +                   "SAI converts ipv4 to ipv6 to simplify the index, this 
causes issues with range search as it starts to mix the values, which isn't 
always desirable or intuative"),
 +    
CUSTOM_INDEX_MAX_COLUMN_48("https://issues.apache.org/jira/browse/CASSANDRA-19897";,
 +                               "Columns can be up to 50 chars, but CREATE 
CUSTOM INDEX only allows up to 48"),
-     
AF_MULTI_NODE_AND_NODE_LOCAL_WRITES("https://issues.apache.org/jira/browse/CASSANDRA-20243";,
-                                         "When writes are done at NODE_LOCAL 
and the select is ALL, AF should be able to return the correct data but it 
doesn't"),
 +    
SHORT_AND_VARINT_GET_INT_FUNCTIONS("https://issues.apache.org/jira/browse/CASSANDRA-19874";,
 +                                       "Function inference maybe unable to 
infer the correct function or chooses one for a smaller type"),
 +    SAI_EMPTY_TYPE("ML: Meaningless emptiness and filtering",
-                    "Some types allow empty bytes, but define them as 
meaningless.  AF can be used to query them using <, <=, and =; but SAI can not")
++                   "Some types allow empty bytes, but define them as 
meaningless.  AF can be used to query them using <, <=, and =; but SAI can 
not"),
++    
AF_MULTI_NODE_MULTI_COLUMN_AND_NODE_LOCAL_WRITES("https://issues.apache.org/jira/browse/CASSANDRA-19007";,
++                                                     "When doing multi 
node/multi column queries, AF can miss data when the nodes are not in-sync"),
 +    ;
 +
 +    KnownIssue(String url, String description)
 +    {
 +        // don't actually care to save the values, just there to force 
documentation
 +    }
 +
 +    public static EnumSet<KnownIssue> ignoreAll()
 +    {
 +        return EnumSet.allOf(KnownIssue.class);
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to