This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 953ab6cf64 Avoid possible consistency violations for SAI intersection 
queries over repaired index matches and multiple non-indexed column matches
953ab6cf64 is described below

commit 953ab6cf64088614ada26e05d664b6a25b1d561d
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Tue Jan 7 16:39:34 2025 -0600

    Avoid possible consistency violations for SAI intersection queries over 
repaired index matches and multiple non-indexed column matches
    
    patch by Caleb Rackliffe; reviewed by David Capwell for CASSANDRA-20189
---
 CHANGES.txt                                        |  1 +
 .../org/apache/cassandra/db/filter/RowFilter.java  | 14 +++-
 .../cassandra/index/sai/plan/FilterTree.java       | 74 ++++++++++++------
 .../apache/cassandra/index/sai/plan/Operation.java | 87 ++++++++++++++++++----
 .../cassandra/index/sai/plan/QueryController.java  |  2 +-
 .../cassandra/index/sai/utils/IndexTermType.java   |  5 ++
 .../distributed/test/sai/StrictFilteringTest.java  | 57 ++++++++++++++
 .../cassandra/index/sai/plan/OperationTest.java    |  5 +-
 8 files changed, 204 insertions(+), 41 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 769e4dddbe..13ae3fddc2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0.4
+ * Avoid possible consistency violations for SAI intersection queries over 
repaired index matches and multiple non-indexed column matches (CASSANDRA-20189)
  * Skip check for DirectIO when initializing tools (CASSANDRA-20289)
  * Avoid under-skipping during intersections when an iterator has mixed STATIC 
and WIDE keys (CASSANDRA-20258)
  * Correct the default behavior of compareTo() when comparing WIDE and STATIC 
