This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new 953ab6cf64 Avoid possible consistency violations for SAI intersection
queries over repaired index matches and multiple non-indexed column matches
953ab6cf64 is described below
commit 953ab6cf64088614ada26e05d664b6a25b1d561d
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Tue Jan 7 16:39:34 2025 -0600
Avoid possible consistency violations for SAI intersection queries over
repaired index matches and multiple non-indexed column matches
patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-20189
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/filter/RowFilter.java | 14 +++-
.../cassandra/index/sai/plan/FilterTree.java | 74 ++++++++++++------
.../apache/cassandra/index/sai/plan/Operation.java | 87 ++++++++++++++++++----
.../cassandra/index/sai/plan/QueryController.java | 2 +-
.../cassandra/index/sai/utils/IndexTermType.java | 5 ++
.../distributed/test/sai/StrictFilteringTest.java | 57 ++++++++++++++
.../cassandra/index/sai/plan/OperationTest.java | 5 +-
8 files changed, 204 insertions(+), 41 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 769e4dddbe..13ae3fddc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0.4
+ * Avoid possible consistency violations for SAI intersection queries over
repaired index matches and multiple non-indexed column matches (CASSANDRA-20189)
* Skip check for DirectIO when initializing tools (CASSANDRA-20289)
* Avoid under-skipping during intersections when an iterator has mixed STATIC
and WIDE keys (CASSANDRA-20258)
* Correct the default behavior of compareTo() when comparing WIDE and STATIC
PrimaryKeys (CASSANDRA-20238)
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 0742f4ee9f..483c163164 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -169,11 +169,21 @@ public class RowFilter implements
Iterable<RowFilter.Expression>
}
/**
- * @return true if this filter contains an intersection on two or more
mutable columns
+ * @return true if this filter contains an intersection on either any
static column or two regular mutable columns
*/
public boolean isMutableIntersection()
{
- return expressions.stream().filter(e ->
!e.column.isPrimaryKeyColumn()).count() > 1;
+ int count = 0;
+ for (Expression e : expressions)
+ {
+ if (e.column.isStatic() && expressions.size() > 1)
+ return true;
+
+ if (!e.column.isPrimaryKeyColumn())
+ if (++count > 1)
+ return true;
+ }
+ return false;
}
/**
diff --git a/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
b/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
index 4107fad2d2..15ea273145 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
@@ -23,8 +23,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
-import com.google.common.collect.ListMultimap;
-
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.index.sai.QueryContext;
@@ -44,12 +42,12 @@ import static
org.apache.cassandra.index.sai.plan.Operation.BooleanOperator;
public class FilterTree
{
protected final BooleanOperator baseOperator;
- protected final ListMultimap<ColumnMetadata, Expression> expressions;
+ protected final Operation.Expressions expressions;
protected final List<FilterTree> children = new ArrayList<>();
private final boolean isStrict;
private final QueryContext context;
- FilterTree(BooleanOperator baseOperator, ListMultimap<ColumnMetadata,
Expression> expressions, boolean isStrict, QueryContext context)
+ FilterTree(BooleanOperator baseOperator, Operation.Expressions
expressions, boolean isStrict, QueryContext context)
{
this.baseOperator = baseOperator;
this.expressions = expressions;
@@ -67,7 +65,7 @@ public class FilterTree
*/
public boolean restrictsNonStaticRow()
{
- for (ColumnMetadata column : expressions.keySet())
+ for (ColumnMetadata column : expressions.columns())
if (!column.isStatic())
return true;
@@ -98,7 +96,12 @@ public class FilterTree
BooleanOperator localOperator = (isStrict ||
!context.hasUnrepairedMatches) ? baseOperator : BooleanOperator.OR;
boolean result = localOperator == BooleanOperator.AND;
- Iterator<ColumnMetadata> columnIterator =
expressions.keySet().iterator();
+ // If all matches on indexed columns are repaired, strict filtering is
not allowed, and there are multiple
+ // unindexed column expressions, isolate the expressions on unindexed
columns and union their results:
+ boolean isolateUnindexed = !context.hasUnrepairedMatches && !isStrict
&& expressions.hasMultipleUnindexedColumns();
+ boolean unindexedResult = false;
+
+ Iterator<ColumnMetadata> columnIterator =
expressions.columns().iterator();
while (columnIterator.hasNext())
{
ColumnMetadata column = columnIterator.next();
@@ -106,38 +109,65 @@ public class FilterTree
// If there is a column with multiple expressions that can mean an
OR, or (in the case of map
// collections) it can mean different map indexes.
- List<Expression> filters = expressions.get(column);
+ List<Expression> filters = expressions.expressionsFor(column);
// We do a reverse iteration over the filters because NOT_EQ
operations will be at the end
// of the filter list, and we want to check them first.
ListIterator<Expression> filterIterator =
filters.listIterator(filters.size());
- while (filterIterator.hasPrevious())
+
+ if (isolateUnindexed && expressions.isUnindexed(column))
{
- Expression filter = filterIterator.previous();
+ // If we isolate unindexed column expressions, we're
implicitly calculating the union of those
+ // expressions. Once we've matched on any column, we can skip
the rest, if any exist.
+ if (unindexedResult)
+ continue;
- if (filter.getIndexTermType().isNonFrozenCollection())
+ while (filterIterator.hasPrevious())
{
- Iterator<ByteBuffer> valueIterator =
filter.getIndexTermType().valuesOf(localRow, now);
- result = localOperator.apply(result,
collectionMatch(valueIterator, filter));
+ Expression filter = filterIterator.previous();
+ unindexedResult = applyFilter(key, now,
BooleanOperator.OR, unindexedResult, localRow, filter);
}
- else
+ }
+ else
+ {
+ while (filterIterator.hasPrevious())
{
- ByteBuffer value = filter.getIndexTermType().valueOf(key,
localRow, now);
- result = localOperator.apply(result, singletonMatch(value,
filter));
- }
+ Expression filter = filterIterator.previous();
+ result = applyFilter(key, now, localOperator, result,
localRow, filter);
- // If the operation is an AND then exit early if we get a
single false
- if ((localOperator == BooleanOperator.AND) && !result)
- return false;
+ // If the operation is an AND then exit early if we get a
single false
+ if ((localOperator == BooleanOperator.AND) && !result)
+ return false;
- // If the operation is an OR then exit early if we get a
single true
- if (localOperator == BooleanOperator.OR && result)
- return true;
+ // If the operation is an OR then exit early if we get a
single true
+ if (localOperator == BooleanOperator.OR && result)
+ return true;
+ }
}
}
+
+ if (isolateUnindexed)
+ // If we had to isolate the unindexed column expressions, combine
with the indexed column result. Note that
+ // the indexed result must be true at this point if it was
evaluated with the AND operator:
+ return localOperator == BooleanOperator.AND ? unindexedResult :
result || unindexedResult;
+
return result;
}
+ private boolean applyFilter(DecoratedKey key, long now, BooleanOperator
operator, boolean result, Row row, Expression expression)
+ {
+ if (expression.getIndexTermType().isNonFrozenCollection())
+ {
+ Iterator<ByteBuffer> valueIterator =
expression.getIndexTermType().valuesOf(row, now);
+ return operator.apply(result, collectionMatch(valueIterator,
expression));
+ }
+ else
+ {
+ ByteBuffer value = expression.getIndexTermType().valueOf(key, row,
now);
+ return operator.apply(result, singletonMatch(value, expression));
+ }
+ }
+
private boolean singletonMatch(ByteBuffer value, Expression filter)
{
return value != null && filter.isSatisfiedBy(value);
diff --git a/src/java/org/apache/cassandra/index/sai/plan/Operation.java
b/src/java/org/apache/cassandra/index/sai/plan/Operation.java
index 5b18241843..eb45722af5 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/Operation.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/Operation.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.index.sai.plan;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
@@ -63,11 +66,58 @@ public class Operation
}
}
+ public static class Expressions
+ {
+ final ListMultimap<ColumnMetadata, Expression> expressions;
+ final Set<ColumnMetadata> unindexedColumns;
+
+ Expressions(ListMultimap<ColumnMetadata, Expression> expressions,
Set<ColumnMetadata> unindexedColumns)
+ {
+ this.expressions = expressions;
+ this.unindexedColumns = unindexedColumns;
+ }
+
+ Set<ColumnMetadata> columns()
+ {
+ return expressions.keySet();
+ }
+
+ Collection<Expression> all()
+ {
+ return expressions.values();
+ }
+
+ List<Expression> expressionsFor(ColumnMetadata column)
+ {
+ return expressions.get(column);
+ }
+
+ boolean isEmpty()
+ {
+ return expressions.isEmpty();
+ }
+
+ int size()
+ {
+ return expressions.size();
+ }
+
+ boolean isUnindexed(ColumnMetadata column)
+ {
+ return unindexedColumns.contains(column);
+ }
+
+ boolean hasMultipleUnindexedColumns()
+ {
+ return unindexedColumns.size() > 1;
+ }
+ }
+
@VisibleForTesting
- protected static ListMultimap<ColumnMetadata, Expression>
buildIndexExpressions(QueryController queryController,
-
List<RowFilter.Expression> expressions)
+ protected static Expressions buildIndexExpressions(QueryController
queryController, List<RowFilter.Expression> expressions)
{
ListMultimap<ColumnMetadata, Expression> analyzed =
ArrayListMultimap.create();
+ Set<ColumnMetadata> unindexedColumns = Collections.emptySet();
// sort all the expressions in the operation by name and priority of
the logical operator
// this gives us an efficient way to handle inequality and combining
into ranges without extra processing
@@ -82,17 +132,28 @@ public class Operation
if (Expression.supportsOperator(expression.operator()))
{
StorageAttachedIndex index =
queryController.indexFor(expression);
-
List<Expression> perColumn = analyzed.get(expression.column());
if (index == null)
+ {
buildUnindexedExpression(queryController, expression,
perColumn);
+
+ if (!expression.column().isPrimaryKeyColumn())
+ {
+ if (unindexedColumns.isEmpty())
+ unindexedColumns = new HashSet<>(3);
+
+ unindexedColumns.add(expression.column());
+ }
+ }
else
+ {
buildIndexedExpression(index, expression, perColumn);
+ }
}
}
- return analyzed;
+ return new Expressions(analyzed, unindexedColumns);
}
private static void buildUnindexedExpression(QueryController
queryController,
@@ -286,11 +347,11 @@ public class Operation
static abstract class Node
{
- ListMultimap<ColumnMetadata, Expression> expressionMap;
+ Expressions expressions;
boolean canFilter()
{
- return (expressionMap != null && !expressionMap.isEmpty()) ||
!children().isEmpty();
+ return (expressions != null && !expressions.isEmpty()) ||
!children().isEmpty();
}
List<Node> children()
@@ -382,19 +443,19 @@ public class Operation
@Override
public void analyze(List<RowFilter.Expression> expressionList,
QueryController controller)
{
- expressionMap = buildIndexExpressions(controller, expressionList);
+ expressions = buildIndexExpressions(controller, expressionList);
}
@Override
FilterTree filterTree(boolean isStrict, QueryContext context)
{
- return new FilterTree(BooleanOperator.AND, expressionMap,
isStrict, context);
+ return new FilterTree(BooleanOperator.AND, expressions, isStrict,
context);
}
@Override
KeyRangeIterator rangeIterator(QueryController controller)
{
- KeyRangeIterator.Builder builder =
controller.getIndexQueryResults(expressionMap.values());
+ KeyRangeIterator.Builder builder =
controller.getIndexQueryResults(expressions.all());
for (Node child : children)
{
boolean canFilter = child.canFilter();
@@ -412,15 +473,15 @@ public class Operation
@Override
public void analyze(List<RowFilter.Expression> expressionList,
QueryController controller)
{
- expressionMap = buildIndexExpressions(controller, expressionList);
- assert expressionMap.size() == 1 : "Expression nodes should only
have a single expression!";
+ expressions = buildIndexExpressions(controller, expressionList);
+ assert expressions.size() == 1 : "Expression nodes should only
have a single expression!";
}
@Override
FilterTree filterTree(boolean isStrict, QueryContext context)
{
// There should only be one expression, so AND/OR would both work
here.
- return new FilterTree(BooleanOperator.AND, expressionMap,
isStrict, context);
+ return new FilterTree(BooleanOperator.AND, expressions, isStrict,
context);
}
public ExpressionNode(RowFilter.Expression expression)
@@ -439,7 +500,7 @@ public class Operation
{
assert canFilter() : "Cannot process query with no expressions";
- return
controller.getIndexQueryResults(expressionMap.values()).build();
+ return controller.getIndexQueryResults(expressions.all()).build();
}
}
}
diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
index b462fa3ad5..614456c493 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
@@ -423,7 +423,7 @@ public class QueryController
"PrimaryKey " + firstKey + " clustering does not match table.
There should be a clustering of size " + cfs.metadata().comparator.size();
ClusteringIndexFilter clusteringIndexFilter =
command.clusteringIndexFilter(firstKey.partitionKey());
-
+
// If we have skinny partitions or the key is for a static row then we
need to get the partition as
// requested by the original query.
if (cfs.metadata().comparator.size() == 0 || firstKey.kind() ==
PrimaryKey.Kind.STATIC)
diff --git a/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
b/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
index 51600f9d79..f5a00b184e 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
@@ -319,6 +319,11 @@ public class IndexTermType
return this.columnMetadata.compareTo(columnMetadata) == 0;
}
+ public static boolean isEqOnlyType(AbstractType<?> type)
+ {
+ return EQ_ONLY_TYPES.contains(type);
+ }
+
/**
* Indicates if the type encoding supports rounding of the raw value.
* <p>
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
index 301336f862..cd753d6e01 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
@@ -54,6 +54,26 @@ public class StrictFilteringTest extends TestBaseImpl
CLUSTER = init(Cluster.build(2).withConfig(config ->
config.set("hinted_handoff_enabled",
false).with(GOSSIP).with(NETWORK)).start());
}
+ @Test
+ public void shouldDegradeToUnionOnSingleStatic()
+ {
+ CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.single_static (pk0
int, ck0 int, ck1 int, s0 int static, v0 int, PRIMARY KEY (pk0, ck0, ck1)) " +
+ "WITH read_repair = 'NONE' AND
CLUSTERING ORDER BY (ck0 ASC, ck1 DESC)"));
+ CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON
%s.single_static(ck0) USING 'sai'"));
+ CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON
%s.single_static(s0) USING 'sai'"));
+ SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+ // To present the coordinator with enough data to find a row match,
both replicas must degrade to OR at query
+ // time. The static column match from node 2 and the clustering key
match from node 1 must be merged.
+ CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO
%s.single_static (pk0, ck0, ck1, s0, v0) VALUES (0, 1, 2, 3, 4)"));
+ CLUSTER.get(1).executeInternal(withKeyspace("UPDATE %s.single_static
SET v0 = 5 WHERE pk0 = 0 AND ck0 = 6 AND ck1 = 7"));
+
+ // A static column predicate and ANY other predicate makes strict
filtering impossible, as the static match
+ // applies to the entire partition.
+ String select = withKeyspace("SELECT * FROM %s.single_static WHERE s0
= 3 AND ck0 = 6");
+ assertRows(CLUSTER.coordinator(1).execute(select,
ConsistencyLevel.ALL), row(0, 6, 7, 3, 5));
+ }
+
@Test
public void shouldRejectNonStrictIN()
{
@@ -94,6 +114,43 @@ public class StrictFilteringTest extends TestBaseImpl
assertRows(initialRows, row(0, 1, 2));
}
+ @Test
+ public void testPartialUpdatesOnNonIndexedColumnsAfterRepair()
+ {
+ CLUSTER.schemaChange(withKeyspace("CREATE TABLE
%s.partial_updates_non_indexed_columns (k int PRIMARY KEY, a int, b int, c int)
WITH read_repair = 'NONE'"));
+ CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON
%s.partial_updates_non_indexed_columns(a) USING 'sai'"));
+ SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+ CLUSTER.coordinator(1).execute(withKeyspace("INSERT INTO
%s.partial_updates_non_indexed_columns(k, a) VALUES (0, 1) USING TIMESTAMP 1"),
ConsistencyLevel.ALL);
+ CLUSTER.get(1).nodetoolResult("repair", KEYSPACE).asserts().success();
+
+ // insert a split row
+ CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO
%s.partial_updates_non_indexed_columns(k, b) VALUES (0, 2) USING TIMESTAMP 2"));
+ CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO
%s.partial_updates_non_indexed_columns(k, c) VALUES (0, 3) USING TIMESTAMP 3"));
+
+ String select = withKeyspace("SELECT * FROM
%s.partial_updates_non_indexed_columns WHERE a = 1 AND b = 2 AND c = 3 ALLOW
FILTERING");
+ Object[][] initialRows = CLUSTER.coordinator(1).execute(select,
ConsistencyLevel.ALL);
+ assertRows(initialRows, row(0, 1, 2, 3));
+ }
+
+ @Test
+ public void testPartialUpdateOnNonIndexedColumnAfterRepair()
+ {
+ CLUSTER.schemaChange(withKeyspace("CREATE TABLE
%s.partial_updates_non_indexed_column (k int PRIMARY KEY, a int, b int) WITH
read_repair = 'NONE'"));
+ CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON
%s.partial_updates_non_indexed_column(a) USING 'sai'"));
+ SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+ CLUSTER.coordinator(1).execute(withKeyspace("INSERT INTO
%s.partial_updates_non_indexed_column(k, a) VALUES (0, 1) USING TIMESTAMP 1"),
ConsistencyLevel.ALL);
+ CLUSTER.get(1).nodetoolResult("repair", KEYSPACE).asserts().success();
+
+ // insert a split row
+ CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO
%s.partial_updates_non_indexed_column(k, b) VALUES (0, 2) USING TIMESTAMP 2"));
+
+ String select = withKeyspace("SELECT * FROM
%s.partial_updates_non_indexed_column WHERE a = 1 AND b = 2 ALLOW FILTERING");
+ Object[][] initialRows = CLUSTER.coordinator(1).execute(select,
ConsistencyLevel.ALL);
+ assertRows(initialRows, row(0, 1, 2));
+ }
+
@Test
public void testPartialUpdatesWithDeleteBetween()
{
diff --git a/test/unit/org/apache/cassandra/index/sai/plan/OperationTest.java
b/test/unit/org/apache/cassandra/index/sai/plan/OperationTest.java
index 81292cbda0..b27764115b 100644
--- a/test/unit/org/apache/cassandra/index/sai/plan/OperationTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/plan/OperationTest.java
@@ -27,7 +27,6 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -324,10 +323,10 @@ public class OperationTest
assertTrue(node.buildFilter(controllerClustering,
true).isSatisfiedBy(key, row, staticRow));
}
- private Map<Expression.IndexOperator, Expression>
convert(Multimap<ColumnMetadata, Expression> expressions)
+ private Map<Expression.IndexOperator, Expression>
convert(Operation.Expressions expressions)
{
Map<Expression.IndexOperator, Expression> converted = new
EnumMap<>(Expression.IndexOperator.class);
- for (Expression expression : expressions.values())
+ for (Expression expression : expressions.all())
{
Expression column = converted.get(expression.getIndexOperator());
assert column == null; // sanity check
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]