This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new be507c6e99 Warn clients about possible consistency violations for 
filtering queries against multiple mutable columns
be507c6e99 is described below

commit be507c6e996078011c08e36b09d9f34faa454973
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Mon Apr 1 14:18:05 2024 -0500

    Warn clients about possible consistency violations for filtering queries 
against multiple mutable columns
    
    patch by Caleb Rackliffe; reviewed by David Capwell and Berenguer Blasi for 
CASSANDRA-19489
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../apache/cassandra/config/GuardrailsOptions.java |  29 +++++
 .../org/apache/cassandra/cql3/QueryProcessor.java  |   2 +-
 .../apache/cassandra/cql3/UntypedResultSet.java    |   4 +-
 .../cql3/restrictions/StatementRestrictions.java   |   3 +-
 .../cassandra/cql3/statements/SelectStatement.java |  45 ++++----
 .../org/apache/cassandra/db/filter/RowFilter.java  |  10 +-
 .../apache/cassandra/db/guardrails/Guardrails.java |  39 +++++++
 .../cassandra/db/guardrails/GuardrailsConfig.java  |  16 +++
 .../cassandra/db/guardrails/GuardrailsMBean.java   |  16 +++
 .../GuardrailNonPartitionRestrictedQueryTest.java  |  37 ++-----
 .../test/guardrails/GuardrailTester.java           |  41 ++++++-
 .../guardrails/IntersectFilteringQueryTest.java    | 120 +++++++++++++++++++++
 14 files changed, 311 insertions(+), 54 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index e879141be7..cb3e2676cb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0-beta2
+ * Warn clients about possible consistency violations for filtering queries 
against multiple mutable columns (CASSANDRA-19489)
  * Align buffer with commitlog segment size (CASSANDRA-19471)
  * Ensure SAI indexes empty byte buffers for types that allow them as a valid 
input (CASSANDRA-19461)
  * Deprecate Python 3.7 and earlier, but allow cqlsh to run with Python 
3.6-3.11 (CASSANDRA-19467)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index d97f35d759..aa0f3ee476 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -905,6 +905,8 @@ public class Config
     public volatile boolean non_partition_restricted_index_query_enabled = 
true;
     public volatile int sai_sstable_indexes_per_query_warn_threshold = 32;
     public volatile int sai_sstable_indexes_per_query_fail_threshold = -1;
+    public volatile boolean intersect_filtering_query_warned = true;
+    public volatile boolean intersect_filtering_query_enabled = true;
 
     public volatile DurationSpec.LongNanosecondsBound streaming_state_expires 
= new DurationSpec.LongNanosecondsBound("3d");
     public volatile DataStorageSpec.LongBytesBound streaming_state_size = new 
DataStorageSpec.LongBytesBound("40MiB");
diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java 
b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
index 2c36a2a03d..8d7bd52197 100644
--- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java
+++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java
@@ -846,6 +846,35 @@ public class GuardrailsOptions implements GuardrailsConfig
                                   x -> config.zero_ttl_on_twcs_enabled = x);
     }
 
+    @Override
+    public boolean getIntersectFilteringQueryWarned()
+    {
+        return config.intersect_filtering_query_warned;
+    }
+
+    @Override
+    public void setIntersectFilteringQueryWarned(boolean value)
+    {
+        updatePropertyWithLogging("intersect_filtering_query_warned",
+                                  value,
+                                  () -> 
config.intersect_filtering_query_warned,
+                                  x -> config.intersect_filtering_query_warned 
= x);
+    }
+
+    @Override
+    public boolean getIntersectFilteringQueryEnabled()
+    {
+        return config.intersect_filtering_query_enabled;
+    }
+
+    public void setIntersectFilteringQueryEnabled(boolean value)
+    {
+        updatePropertyWithLogging("intersect_filtering_query_enabled",
+                                  value,
+                                  () -> 
config.intersect_filtering_query_enabled,
+                                  x -> 
config.intersect_filtering_query_enabled = x);
+    }
+
     @Override
     public  DurationSpec.LongMicrosecondsBound 