PrimaryKeys (CASSANDRA-20238)
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 0742f4ee9f..483c163164 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -169,11 +169,21 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
     }
 
     /**
-     * @return true if this filter contains an intersection on two or more 
mutable columns
+     * @return true if this filter contains an intersection on either any 
static column or two regular mutable columns
      */
     public boolean isMutableIntersection()
     {
-        return expressions.stream().filter(e -> 
!e.column.isPrimaryKeyColumn()).count() > 1;
+        int count = 0;
+        for (Expression e : expressions)
+        {
+            if (e.column.isStatic() && expressions.size() > 1)
+                return true;
+
+            if (!e.column.isPrimaryKeyColumn())
+                if (++count > 1)
+                    return true;
+        }
+        return false;
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java 
b/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
index 4107fad2d2..15ea273145 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/FilterTree.java
@@ -23,8 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 
-import com.google.common.collect.ListMultimap;
-
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.index.sai.QueryContext;
@@ -44,12 +42,12 @@ import static 
org.apache.cassandra.index.sai.plan.Operation.BooleanOperator;
 public class FilterTree
 {
     protected final BooleanOperator baseOperator;
-    protected final ListMultimap<ColumnMetadata, Expression> expressions;
+    protected final Operation.Expressions expressions;
     protected final List<FilterTree> children = new ArrayList<>();
     private final boolean isStrict;
     private final QueryContext context;
 
-    FilterTree(BooleanOperator baseOperator, ListMultimap<ColumnMetadata, 
Expression> expressions, boolean isStrict, QueryContext context)
+    FilterTree(BooleanOperator baseOperator, Operation.Expressions 
expressions, boolean isStrict, QueryContext context)
     {
         this.baseOperator = baseOperator;
         this.expressions = expressions;
@@ -67,7 +65,7 @@ public class FilterTree
      */
     public boolean restrictsNonStaticRow()
     {
-        for (ColumnMetadata column : expressions.keySet())
+        for (ColumnMetadata column : expressions.columns())
             if (!column.isStatic())
                 return true;
 
@@ -98,7 +96,12 @@ public class FilterTree
         BooleanOperator localOperator = (isStrict || 
!context.hasUnrepairedMatches) ? baseOperator : BooleanOperator.OR;
         boolean result = localOperator == BooleanOperator.AND;
 
-        Iterator<ColumnMetadata> columnIterator = 
expressions.keySet().iterator();
+        // If all matches on indexed columns are repaired, strict filtering is 
not allowed, and there are multiple
+        // unindexed column expressions, isolate the expressions on unindexed 
columns and union their results:
+        boolean isolateUnindexed = !context.hasUnrepairedMatches && !isStrict 
&& expressions.hasMultipleUnindexedColumns();
+        boolean unindexedResult = false;
+
+        Iterator<ColumnMetadata> columnIterator = 
expressions.columns().iterator();
         while (columnIterator.hasNext())
         {
             ColumnMetadata column = columnIterator.next();
@@ -106,38 +109,65 @@ public class FilterTree
 
             // If there is a column with multiple expressions that can mean an 
OR, or (in the case of map
             // collections) it can mean different map indexes.
-            List<Expression> filters = expressions.get(column);
+            List<Expression> filters = expressions.expressionsFor(column);
 
             // We do a reverse iteration over the filters because NOT_EQ 
operations will be at the end
             // of the filter list, and we want to check them first.
             ListIterator<Expression> filterIterator = 
filters.listIterator(filters.size());
-            while (filterIterator.hasPrevious())
+
+            if (isolateUnindexed && expressions.isUnindexed(column))
             {
-                Expression filter = filterIterator.previous();
+                // If we isolate unindexed column expressions, we're 
implicitly calculating the union of those 
+                // expressions. Once we've matched on any column, we can skip 
the rest, if any exist.
+                if (unindexedResult)
+                    continue;
 
-                if (filter.getIndexTermType().isNonFrozenCollection())
+                while (filterIterator.hasPrevious())
                 {
-                    Iterator<ByteBuffer> valueIterator = 
filter.getIndexTermType().valuesOf(localRow, now);
-                    result = localOperator.apply(result, 
collectionMatch(valueIterator, filter));
+                    Expression filter = filterIterator.previous();
+                    unindexedResult = applyFilter(key, now, 
BooleanOperator.OR, unindexedResult, localRow, filter);
                 }
-                else
+            }
+            else
+            {
+                while (filterIterator.hasPrevious())
                 {
-                    ByteBuffer value = filter.getIndexTermType().valueOf(key, 
localRow, now);
-                    result = localOperator.apply(result, singletonMatch(value, 
filter));
-                }
+                    Expression filter = filterIterator.previous();
+                    result = applyFilter(key, now, localOperator, result, 
localRow, filter);
 
-                // If the operation is an AND then exit early if we get a 
single false
-                if ((localOperator == BooleanOperator.AND) && !result)
-                    return false;
+                    // If the operation is an AND then exit early if we get a 
single false
+                    if ((localOperator == BooleanOperator.AND) && !result)
+                        return false;
 
-                // If the operation is an OR then exit early if we get a 
single true
-                if (localOperator == BooleanOperator.OR && result)
-                    return true;
+                    // If the operation is an OR then exit early if we get a 
single true
+                    if (localOperator == BooleanOperator.OR && result)
+                        return true;
+                }
             }
         }
+
+        if (isolateUnindexed)
+            // If we had to isolate the unindexed column expressions, combine 
with the indexed column result. Note that
+            // the indexed result must be true at this point if it was 
evaluated with the AND operator:
+            return localOperator == BooleanOperator.AND ? unindexedResult : 
result || unindexedResult;
+
         return result;
     }
 
+    private boolean applyFilter(DecoratedKey key, long now, BooleanOperator 
operator, boolean result, Row row, Expression expression)
+    {
+        if (expression.getIndexTermType().isNonFrozenCollection())
+        {
+            Iterator<ByteBuffer> valueIterator = 
expression.getIndexTermType().valuesOf(row, now);
+            return operator.apply(result, collectionMatch(valueIterator, 
expression));
+        }
+        else
+        {
+            ByteBuffer value = expression.getIndexTermType().valueOf(key, row, 
now);
+            return operator.apply(result, singletonMatch(value, expression));
+        }
+    }
+
     private boolean singletonMatch(ByteBuffer value, Expression filter)
     {
         return value != null && filter.isSatisfiedBy(value);
diff --git a/src/java/org/apache/cassandra/index/sai/plan/Operation.java 
b/src/java/org/apache/cassandra/index/sai/plan/Operation.java
index 5b18241843..eb45722af5 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/Operation.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/Operation.java
@@ -20,8 +20,11 @@ package org.apache.cassandra.index.sai.plan;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 
@@ -63,11 +66,58 @@ public class Operation
         }
     }
 
+    public static class Expressions
+    {
+        final ListMultimap<ColumnMetadata, Expression> expressions;
+        final Set<ColumnMetadata> unindexedColumns;
+
+        Expressions(ListMultimap<ColumnMetadata, Expression> expressions, 
Set<ColumnMetadata> unindexedColumns)
+        {
+            this.expressions = expressions;
+            this.unindexedColumns = unindexedColumns;
+        }
+
+        Set<ColumnMetadata> columns()
+        {
+            return expressions.keySet();
+        }
+
+        Collection<Expression> all()
+        {
+            return expressions.values();
+        }
+
+        List<Expression> expressionsFor(ColumnMetadata column)
+        {
+            return expressions.get(column);
+        }
+
+        boolean isEmpty()
+        {
+            return expressions.isEmpty();
+        }
+
+        int size()
+        {
+            return expressions.size();
+        }
+
+        boolean isUnindexed(ColumnMetadata column)
+        {
+            return unindexedColumns.contains(column);
+        }
+
+        boolean hasMultipleUnindexedColumns()
+        {
+            return unindexedColumns.size() > 1;
+        }
+    }
+
     @VisibleForTesting
-    protected static ListMultimap<ColumnMetadata, Expression> 
buildIndexExpressions(QueryController queryController,
-                                                                               
     List<RowFilter.Expression> expressions)
+    protected static Expressions buildIndexExpressions(QueryController 
queryController, List<RowFilter.Expression> expressions)
     {
         ListMultimap<ColumnMetadata, Expression> analyzed = 
ArrayListMultimap.create();
+        Set<ColumnMetadata> unindexedColumns = Collections.emptySet();
 
         // sort all the expressions in the operation by name and priority of 
the logical operator
         // this gives us an efficient way to handle inequality and combining 
into ranges without extra processing
@@ -82,17 +132,28 @@ public class Operation
             if (Expression.supportsOperator(expression.operator()))
             {
                 StorageAttachedIndex index = 
queryController.indexFor(expression);
-
                 List<Expression> perColumn = analyzed.get(expression.column());
 
                 if (index == null)
+                {
                     buildUnindexedExpression(queryController, expression, 
perColumn);
+
+                    if (!expression.column().isPrimaryKeyColumn())
+                    {
+                        if (unindexedColumns.isEmpty())
+                            unindexedColumns = new HashSet<>(3);
+
+                        unindexedColumns.add(expression.column());
+                    }
+                }
                 else
+                {
                     buildIndexedExpression(index, expression, perColumn);
+                }
             }
         }
 
-        return analyzed;
+        return new Expressions(analyzed, unindexedColumns);
     }
 
     private static void buildUnindexedExpression(QueryController 
queryController,
@@ -286,11 +347,11 @@ public class Operation
 
     static abstract class Node
     {
-        ListMultimap<ColumnMetadata, Expression> expressionMap;
+        Expressions expressions;
 
         boolean canFilter()
         {
-            return (expressionMap != null && !expressionMap.isEmpty()) || 
!children().isEmpty();
+            return (expressions != null && !expressions.isEmpty()) || 
!children().isEmpty();
         }
 
         List<Node> children()
@@ -382,19 +443,19 @@ public class Operation
         @Override
         public void analyze(List<RowFilter.Expression> expressionList, 
QueryController controller)
         {
-            expressionMap = buildIndexExpressions(controller, expressionList);
+            expressions = buildIndexExpressions(controller, expressionList);
         }
 
         @Override
         FilterTree filterTree(boolean isStrict, QueryContext context)
         {
-            return new FilterTree(BooleanOperator.AND, expressionMap, 
isStrict, context);
+            return new FilterTree(BooleanOperator.AND, expressions, isStrict, 
context);
         }
 
         @Override
         KeyRangeIterator rangeIterator(QueryController controller)
         {
-            KeyRangeIterator.Builder builder = 
controller.getIndexQueryResults(expressionMap.values());
+            KeyRangeIterator.Builder builder = 
controller.getIndexQueryResults(expressions.all());
             for (Node child : children)
             {
                 boolean canFilter = child.canFilter();
@@ -412,15 +473,15 @@ public class Operation
         @Override
         public void analyze(List<RowFilter.Expression> expressionList, 
QueryController controller)
         {
-            expressionMap = buildIndexExpressions(controller, expressionList);
-            assert expressionMap.size() == 1 : "Expression nodes should only 
have a single expression!";
+            expressions = buildIndexExpressions(controller, expressionList);
+            assert expressions.size() == 1 : "Expression nodes should only 
have a single expression!";
         }
 
         @Override
         FilterTree filterTree(boolean isStrict, QueryContext context)
         {
             // There should only be one expression, so AND/OR would both work 
here. 
-            return new FilterTree(BooleanOperator.AND, expressionMap, 
isStrict, context);
+            return new FilterTree(BooleanOperator.AND, expressions, isStrict, 
context);
         }
 
         public ExpressionNode(RowFilter.Expression expression)
@@ -439,7 +500,7 @@ public class Operation
         {
             assert canFilter() : "Cannot process query with no expressions";
 
-            return 
controller.getIndexQueryResults(expressionMap.values()).build();
+            return controller.getIndexQueryResults(expressions.all()).build();
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java 
b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
index b462fa3ad5..614456c493 100644
--- a/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
+++ b/src/java/org/apache/cassandra/index/sai/plan/QueryController.java
@@ -423,7 +423,7 @@ public class QueryController
                "PrimaryKey " + firstKey + " clustering does not match table. 
There should be a clustering of size " + cfs.metadata().comparator.size();
 
         ClusteringIndexFilter clusteringIndexFilter = 
command.clusteringIndexFilter(firstKey.partitionKey());
-        
+
         // If we have skinny partitions or the key is for a static row then we 
need to get the partition as
         // requested by the original query.
         if (cfs.metadata().comparator.size() == 0 || firstKey.kind() == 
PrimaryKey.Kind.STATIC)
diff --git a/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java 
b/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
index 51600f9d79..f5a00b184e 100644
--- a/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
+++ b/src/java/org/apache/cassandra/index/sai/utils/IndexTermType.java
@@ -319,6 +319,11 @@ public class IndexTermType
         return this.columnMetadata.compareTo(columnMetadata) == 0;
     }
 
+    public static boolean isEqOnlyType(AbstractType<?> type)
+    {
+        return EQ_ONLY_TYPES.contains(type);
+    }
+
     /**
      * Indicates if the type encoding supports rounding of the raw value.
      * <p>
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
index 301336f862..cd753d6e01 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
@@ -54,6 +54,26 @@ public class StrictFilteringTest extends TestBaseImpl
         CLUSTER = init(Cluster.build(2).withConfig(config -> 
config.set("hinted_handoff_enabled", 
false).with(GOSSIP).with(NETWORK)).start());
     }
 
+    @Test
+    public void shouldDegradeToUnionOnSingleStatic()
+    {
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.single_static (pk0 
int, ck0 int, ck1 int, s0 int static, v0 int, PRIMARY KEY (pk0, ck0, ck1)) " +
+                                          "WITH read_repair = 'NONE' AND 
CLUSTERING ORDER BY (ck0 ASC, ck1 DESC)"));
+        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.single_static(ck0) USING 'sai'"));
+        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.single_static(s0) USING 'sai'"));
+        SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+        // To present the coordinator with enough data to find a row match, 
both replicas must degrade to OR at query
+        // time. The static column match from node 2 and the clustering key 
match from node 1 must be merged.
+        CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO 
%s.single_static (pk0, ck0, ck1, s0, v0) VALUES (0, 1, 2, 3, 4)"));
+        CLUSTER.get(1).executeInternal(withKeyspace("UPDATE %s.single_static 
SET v0 = 5 WHERE pk0 = 0 AND ck0 = 6 AND ck1 = 7"));
+
+        // A static column predicate and ANY other predicate makes strict 
filtering impossible, as the static match
+        // applies to the entire partition.
+        String select = withKeyspace("SELECT * FROM %s.single_static WHERE s0 
= 3 AND ck0 = 6");
+        assertRows(CLUSTER.coordinator(1).execute(select, 
ConsistencyLevel.ALL), row(0, 6, 7, 3, 5));
+    }
+
     @Test
     public void shouldRejectNonStrictIN()
     {
@@ -94,6 +114,43 @@ public class StrictFilteringTest extends TestBaseImpl
         assertRows(initialRows, row(0, 1, 2));
     }
 
+    @Test
+    public void testPartialUpdatesOnNonIndexedColumnsAfterRepair()
+    {
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE 
%s.partial_updates_non_indexed_columns (k int PRIMARY KEY, a int, b int, c int) 
WITH read_repair = 'NONE'"));
+        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.partial_updates_non_indexed_columns(a) USING 'sai'"));
+        SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+        CLUSTER.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.partial_updates_non_indexed_columns(k, a) VALUES (0, 1) USING TIMESTAMP 1"), 
ConsistencyLevel.ALL);
+        CLUSTER.get(1).nodetoolResult("repair", KEYSPACE).asserts().success();
+        
+        // insert a split row
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.partial_updates_non_indexed_columns(k, b) VALUES (0, 2) USING TIMESTAMP 2"));
+        CLUSTER.get(2).executeInternal(withKeyspace("INSERT INTO 
%s.partial_updates_non_indexed_columns(k, c) VALUES (0, 3) USING TIMESTAMP 3"));
+
+        String select = withKeyspace("SELECT * FROM 
%s.partial_updates_non_indexed_columns WHERE a = 1 AND b = 2 AND c = 3 ALLOW 
FILTERING");
+        Object[][] initialRows = CLUSTER.coordinator(1).execute(select, 
ConsistencyLevel.ALL);
+        assertRows(initialRows, row(0, 1, 2, 3));
+    }
+
+    @Test
+    public void testPartialUpdateOnNonIndexedColumnAfterRepair()
+    {
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE 
%s.partial_updates_non_indexed_column (k int PRIMARY KEY, a int, b int) WITH 
read_repair = 'NONE'"));
+        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.partial_updates_non_indexed_column(a) USING 'sai'"));
+        SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+
+        CLUSTER.coordinator(1).execute(withKeyspace("INSERT INTO 
%s.partial_updates_non_indexed_column(k, a) VALUES (0, 1) USING TIMESTAMP 1"), 
ConsistencyLevel.ALL);
+        CLUSTER.get(1).nodetoolResult("repair", KEYSPACE).asserts().success();
+
+        // insert a split row
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.partial_updates_non_indexed_column(k, b) VALUES (0, 2) USING TIMESTAMP 2"));
+
+        String select = withKeyspace("SELECT * FROM 
%s.partial_updates_non_indexed_column WHERE a = 1 AND b = 2 ALLOW FILTERING");
+        Object[][] initialRows = CLUSTER.coordinator(1).execute(select, 
ConsistencyLevel.ALL);
+        assertRows(initialRows, row(0, 1, 2));
+    }
+
     @Test
     public void testPartialUpdatesWithDeleteBetween()
     {
diff --git a/test/unit/org/apache/cassandra/index/sai/plan/OperationTest.java 
b/test/unit/org/apache/cassandra/index/sai/plan/OperationTest.java
index 81292cbda0..b27764115b 100644
--- a/test/unit/org/apache/cassandra/index/sai/plan/OperationTest.java
+++ b/test/unit/org/apache/cassandra/index/sai/plan/OperationTest.java
@@ -27,7 +27,6 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -324,10 +323,10 @@ public class OperationTest
         assertTrue(node.buildFilter(controllerClustering, 
true).isSatisfiedBy(key, row, staticRow));
     }
 
-    private Map<Expression.IndexOperator, Expression> 
convert(Multimap<ColumnMetadata, Expression> expressions)
+    private Map<Expression.IndexOperator, Expression> 
convert(Operation.Expressions expressions)
     {
         Map<Expression.IndexOperator, Expression> converted = new 
EnumMap<>(Expression.IndexOperator.class);
-        for (Expression expression : expressions.values())
+        for (Expression expression : expressions.all())
         {
             Expression column = converted.get(expression.getIndexOperator());
             assert column == null; // sanity check


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to