This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cassandra-5.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push: new be507c6e99 Warn clients about possible consistency violations for filtering queries against multiple mutable columns be507c6e99 is described below commit be507c6e996078011c08e36b09d9f34faa454973 Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Mon Apr 1 14:18:05 2024 -0500 Warn clients about possible consistency violations for filtering queries against multiple mutable columns patch by Caleb Rackliffe; reviewed by David Capwell and Berenguer Blasi for CASSANDRA-19489 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/config/Config.java | 2 + .../apache/cassandra/config/GuardrailsOptions.java | 29 +++++ .../org/apache/cassandra/cql3/QueryProcessor.java | 2 +- .../apache/cassandra/cql3/UntypedResultSet.java | 4 +- .../cql3/restrictions/StatementRestrictions.java | 3 +- .../cassandra/cql3/statements/SelectStatement.java | 45 ++++---- .../org/apache/cassandra/db/filter/RowFilter.java | 10 +- .../apache/cassandra/db/guardrails/Guardrails.java | 39 +++++++ .../cassandra/db/guardrails/GuardrailsConfig.java | 16 +++ .../cassandra/db/guardrails/GuardrailsMBean.java | 16 +++ .../GuardrailNonPartitionRestrictedQueryTest.java | 37 ++----- .../test/guardrails/GuardrailTester.java | 41 ++++++- .../guardrails/IntersectFilteringQueryTest.java | 120 +++++++++++++++++++++ 14 files changed, 311 insertions(+), 54 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index e879141be7..cb3e2676cb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0-beta2 + * Warn clients about possible consistency violations for filtering queries against multiple mutable columns (CASSANDRA-19489) * Align buffer with commitlog segment size (CASSANDRA-19471) * Ensure SAI indexes empty byte buffers for types that allow them as a valid input (CASSANDRA-19461) * Deprecate Python 3.7 and earlier, but allow cqlsh to run with Python 3.6-3.11 (CASSANDRA-19467) diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index d97f35d759..aa0f3ee476 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -905,6 +905,8 @@ public class Config public volatile boolean non_partition_restricted_index_query_enabled = true; public volatile int sai_sstable_indexes_per_query_warn_threshold = 32; public volatile int sai_sstable_indexes_per_query_fail_threshold = -1; + public volatile boolean intersect_filtering_query_warned = true; + public volatile boolean intersect_filtering_query_enabled = true; public volatile DurationSpec.LongNanosecondsBound streaming_state_expires = new DurationSpec.LongNanosecondsBound("3d"); public volatile DataStorageSpec.LongBytesBound streaming_state_size = new DataStorageSpec.LongBytesBound("40MiB"); diff --git a/src/java/org/apache/cassandra/config/GuardrailsOptions.java b/src/java/org/apache/cassandra/config/GuardrailsOptions.java index 2c36a2a03d..8d7bd52197 100644 --- a/src/java/org/apache/cassandra/config/GuardrailsOptions.java +++ b/src/java/org/apache/cassandra/config/GuardrailsOptions.java @@ -846,6 +846,35 @@ public class GuardrailsOptions implements GuardrailsConfig x -> config.zero_ttl_on_twcs_enabled = x); } + @Override + public boolean getIntersectFilteringQueryWarned() + { + return config.intersect_filtering_query_warned; + } + + @Override + public void setIntersectFilteringQueryWarned(boolean value) + { + updatePropertyWithLogging("intersect_filtering_query_warned", + value, + () -> config.intersect_filtering_query_warned, + x -> config.intersect_filtering_query_warned = x); + } + + @Override + public boolean getIntersectFilteringQueryEnabled() + { + return config.intersect_filtering_query_enabled; + } + + public void setIntersectFilteringQueryEnabled(boolean value) + { + updatePropertyWithLogging("intersect_filtering_query_enabled", + value, + () -> config.intersect_filtering_query_enabled, + x -> config.intersect_filtering_query_enabled = x); + } + @Override public DurationSpec.LongMicrosecondsBound getMaximumTimestampWarnThreshold() { diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index fba424d7d9..15cfaa6c0f 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -621,7 +621,7 @@ public class QueryProcessor implements QueryHandler try (PartitionIterator iter = partitions) { SelectStatement ss = (SelectStatement) getStatement(query, null); - ResultSet cqlRows = ss.process(iter, FBUtilities.nowInSeconds(), true); + ResultSet cqlRows = ss.process(iter, FBUtilities.nowInSeconds(), true, ClientState.forInternalCalls()); return UntypedResultSet.create(cqlRows); } } diff --git a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java index 347e2e29b6..92a33c9e02 100644 --- a/src/java/org/apache/cassandra/cql3/UntypedResultSet.java +++ b/src/java/org/apache/cassandra/cql3/UntypedResultSet.java @@ -228,7 +228,7 @@ public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row> try (ReadExecutionController executionController = pager.executionController(); PartitionIterator iter = pager.fetchPageInternal(pageSize, executionController)) { - currentPage = select.process(iter, nowInSec, true).rows.iterator(); + currentPage = select.process(iter, nowInSec, true, ClientState.forInternalCalls()).rows.iterator(); } } return new Row(metadata, currentPage.next()); @@ -293,7 +293,7 @@ public abstract class UntypedResultSet implements Iterable<UntypedResultSet.Row> try (PartitionIterator iter = pager.fetchPage(pageSize, cl, clientState, nanoTime())) { - currentPage = select.process(iter, nowInSec, true).rows.iterator(); + currentPage = select.process(iter, nowInSec, true, clientState).rows.iterator(); } } return new Row(metadata, currentPage.next()); diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index 088ac2c94d..4f6b829191 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@ -780,7 +780,8 @@ public final class StatementRestrictions return RowFilter.none(); // If there is only one replica, we don't need reconciliation at any consistency level. - boolean needsReconciliation = options.getConsistency().needsReconciliation() + boolean needsReconciliation = !table.isVirtual() + && options.getConsistency().needsReconciliation() && Keyspace.open(table.keyspace).getReplicationStrategy().getReplicationFactor().allReplicas > 1; RowFilter filter = RowFilter.create(needsReconciliation); diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index f934232d23..5418159ad6 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -423,7 +423,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement { try (PartitionIterator data = query.execute(options.getConsistency(), state, queryStartNanoTime)) { - return processResults(data, options, selectors, nowInSec, userLimit, aggregationSpec, unmask); + return processResults(data, options, selectors, nowInSec, userLimit, aggregationSpec, unmask, state); } } @@ -538,7 +538,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement ResultMessage.Rows msg; try (PartitionIterator page = pager.fetchPage(pageSize, queryStartNanoTime)) { - msg = processResults(page, options, selectors, nowInSec, userLimit, aggregationSpec, unmask); + msg = processResults(page, options, selectors, nowInSec, userLimit, aggregationSpec, unmask, state.getClientState()); } // Please note that the isExhausted state of the pager only gets updated when we've closed the page, so this @@ -561,9 +561,10 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement long nowInSec, int userLimit, AggregationSpecification aggregationSpec, - boolean unmask) throws RequestValidationException + boolean unmask, + ClientState state) throws RequestValidationException { - ResultSet rset = process(partitions, options, selectors, nowInSec, userLimit, aggregationSpec, unmask); + ResultSet rset = process(partitions, options, selectors, nowInSec, userLimit, aggregationSpec, unmask, state); return new ResultMessage.Rows(rset); } @@ -599,7 +600,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement { try (PartitionIterator data = query.executeInternal(executionController)) { - return processResults(data, options, selectors, nowInSec, userLimit, null, unmask); + return processResults(data, options, selectors, nowInSec, userLimit, null, unmask, state.getClientState()); } } @@ -672,11 +673,11 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement } } - public ResultSet process(PartitionIterator partitions, long nowInSec, boolean unmask) throws InvalidRequestException + public ResultSet process(PartitionIterator partitions, long nowInSec, boolean unmask, ClientState state) throws InvalidRequestException { QueryOptions options = QueryOptions.DEFAULT; Selectors selectors = selection.newSelectors(options); - return process(partitions, options, selectors, nowInSec, getLimit(options), getAggregationSpec(options), unmask); + return process(partitions, options, selectors, nowInSec, getLimit(options), getAggregationSpec(options), unmask, state); } @Override @@ -722,7 +723,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement if (filter == null || filter.isEmpty(table.comparator)) return ReadQuery.empty(table); - RowFilter rowFilter = getRowFilter(options); + RowFilter rowFilter = getRowFilter(options, state); List<DecoratedKey> decoratedKeys = new ArrayList<>(keys.size()); for (ByteBuffer key : keys) @@ -767,7 +768,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement ClientState state = ClientState.forInternalCalls(); ColumnFilter columnFilter = selection.newSelectors(options).getColumnFilter(); ClusteringIndexFilter filter = makeClusteringIndexFilter(options, state, columnFilter); - RowFilter rowFilter = getRowFilter(options); + RowFilter rowFilter = getRowFilter(options, state); return SinglePartitionReadCommand.create(table, nowInSec, columnFilter, rowFilter, DataLimits.NONE, key, filter); } @@ -776,7 +777,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement */ public RowFilter rowFilterForInternalCalls() { - return getRowFilter(QueryOptions.forInternalCalls(Collections.emptyList())); + return getRowFilter(QueryOptions.forInternalCalls(Collections.emptyList()), ClientState.forInternalCalls()); } private ReadQuery getRangeCommand(QueryOptions options, ClientState state, ColumnFilter columnFilter, DataLimits limit, long nowInSec) @@ -785,7 +786,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement if (clusteringIndexFilter == null) return ReadQuery.empty(table); - RowFilter rowFilter = getRowFilter(options); + RowFilter rowFilter = getRowFilter(options, state); // The LIMIT provided by the user is the number of CQL row he wants returned. // We want to have getRangeSlice to count the number of columns, not the number of keys. @@ -973,10 +974,15 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement /** * May be used by custom QueryHandler implementations */ - public RowFilter getRowFilter(QueryOptions options) throws InvalidRequestException + public RowFilter getRowFilter(QueryOptions options, ClientState state) throws InvalidRequestException { IndexRegistry indexRegistry = IndexRegistry.obtain(table); - return restrictions.getRowFilter(indexRegistry, options); + RowFilter filter = restrictions.getRowFilter(indexRegistry, options); + + if (filter.needsReconciliation() && filter.isMutableIntersection() && restrictions.needFiltering(table)) + Guardrails.intersectFilteringQueryEnabled.ensureEnabled(state); + + return filter; } private ResultSet process(PartitionIterator partitions, @@ -985,7 +991,8 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement long nowInSec, int userLimit, AggregationSpecification aggregationSpec, - boolean unmask) throws InvalidRequestException + boolean unmask, + ClientState state) throws InvalidRequestException { GroupMaker groupMaker = aggregationSpec == null ? null : aggregationSpec.newGroupMaker(); ResultSetBuilder result = new ResultSetBuilder(getResultMetadata(), selectors, unmask, groupMaker); @@ -1001,7 +1008,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement ResultSet cqlRows = result.build(); maybeWarn(result, options); - orderResults(cqlRows, options); + orderResults(cqlRows, options, state); cqlRows.trim(userLimit); @@ -1156,12 +1163,12 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement * <p> * In the case of ANN ordering the rows are first ordered in index column order and then by primary key. */ - private void orderResults(ResultSet cqlRows, QueryOptions options) + private void orderResults(ResultSet cqlRows, QueryOptions options, ClientState state) { if (cqlRows.size() == 0 || !needsPostQueryOrdering()) return; - Comparator<List<ByteBuffer>> comparator = orderingComparator.prepareFor(table, getRowFilter(options), options); + Comparator<List<ByteBuffer>> comparator = orderingComparator.prepareFor(table, getRowFilter(options, state), options); if (comparator != null) cqlRows.rows.sort(comparator); } @@ -1832,7 +1839,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement if (clusteringIndexFilter == null) return "EMPTY"; - RowFilter rowFilter = getRowFilter(options); + RowFilter rowFilter = getRowFilter(options, state); // The LIMIT provided by the user is the number of CQL row he wants returned. // We want to have getRangeSlice to count the number of columns, not the number of keys. @@ -1898,7 +1905,7 @@ public class SelectStatement implements CQLStatement.SingleKeyspaceCqlStatement sb.append(')'); } - RowFilter rowFilter = getRowFilter(options); + RowFilter rowFilter = getRowFilter(options, state); if (!rowFilter.isEmpty()) sb.append(" AND ").append(rowFilter); diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java b/src/java/org/apache/cassandra/db/filter/RowFilter.java index 588db7fb46..beb3edfcd7 100644 --- a/src/java/org/apache/cassandra/db/filter/RowFilter.java +++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java @@ -165,7 +165,15 @@ public class RowFilter implements Iterable<RowFilter.Expression> */ public boolean isStrict() { - return !needsReconciliation || expressions.stream().filter(e -> !e.column.isPrimaryKeyColumn()).count() <= 1; + return !needsReconciliation || !isMutableIntersection(); + } + + /** + * @return true if this filter contains an intersection on two or more mutable columns + */ + public boolean isMutableIntersection() + { + return expressions.stream().filter(e -> !e.column.isPrimaryKeyColumn()).count() > 1; } /** diff --git a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java index 971c635c67..01157bf345 100644 --- a/src/java/org/apache/cassandra/db/guardrails/Guardrails.java +++ b/src/java/org/apache/cassandra/db/guardrails/Guardrails.java @@ -216,6 +216,21 @@ public final class Guardrails implements GuardrailsMBean state -> CONFIG_PROVIDER.getOrCreate(state).getZeroTTLOnTWCSEnabled(), "0 default_time_to_live on a table with " + TimeWindowCompactionStrategy.class.getSimpleName() + " compaction strategy"); + /** + * Guardrail to warn on or fail filtering queries that contain intersections on mutable columns at consistency + * levels that require coordinator reconciliation. + * + * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-19007">CASSANDRA-19007</a> + */ + public static final EnableFlag intersectFilteringQueryEnabled = + new EnableFlag("intersect_filtering_query", + "Filtering queries involving an intersection on multiple mutable (i.e. non-key) columns " + + "over unrepaired data at read consistency levels that would require coordinator " + + "reconciliation may violate the guarantees of those consistency levels.", + state -> CONFIG_PROVIDER.getOrCreate(state).getIntersectFilteringQueryWarned(), + state -> CONFIG_PROVIDER.getOrCreate(state).getIntersectFilteringQueryEnabled(), + "Filtering query with intersection on mutable columns at consistency level requiring coordinator reconciliation"); + /** * Guardrail on the number of elements returned within page. */ @@ -1245,6 +1260,30 @@ public final class Guardrails implements GuardrailsMBean DEFAULT_CONFIG.setNonPartitionRestrictedQueryEnabled(enabled); } + @Override + public boolean getIntersectFilteringQueryWarned() + { + return DEFAULT_CONFIG.getIntersectFilteringQueryWarned(); + } + + @Override + public void setIntersectFilteringQueryWarned(boolean value) + { + DEFAULT_CONFIG.setIntersectFilteringQueryWarned(value); + } + + @Override + public boolean getIntersectFilteringQueryEnabled() + { + return DEFAULT_CONFIG.getIntersectFilteringQueryEnabled(); + } + + @Override + public void setIntersectFilteringQueryEnabled(boolean value) + { + DEFAULT_CONFIG.setIntersectFilteringQueryEnabled(value); + } + private static String toCSV(Set<String> values) { return values == null || values.isEmpty() ? "" : String.join(",", values); diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java index 00cc853589..5c6880fde5 100644 --- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java +++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsConfig.java @@ -384,6 +384,22 @@ public interface GuardrailsConfig */ void setZeroTTLOnTWCSEnabled(boolean value); + /** + * @return true if a client warning is emitted for a filtering query with an intersection on mutable columns at a + * consistency level requiring coordinator reconciliation + */ + boolean getIntersectFilteringQueryWarned(); + + void setIntersectFilteringQueryWarned(boolean value); + + /** + * @return true if it is possible to execute a filtering query with an intersection on mutable columns at a + * consistency level requiring coordinator reconciliation + */ + boolean getIntersectFilteringQueryEnabled(); + + void setIntersectFilteringQueryEnabled(boolean value); + /** * @return A timestamp that if a user supplied timestamp is after will trigger a warning */ diff --git a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java index e4c2774a0a..89fe7a70ba 100644 --- a/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java +++ b/src/java/org/apache/cassandra/db/guardrails/GuardrailsMBean.java @@ -804,4 +804,20 @@ public interface GuardrailsMBean * @param enabled {@code true} if a query without partition key is enabled or not */ void setNonPartitionRestrictedQueryEnabled(boolean enabled); + + /** + * @return true if a client warning is emitted for a filtering query with an intersection on mutable columns at a + * consistency level requiring coordinator reconciliation + */ + boolean getIntersectFilteringQueryWarned(); + + void setIntersectFilteringQueryWarned(boolean value); + + /** + * @return true if it is possible to execute a filtering query with an intersection on mutable columns at a + * consistency level requiring coordinator reconciliation + */ + boolean getIntersectFilteringQueryEnabled(); + + void setIntersectFilteringQueryEnabled(boolean value); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailNonPartitionRestrictedQueryTest.java b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailNonPartitionRestrictedQueryTest.java index 977527786f..24d13fd025 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailNonPartitionRestrictedQueryTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailNonPartitionRestrictedQueryTest.java @@ -30,9 +30,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; -import com.datastax.driver.core.SimpleStatement; import com.datastax.driver.core.exceptions.InvalidQueryException; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.ColumnFamilyStore; @@ -43,7 +41,6 @@ import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.IIsolatedExecutor; -import org.apache.cassandra.distributed.util.Auth; import org.apache.cassandra.exceptions.QueryReferencesTooManyIndexesAbortException; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.index.Index; @@ -62,6 +59,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@SuppressWarnings("Convert2MethodRef") public class GuardrailNonPartitionRestrictedQueryTest extends GuardrailTester { private static Cluster cluster; @@ -78,18 +76,7 @@ public class GuardrailNonPartitionRestrictedQueryTest extends GuardrailTester .withDataDirCount(1) .start(); - Auth.waitForExistingRoles(cluster.get(1)); - - // create a regular user, since the default superuser is excluded from guardrails - com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1"); - try (com.datastax.driver.core.Cluster c = builder.withCredentials("cassandra", "cassandra").build(); - Session session = c.connect()) - { - session.execute("CREATE USER test WITH PASSWORD 'test'"); - } - - // connect using that superuser, we use the driver to get access to the client warnings - driverCluster = builder.withCredentials("test", "test").build(); + driverCluster = buildDriverCluster(cluster); } @AfterClass @@ -124,6 +111,12 @@ public class GuardrailNonPartitionRestrictedQueryTest extends GuardrailTester return cluster; } + @Override + protected Session getSession() + { + return driverSession; + } + @Test public void testGuardrailForLegacy2i() { @@ -305,20 +298,6 @@ public class GuardrailNonPartitionRestrictedQueryTest extends GuardrailTester assertThat(totalAborts()).as("aborts").isEqualTo(aborts); } - /** - * Execution of statements via driver will not bypass guardrails as internal queries would do as they are - * done by superuser / they do not have any notion of roles - * - * @return list of warnings - */ - private List<String> executeViaDriver(String query) - { - SimpleStatement stmt = new SimpleStatement(query); - stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM); - ResultSet resultSet = driverSession.execute(stmt); - return resultSet.getExecutionInfo().getWarnings(); - } - private List<String> executeSelect(long valueToQuery, boolean expectToFail) { return cluster.get(1).applyOnInstance((IIsolatedExecutor.SerializableTriFunction<String, String, Long, List<String>>) (keyspace, table, v1) -> { diff --git a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java index b12d9abbb2..376f6a310b 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java +++ b/test/distributed/org/apache/cassandra/distributed/test/guardrails/GuardrailTester.java @@ -21,6 +21,10 @@ package org.apache.cassandra.distributed.test.guardrails; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.SimpleStatement; +import org.apache.cassandra.distributed.util.Auth; import org.junit.After; import org.junit.Before; @@ -41,11 +45,16 @@ public abstract class GuardrailTester extends TestBaseImpl protected abstract Cluster getCluster(); + protected Session getSession() + { + return null; + } + @Before public void beforeTest() { tableName = "t_" + seqNumber.getAndIncrement(); - qualifiedTableName = KEYSPACE + "." + tableName; + qualifiedTableName = KEYSPACE + '.' + tableName; } @After @@ -54,6 +63,36 @@ public abstract class GuardrailTester extends TestBaseImpl schemaChange("DROP TABLE IF EXISTS %s"); } + protected static com.datastax.driver.core.Cluster buildDriverCluster(Cluster cluster) + { + Auth.waitForExistingRoles(cluster.get(1)); + + // create a regular user, since the default superuser is excluded from guardrails + com.datastax.driver.core.Cluster.Builder builder = com.datastax.driver.core.Cluster.builder().addContactPoint("127.0.0.1"); + try (com.datastax.driver.core.Cluster c = builder.withCredentials("cassandra", "cassandra").build(); + Session session = c.connect()) + { + session.execute("CREATE USER test WITH PASSWORD 'test'"); + } + + // connect using that superuser, we use the driver to get access to the client warnings + return builder.withCredentials("test", "test").build(); + } + + /** + * Execution of statements via driver will not bypass guardrails as internal queries would do as they are + * done by superuser / they do not have any notion of roles + * + * @return list of warnings + */ + protected List<String> executeViaDriver(String query) + { + SimpleStatement stmt = new SimpleStatement(query); + stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.QUORUM); + ResultSet resultSet = getSession().execute(stmt); + return resultSet.getExecutionInfo().getWarnings(); + } + protected String format(String query) { return String.format(query, qualifiedTableName); diff --git a/test/distributed/org/apache/cassandra/distributed/test/guardrails/IntersectFilteringQueryTest.java b/test/distributed/org/apache/cassandra/distributed/test/guardrails/IntersectFilteringQueryTest.java new file mode 100644 index 0000000000..aa6f8876e6 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/guardrails/IntersectFilteringQueryTest.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.guardrails; + +import java.io.IOException; +import java.util.List; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import org.apache.cassandra.db.guardrails.Guardrails; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class IntersectFilteringQueryTest extends GuardrailTester +{ + private static Cluster cluster; + private static com.datastax.driver.core.Cluster driverCluster; + private static Session driverSession; + + @BeforeClass + public static void setupCluster() throws IOException + { + cluster = init(Cluster.build(2).withConfig(c -> c.with(Feature.GOSSIP, Feature.NATIVE_PROTOCOL) + .set("read_thresholds_enabled", "true") + .set("authenticator", "PasswordAuthenticator")).start()); + + driverCluster = buildDriverCluster(cluster); + driverSession = driverCluster.connect(); + } + + @AfterClass + public static void teardownCluster() + { + if (driverSession != null) + driverSession.close(); + + if (driverCluster != null) + driverCluster.close(); + + if (cluster != null) + cluster.close(); + } + + @Override + protected Cluster getCluster() + { + return cluster; + } + + @Override + protected Session getSession() + { + return driverSession; + } + + @Test + @SuppressWarnings({"DataFlowIssue", "ResultOfMethodCallIgnored"}) + public void shouldWarnOnFilteringQuery() + { + cluster.forEach(instance -> instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> Guardrails.instance.setIntersectFilteringQueryWarned(true))); + cluster.forEach(instance -> instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> Guardrails.instance.setIntersectFilteringQueryEnabled(true))); + + schemaChange("CREATE TABLE IF NOT EXISTS %s (k bigint, c bigint, v1 bigint, v2 bigint, PRIMARY KEY (k, c))"); + List<String> globalWarnings = executeViaDriver(format("SELECT * FROM %s WHERE v1 = 0 AND v2 = 0 ALLOW FILTERING")); + assertThat(globalWarnings).satisfiesOnlyOnce(s -> s.contains(Guardrails.intersectFilteringQueryEnabled.reason)); + List<String> partitionWarnings = executeViaDriver(format("SELECT * FROM %s WHERE k = 0 AND v1 = 0 AND v2 = 0 ALLOW FILTERING")); + assertThat(partitionWarnings).satisfiesOnlyOnce(s -> s.contains(Guardrails.intersectFilteringQueryEnabled.reason)); + } + + @Test + public void shouldFailOnFilteringQuery() + { + cluster.forEach(instance -> instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> Guardrails.instance.setIntersectFilteringQueryEnabled(false))); + + schemaChange("CREATE TABLE IF NOT EXISTS %s (k bigint, c bigint, v1 bigint, v2 bigint, PRIMARY KEY (k, c))"); + + assertThatThrownBy(() -> executeViaDriver(format("SELECT * FROM %s WHERE k = 0 AND v1 = 0 AND v2 = 0 ALLOW FILTERING"))) + .isInstanceOf(InvalidQueryException.class) + .hasMessageContaining(Guardrails.intersectFilteringQueryEnabled.reason); + } + + @Test + public void shouldNotWarnOrFailOnIndexQuery() + { + cluster.forEach(instance -> instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> Guardrails.instance.setIntersectFilteringQueryWarned(true))); + cluster.forEach(instance -> instance.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> Guardrails.instance.setIntersectFilteringQueryEnabled(false))); + + schemaChange("CREATE TABLE %s (k bigint, c bigint, v1 bigint, v2 bigint, PRIMARY KEY (k, c))"); + schemaChange("CREATE INDEX ON %s(v1) USING 'sai'"); + schemaChange("CREATE INDEX ON %s(v2) USING 'sai'"); + List<String> globalWarnings = executeViaDriver(format("SELECT * FROM %s WHERE v1 = 0 AND v2 = 0")); + assertThat(globalWarnings).isEmpty(); + List<String> partitionWarnings = executeViaDriver(format("SELECT * FROM %s WHERE k = 0 AND v1 = 0 AND v2 = 0")); + assertThat(partitionWarnings).isEmpty(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org