getMaximumTimestampWarnThreshold()
     {
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index fba424d7d9..15cfaa6c0f 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -621,7 +621,7 @@ public class QueryProcessor implements QueryHandler
         try (PartitionIterator iter = partitions)
         {
             SelectStatement ss = (SelectStatement) getStatement(query, null);
-            ResultSet cqlRows = ss.process(iter, FBUtilities.nowInSeconds(), 
true);
+            ResultSet cqlRows = ss.process(iter, FBUtilities.nowInSeconds(), 
true, ClientState.forInternalCalls());
             return UntypedResultSet.create(cqlRows);
         }
     }
diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java 
b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
index 347e2e29b6..92a33c9e02 100644
--- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
+++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java
@@ -228,7 +228,7 @@ public abstract class UntypedResultSet implements 
Iterable<UntypedResultSet.Row>
                         try (ReadExecutionController executionController = 
pager.executionController();
                              PartitionIterator iter = 
pager.fetchPageInternal(pageSize, executionController))
                         {
-                            currentPage = select.process(iter, nowInSec, 
true).rows.iterator();
+                            currentPage = select.process(iter, nowInSec, true, 
ClientState.forInternalCalls()).rows.iterator();
                         }
                     }
                     return new Row(metadata, currentPage.next());
@@ -293,7 +293,7 @@ public abstract class UntypedResultSet implements 
Iterable<UntypedResultSet.Row>
 
                         try (PartitionIterator iter = 
pager.fetchPage(pageSize, cl, clientState, nanoTime()))
                         {
-                            currentPage = select.process(iter, nowInSec, 
true).rows.iterator();
+                            currentPage = select.process(iter, nowInSec, true, 
clientState).rows.iterator();
                         }
                     }
                     return new Row(metadata, currentPage.next());
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 088ac2c94d..4f6b829191 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -780,7 +780,8 @@ public final class StatementRestrictions
             return RowFilter.none();
 
         // If there is only one replica, we don't need reconciliation at any 
consistency level. 
-        boolean needsReconciliation = 
options.getConsistency().needsReconciliation()
+        boolean needsReconciliation = !table.isVirtual() 
+                                      && 
options.getConsistency().needsReconciliation()
                                       && 
Keyspace.open(table.keyspace).getReplicationStrategy().getReplicationFactor().allReplicas
 > 1;
 
         RowFilter filter = RowFilter.create(needsReconciliation);
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index f934232d23..5418159ad6 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -423,7 +423,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
     {
         try (PartitionIterator data = query.execute(options.getConsistency(), 
state, queryStartNanoTime))
         {
-            return processResults(data, options, selectors, nowInSec, 
userLimit, aggregationSpec, unmask);
+            return processResults(data, options, selectors, nowInSec, 
userLimit, aggregationSpec, unmask, state);
         }
     }
 
@@ -538,7 +538,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
         ResultMessage.Rows msg;
         try (PartitionIterator page = pager.fetchPage(pageSize, 
queryStartNanoTime))
         {
-            msg = processResults(page, options, selectors, nowInSec, 
userLimit, aggregationSpec, unmask);
+            msg = processResults(page, options, selectors, nowInSec, 
userLimit, aggregationSpec, unmask, state.getClientState());
         }
 
         // Please note that the isExhausted state of the pager only gets 
updated when we've closed the page, so this
@@ -561,9 +561,10 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
                                               long nowInSec,
                                               int userLimit,
                                               AggregationSpecification 
aggregationSpec,
-                                              boolean unmask) throws 
RequestValidationException
+                                              boolean unmask,
+                                              ClientState state) throws 
RequestValidationException
     {
-        ResultSet rset = process(partitions, options, selectors, nowInSec, 
userLimit, aggregationSpec, unmask);
+        ResultSet rset = process(partitions, options, selectors, nowInSec, 
userLimit, aggregationSpec, unmask, state);
         return new ResultMessage.Rows(rset);
     }
 
@@ -599,7 +600,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
             {
                 try (PartitionIterator data = 
query.executeInternal(executionController))
                 {
-                    return processResults(data, options, selectors, nowInSec, 
userLimit, null, unmask);
+                    return processResults(data, options, selectors, nowInSec, 
userLimit, null, unmask, state.getClientState());
                 }
             }
 
