This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
     new 5483165938 Avoid limit on RFP fetch in the case of an unresolved 
static row
5483165938 is described below

commit 548316593898c3b2169986d27eae7a6624bd873d
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Feb 20 17:54:29 2025 -0600

    Avoid limit on RFP fetch in the case of an unresolved static row
    
    patch by Caleb Rackliffe; reviewed by David Capwell and Zhao Yang for 
CASSANDRA-20323
---
 CHANGES.txt                                        |   1 +
 .../service/reads/ReplicaFilteringProtection.java  |  15 ++-
 .../test/ReplicaFilteringProtectionTest.java       |  19 ---
 .../test/sai/ReplicaFilteringWithStaticsTest.java  | 131 +++++++++++++++++++++
 .../distributed/test/sai/StrictFilteringTest.java  |  20 ----
 5 files changed, 142 insertions(+), 44 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 06c119f1ff..b01f18babc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.0.4
+ * Avoid limit on RFP fetch in the case of an unresolved static row 
(CASSANDRA-20323)
  * Include materialized views to the output of DESCRIBE TABLE statements 
(CASSANDRA-20365)
  * Heap and GC jvm flags improvements (CASSANDRA-20296)
  * Fix unparseable YAML in default cassandra.yaml when uncommented for 
downstream tooling (CASSANDRA-20359)
diff --git 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
index 7c57245bb1..c66c2007d6 100644
--- 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
@@ -230,7 +230,7 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
 
                         // Even if there are no completely missing rows, 
replicas may still be silent about individual
                         // columns, so we need to check for divergence at the 
column level:
-                        for (ColumnMetadata column : columns)
+                        for (ColumnMetadata column : merged.isStatic() ? 
columns.statics : columns.regulars)
                         {
                             Arrays.fill(silentColumnAt, false);
                             boolean allSilent = true;
@@ -538,14 +538,19 @@ public class ReplicaFilteringProtection<E extends 
Endpoints<E>>
             Tracing.trace("Requesting {} rows in partition {} from {} for 
replica filtering protection",
                           clusterings.size(), key, source);
 
-            // build the read command taking into account that we could be 
requesting only in the static row
-            DataLimits limits = clusterings.isEmpty() ? 
DataLimits.cqlLimits(1) : DataLimits.NONE;
-            ClusteringIndexFilter filter = unresolvedStatic ? 
command.clusteringIndexFilter(key) : new 
ClusteringIndexNamesFilter(clusterings, command.isReversed());
+            // If there is an unresolved static column, we must fetch the 
entire partition, as static column predicates
+            // may produce row matches across the entire partition. If there 
are only non-static rows to complete, we
+            // query the partition specifically for the corresponding 
cluterings by name. In either case, we do not
+            // provide a limit. (In the unresolved static case, we have no way 
of knowing how many stale rows we might
+            // read on a silent replica before finding a live one.)
+            ClusteringIndexFilter filter = unresolvedStatic ? 
command.clusteringIndexFilter(key)
+                                                            : new 
ClusteringIndexNamesFilter(clusterings, command.isReversed());
+
             SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
                                                                                
command.nowInSec(),
                                                                                
command.columnFilter(),
                                                                                
RowFilter.none(),
-                                                                               
limits,
+                                                                               
DataLimits.NONE,
                                                                                
key,
                                                                                
filter);
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
index 1cff326f41..fd8110cba7 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
@@ -27,7 +27,6 @@ import org.junit.Test;
 
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.SimpleQueryResult;
 import org.apache.cassandra.exceptions.OverloadedException;
@@ -72,24 +71,6 @@ public class ReplicaFilteringProtectionTest extends 
TestBaseImpl
             cluster.close();
     }
 
