Fix potentially returning deleted row with range tombstones (2.1 version) patch by slebresne; reviewed by thobbs for CASSANDRA-8558
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1c9c47d2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1c9c47d2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1c9c47d2 Branch: refs/heads/cassandra-2.1 Commit: 1c9c47d26f2da98d1a923254858bcde550122445 Parents: 19c54a5 Author: Sylvain Lebresne <[email protected]> Authored: Wed Jan 14 11:40:57 2015 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Wed Jan 14 11:40:57 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/SelectStatement.java | 6 +-- .../apache/cassandra/db/AtomDeserializer.java | 34 +++++++++++---- .../db/columniterator/IndexedSliceReader.java | 46 ++++++++++++++++++-- .../db/columniterator/SSTableNamesIterator.java | 25 ++++++++--- .../cassandra/db/composites/CellNameType.java | 2 +- .../cassandra/cql3/RangeDeletionTest.java | 35 +++++++++++++++ 7 files changed, 129 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e070eaf..175a78a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -52,6 +52,7 @@ * Log failed host when preparing incremental repair (CASSANDRA-8228) * Force config client mode in CQLSSTableWriter (CASSANDRA-8281) Merged from 2.0: + * Fix potentially returning deleted rows with range tombstone (CASSANDRA-8558) * Check for available disk space before starting a compaction (CASSANDRA-8562) * Fix DISTINCT queries with LIMITs or paging when some partitions contain only tombstones (CASSANDRA-8490) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 92a9579..4ef554d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -844,7 +844,7 @@ public class SelectStatement implements CQLStatement // For composites, if there was preceding component and we're computing the end, we must change the last component // End-Of-Component, otherwise we would be selecting only one record. Composite prefix = builder.build(); - return Collections.singletonList(!prefix.isEmpty() && eocBound == Bound.END ? prefix.end() : prefix); + return Collections.singletonList(!prefix.isEmpty() && eocBound == Bound.END ? prefix.end() : prefix.start()); } if (r.isSlice()) { @@ -869,7 +869,7 @@ public class SelectStatement implements CQLStatement throw new InvalidRequestException(String.format("Invalid null clustering key part %s", def.name)); Composite prefix = builder.buildWith(val); // See below for why this - s.add((eocBound == Bound.END && builder.remainingCount() > 0) ? prefix.end() : prefix); + s.add((eocBound == Bound.END && builder.remainingCount() > 0) ? prefix.end() : prefix.start()); } return new ArrayList<>(s); } @@ -887,7 +887,7 @@ public class SelectStatement implements CQLStatement // case using the eoc would be bad, since for the random partitioner we have no guarantee that // prefix.end() will sort after prefix (see #5240). Composite prefix = builder.build(); - return Collections.singletonList(eocBound == Bound.END && builder.remainingCount() > 0 ? prefix.end() : prefix); + return Collections.singletonList(eocBound == Bound.END && builder.remainingCount() > 0 ? prefix.end() : prefix.start()); } private static Composite.EOC eocForRelation(Operator op) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/AtomDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AtomDeserializer.java b/src/java/org/apache/cassandra/db/AtomDeserializer.java index 799ed0e..a103647 100644 --- a/src/java/org/apache/cassandra/db/AtomDeserializer.java +++ b/src/java/org/apache/cassandra/db/AtomDeserializer.java @@ -42,6 +42,10 @@ public class AtomDeserializer private final int expireBefore; private final Descriptor.Version version; + // The "flag" for the next name (which correspond to the "masks" in ColumnSerializer) if it has been + // read already, Integer.MIN_VALUE otherwise; + private int nextFlags = Integer.MIN_VALUE; + public AtomDeserializer(CellNameType type, DataInput in, ColumnSerializer.Flag flag, int expireBefore, Descriptor.Version version) { this.type = type; @@ -82,17 +86,30 @@ public class AtomDeserializer } /** + * Returns whether the next atom is a range tombstone or not. + * + * Please note that this should only be called after compareNextTo() has been called. + */ + public boolean nextIsRangeTombstone() throws IOException + { + nextFlags = in.readUnsignedByte(); + return (nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0; + } + + /** * Returns the next atom. */ public OnDiskAtom readNext() throws IOException { Composite name = nameDeserializer.readNext(); assert !name.isEmpty(); // This would imply hasNext() hasn't been called - int b = in.readUnsignedByte(); - if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0) - return type.rangeTombstoneSerializer().deserializeBody(in, name, version); - else - return type.columnSerializer().deserializeColumnBody(in, (CellName)name, b, flag, expireBefore); + + nextFlags = nextFlags == Integer.MIN_VALUE ? in.readUnsignedByte() : nextFlags; + OnDiskAtom atom = (nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0 + ? type.rangeTombstoneSerializer().deserializeBody(in, name, version) + : type.columnSerializer().deserializeColumnBody(in, (CellName)name, nextFlags, flag, expireBefore); + nextFlags = Integer.MIN_VALUE; + return atom; } /** @@ -101,10 +118,11 @@ public class AtomDeserializer public void skipNext() throws IOException { nameDeserializer.skipNext(); - int b = in.readUnsignedByte(); - if ((b & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0) + nextFlags = nextFlags == Integer.MIN_VALUE ? in.readUnsignedByte() : nextFlags; + if ((nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0) type.rangeTombstoneSerializer().skipBody(in, version); else - type.columnSerializer().skipColumnBody(in, b); + type.columnSerializer().skipColumnBody(in, nextFlags); + nextFlags = Integer.MIN_VALUE; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java index 7012321..924e9bc 100644 --- a/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java +++ b/src/java/org/apache/cassandra/db/columniterator/IndexedSliceReader.java @@ -290,8 +290,13 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA // If we read blocks in reversed disk order, we may have columns from the previous block to handle. // Note that prefetched keeps columns in reversed disk order. + // Also note that Range Tombstone handling is a bit tricky, because we may run into range tombstones + // that cover a slice *after* we've move to the previous slice. To keep it simple, we simply include + // every RT in prefetched: it's only slightly inefficient to do so and there is only so much RT that + // can be mistakenly added this way. if (reversed && !prefetched.isEmpty()) { + // Whether we've found anything to return in prefetched boolean gotSome = false; // Avoids some comparison when we know it's not useful boolean inSlice = false; @@ -303,8 +308,22 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA if (isColumnBeforeSliceStart(prefetchedCol)) { inSlice = false; - if (!setNextSlice()) - return false; + + // As explained above, we add RT unconditionally + if (prefetchedCol instanceof RangeTombstone) + { + blockColumns.addLast(prefetched.poll()); + gotSome = true; + continue; + } + + // Otherwise, we either move to the next slice or, if we have none (which can happen + // because we unwind prefetched no matter what due to RT), we skip the cell + if (hasMoreSlice()) + setNextSlice(); + else + prefetched.poll(); + } // col is within slice, all columns // (we go in reverse, so as soon as we are in a slice, no need to check @@ -374,6 +393,16 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA Composite start = currentStart(); if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0) { + // If it's a rangeTombstone, then we need to read it and include it unless it's end + // stops before our slice start. + if (deserializer.nextIsRangeTombstone()) + { + RangeTombstone rt = (RangeTombstone)deserializer.readNext(); + if (comparator.compare(rt.max, start) >= 0) + addColumn(rt); + continue; + } + if (reversed) { // the next slice select columns that are before the current one, so it may @@ -451,7 +480,18 @@ class IndexedSliceReader extends AbstractIterator<OnDiskAtom> implements OnDiskA Composite start = currentStart(); if (!inSlice && !start.isEmpty() && deserializer.compareNextTo(start) < 0) { - deserializer.skipNext(); + // If it's a rangeTombstone, then we need to read it and include it unless it's end + // stops before our slice start. Otherwise, we can skip it. + if (deserializer.nextIsRangeTombstone()) + { + RangeTombstone rt = (RangeTombstone)deserializer.readNext(); + if (comparator.compare(rt.max, start) >= 0) + addColumn(rt); + } + else + { + deserializer.skipNext(); + } continue; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java index 224b63f..221f499 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableNamesIterator.java @@ -214,16 +214,31 @@ public class SSTableNamesIterator extends AbstractIterator<OnDiskAtom> implement while (file.bytesPastMark(mark) < indexInfo.width && nextToFetch != null) { int cmp = deserializer.compareNextTo(nextToFetch); - if (cmp == 0) + if (cmp < 0) + { + // If it's a rangeTombstone, then we need to read it and include + // it if it includes our target. Otherwise, we can skip it. + if (deserializer.nextIsRangeTombstone()) + { + RangeTombstone rt = (RangeTombstone)deserializer.readNext(); + if (comparator.compare(rt.max, nextToFetch) >= 0) + result.add(rt); + } + else + { + deserializer.skipNext(); + } + } + else if (cmp == 0) { nextToFetch = toFetch.hasNext() ? toFetch.next() : null; result.add(deserializer.readNext()); - continue; } - - deserializer.skipNext(); - if (cmp > 0) + else + { + deserializer.skipNext(); nextToFetch = toFetch.hasNext() ? toFetch.next() : null; + } } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/src/java/org/apache/cassandra/db/composites/CellNameType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/composites/CellNameType.java b/src/java/org/apache/cassandra/db/composites/CellNameType.java index 1e87296..7b4fd36 100644 --- a/src/java/org/apache/cassandra/db/composites/CellNameType.java +++ b/src/java/org/apache/cassandra/db/composites/CellNameType.java @@ -197,7 +197,7 @@ public interface CellNameType extends CType public boolean hasUnprocessed() throws IOException; /** - * Comparare the next name to read to the provided Composite. + * Compare the next name to read to the provided Composite. * This does not consume the next name. */ public int compareNextTo(Composite composite) throws IOException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1c9c47d2/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java b/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java new file mode 100644 index 0000000..b31d0c2 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/RangeDeletionTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import org.junit.Test; + +public class RangeDeletionTest extends CQLTester +{ + @Test + public void testCassandra8558() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b, c))"); + + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 1, 1); + flush(); + execute("DELETE FROM %s WHERE a=? AND b=?", 1, 1); + flush(); + assertEmpty(execute("SELECT * FROM %s WHERE a=? AND b=? AND c=?", 1, 1, 1)); + } +}