@@ -672,11 +673,11 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
         }
     }
 
-    public ResultSet process(PartitionIterator partitions, long nowInSec, 
boolean unmask) throws InvalidRequestException
+    public ResultSet process(PartitionIterator partitions, long nowInSec, 
boolean unmask, ClientState state) throws InvalidRequestException
     {
         QueryOptions options = QueryOptions.DEFAULT;
         Selectors selectors = selection.newSelectors(options);
-        return process(partitions, options, selectors, nowInSec, 
getLimit(options), getAggregationSpec(options), unmask);
+        return process(partitions, options, selectors, nowInSec, 
getLimit(options), getAggregationSpec(options), unmask, state);
     }
 
     @Override
@@ -722,7 +723,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
         if (filter == null || filter.isEmpty(table.comparator))
             return ReadQuery.empty(table);
 
-        RowFilter rowFilter = getRowFilter(options);
+        RowFilter rowFilter = getRowFilter(options, state);
 
         List<DecoratedKey> decoratedKeys = new ArrayList<>(keys.size());
         for (ByteBuffer key : keys)
@@ -767,7 +768,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
         ClientState state = ClientState.forInternalCalls();
         ColumnFilter columnFilter = 
selection.newSelectors(options).getColumnFilter();
         ClusteringIndexFilter filter = makeClusteringIndexFilter(options, 
state, columnFilter);
-        RowFilter rowFilter = getRowFilter(options);
+        RowFilter rowFilter = getRowFilter(options, state);
         return SinglePartitionReadCommand.create(table, nowInSec, 
columnFilter, rowFilter, DataLimits.NONE, key, filter);
     }
 
@@ -776,7 +777,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
      */
     public RowFilter rowFilterForInternalCalls()
     {
-        return 
getRowFilter(QueryOptions.forInternalCalls(Collections.emptyList()));
+        return 
getRowFilter(QueryOptions.forInternalCalls(Collections.emptyList()), 
ClientState.forInternalCalls());
     }
 
     private ReadQuery getRangeCommand(QueryOptions options, ClientState state, 
ColumnFilter columnFilter, DataLimits limit, long nowInSec)
@@ -785,7 +786,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
         if (clusteringIndexFilter == null)
             return ReadQuery.empty(table);
 
-        RowFilter rowFilter = getRowFilter(options);
+        RowFilter rowFilter = getRowFilter(options, state);
 
         // The LIMIT provided by the user is the number of CQL row he wants 
returned.
         // We want to have getRangeSlice to count the number of columns, not 
the number of keys.
@@ -973,10 +974,15 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
     /**
      * May be used by custom QueryHandler implementations
      */
