This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch cassandra-5.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-5.0 by this push:
new 5483165938 Avoid limit on RFP fetch in the case of an unresolved
static row
5483165938 is described below
commit 548316593898c3b2169986d27eae7a6624bd873d
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Feb 20 17:54:29 2025 -0600
Avoid limit on RFP fetch in the case of an unresolved static row
patch by Caleb Rackliffe; reviewed by David Capwell and Zhao Yang for
CASSANDRA-20323
---
CHANGES.txt | 1 +
.../service/reads/ReplicaFilteringProtection.java | 15 ++-
.../test/ReplicaFilteringProtectionTest.java | 19 ---
.../test/sai/ReplicaFilteringWithStaticsTest.java | 131 +++++++++++++++++++++
.../distributed/test/sai/StrictFilteringTest.java | 20 ----
5 files changed, 142 insertions(+), 44 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 06c119f1ff..b01f18babc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0.4
+ * Avoid limit on RFP fetch in the case of an unresolved static row
(CASSANDRA-20323)
* Include materialized views to the output of DESCRIBE TABLE statements
(CASSANDRA-20365)
* Heap and GC jvm flags improvements (CASSANDRA-20296)
* Fix unparseable YAML in default cassandra.yaml when uncommented for
downstream tooling (CASSANDRA-20359)
diff --git
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
index 7c57245bb1..c66c2007d6 100644
---
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
+++
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
@@ -230,7 +230,7 @@ public class ReplicaFilteringProtection<E extends
Endpoints<E>>
// Even if there are no completely missing rows,
replicas may still be silent about individual
// columns, so we need to check for divergence at the
column level:
- for (ColumnMetadata column : columns)
+ for (ColumnMetadata column : merged.isStatic() ?
columns.statics : columns.regulars)
{
Arrays.fill(silentColumnAt, false);
boolean allSilent = true;
@@ -538,14 +538,19 @@ public class ReplicaFilteringProtection<E extends
Endpoints<E>>
Tracing.trace("Requesting {} rows in partition {} from {} for
replica filtering protection",
clusterings.size(), key, source);
- // build the read command taking into account that we could be
requesting only in the static row
- DataLimits limits = clusterings.isEmpty() ?
DataLimits.cqlLimits(1) : DataLimits.NONE;
- ClusteringIndexFilter filter = unresolvedStatic ?
command.clusteringIndexFilter(key) : new
ClusteringIndexNamesFilter(clusterings, command.isReversed());
+ // If there is an unresolved static column, we must fetch the
entire partition, as static column predicates
+ // may produce row matches across the entire partition. If there
are only non-static rows to complete, we
+ // query the partition specifically for the corresponding
cluterings by name. In either case, we do not
+ // provide a limit. (In the unresolved static case, we have no way
of knowing how many stale rows we might
+ // read on a silent replica before finding a live one.)
+ ClusteringIndexFilter filter = unresolvedStatic ?
command.clusteringIndexFilter(key)
+ : new
ClusteringIndexNamesFilter(clusterings, command.isReversed());
+
SinglePartitionReadCommand cmd =
SinglePartitionReadCommand.create(command.metadata(),
command.nowInSec(),
command.columnFilter(),
RowFilter.none(),
-
limits,
+
DataLimits.NONE,
key,
filter);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
index 1cff326f41..fd8110cba7 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
@@ -27,7 +27,6 @@ import org.junit.Test;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.distributed.Cluster;
-import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.exceptions.OverloadedException;
@@ -72,24 +71,6 @@ public class ReplicaFilteringProtectionTest extends
TestBaseImpl
cluster.close();
}
- @Test
- public void testMissingStaticRowWithNonStaticExpression()
- {
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.single_predicate
(pk0 int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY
(pk0, ck0, ck1)) " +
- "WITH CLUSTERING ORDER BY (ck0 ASC,
ck1 DESC) AND read_repair = 'NONE'"));
-
- cluster.get(1).executeInternal(withKeyspace("INSERT INTO
%s.single_predicate (pk0, ck0, ck1, s0, s1, v0) " +
- "VALUES (0, 1, 2, 3, 4, 5)
USING TIMESTAMP 1"));
- cluster.get(2).executeInternal(withKeyspace("UPDATE
%s.single_predicate USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
- "WHERE pk0 = 0 AND ck0 =
9 AND ck1 = 10"));
-
- // Node 2 will not produce a match for the static row. Make sure that
replica filtering protection does not
- // fetch the entire partition, which could let non-matching rows slip
through combined with the fact that we
- // don't post-filter at the coordinator with no regular column
predicates in the query.
- String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM
%s.single_predicate WHERE ck1 = 2 ALLOW FILTERING");
- assertRows(cluster.coordinator(1).execute(select,
ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
- }
-
@Test
public void testMissedUpdatesBelowCachingWarnThreshold()
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/sai/ReplicaFilteringWithStaticsTest.java
b/test/distributed/org/apache/cassandra/distributed/test/sai/ReplicaFilteringWithStaticsTest.java
new file mode 100644
index 0000000000..59cc1cc6a5
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/sai/ReplicaFilteringWithStaticsTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.sai;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+
+public class ReplicaFilteringWithStaticsTest extends TestBaseImpl
+{
+ private static Cluster CLUSTER;
+
+ @BeforeClass
+ public static void setUpCluster() throws IOException
+ {
+ CLUSTER = init(Cluster.build(3).withConfig(config ->
config.set("hinted_handoff_enabled",
false).with(GOSSIP).with(NETWORK)).start());
+ }
+
+ @Test
+ public void testStaticMatchWithPartitionDelete()
+ {
+ testStaticMatchWithPartitionDelete(false);
+ }
+
+ @Test
+ public void testStaticMatchWithPartitionDeleteSAI()
+ {
+ testStaticMatchWithPartitionDelete(true);
+ }
+
+ public void testStaticMatchWithPartitionDelete(boolean sai)
+ {
+ String table = "static_and_delete" + (sai ? "_sai" : "");
+ CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s." + table + " (pk0
boolean, ck0 ascii, s1 tinyint static, v0 boolean, PRIMARY KEY (pk0, ck0)) " +
+ "WITH CLUSTERING ORDER BY (ck0 ASC)
AND read_repair = 'NONE'"));
+
+ if (sai)
+ {
+ CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s." + table +
"(s1) USING 'sai'"));
+ SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+ }
+
+ CLUSTER.get(3).executeInternal(withKeyspace("UPDATE %s." + table + "
USING TIMESTAMP 1 SET v0 = false WHERE pk0 = true AND ck0 = 'D'"));
+ CLUSTER.get(1).executeInternal(withKeyspace("DELETE FROM %s." + table
+ " USING TIMESTAMP 2 WHERE pk0 = true"));
+
+ CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s." + table
+ " (pk0, ck0, s1, v0) VALUES (true, 'G', -114, true) USING TIMESTAMP 3"));
+ CLUSTER.get(3).executeInternal(withKeyspace("INSERT INTO %s." + table
+ " (pk0, ck0) VALUES (true, 'F') USING TIMESTAMP 4"));
+ CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s." + table
+ " (pk0, ck0, s1, v0) VALUES (true, 'C', 17, true) USING TIMESTAMP 5"));
+
+ // This update to the static column creates matches across all
previously written live rows in the partition.
+ // When RFP sees the unresolved static row, it must read enough data
from the silent replicas at nodes 1 and 3
+ // to find all potential matches. With a page size of 1, reading only
1 row from node 3 will return the row at
+ // ck = 'D', as the partition delete never made it to node 3. This
means we'll ignore the live result node 3 has
+ // at ck = 'F', because node 1 will produce a result at ck = 'G', and
that determines the next paging cursor.
+ CLUSTER.get(2).executeInternal(withKeyspace("UPDATE %s." + table + "
USING TIMESTAMP 6 SET s1 = 1, v0 = false WHERE pk0 = true AND ck0 = 'A'"));
+
+ String select = withKeyspace("SELECT ck0 FROM %s." + table + " WHERE
s1 = 1" + (sai ? "" : " ALLOW FILTERING" ));
+ assertRows(CLUSTER.coordinator(1).executeWithPaging(select, ALL, 1),
row("A"), row("C"), row("F"), row("G"));
+ }
+
+ @Test
+ public void testMissingStaticRowWithNonStaticExpression()
+ {
+ testMissingStaticRowWithNonStaticExpression(false);
+ }
+
+ @Test
+ public void testMissingStaticRowWithNonStaticExpressionSAI()
+ {
+ testMissingStaticRowWithNonStaticExpression(true);
+ }
+
+ public void testMissingStaticRowWithNonStaticExpression(boolean sai)
+ {
+ String table = "single_predicate" + (sai ? "_sai" : "");
+ CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s." + table + " (pk0
int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY (pk0,
ck0, ck1)) " +
+ "WITH CLUSTERING ORDER BY (ck0 ASC,
ck1 DESC) AND read_repair = 'NONE'"));
+
+ if (sai)
+ {
+ CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON %s." + table +
"(ck1) USING 'sai'"));
+ SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
+ }
+
+ CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO %s." + table
+ " (pk0, ck0, ck1, s0, s1, v0) " +
+ "VALUES (0, 1, 2, 3, 4, 5)
USING TIMESTAMP 1"));
+ CLUSTER.get(2).executeInternal(withKeyspace("UPDATE %s." + table + "
USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
+ "WHERE pk0 = 0 AND ck0 =
9 AND ck1 = 10"));
+
+ // Node 2 will not produce a match for the static row. Make sure that
replica filtering protection does not
+ // fetch the entire partition, which could let non-matching rows slip
through combined with the fact that we
+ // don't post-filter at the coordinator with no regular column
predicates in the query.
+ String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM %s." +
table + " WHERE ck1 = 2" + (sai ? "" : " ALLOW FILTERING"));
+ assertRows(CLUSTER.coordinator(1).execute(select,
ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
+ }
+
+ @AfterClass
+ public static void shutDownCluster()
+ {
+ if (CLUSTER != null)
+ CLUSTER.close();
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
index 5ef92bb9ef..6ec80fd0ae 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/sai/StrictFilteringTest.java
@@ -53,26 +53,6 @@ public class StrictFilteringTest extends TestBaseImpl
CLUSTER = init(Cluster.build(2).withConfig(config ->
config.set("hinted_handoff_enabled",
false).with(GOSSIP).with(NETWORK)).start());
}
- @Test
- public void testMissingStaticRowWithNonStaticExpression()
- {
- CLUSTER.schemaChange(withKeyspace("CREATE TABLE %s.single_predicate
(pk0 int, ck0 int, ck1 int, s0 int static, s1 int static, v0 int, PRIMARY KEY
(pk0, ck0, ck1)) " +
- "WITH CLUSTERING ORDER BY (ck0 ASC,
ck1 DESC) AND read_repair = 'NONE'"));
- CLUSTER.schemaChange(withKeyspace("CREATE INDEX ON
%s.single_predicate(ck1) USING 'sai'"));
- SAIUtil.waitForIndexQueryable(CLUSTER, KEYSPACE);
-
- CLUSTER.get(1).executeInternal(withKeyspace("INSERT INTO
%s.single_predicate (pk0, ck0, ck1, s0, s1, v0) " +
- "VALUES (0, 1, 2, 3, 4, 5)
USING TIMESTAMP 1"));
- CLUSTER.get(2).executeInternal(withKeyspace("UPDATE
%s.single_predicate USING TIMESTAMP 2 SET s0 = 6, s1 = 7, v0 = 8 " +
- "WHERE pk0 = 0 AND ck0 =
9 AND ck1 = 10"));
-
- // Node 2 will not produce a match for the static row. Make sure that
replica filtering protection does not
- // fetch the entire partition, which could let non-matching rows slip
through combined with the fact that we
- // don't post-filter at the coordinator with no regular column
predicates in the query.
- String select = withKeyspace("SELECT pk0, ck0, ck1, s0, s1 FROM
%s.single_predicate WHERE ck1 = 2 ALLOW FILTERING");
- assertRows(CLUSTER.coordinator(1).execute(select,
ConsistencyLevel.ALL), row(0, 1, 2, 6, 7));
- }
-
@Test
public void shouldDegradeToUnionOnSingleStatic()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]