This is an automated email from the ASF dual-hosted git repository.
samt pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 092915a Don't skip sstables with partition deletes
092915a is described below
commit 092915ad19d25c1ba93f7968210b88fb6e4b9180
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Tue Apr 7 16:35:50 2020 +0100
Don't skip sstables with partition deletes
Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-15690
---
CHANGES.txt | 1 +
.../cassandra/db/SinglePartitionReadCommand.java | 103 +++++++++------------
.../distributed/test/SimpleReadWriteTest.java | 101 ++++++++++++++++++++
3 files changed, 148 insertions(+), 57 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 5af0ef3..91b8241 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.21
+ * Don't skip sstables in slice queries based only on local min/max/deletion
timestamp (CASSANDRA-15690)
* Memtable memory allocations may deadlock (CASSANDRA-15367)
* Run evictFromMembership in GossipStage (CASSANDRA-15592)
Merged from 2.2:
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 15b74d8..2e014ba 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -705,92 +705,81 @@ public class SinglePartitionReadCommand extends
ReadCommand
* We can't eliminate full sstables based on the timestamp of what
we've already read like
* in collectTimeOrderedData, but we still want to eliminate
sstable whose maxTimestamp < mostRecentTombstone
* we've read. We still rely on the sstable ordering by
maxTimestamp since if
- * maxTimestamp_s1 > maxTimestamp_s0,
+ * maxTimestamp_s1 < maxTimestamp_s0,
* we're guaranteed that s1 cannot have a row tombstone such that
* timestamp(tombstone) > maxTimestamp_s0
* since we necessarily have
* timestamp(tombstone) <= maxTimestamp_s1
- * In other words, iterating in maxTimestamp order allow to do our
mostRecentPartitionTombstone elimination
- * in one pass, and minimize the number of sstables for which we
read a partition tombstone.
+ * In other words, iterating in descending maxTimestamp order
allow to do our mostRecentPartitionTombstone
+ * elimination in one pass, and minimize the number of sstables
for which we read a partition tombstone.
*/
Collections.sort(view.sstables,
SSTableReader.maxTimestampComparator);
- List<SSTableReader> skippedSSTables = null;
long mostRecentPartitionTombstone = Long.MIN_VALUE;
- long minTimestamp = Long.MAX_VALUE;
int nonIntersectingSSTables = 0;
+ int includedDueToTombstones = 0;
SSTableReadMetricsCollector metricsCollector = new
SSTableReadMetricsCollector();
for (SSTableReader sstable : view.sstables)
{
- minTimestamp = Math.min(minTimestamp,
sstable.getMinTimestamp());
// if we've already seen a partition tombstone with a
timestamp greater
// than the most recent update to this sstable, we can skip it
if (sstable.getMaxTimestamp() < mostRecentPartitionTombstone)
break;
- if (!shouldInclude(sstable))
- {
- nonIntersectingSSTables++;
- // sstable contains no tombstone if maxLocalDeletionTime
== Integer.MAX_VALUE, so we can safely skip those entirely
- if (sstable.hasTombstones())
- {
- if (skippedSSTables == null)
- skippedSSTables = new ArrayList<>();
- skippedSSTables.add(sstable);
- }
- continue;
- }
-
- if (!sstable.isRepaired())
- oldestUnrepairedTombstone =
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
-
- // 'iter' is added to iterators which is closed on exception,
or through the closing of the final merged iterator
- @SuppressWarnings("resource")
- UnfilteredRowIterator iter = filter.filter(
- sstable.iterator(partitionKey(),
- columnFilter(),
- filter.isReversed(),
- isForThrift(),
- metricsCollector)
- );
-
- if (isForThrift())
- iter = ThriftResultsMerger.maybeWrap(iter, nowInSec());
-
- iterators.add(RTBoundValidator.validate(iter,
RTBoundValidator.Stage.SSTABLE, false));
-
- mostRecentPartitionTombstone =
Math.max(mostRecentPartitionTombstone,
iter.partitionLevelDeletion().markedForDeleteAt());
- }
-
- int includedDueToTombstones = 0;
- // Check for partition tombstones in the skipped sstables
- if (skippedSSTables != null)
- {
- for (SSTableReader sstable : skippedSSTables)
+ if (shouldInclude(sstable))
{
- if (sstable.getMaxTimestamp() <= minTimestamp)
- continue;
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone =
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- @SuppressWarnings("resource") // 'iter' is either closed
right away, or added to iterators which is close on exception, or through the
closing of the final merged iterator
+ // 'iter' is added to iterators which is closed on
exception, or through the closing of the final merged iterator
+ @SuppressWarnings("resource")
UnfilteredRowIterator iter =
filter.filter(sstable.iterator(partitionKey(),
columnFilter(),
filter.isReversed(),
isForThrift(),
metricsCollector));
- if (iter.partitionLevelDeletion().markedForDeleteAt() >
minTimestamp)
- {
- iterators.add(iter);
- if (!sstable.isRepaired())
- oldestUnrepairedTombstone =
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
- includedDueToTombstones++;
- }
- else
+ if (isForThrift())
+ iter = ThriftResultsMerger.maybeWrap(iter, nowInSec());
+
+ iterators.add(RTBoundValidator.validate(iter,
RTBoundValidator.Stage.SSTABLE, false));
+ mostRecentPartitionTombstone =
Math.max(mostRecentPartitionTombstone,
+
iter.partitionLevelDeletion().markedForDeleteAt());
+ }
+ else
+ {
+
+ nonIntersectingSSTables++;
+ // sstable contains no tombstone if maxLocalDeletionTime
== Integer.MAX_VALUE, so we can safely skip those entirely
+ if (sstable.hasTombstones())
{
- iter.close();
+ // 'iter' is added to iterators which is closed on
exception, or through the closing of the final merged iterator
+ @SuppressWarnings("resource")
+ UnfilteredRowIterator iter =
filter.filter(sstable.iterator(partitionKey(),
+
columnFilter(),
+
filter.isReversed(),
+
isForThrift(),
+
metricsCollector));
+ // if the sstable contains a partition delete, then we
must include it regardless of whether it
+ // shadows any other data seen locally as we can't
guarantee that other replicas have seen it
+ if (!iter.partitionLevelDeletion().isLive())
+ {
+ includedDueToTombstones++;
+ iterators.add(RTBoundValidator.validate(iter,
RTBoundValidator.Stage.SSTABLE, false));
+ if (!sstable.isRepaired())
+ oldestUnrepairedTombstone =
Math.min(oldestUnrepairedTombstone, sstable.getMinLocalDeletionTime());
+ mostRecentPartitionTombstone =
Math.max(mostRecentPartitionTombstone,
+
iter.partitionLevelDeletion().markedForDeleteAt());
+ }
+ else
+ {
+ iter.close();
+ }
+
}
}
}
+
if (Tracing.isTracing())
Tracing.trace("Skipped {}/{} non-slice-intersecting sstables,
included {} due to tombstones",
nonIntersectingSSTables, view.sstables.size(),
includedDueToTombstones);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index f1f8674..75e5ba9 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -1,12 +1,16 @@
package org.apache.cassandra.distributed.test;
+import java.util.Set;
+
import org.junit.Assert;
import org.junit.Test;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import static org.junit.Assert.assertEquals;
@@ -269,6 +273,103 @@ public class SimpleReadWriteTest extends
SharedClusterTestBase
assertEquals(100, readCount1);
}
+
+ @Test
+ public void skippedSSTableWithPartitionDeletionTest() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY(pk, ck))");
+ // insert a partition tombstone on node 1, the deletion timestamp
should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl
USING TIMESTAMP 1 WHERE pk = 0");
+ // and a row from a different partition, to provide the sstable's
min/max clustering
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 2");
+ cluster.get(1).flush(KEYSPACE);
+ // expect a single sstable, where minTimestamp equals the
timestamp of the partition delete
+ cluster.get(1).runOnInstance(() -> {
+ Set<SSTableReader> sstables = Keyspace.open(KEYSPACE)
+
.getColumnFamilyStore("tbl")
+ .getLiveSSTables();
+ assertEquals(1, sstables.size());
+ assertEquals(1, sstables.iterator().next().getMinTimestamp());
+ });
+
+ // on node 2, add a row for the deleted partition with an older
timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+
+ Object[][] rows = cluster.coordinator(1)
+ .execute("SELECT * FROM " + KEYSPACE +
".tbl WHERE pk=0 AND ck > 5",
+ ConsistencyLevel.ALL);
+ assertEquals(0, rows.length);
+ }
+ }
+
+ @Test
+ public void
skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY(pk, ck))");
+ // insert a partition tombstone on node 1, the deletion timestamp
should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl
USING TIMESTAMP 1 WHERE pk = 0");
+ // and a row from a different partition, to provide the sstable's
min/max clustering
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 1");
+ cluster.get(1).flush(KEYSPACE);
+ // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped
due to its min/max clusterings. Now we
+ // insert a row which is not shadowed by the partition delete and
flush to a second sstable. Importantly,
+ // this sstable's minTimestamp is > than the maxTimestamp of the
first sstable. This would cause the first
+ // sstable not to be reincluded in the merge input, but we can't
really make that decision as we don't
+ // know what data and/or tombstones are present on other nodes
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+ cluster.get(1).flush(KEYSPACE);
+
+ // on node 2, add a row for the deleted partition with an older
timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+ Object[][] rows = cluster.coordinator(1)
+ .execute("SELECT * FROM " + KEYSPACE +
".tbl WHERE pk=0 AND ck > 5",
+ ConsistencyLevel.ALL);
+ // we expect that the row from node 2 (0, 10, 10) was shadowed by
the partition delete, but the row from
+ // node 1 (0, 6, 6) was not.
+ assertRows(rows, new Object[] {0, 6 ,6});
+ }
+ }
+
+ @Test
+ public void
skippedSSTableWithPartitionDeletionShadowingDataOnAnotherNode2() throws
Throwable
+ {
+ // don't not add skipped sstables back just because the partition
delete ts is < the local min ts
+
+ try (Cluster cluster = init(Cluster.create(2)))
+ {
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v int, PRIMARY KEY(pk, ck))");
+ // insert a partition tombstone on node 1, the deletion timestamp
should end up being the sstable's minTimestamp
+ cluster.get(1).executeInternal("DELETE FROM " + KEYSPACE + ".tbl
USING TIMESTAMP 1 WHERE pk = 0");
+ // and a row from a different partition, to provide the sstable's
min/max clustering
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (1, 1, 1) USING TIMESTAMP 3");
+ cluster.get(1).flush(KEYSPACE);
+ // sstable 1 has minTimestamp == maxTimestamp == 1 and is skipped
due to its min/max clusterings. Now we
+ // insert a row which is not shadowed by the partition delete and
flush to a second sstable. The first sstable
+ // has a maxTimestamp > than the min timestamp of all sstables, so
it is a candidate for reinclusion to the
+ // merge. Hoever, the second sstable's minTimestamp is > than the
partition delete. This would cause the
+ // first sstable not to be reincluded in the merge input, but we
can't really make that decision as we don't
+ // know what data and/or tombstones are present on other nodes
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (0, 6, 6) USING TIMESTAMP 2");
+ cluster.get(1).flush(KEYSPACE);
+
+ // on node 2, add a row for the deleted partition with an older
timestamp than the deletion so it should be shadowed
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v) VALUES (0, 10, 10) USING TIMESTAMP 0");
+
+ Object[][] rows = cluster.coordinator(1)
+ .execute("SELECT * FROM " + KEYSPACE +
".tbl WHERE pk=0 AND ck > 5",
+ ConsistencyLevel.ALL);
+ // we expect that the row from node 2 (0, 10, 10) was shadowed by
the partition delete, but the row from
+ // node 1 (0, 6, 6) was not.
+ assertRows(rows, new Object[] {0, 6 ,6});
+ }
+ }
+
private long readCount(IInvokableInstance instance)
{
return instance.callOnInstance(() ->
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]