-    public RowFilter getRowFilter(QueryOptions options) throws 
InvalidRequestException
+    public RowFilter getRowFilter(QueryOptions options, ClientState state) 
throws InvalidRequestException
     {
         IndexRegistry indexRegistry = IndexRegistry.obtain(table);
-        return restrictions.getRowFilter(indexRegistry, options);
+        RowFilter filter = restrictions.getRowFilter(indexRegistry, options);
+
+        if (filter.needsReconciliation() && filter.isMutableIntersection() && 
restrictions.needFiltering(table))
+            Guardrails.intersectFilteringQueryEnabled.ensureEnabled(state);
+
+        return filter;
     }
 
     private ResultSet process(PartitionIterator partitions,
@@ -985,7 +991,8 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
                               long nowInSec,
                               int userLimit,
                               AggregationSpecification aggregationSpec,
-                              boolean unmask) throws InvalidRequestException
+                              boolean unmask,
+                              ClientState state) throws InvalidRequestException
     {
         GroupMaker groupMaker = aggregationSpec == null ? null : 
aggregationSpec.newGroupMaker();
         ResultSetBuilder result = new ResultSetBuilder(getResultMetadata(), 
selectors, unmask, groupMaker);
@@ -1001,7 +1008,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
         ResultSet cqlRows = result.build();
         maybeWarn(result, options);
 
-        orderResults(cqlRows, options);
+        orderResults(cqlRows, options, state);
 
         cqlRows.trim(userLimit);
 
@@ -1156,12 +1163,12 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
      * <p>
      * In the case of ANN ordering the rows are first ordered in index column 
order and then by primary key.
      */
-    private void orderResults(ResultSet cqlRows, QueryOptions options)
+    private void orderResults(ResultSet cqlRows, QueryOptions options, 
ClientState state)
     {
         if (cqlRows.size() == 0 || !needsPostQueryOrdering())
             return;
 
-        Comparator<List<ByteBuffer>> comparator = 
orderingComparator.prepareFor(table, getRowFilter(options), options);
+        Comparator<List<ByteBuffer>> comparator = 
orderingComparator.prepareFor(table, getRowFilter(options, state), options);
         if (comparator != null)
             cqlRows.rows.sort(comparator);
     }
@@ -1832,7 +1839,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
             if (clusteringIndexFilter == null)
                 return "EMPTY";
 
-            RowFilter rowFilter = getRowFilter(options);
+            RowFilter rowFilter = getRowFilter(options, state);
 
             // The LIMIT provided by the user is the number of CQL row he 
wants returned.
             // We want to have getRangeSlice to count the number of columns, 
not the number of keys.
@@ -1898,7 +1905,7 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
                 sb.append(')');
             }
 
-            RowFilter rowFilter = getRowFilter(options);
+            RowFilter rowFilter = getRowFilter(options, state);
             if (!rowFilter.isEmpty())
                 sb.append(" AND ").append(rowFilter);
 
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 588db7fb46..beb3edfcd7 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -165,7 +165,15 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
      */
     public boolean isStrict()
     {
-        return !needsReconciliation || expressions.stream().filter(e -> 
!e.column.isPrimaryKeyColumn()).count() <= 1;
+        return !needsReconciliation || !isMutableIntersection();
+    }
+
+    /**
+     * @return true if this filter contains an intersection on two or more 
mutable columns
+     */
+    public boolean isMutableIntersection()
+    {
+        return expressions.stream().filter(e -> 
!e.column.isPrimaryKeyColumn()).count() > 1;
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java 
b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
index 971c635c67..01157bf345 100644
--- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
+++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java
@@ -216,6 +216,21 @@ public final class Guardrails implements GuardrailsMBean
                    state -> 
CONFIG_PROVIDER.getOrCreate(state).getZeroTTLOnTWCSEnabled(),
                    "0 default_time_to_live on a table with " + 
TimeWindowCompactionStrategy.class.getSimpleName() + " compaction strategy");
 
+    /**
+     * Guardrail to warn on or fail filtering queries that contain 
intersections on mutable columns at consistency
+     * levels that require coordinator reconciliation.
+     * 
+     * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-19007";>CASSANDRA-19007</a>
+     */
+    public static final EnableFlag intersectFilteringQueryEnabled =
+            new EnableFlag("intersect_filtering_query",
+                           "Filtering queries involving an intersection on 
multiple mutable (i.e. non-key) columns " +
+                           "over unrepaired data at read consistency levels 
that would require coordinator " +
+                           "reconciliation may violate the guarantees of those 
consistency levels.",
+                           state -> 
CONFIG_PROVIDER.getOrCreate(state).getIntersectFilteringQueryWarned(),
+                           state -> 
CONFIG_PROVIDER.getOrCreate(state).getIntersectFilteringQueryEnabled(),
+                           "Filtering query with intersection on mutable 
columns at consistency level requiring coordinator reconciliation");
+
     /**
      * Guardrail on the number of elements returned within page.
      */
@@ -1245,6 +1260,30 @@ public final class Guardrails implements GuardrailsMBean
         DEFAULT_CONFIG.setNonPartitionRestrictedQueryEnabled(enabled);
     }
 
