Repository: cassandra Updated Branches: refs/heads/cassandra-3.11 3e95c5b0c -> a85eeefe8 refs/heads/trunk ea662ce21 -> 9330409ac
Bugs handling range tombstones in the sstable iterators patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13340 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a85eeefe Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a85eeefe Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a85eeefe Branch: refs/heads/cassandra-3.11 Commit: a85eeefe88eb036a9cd9fa85a1c8c31c2bfad78a Parents: 3e95c5b Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Thu Mar 16 17:05:15 2017 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Mar 23 17:17:16 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ClusteringPrefix.java | 2 +- .../cassandra/db/UnfilteredDeserializer.java | 1 - .../db/columniterator/SSTableIterator.java | 11 +- .../columniterator/SSTableReversedIterator.java | 126 +++++++++++++++---- .../cql3/validation/operations/DeleteTest.java | 70 +++++++++++ 6 files changed, 181 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c58fad8..728e3e7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -13,6 +13,7 @@ * NoReplicationTokenAllocator should work with zero replication factor (CASSANDRA-12983) * Address message coalescing regression (CASSANDRA-12676) Merged from 3.0: + * Bugs handling range tombstones in the sstable iterators (CASSANDRA-13340) * Fix CONTAINS filtering for null collections (CASSANDRA-13246) * Applying: Use a unique metric reservoir per test run when using Cassandra-wide metrics residing in MBeans (CASSANDRA-13216) * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/ClusteringPrefix.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java b/src/java/org/apache/cassandra/db/ClusteringPrefix.java index 340e237..1ecc92d 100644 --- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java +++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java @@ -482,7 +482,7 @@ public interface ClusteringPrefix extends IMeasurableMemory, Clusterable } if (bound.size() == nextSize) - return nextKind.compareTo(bound.kind()); + return Kind.compare(nextKind, bound.kind()); // We know that we'll have exited already if nextSize < bound.size return -bound.kind().comparedToClustering; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java index 79b8636..b977907 100644 --- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java +++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java @@ -690,6 +690,5 @@ public abstract class UnfilteredDeserializer } } } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java index b3c2e94..e21bd72 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java @@ -138,7 +138,14 @@ public class SSTableIterator extends AbstractSSTableIterator { assert deserializer != null; - if (!deserializer.hasNext() || deserializer.compareNextTo(end) > 0) + // We use a same reasoning as in handlePreSliceData regarding the strictness of the inequality below. + // We want to exclude deserialized unfiltered equal to end, because 1) we won't miss any rows since those + // woudn't be equal to a slice bound and 2) a end bound can be equal to a start bound + // (EXCL_END(x) == INCL_START(x) for instance) and in that case we don't want to return start bound because + // it's fundamentally excluded. And if the bound is a end (for a range tombstone), it means it's exactly + // our slice end, but in that case we will properly close the range tombstone anyway as part of our "close + // an open marker" code in hasNextInterna + if (!deserializer.hasNext() || deserializer.compareNextTo(end) >= 0) return null; Unfiltered next = deserializer.readNext(); @@ -281,7 +288,7 @@ public class SSTableIterator extends AbstractSSTableIterator if (indexState.isDone() || indexState.currentBlockIdx() > lastBlockIdx || !deserializer.hasNext() - || (indexState.currentBlockIdx() == lastBlockIdx && deserializer.compareNextTo(end) > 0)) + || (indexState.currentBlockIdx() == lastBlockIdx && deserializer.compareNextTo(end) >= 0)) return null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java index c74b5db..c4bcd9e 100644 --- a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java +++ b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java @@ -28,6 +28,7 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileHandle; +import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.btree.BTree; /** @@ -81,6 +82,11 @@ public class SSTableReversedIterator extends AbstractSSTableIterator protected ReusablePartitionData buffer; protected Iterator<Unfiltered> iterator; + // Set in loadFromDisk () and used in setIterator to handle range tombstone extending on multiple index block. See + // loadFromDisk for details. Note that those are always false for non-indexed readers. + protected boolean skipFirstIteratedItem; + protected boolean skipLastIteratedItem; + private ReverseReader(FileDataInput file, boolean shouldCloseFile) { super(file, shouldCloseFile); @@ -123,8 +129,8 @@ public class SSTableReversedIterator extends AbstractSSTableIterator buffer = createBuffer(1); // Note that we can reuse that buffer between slices (we could alternatively re-read from disk // every time, but that feels more wasteful) so we want to include everything from the beginning. - // We can stop at the last slice end however since any following slice will be before that. - loadFromDisk(null, slice.end(), true); + // We can stop at the slice end however since any following slice will be before that. + loadFromDisk(null, slice.end(), true, false, false); } setIterator(slice); } @@ -133,6 +139,15 @@ public class SSTableReversedIterator extends AbstractSSTableIterator { assert buffer != null; iterator = buffer.built.unfilteredIterator(columns, Slices.with(metadata().comparator, slice), true); + + if (!iterator.hasNext()) + return; + + if (skipFirstIteratedItem) + iterator.next(); + + if (skipLastIteratedItem) + iterator = new SkipLastIterator(iterator); } protected boolean hasNextInternal() throws IOException @@ -158,9 +173,18 @@ public class SSTableReversedIterator extends AbstractSSTableIterator // Reads the unfiltered from disk and load them into the reader buffer. It stops reading when either the partition // is fully read, or when stopReadingDisk() returns true. - protected void loadFromDisk(ClusteringBound start, ClusteringBound end, boolean includeFirst) throws IOException + protected void loadFromDisk(ClusteringBound start, + ClusteringBound end, + boolean includeFirst, + boolean hasPreviousBlock, + boolean hasNextBlock) throws IOException { + // start != null means it's the block covering the beginning of the slice, so it has to be the last block for this slice. + assert start == null || !hasNextBlock; + buffer.reset(); + skipFirstIteratedItem = false; + skipLastIteratedItem = false; boolean isFirst = true; @@ -177,16 +201,30 @@ public class SSTableReversedIterator extends AbstractSSTableIterator } } - // If we have an open marker, it's either one from what we just skipped (if start != null), or it's from the previous index block. + // If we have an open marker, it's either one from what we just skipped or it's one that open in the next (or + // one of the next) index block (if openMarker == openMarkerAtStartOfBlock). if (openMarker != null) { + // We have to feed a marker to the buffer, because that marker is likely to be close later and ImmtableBTreePartition + // doesn't take kindly to marker that comes without their counterpart. If that's the last block we're gonna read (for + // the current slice at least) it's easy because we'll want to return that open marker at the end of the data in this + // block anyway, so we have nothing more to do than adding it to the buffer. + // If it's not the last block however, in which case we know we'll have start == null, it means this marker is really + // open in a next block and so while we do need to add it the buffer for the reason mentioned above, we don't + // want to "return" it just yet, we'll wait until we reach it in the next blocks. That's why we trigger + // skipLastIteratedItem in that case (this is first item of the block, but we're iterating in reverse order + // so it will be last returned by the iterator). ClusteringBound markerStart = start == null ? ClusteringBound.BOTTOM : start; buffer.add(new RangeTombstoneBoundMarker(markerStart, openMarker)); + if (hasNextBlock) + skipLastIteratedItem = true; } // Now deserialize everything until we reach our requested end (if we have one) + // See SSTableIterator.ForwardRead.computeNext() for why this is a strict inequality below: this is the same + // reasoning here. while (deserializer.hasNext() - && (end == null || deserializer.compareNextTo(end) <= 0) + && (end == null || deserializer.compareNextTo(end) < 0) && !stopReadingDisk()) { Unfiltered unfiltered = deserializer.readNext(); @@ -202,9 +240,18 @@ public class SSTableReversedIterator extends AbstractSSTableIterator // If we have an open marker, we should close it before finishing if (openMarker != null) { - // If we have no end and still an openMarker, this means we're indexed and the marker is closed in a following block. + // This is the reverse problem than the one at the start of the block. Namely, if it's the first block + // we deserialize for the slice (the one covering the slice end basically), then it's easy, we just want + // to add the close marker to the buffer and return it normally. + // If it's note our first block (for the slice) however, it means that marker closed in a previously read + // block and we have already returned it. So while we should still add it to the buffer for the sake of + // not breaking ImmutableBTreePartition, we should skip it when returning from the iterator, hence the + // skipFirstIteratedItem (this is the last item of the block, but we're iterating in reverse order so it will + // be the first returned by the iterator). ClusteringBound markerEnd = end == null ? ClusteringBound.TOP : end; buffer.add(new RangeTombstoneBoundMarker(markerEnd, getAndClearOpenMarker())); + if (hasPreviousBlock) + skipFirstIteratedItem = true; } buffer.build(); @@ -267,13 +314,13 @@ public class SSTableReversedIterator extends AbstractSSTableIterator if (startIdx >= indexState.blocksCount()) startIdx = indexState.blocksCount() - 1; - if (startIdx != indexState.currentBlockIdx()) - { - indexState.setToBlock(startIdx); - readCurrentBlock(true); - } + // Note that even if we were already set on the proper block (which would happen if the previous slice + // requested ended on the same block this one start), we can't reuse it because when reading the previous + // slice we've only read that block from the previous slice start. Re-reading also handles + // skipFirstIteratedItem/skipLastIteratedItem that we would need to handle otherwise. + indexState.setToBlock(startIdx); - setIterator(slice); + readCurrentBlock(false, startIdx != lastBlockIdx); } @Override @@ -282,15 +329,14 @@ public class SSTableReversedIterator extends AbstractSSTableIterator if (super.hasNextInternal()) return true; - // We have nothing more for our current block, move the previous one. - int previousBlockIdx = indexState.currentBlockIdx() - 1; - if (previousBlockIdx < 0 || previousBlockIdx < lastBlockIdx) + // We have nothing more for our current block, move the next one (so the one before on disk). + int nextBlockIdx = indexState.currentBlockIdx() - 1; + if (nextBlockIdx < 0 || nextBlockIdx < lastBlockIdx) return false; // The slice start can be in - indexState.setToBlock(previousBlockIdx); - readCurrentBlock(false); - setIterator(slice); + indexState.setToBlock(nextBlockIdx); + readCurrentBlock(true, nextBlockIdx != lastBlockIdx); // since that new block is within the bounds we've computed in setToSlice(), we know there will // always be something matching the slice unless we're on the lastBlockIdx (in which case there // may or may not be results, but if there isn't, we're done for the slice). @@ -300,33 +346,42 @@ public class SSTableReversedIterator extends AbstractSSTableIterator /** * Reads the current block, the last one we've set. * - * @param canIncludeSliceEnd whether the block can include the slice end. + * @param hasPreviousBlock is whether we have already read a previous block for the current slice. + * @param hasNextBlock is whether we have more blocks to read for the current slice. */ - private void readCurrentBlock(boolean canIncludeSliceEnd) throws IOException + private void readCurrentBlock(boolean hasPreviousBlock, boolean hasNextBlock) throws IOException { if (buffer == null) buffer = createBuffer(indexState.blocksCount()); int currentBlock = indexState.currentBlockIdx(); - boolean canIncludeSliceStart = currentBlock == lastBlockIdx; + // The slice start (resp. slice end) is only meaningful on the last (resp. first) block read (since again, + // we read blocks in reverse order). + boolean canIncludeSliceStart = !hasNextBlock; + boolean canIncludeSliceEnd = !hasPreviousBlock; // When dealing with old format sstable, we have the problem that a row can span 2 index block, i.e. it can // start at the end of a block and end at the beginning of the next one. That's not a problem per se for // UnfilteredDeserializer.OldFormatSerializer, since it always read rows entirely, even if they span index // blocks, but as we reading index block in reverse we must be careful to not read the end of the row at // beginning of a block before we're reading the beginning of that row. So what we do is that if we detect - // that the row starting this block is also the row ending the previous one, we skip that first result and - // let it be read when we'll read the previous block. + // that the row starting this block is also the row ending the next one we're read (previous on disk), then + // we'll skip that first result and let it be read with the next block. boolean includeFirst = true; if (!sstable.descriptor.version.storeRows() && currentBlock > 0) { - ClusteringPrefix lastOfPrevious = indexState.index(currentBlock - 1).lastName; + ClusteringPrefix lastOfNext = indexState.index(currentBlock - 1).lastName; ClusteringPrefix firstOfCurrent = indexState.index(currentBlock).firstName; - includeFirst = metadata().comparator.compare(lastOfPrevious, firstOfCurrent) != 0; + includeFirst = metadata().comparator.compare(lastOfNext, firstOfCurrent) != 0; } - loadFromDisk(canIncludeSliceStart ? slice.start() : null, canIncludeSliceEnd ? slice.end() : null, includeFirst); + loadFromDisk(canIncludeSliceStart ? slice.start() : null, + canIncludeSliceEnd ? slice.end() : null, + includeFirst, + hasPreviousBlock, + hasNextBlock); + setIterator(slice); } @Override @@ -382,4 +437,23 @@ public class SSTableReversedIterator extends AbstractSSTableIterator deletionBuilder = null; } } + + private static class SkipLastIterator extends AbstractIterator<Unfiltered> + { + private final Iterator<Unfiltered> iterator; + + private SkipLastIterator(Iterator<Unfiltered> iterator) + { + this.iterator = iterator; + } + + protected Unfiltered computeNext() + { + if (!iterator.hasNext()) + return endOfData(); + + Unfiltered next = iterator.next(); + return iterator.hasNext() ? next : endOfData(); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java index 4694ffc..6edca38 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java @@ -1345,6 +1345,76 @@ public class DeleteTest extends CQLTester assertTrue("The memtable should be empty but is not", isMemtableEmpty()); } + @Test + public void testQueryingOnRangeTombstoneBoundForward() throws Throwable + { + createTable("CREATE TABLE %s (k text, i int, PRIMARY KEY (k, i))"); + + execute("INSERT INTO %s (k, i) VALUES (?, ?)", "a", 0); + + execute("DELETE FROM %s WHERE k = ? AND i > ? AND i <= ?", "a", 0, 1); + execute("DELETE FROM %s WHERE k = ? AND i > ?", "a", 1); + + flush(); + + assertEmpty(execute("SELECT i FROM %s WHERE k = ? AND i = ?", "a", 1)); + } + + @Test + public void testQueryingOnRangeTombstoneBoundReverse() throws Throwable + { + createTable("CREATE TABLE %s (k text, i int, PRIMARY KEY (k, i))"); + + execute("INSERT INTO %s (k, i) VALUES (?, ?)", "a", 0); + + execute("DELETE FROM %s WHERE k = ? AND i > ? AND i <= ?", "a", 0, 1); + execute("DELETE FROM %s WHERE k = ? AND i > ?", "a", 1); + + flush(); + + assertRows(execute("SELECT i FROM %s WHERE k = ? AND i <= ? ORDER BY i DESC", "a", 1), row(0)); + } + + @Test + public void testReverseQueryWithRangeTombstoneOnMultipleBlocks() throws Throwable + { + createTable("CREATE TABLE %s (k text, i int, v text, PRIMARY KEY (k, i))"); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 1200; i++) + sb.append('a'); + String longText = sb.toString(); + + for (int i = 0; i < 10; i++) + execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 3", "a", i*2, longText); + + execute("DELETE FROM %s USING TIMESTAMP 1 WHERE k = ? AND i >= ? AND i <= ?", "a", 12, 16); + + flush(); + + execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", "a", 3, longText); + execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 3", "a", 11, longText); + execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", "a", 15, longText); + execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", "a", 17, longText); + + flush(); + + assertRows(execute("SELECT i FROM %s WHERE k = ? ORDER BY i DESC", "a"), + row(18), + row(17), + row(16), + row(14), + row(12), + row(11), + row(10), + row(8), + row(6), + row(4), + row(3), + row(2), + row(0)); + } + /** * Test for CASSANDRA-13305 */