This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new bd99331bf2 Avoid fetching entire partitions on unresolved static rows
in RFP when no static column predicates exist
bd99331bf2 is described below
commit bd99331bf2a963f77ca7cba1d39efb985b21842f
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Tue Feb 11 12:06:09 2025 -0600
Avoid fetching entire partitions on unresolved static rows in RFP when no
static column predicates exist
patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-20243
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/filter/RowFilter.java | 18 +++++++++++++++++
.../cassandra/index/sai/plan/QueryController.java | 6 +++++-
.../cassandra/service/reads/DataResolver.java | 6 +++++-
.../service/reads/ReplicaFilteringProtection.java | 10 +++++-----
.../test/ReplicaFilteringProtectionTest.java | 19 ++++++++++++++++++
.../distributed/test/sai/StrictFilteringTest.java | 23 ++++++++++++++++++++--
7 files changed, 74 insertions(+), 9 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index cbbd1767b5..f659a1f220 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0.4
+ * Avoid fetching entire partitions on unresolved static rows in RFP when no
static column predicates exist (CASSANDRA-20243)
* Avoid indexing empty values for non-literals and types that do not allow
them (CASSANDRA-20313)
* Fix incorrect results of min / max in-built functions on clustering columns
in descending order (CASSANDRA-20295)
* Avoid possible consistency violations for SAI intersection queries over
repaired index matches and multiple non-indexed column matches (CASSANDRA-20189)
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 483c163164..9b83a56f23 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -394,6 +394,24 @@ public class RowFilter implements
Iterable<RowFilter.Expression>
return withNewExpressions(newExpressions);
}
+ public boolean hasNonKeyExpression()
+ {
+ for (Expression e : expressions)
+ if (!e.column().isPrimaryKeyColumn())
+ return true;
+
+ return false;
+ }
+
+ public boolean hasStaticExpression()
+ {
+ for (Expression e : expressions)
+ if (e.column().isStatic())
+ return true;
+
+ return false;
+ }
+
public RowFilter withoutExpressions()
{
return withNewExpressions(Collections.emptyList());
diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
index 614456c493..8bd1e25f28 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
@@ -271,7 +271,11 @@ public class QueryController
private void maybeTriggerGuardrails(QueryViewBuilder.QueryView queryView)
{
- int referencedIndexes = queryView.referencedIndexes.size();
+ int referencedIndexes = 0;
+
+ // We want to make sure that no individual column expression touches
too many SSTable-attached indexes:
+ for (Pair<Expression, Collection<SSTableIndex>> expressionSSTables :
queryView.view)
+ referencedIndexes = Math.max(referencedIndexes,
expressionSSTables.right.size());
if (Guardrails.saiSSTableIndexesPerQuery.failsOn(referencedIndexes,
null))
{
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java
b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index 64e4c72b01..332a785708 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -138,7 +138,7 @@ public class DataResolver<E extends Endpoints<E>, P extends
ReplicaPlan.ForRead<
return false;
Index.QueryPlan queryPlan = command.indexQueryPlan();
- if (queryPlan == null )
+ if (queryPlan == null)
return true;
return
queryPlan.supportsReplicaFilteringProtection(command.rowFilter());
@@ -275,6 +275,10 @@ public class DataResolver<E extends Endpoints<E>, P
extends ReplicaPlan.ForRead<
private UnaryOperator<PartitionIterator>
preCountFilterForReplicaFilteringProtection()
{
+ // Key columns are immutable and should never need to participate in
replica filtering
+ if (!command.rowFilter().hasNonKeyExpression())
+ return results -> results;
+
return results -> {
Index.Searcher searcher = command.indexSearcher();
// in case of "ALLOW FILTERING" without index
diff --git
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
index 9ec02a5b20..7c57245bb1 100644
---
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
+++
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
@@ -427,12 +427,12 @@ public class ReplicaFilteringProtection<E extends
Endpoints<E>>
if (toFetch == null)
toFetch = BTreeSet.builder(command.metadata().comparator);
- // Note that for static, we shouldn't add the clustering to the
clustering set (the
- // ClusteringIndexNamesFilter we'll build from this later does not
expect it), but the fact
- // we created a builder in the first place will act as a marker
that the static row must be
- // fetched, even if no other rows are added for this partition.
if (row.isStatic())
- unresolvedStatic = true;
+ // If there is an expression on a static column, the static
row must be marked unresolved and the
+ // partition fetched, as completing the static row could
produce matches across the entire partition.
+ // The static row itself will still be retrieved and completed
if there is any unresolved non-static
+ // row, however, ensuring the latest static values are
returned from the query.
+ unresolvedStatic = command.rowFilter().hasStaticExpression();
else
toFetch.add(row.clustering());
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
index fd8110cba7..1cff326f41 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
@@ -27,6 +27,7 @@ import org.junit.Test;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.exceptions.OverloadedException;
@@ -71,6 +72,24 @@ public class ReplicaFilteringProtectionTest extends
TestBaseImpl
cluster.close();
}
+ @Test
+ public void testMissingStaticRowWithNonStaticExpression()
+ {
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s.single_predicate
(pk0 int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY
(pk0, ck0, ck1)) " +
+ "WITH CLUSTERING ORDER BY (ck0 ASC,
ck1 DESC) AND read_repair = 'NONE'"));
+
+ cluster.get(1).executeInternal(withKeyspace("INSERT INTO
%s.single_predicate (pk0, ck0, ck1, s0, s1, v0) " +
+ "VALUES (0, 1, 2, 3, 4, 5)
USING TIMESTAMP 1"));
+ cluster.get(2).executeInternal(withKeyspace("UPDATE
%s.single_predicate USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
+ "WHERE pk0 = 0 AND ck0 =
9 AND ck1 = 10"));
+
+ // Node 2 will not produce a match for the static row. Make sure that
replica filtering protection does not
+ // fetch the entire partition, which could let non-matching rows slip
through combined with the fact that we
+ // don't post-filter at the coordinator with no regular column
predicates in the query.
+ String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM
%s.single_predicate WHERE ck1 = 2 ALLOW FILTERING");
+ assertRows(cluster.coordinator(1).execute(select,
ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
+ }
+
@Test
public void testMissedUpdatesBelowCachingWarnThreshold()
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
index cd753d6e01..5ef92bb9ef 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
@@ -29,7 +29,6 @@ import org.junit.Test;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.index.sai.plan.StorageAttachedIndexQueryPlan;
@@ -54,6 +53,26 @@ public class StrictFilteringTest extends TestBaseImpl
CLUSTER = init(Cluster.build(2).withConfig(config ->
config.set("hinted_handoff_enabled",
false).with(GOSSIP).with(NETWORK)).start());
}
+ @Test
+ public void testMissingStaticRowWithNonStaticExpression()
+ {
+ CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.single_predicate
(pk0 int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY
(pk0, ck0, ck1)) " +
+ "WITH CLUSTERING ORDER BY (ck0 ASC,
ck1 DESC) AND read_repair = 'NONE'"));
+ CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON
%s.single_predicate(ck1) USING 'sai'"));
+ SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+ CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO
%s.single_predicate (pk0, ck0, ck1, s0, s1, v0) " +
+ "VALUES (0, 1, 2, 3, 4, 5)
USING TIMESTAMP 1"));
+ CLUSTER.get(2).executeInternal(withKeyspace("UPDATE
%s.single_predicate USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
+ "WHERE pk0 = 0 AND ck0 =
9 AND ck1 = 10"));
+
+ // Node 2 will not produce a match for the static row. Make sure that
replica filtering protection does not
+ // fetch the entire partition, which could let non-matching rows slip
through combined with the fact that we
+ // don't post-filter at the coordinator with no regular column
predicates in the query.
+ String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM
%s.single_predicate WHERE ck1 = 2 ALLOW FILTERING");
+ assertRows(CLUSTER.coordinator(1).execute(select,
ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
+ }
+
@Test
public void shouldDegradeToUnionOnSingleStatic()
{
@@ -257,7 +276,7 @@ public class StrictFilteringTest extends TestBaseImpl
String select = withKeyspace("SELECT * FROM %s.timestamp_collision
WHERE a = 2 AND b = 2");
Object[][] initialRows = CLUSTER.coordinator(1).execute(select,
ConsistencyLevel.ALL);
- assertRows(initialRows, AssertUtils.row(0, 2, 2));
+ assertRows(initialRows, row(0, 2, 2));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]