+    @Override
+    public boolean getIntersectFilteringQueryWarned()
+    {
+        return DEFAULT_CONFIG.getIntersectFilteringQueryWarned();
+    }
+
+    @Override
+    public void setIntersectFilteringQueryWarned(boolean value)
+    {
+        DEFAULT_CONFIG.setIntersectFilteringQueryWarned(value);
+    }
+
+    @Override
+    public boolean getIntersectFilteringQueryEnabled()
+    {
+        return DEFAULT_CONFIG.getIntersectFilteringQueryEnabled();
+    }
+
+    @Override
+    public void setIntersectFilteringQueryEnabled(boolean value)
+    {
+        DEFAULT_CONFIG.setIntersectFilteringQueryEnabled(value);
+    }
+
     private static String toCSV(Set<String> values)
     {
         return values == null || values.isEmpty() ? "" : String.join(",", 
values);
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
index 00cc853589..5c6880fde5 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java
@@ -384,6 +384,22 @@ public interface GuardrailsConfig
      */
     void setZeroTTLOnTWCSEnabled(boolean value);
 
+    /**
+     * @return true if a client warning is emitted for a filtering query with 
an intersection on mutable columns at a 
+     *         consistency level requiring coordinator reconciliation
+     */
+    boolean getIntersectFilteringQueryWarned();
+
+    void setIntersectFilteringQueryWarned(boolean value);
+
+    /**
+     * @return true if it is possible to execute a filtering query with an 
intersection on mutable columns at a 
+     *         consistency level requiring coordinator reconciliation
+     */
+    boolean getIntersectFilteringQueryEnabled();
+
+    void setIntersectFilteringQueryEnabled(boolean value);
+
     /**
      * @return A timestamp that if a user supplied timestamp is after will 
trigger a warning
      */
diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java 
b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
index e4c2774a0a..89fe7a70ba 100644
--- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
+++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java
@@ -804,4 +804,20 @@ public interface GuardrailsMBean
      * @param enabled {@code true} if a query without partition key is enabled 
or not
      */
     void setNonPartitionRestrictedQueryEnabled(boolean enabled);
+
+    /**
+     * @return true if a client warning is emitted for a filtering query with 
an intersection on mutable columns at a 
+     *         consistency level requiring coordinator reconciliation
+     */
+    boolean getIntersectFilteringQueryWarned();
+    
+    void setIntersectFilteringQueryWarned(boolean value);
+
+    /**
+     * @return true if it is possible to execute a filtering query with an 
intersection on mutable columns at a 
+     *         consistency level requiring coordinator reconciliation
+     */
+    boolean getIntersectFilteringQueryEnabled();
+    
+    void setIntersectFilteringQueryEnabled(boolean value);
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailNonPartitionRestrictedQueryTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailNonPartitionRestrictedQueryTest.java
index 977527786f..24d13fd025 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailNonPartitionRestrictedQueryTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailNonPartitionRestrictedQueryTest.java
@@ -30,9 +30,7 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.SimpleStatement;
 import com.datastax.driver.core.exceptions.InvalidQueryException;
 import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -43,7 +41,6 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
-import org.apache.cassandra.distributed.util.Auth;
 import 
org.apache.cassandra.exceptions.QueryReferencesTooManyIndexesAbortException;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.index.Index;
@@ -62,6 +59,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@SuppressWarnings("Convert2MethodRef")
 public class GuardrailNonPartitionRestrictedQueryTest extends GuardrailTester
 {
     private static Cluster cluster;
@@ -78,18 +76,7 @@ public class GuardrailNonPartitionRestrictedQueryTest 
extends GuardrailTester
                          .withDataDirCount(1)
                          .start();
 
-        Auth.waitForExistingRoles(cluster.get(1));
-
-        // create a regular user, since the default superuser is excluded from 
guardrails
-        com.datastax.driver.core.Cluster.Builder builder = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
-        try (com.datastax.driver.core.Cluster c = 
builder.withCredentials("cassandra", "cassandra").build();
-             Session session = c.connect())
-        {
-            session.execute("CREATE USER test WITH PASSWORD 'test'");
-        }
-
-        // connect using that superuser, we use the driver to get access to 
the client warnings
-        driverCluster = builder.withCredentials("test", "test").build();
+        driverCluster = buildDriverCluster(cluster);
     }
 
     @AfterClass
@@ -124,6 +111,12 @@ public class GuardrailNonPartitionRestrictedQueryTest 
extends GuardrailTester
         return cluster;
     }
 