-    @Test
-    public void testMissingStaticRowWithNonStaticExpression()
-    {
-        cluster.schemaChange(withKeyspace("CREATE TABLE %s.single_predicate 
(pk0 int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY 
(pk0, ck0, ck1)) " +
-                                          "WITH CLUSTERING ORDER BY (ck0 ASC, 
ck1 DESC) AND read_repair = 'NONE'"));
-
-        cluster.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.single_predicate (pk0, ck0, ck1, s0, s1, v0) " +
-                                                    "VALUES (0, 1, 2, 3, 4, 5) 
USING TIMESTAMP 1"));
-        cluster.get(2).executeInternal(withKeyspace("UPDATE 
%s.single_predicate  USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
-                                                    "WHERE  pk0 = 0 AND ck0 = 
9 AND ck1 = 10"));
-
-        // Node 2 will not produce a match for the static row. Make sure that 
replica filtering protection does not
-        // fetch the entire partition, which could let non-matching rows slip 
through combined with the fact that we 
-        // don't post-filter at the coordinator with no regular column 
predicates in the query.
-        String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM 
%s.single_predicate WHERE ck1 = 2 ALLOW FILTERING");
-        assertRows(cluster.coordinator(1).execute(select, 
ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
-    }
-
     @Test
     public void testMissedUpdatesBelowCachingWarnThreshold()
     {
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/ReplicaFilteringWithStaticsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/ReplicaFilteringWithStaticsTest.java
new file mode 100644
index 0000000000..59cc1cc6a5
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/ReplicaFilteringWithStaticsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+public class ReplicaFilteringWithStaticsTest extends TestBaseImpl
+{
+    private static Cluster CLUSTER;
+
+    @BeforeClass
+    public static void setUpCluster() throws IOException
+    {
+        CLUSTER = init(Cluster.build(3).withConfig(config -> 
config.set("hinted_handoff_enabled", 
false).with(GOSSIP).with(NETWORK)).start());
+    }
+
+    @Test
+    public void testStaticMatchWithPartitionDelete()
+    {
+        testStaticMatchWithPartitionDelete(false);
+    }
+
+    @Test
+    public void testStaticMatchWithPartitionDeleteSAI()
+    {
+        testStaticMatchWithPartitionDelete(true);
+    }
+
+    public void testStaticMatchWithPartitionDelete(boolean sai)
+    {
+        String table = "static_and_delete" + (sai ? "_sai" : "");
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s." + table + " (pk0 
boolean, ck0 ascii, s1 tinyint static, v0 boolean, PRIMARY KEY (pk0, ck0)) " +
+                                          "WITH CLUSTERING ORDER BY (ck0 ASC) 
AND read_repair = 'NONE'"));
+
+        if (sai)
+        {
+            CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s." + table + 
"(s1) USING 'sai'"));
+            SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+        }
+
+        CLUSTER.get(3).executeInternal(withKeyspace("UPDATE %s." + table + " 
USING TIMESTAMP 1 SET v0 = false WHERE pk0 = true AND ck0 = 'D'"));
+        CLUSTER.get(1).executeInternal(withKeyspace("DELETE FROM %s." + table 
+ " USING TIMESTAMP 2 WHERE pk0 = true"));
+
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s." + table 
+ " (pk0, ck0, s1, v0) VALUES (true, 'G', -114, true) USING TIMESTAMP 3"));
+        CLUSTER.get(3).executeInternal(withKeyspace("INSERT INTO %s." + table 
+ " (pk0, ck0) VALUES (true, 'F') USING TIMESTAMP 4"));
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s." + table 
+ " (pk0, ck0, s1, v0) VALUES (true, 'C', 17, true) USING TIMESTAMP 5"));
+
+        // This update to the static column creates matches across all 
previously written live rows in the partition.
+        // When RFP sees the unresolved static row, it must read enough data 
from the silent replicas at nodes 1 and 3
+        // to find all potential matches. With a page size of 1, reading only 
1 row from node 3 will return the row at
+        // ck = 'D', as the partition delete never made it to node 3. This 
means we'll ignore the live result node 3 has
+        // at ck = 'F', because node 1 will produce a result at ck = 'G', and 
that determines the next paging cursor.
+        CLUSTER.get(2).executeInternal(withKeyspace("UPDATE %s." + table + " 
USING TIMESTAMP 6 SET s1 = 1, v0 = false WHERE pk0 = true AND ck0 = 'A'"));
+
+        String select = withKeyspace("SELECT ck0 FROM %s." + table + " WHERE 
s1 = 1" + (sai ? "" : " ALLOW FILTERING" ));
+        assertRows(CLUSTER.coordinator(1).executeWithPaging(select, ALL, 1), 
row("A"), row("C"), row("F"), row("G"));
+    }
+
+    @Test
+    public void testMissingStaticRowWithNonStaticExpression()
+    {
+        testMissingStaticRowWithNonStaticExpression(false);
+    }
+
+    @Test
+    public void testMissingStaticRowWithNonStaticExpressionSAI()
+    {
+        testMissingStaticRowWithNonStaticExpression(true);
+    }
+    
+    public void testMissingStaticRowWithNonStaticExpression(boolean sai)
+    {
+        String table = "single_predicate" + (sai ? "_sai" : "");
+        CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s." + table + " (pk0 
int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY (pk0, 
ck0, ck1)) " +
+                                          "WITH CLUSTERING ORDER BY (ck0 ASC, 
ck1 DESC) AND read_repair = 'NONE'"));
+        
+        if (sai)
+        {
+            CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s." + table + 
"(ck1) USING 'sai'"));
+            SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+        }
+
+        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s." + table 
+ " (pk0, ck0, ck1, s0, s1, v0) " +
+                                                    "VALUES (0, 1, 2, 3, 4, 5) 
USING TIMESTAMP 1"));
+        CLUSTER.get(2).executeInternal(withKeyspace("UPDATE %s." + table + "  
USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
+                                                    "WHERE  pk0 = 0 AND ck0 = 
9 AND ck1 = 10"));
+
+        // Node 2 will not produce a match for the static row. Make sure that 
replica filtering protection does not
+        // fetch the entire partition, which could let non-matching rows slip 
through combined with the fact that we 
+        // don't post-filter at the coordinator with no regular column 
predicates in the query.
+        String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM %s." + 
table + " WHERE ck1 = 2" + (sai ? "" : " ALLOW FILTERING"));
+        assertRows(CLUSTER.coordinator(1).execute(select, 
ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
+    }
+
+    @AfterClass
+    public static void shutDownCluster()
+    {
+        if (CLUSTER != null)
+            CLUSTER.close();
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
index 5ef92bb9ef..6ec80fd0ae 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
@@ -53,26 +53,6 @@ public class StrictFilteringTest extends TestBaseImpl
         CLUSTER = init(Cluster.build(2).withConfig(config -> 
config.set("hinted_handoff_enabled", 
false).with(GOSSIP).with(NETWORK)).start());
     }
 
-    @Test
-    public void testMissingStaticRowWithNonStaticExpression()
-    {
-        CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.single_predicate 
(pk0 int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY 
(pk0, ck0, ck1)) " +
-                                          "WITH CLUSTERING ORDER BY (ck0 ASC, 
ck1 DESC) AND read_repair = 'NONE'"));
-        CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON 
%s.single_predicate(ck1) USING 'sai'"));
-        SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
-
-        CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO 
%s.single_predicate (pk0, ck0, ck1, s0, s1, v0) " +
-                                                    "VALUES (0, 1, 2, 3, 4, 5) 
USING TIMESTAMP 1"));
-        CLUSTER.get(2).executeInternal(withKeyspace("UPDATE 
%s.single_predicate  USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
-                                                    "WHERE  pk0 = 0 AND ck0 = 
9 AND ck1 = 10"));
-
-        // Node 2 will not produce a match for the static row. Make sure that 
replica filtering protection does not
-        // fetch the entire partition, which could let non-matching rows slip 
through combined with the fact that we 
-        // don't post-filter at the coordinator with no regular column 
predicates in the query.
-        String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM 
%s.single_predicate WHERE ck1 = 2 ALLOW FILTERING");
-        assertRows(CLUSTER.coordinator(1).execute(select, 
ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
-    }
-
     @Test
     public void shouldDegradeToUnionOnSingleStatic()
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to