+    @Override
+    protected Session getSession()
+    {
+        return driverSession;
+    }
+
     @Test
     public void testGuardrailForLegacy2i()
     {
@@ -305,20 +298,6 @@ public class GuardrailNonPartitionRestrictedQueryTest 
extends GuardrailTester
         assertThat(totalAborts()).as("aborts").isEqualTo(aborts);
     }
 
-    /**
-     * Execution of statements via driver will not bypass guardrails as 
internal queries would do as they are
-     * done by superuser / they do not have any notion of roles
-     *
-     * @return list of warnings
-     */
-    private List<String> executeViaDriver(String query)
-    {
-        SimpleStatement stmt = new SimpleStatement(query);
-        
stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM);
-        ResultSet resultSet = driverSession.execute(stmt);
-        return resultSet.getExecutionInfo().getWarnings();
-    }
-
     private List<String> executeSelect(long valueToQuery, boolean expectToFail)
     {
         return 
cluster.get(1).applyOnInstance((IIsolatedExecutor.SerializableTriFunction<String,
 String, Long, List<String>>) (keyspace, table, v1) -> {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
index b12d9abbb2..376f6a310b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java
@@ -21,6 +21,10 @@ package org.apache.cassandra.distributed.test.guardrails;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import org.apache.cassandra.distributed.util.Auth;
 import org.junit.After;
 import org.junit.Before;
 
@@ -41,11 +45,16 @@ public abstract class GuardrailTester extends TestBaseImpl
 
     protected abstract Cluster getCluster();
 
+    protected Session getSession()
+    {
+        return null;
+    }
+
     @Before
     public void beforeTest()
     {
         tableName = "t_" + seqNumber.getAndIncrement();
-        qualifiedTableName = KEYSPACE + "." + tableName;
+        qualifiedTableName = KEYSPACE + '.' + tableName;
     }
 
     @After
@@ -54,6 +63,36 @@ public abstract class GuardrailTester extends TestBaseImpl
         schemaChange("DROP TABLE IF EXISTS %s");
     }
 
+    protected static com.datastax.driver.core.Cluster 
buildDriverCluster(Cluster cluster)
+    {
+        Auth.waitForExistingRoles(cluster.get(1));
+
+        // create a regular user, since the default superuser is excluded from 
guardrails
+        com.datastax.driver.core.Cluster.Builder builder = 
com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1");
+        try (com.datastax.driver.core.Cluster c = 
builder.withCredentials("cassandra", "cassandra").build();
+             Session session = c.connect())
+        {
+            session.execute("CREATE USER test WITH PASSWORD 'test'");
+        }
+
+        // connect using that superuser, we use the driver to get access to 
the client warnings
+        return builder.withCredentials("test", "test").build();
+    }
+
+    /**
+     * Execution of statements via driver will not bypass guardrails as 
internal queries would do as they are
+     * done by superuser / they do not have any notion of roles
+     *
+     * @return list of warnings
+     */
+    protected List<String> executeViaDriver(String query)
+    {
+        SimpleStatement stmt = new SimpleStatement(query);
+        
stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM);
+        ResultSet resultSet = getSession().execute(stmt);
+        return resultSet.getExecutionInfo().getWarnings();
+    }
+
     protected String format(String query)
     {
         return String.format(query, qualifiedTableName);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/guardrails/IntersectFilteringQueryTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/IntersectFilteringQueryTest.java
new file mode 100644
index 0000000000..aa6f8876e6
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/guardrails/IntersectFilteringQueryTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.guardrails;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
+import org.apache.cassandra.db.guardrails.Guardrails;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+public class IntersectFilteringQueryTest extends GuardrailTester
+{
+    private static Cluster cluster;
+    private static com.datastax.driver.core.Cluster driverCluster;
+    private static Session driverSession;
+
+    @BeforeClass
+    public static void setupCluster() throws IOException
+    {
+        cluster = init(Cluster.build(2).withConfig(c -> c.with(Feature.GOSSIP, 
Feature.NATIVE_PROTOCOL)
+                                                         
.set("read_thresholds_enabled", "true")
+                                                         .set("authenticator", 
"PasswordAuthenticator")).start());
+
+        driverCluster = buildDriverCluster(cluster);
+        driverSession = driverCluster.connect();
+    }
+
+    @AfterClass
+    public static void teardownCluster()
+    {
+        if (driverSession != null)
+            driverSession.close();
+
+        if (driverCluster != null)
+            driverCluster.close();
+
+        if (cluster != null)
+            cluster.close();
+    }
+
+    @Override
+    protected Cluster getCluster()
+    {
+        return cluster;
+    }
+
+    @Override
+    protected Session getSession()
+    {
+        return driverSession;
+    }
+
+    @Test
+    @SuppressWarnings({"DataFlowIssue", "ResultOfMethodCallIgnored"})
+    public void shouldWarnOnFilteringQuery()
+    {
+        cluster.forEach(instance -> 
instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> 
Guardrails.instance.setIntersectFilteringQueryWarned(true)));
+        cluster.forEach(instance -> 
instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> 
Guardrails.instance.setIntersectFilteringQueryEnabled(true)));
+        
+        schemaChange("CREATE TABLE IF NOT EXISTS %s (k bigint, c bigint, v1 
bigint, v2 bigint, PRIMARY KEY (k, c))");
+        List<String> globalWarnings = executeViaDriver(format("SELECT * FROM 
%s WHERE v1 = 0 AND v2 = 0 ALLOW FILTERING"));
+        assertThat(globalWarnings).satisfiesOnlyOnce(s -> 
s.contains(Guardrails.intersectFilteringQueryEnabled.reason));
+        List<String> partitionWarnings = executeViaDriver(format("SELECT * 
FROM %s WHERE k = 0 AND v1 = 0 AND v2 = 0 ALLOW FILTERING"));
+        assertThat(partitionWarnings).satisfiesOnlyOnce(s -> 
s.contains(Guardrails.intersectFilteringQueryEnabled.reason));
+    }
+
+    @Test
+    public void shouldFailOnFilteringQuery()
+    {
+        cluster.forEach(instance -> 
instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> 
Guardrails.instance.setIntersectFilteringQueryEnabled(false)));
+
+        schemaChange("CREATE TABLE IF NOT EXISTS %s (k bigint, c bigint, v1 
bigint, v2 bigint, PRIMARY KEY (k, c))");
+
+        assertThatThrownBy(() -> executeViaDriver(format("SELECT * FROM %s 
WHERE k = 0 AND v1 = 0 AND v2 = 0 ALLOW FILTERING")))
+                .isInstanceOf(InvalidQueryException.class)
+                
.hasMessageContaining(Guardrails.intersectFilteringQueryEnabled.reason);
+    }
+
+    @Test
+    public void shouldNotWarnOrFailOnIndexQuery()
+    {
+        cluster.forEach(instance -> 
instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> 
Guardrails.instance.setIntersectFilteringQueryWarned(true)));
+        cluster.forEach(instance -> 
instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> 
Guardrails.instance.setIntersectFilteringQueryEnabled(false)));
+        
+        schemaChange("CREATE TABLE %s (k bigint, c bigint, v1 bigint, v2 
bigint, PRIMARY KEY (k, c))");
+        schemaChange("CREATE INDEX ON %s(v1) USING 'sai'");
+        schemaChange("CREATE INDEX ON %s(v2) USING 'sai'");
+        List<String> globalWarnings = executeViaDriver(format("SELECT * FROM 
%s WHERE v1 = 0 AND v2 = 0"));
+        assertThat(globalWarnings).isEmpty();
+        List<String> partitionWarnings = executeViaDriver(format("SELECT * 
FROM %s WHERE k = 0 AND v1 = 0 AND v2 = 0"));
+        assertThat(partitionWarnings).isEmpty();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org


Reply via email to