Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 3e95c5b0c -> a85eeefe8
  refs/heads/trunk ea662ce21 -> 9330409ac


Bugs handling range tombstones in the sstable iterators

patch by Sylvain Lebresne; reviewed by Branimir Lambov for CASSANDRA-13340


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a85eeefe
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a85eeefe
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a85eeefe

Branch: refs/heads/cassandra-3.11
Commit: a85eeefe88eb036a9cd9fa85a1c8c31c2bfad78a
Parents: 3e95c5b
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Thu Mar 16 17:05:15 2017 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Thu Mar 23 17:17:16 2017 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/db/ClusteringPrefix.java   |   2 +-
 .../cassandra/db/UnfilteredDeserializer.java    |   1 -
 .../db/columniterator/SSTableIterator.java      |  11 +-
 .../columniterator/SSTableReversedIterator.java | 126 +++++++++++++++----
 .../cql3/validation/operations/DeleteTest.java  |  70 +++++++++++
 6 files changed, 181 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c58fad8..728e3e7 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -13,6 +13,7 @@
  * NoReplicationTokenAllocator should work with zero replication factor 
(CASSANDRA-12983)
  * Address message coalescing regression (CASSANDRA-12676)
 Merged from 3.0:
+ * Bugs handling range tombstones in the sstable iterators (CASSANDRA-13340)
  * Fix CONTAINS filtering for null collections (CASSANDRA-13246)
  * Applying: Use a unique metric reservoir per test run when using 
Cassandra-wide metrics residing in MBeans (CASSANDRA-13216)
  * Propagate row deletions in 2i tables on upgrade (CASSANDRA-13320)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/ClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringPrefix.java 
b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
index 340e237..1ecc92d 100644
--- a/src/java/org/apache/cassandra/db/ClusteringPrefix.java
+++ b/src/java/org/apache/cassandra/db/ClusteringPrefix.java
@@ -482,7 +482,7 @@ public interface ClusteringPrefix extends 
IMeasurableMemory, Clusterable
             }
 
             if (bound.size() == nextSize)
-                return nextKind.compareTo(bound.kind());
+                return Kind.compare(nextKind, bound.kind());
 
             // We know that we'll have exited already if nextSize < bound.size
             return -bound.kind().comparedToClustering;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java 
b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 79b8636..b977907 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -690,6 +690,5 @@ public abstract class UnfilteredDeserializer
                 }
             }
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
index b3c2e94..e21bd72 100644
--- a/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
+++ b/src/java/org/apache/cassandra/db/columniterator/SSTableIterator.java
@@ -138,7 +138,14 @@ public class SSTableIterator extends 
AbstractSSTableIterator
         {
             assert deserializer != null;
 
-            if (!deserializer.hasNext() || deserializer.compareNextTo(end) > 0)
+            // We use a same reasoning as in handlePreSliceData regarding the 
strictness of the inequality below.
+            // We want to exclude deserialized unfiltered equal to end, 
because 1) we won't miss any rows since those
+            // woudn't be equal to a slice bound and 2) a end bound can be 
equal to a start bound
+            // (EXCL_END(x) == INCL_START(x) for instance) and in that case we 
don't want to return start bound because
+            // it's fundamentally excluded. And if the bound is a  end (for a 
range tombstone), it means it's exactly
+            // our slice end, but in that  case we will properly close the 
range tombstone anyway as part of our "close
+            // an open marker" code in hasNextInterna
+            if (!deserializer.hasNext() || deserializer.compareNextTo(end) >= 
0)
                 return null;
 
             Unfiltered next = deserializer.readNext();
@@ -281,7 +288,7 @@ public class SSTableIterator extends AbstractSSTableIterator
             if (indexState.isDone()
                 || indexState.currentBlockIdx() > lastBlockIdx
                 || !deserializer.hasNext()
-                || (indexState.currentBlockIdx() == lastBlockIdx && 
deserializer.compareNextTo(end) > 0))
+                || (indexState.currentBlockIdx() == lastBlockIdx && 
deserializer.compareNextTo(end) >= 0))
                 return null;
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index c74b5db..c4bcd9e 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.io.util.FileHandle;
+import org.apache.cassandra.utils.AbstractIterator;
 import org.apache.cassandra.utils.btree.BTree;
 
 /**
@@ -81,6 +82,11 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
         protected ReusablePartitionData buffer;
         protected Iterator<Unfiltered> iterator;
 
+        // Set in loadFromDisk () and used in setIterator to handle range 
tombstone extending on multiple index block. See
+        // loadFromDisk for details. Note that those are always false for 
non-indexed readers.
+        protected boolean skipFirstIteratedItem;
+        protected boolean skipLastIteratedItem;
+
         private ReverseReader(FileDataInput file, boolean shouldCloseFile)
         {
             super(file, shouldCloseFile);
@@ -123,8 +129,8 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
                 buffer = createBuffer(1);
                 // Note that we can reuse that buffer between slices (we could 
alternatively re-read from disk
                 // every time, but that feels more wasteful) so we want to 
include everything from the beginning.
-                // We can stop at the last slice end however since any 
following slice will be before that.
-                loadFromDisk(null, slice.end(), true);
+                // We can stop at the slice end however since any following 
slice will be before that.
+                loadFromDisk(null, slice.end(), true, false, false);
             }
             setIterator(slice);
         }
@@ -133,6 +139,15 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
         {
             assert buffer != null;
             iterator = buffer.built.unfilteredIterator(columns, 
Slices.with(metadata().comparator, slice), true);
+
+            if (!iterator.hasNext())
+                return;
+
+            if (skipFirstIteratedItem)
+                iterator.next();
+
+            if (skipLastIteratedItem)
+                iterator = new SkipLastIterator(iterator);
         }
 
         protected boolean hasNextInternal() throws IOException
@@ -158,9 +173,18 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
 
         // Reads the unfiltered from disk and load them into the reader 
buffer. It stops reading when either the partition
         // is fully read, or when stopReadingDisk() returns true.
-        protected void loadFromDisk(ClusteringBound start, ClusteringBound 
end, boolean includeFirst) throws IOException
+        protected void loadFromDisk(ClusteringBound start,
+                                    ClusteringBound end,
+                                    boolean includeFirst,
+                                    boolean hasPreviousBlock,
+                                    boolean hasNextBlock) throws IOException
         {
+            // start != null means it's the block covering the beginning of 
the slice, so it has to be the last block for this slice.
+            assert start == null || !hasNextBlock;
+
             buffer.reset();
+            skipFirstIteratedItem = false;
+            skipLastIteratedItem = false;
 
             boolean isFirst = true;
 
@@ -177,16 +201,30 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
                 }
             }
 
-            // If we have an open marker, it's either one from what we just 
skipped (if start != null), or it's from the previous index block.
+            // If we have an open marker, it's either one from what we just 
skipped or it's one that open in the next (or
+            // one of the next) index block (if openMarker == 
openMarkerAtStartOfBlock).
             if (openMarker != null)
             {
+                // We have to feed a marker to the buffer, because that marker 
is likely to be close later and ImmtableBTreePartition
+                // doesn't take kindly to marker that comes without their 
counterpart. If that's the last block we're gonna read (for
+                // the current slice at least) it's easy because we'll want to 
return that open marker at the end of the data in this
+                // block anyway, so we have nothing more to do than adding it 
to the buffer.
+                // If it's not the last block however, in which case we know 
we'll have start == null, it means this marker is really
+                // open in a next block and so while we do need to add it the 
buffer for the reason mentioned above, we don't
+                // want to "return" it just yet, we'll wait until we reach it 
in the next blocks. That's why we trigger
+                // skipLastIteratedItem in that case (this is first item of 
the block, but we're iterating in reverse order
+                // so it will be last returned by the iterator).
                 ClusteringBound markerStart = start == null ? 
ClusteringBound.BOTTOM : start;
                 buffer.add(new RangeTombstoneBoundMarker(markerStart, 
openMarker));
+                if (hasNextBlock)
+                    skipLastIteratedItem = true;
             }
 
             // Now deserialize everything until we reach our requested end (if 
we have one)
+            // See SSTableIterator.ForwardRead.computeNext() for why this is a 
strict inequality below: this is the same
+            // reasoning here.
             while (deserializer.hasNext()
-                   && (end == null || deserializer.compareNextTo(end) <= 0)
+                   && (end == null || deserializer.compareNextTo(end) < 0)
                    && !stopReadingDisk())
             {
                 Unfiltered unfiltered = deserializer.readNext();
@@ -202,9 +240,18 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
             // If we have an open marker, we should close it before finishing
             if (openMarker != null)
             {
-                // If we have no end and still an openMarker, this means we're 
indexed and the marker is closed in a following block.
+                // This is the reverse problem than the one at the start of 
the block. Namely, if it's the first block
+                // we deserialize for the slice (the one covering the slice 
end basically), then it's easy, we just want
+                // to add the close marker to the buffer and return it 
normally.
+                // If it's note our first block (for the slice) however, it 
means that marker closed in a previously read
+                // block and we have already returned it. So while we should 
still add it to the buffer for the sake of
+                // not breaking ImmutableBTreePartition, we should skip it 
when returning from the iterator, hence the
+                // skipFirstIteratedItem (this is the last item of the block, 
but we're iterating in reverse order so it will
+                // be the first returned by the iterator).
                 ClusteringBound markerEnd = end == null ? ClusteringBound.TOP 
: end;
                 buffer.add(new RangeTombstoneBoundMarker(markerEnd, 
getAndClearOpenMarker()));
+                if (hasPreviousBlock)
+                    skipFirstIteratedItem = true;
             }
 
             buffer.build();
@@ -267,13 +314,13 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
             if (startIdx >= indexState.blocksCount())
                 startIdx = indexState.blocksCount() - 1;
 
-            if (startIdx != indexState.currentBlockIdx())
-            {
-                indexState.setToBlock(startIdx);
-                readCurrentBlock(true);
-            }
+            // Note that even if we were already set on the proper block 
(which would happen if the previous slice
+            // requested ended on the same block this one start), we can't 
reuse it because when reading the previous
+            // slice we've only read that block from the previous slice start. 
Re-reading also handles
+            // skipFirstIteratedItem/skipLastIteratedItem that we would need 
to handle otherwise.
+            indexState.setToBlock(startIdx);
 
-            setIterator(slice);
+            readCurrentBlock(false, startIdx != lastBlockIdx);
         }
 
         @Override
@@ -282,15 +329,14 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
             if (super.hasNextInternal())
                 return true;
 
-            // We have nothing more for our current block, move the previous 
one.
-            int previousBlockIdx = indexState.currentBlockIdx() - 1;
-            if (previousBlockIdx < 0 || previousBlockIdx < lastBlockIdx)
+            // We have nothing more for our current block, move the next one 
(so the one before on disk).
+            int nextBlockIdx = indexState.currentBlockIdx() - 1;
+            if (nextBlockIdx < 0 || nextBlockIdx < lastBlockIdx)
                 return false;
 
             // The slice start can be in
-            indexState.setToBlock(previousBlockIdx);
-            readCurrentBlock(false);
-            setIterator(slice);
+            indexState.setToBlock(nextBlockIdx);
+            readCurrentBlock(true, nextBlockIdx != lastBlockIdx);
             // since that new block is within the bounds we've computed in 
setToSlice(), we know there will
             // always be something matching the slice unless we're on the 
lastBlockIdx (in which case there
             // may or may not be results, but if there isn't, we're done for 
the slice).
@@ -300,33 +346,42 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
         /**
          * Reads the current block, the last one we've set.
          *
-         * @param canIncludeSliceEnd whether the block can include the slice 
end.
+         * @param hasPreviousBlock is whether we have already read a previous 
block for the current slice.
+         * @param hasNextBlock is whether we have more blocks to read for the 
current slice.
          */
-        private void readCurrentBlock(boolean canIncludeSliceEnd) throws 
IOException
+        private void readCurrentBlock(boolean hasPreviousBlock, boolean 
hasNextBlock) throws IOException
         {
             if (buffer == null)
                 buffer = createBuffer(indexState.blocksCount());
 
             int currentBlock = indexState.currentBlockIdx();
 
-            boolean canIncludeSliceStart = currentBlock == lastBlockIdx;
+            // The slice start (resp. slice end) is only meaningful on the 
last (resp. first) block read (since again,
+            // we read blocks in reverse order).
+            boolean canIncludeSliceStart = !hasNextBlock;
+            boolean canIncludeSliceEnd = !hasPreviousBlock;
 
             // When dealing with old format sstable, we have the problem that 
a row can span 2 index block, i.e. it can
             // start at the end of a block and end at the beginning of the 
next one. That's not a problem per se for
             // UnfilteredDeserializer.OldFormatSerializer, since it always 
read rows entirely, even if they span index
             // blocks, but as we reading index block in reverse we must be 
careful to not read the end of the row at
             // beginning of a block before we're reading the beginning of that 
row. So what we do is that if we detect
-            // that the row starting this block is also the row ending the 
previous one, we skip that first result and
-            // let it be read when we'll read the previous block.
+            // that the row starting this block is also the row ending the 
next one we're read (previous on disk), then
+            // we'll skip that first result and  let it be read with the next 
block.
             boolean includeFirst = true;
             if (!sstable.descriptor.version.storeRows() && currentBlock > 0)
             {
-                ClusteringPrefix lastOfPrevious = 
indexState.index(currentBlock - 1).lastName;
+                ClusteringPrefix lastOfNext = indexState.index(currentBlock - 
1).lastName;
                 ClusteringPrefix firstOfCurrent = 
indexState.index(currentBlock).firstName;
-                includeFirst = metadata().comparator.compare(lastOfPrevious, 
firstOfCurrent) != 0;
+                includeFirst = metadata().comparator.compare(lastOfNext, 
firstOfCurrent) != 0;
             }
 
-            loadFromDisk(canIncludeSliceStart ? slice.start() : null, 
canIncludeSliceEnd ? slice.end() : null, includeFirst);
+            loadFromDisk(canIncludeSliceStart ? slice.start() : null,
+                         canIncludeSliceEnd ? slice.end() : null,
+                         includeFirst,
+                         hasPreviousBlock,
+                         hasNextBlock);
+            setIterator(slice);
         }
 
         @Override
@@ -382,4 +437,23 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
             deletionBuilder = null;
         }
     }
+
+    private static class SkipLastIterator extends AbstractIterator<Unfiltered>
+    {
+        private final Iterator<Unfiltered> iterator;
+
+        private SkipLastIterator(Iterator<Unfiltered> iterator)
+        {
+            this.iterator = iterator;
+        }
+
+        protected Unfiltered computeNext()
+        {
+            if (!iterator.hasNext())
+                return endOfData();
+
+            Unfiltered next = iterator.next();
+            return iterator.hasNext() ? next : endOfData();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a85eeefe/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
index 4694ffc..6edca38 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/DeleteTest.java
@@ -1345,6 +1345,76 @@ public class DeleteTest extends CQLTester
         assertTrue("The memtable should be empty but is not", 
isMemtableEmpty());
     }
 
+    @Test
+    public void testQueryingOnRangeTombstoneBoundForward() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, i int, PRIMARY KEY (k, i))");
+
+        execute("INSERT INTO %s (k, i) VALUES (?, ?)", "a", 0);
+
+        execute("DELETE FROM %s WHERE k = ? AND i > ? AND i <= ?", "a", 0, 1);
+        execute("DELETE FROM %s WHERE k = ? AND i > ?", "a", 1);
+
+        flush();
+
+        assertEmpty(execute("SELECT i FROM %s WHERE k = ? AND i = ?", "a", 1));
+    }
+
+    @Test
+    public void testQueryingOnRangeTombstoneBoundReverse() throws Throwable
+    {
+        createTable("CREATE TABLE %s (k text, i int, PRIMARY KEY (k, i))");
+
+        execute("INSERT INTO %s (k, i) VALUES (?, ?)", "a", 0);
+
+        execute("DELETE FROM %s WHERE k = ? AND i > ? AND i <= ?", "a", 0, 1);
+        execute("DELETE FROM %s WHERE k = ? AND i > ?", "a", 1);
+
+        flush();
+
+        assertRows(execute("SELECT i FROM %s WHERE k = ? AND i <= ? ORDER BY i 
DESC", "a", 1), row(0));
+    }
+
+    @Test
+    public void testReverseQueryWithRangeTombstoneOnMultipleBlocks() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (k text, i int, v text, PRIMARY KEY (k, 
i))");
+
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < 1200; i++)
+            sb.append('a');
+        String longText = sb.toString();
+
+        for (int i = 0; i < 10; i++)
+            execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 
3", "a", i*2, longText);
+
+        execute("DELETE FROM %s USING TIMESTAMP 1 WHERE k = ? AND i >= ? AND i 
<= ?", "a", 12, 16);
+
+        flush();
+
+        execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", 
"a", 3, longText);
+        execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 3", 
"a", 11, longText);
+        execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", 
"a", 15, longText);
+        execute("INSERT INTO %s(k, i, v) VALUES (?, ?, ?) USING TIMESTAMP 0", 
"a", 17, longText);
+
+        flush();
+
+        assertRows(execute("SELECT i FROM %s WHERE k = ? ORDER BY i DESC", 
"a"),
+                   row(18),
+                   row(17),
+                   row(16),
+                   row(14),
+                   row(12),
+                   row(11),
+                   row(10),
+                   row(8),
+                   row(6),
+                   row(4),
+                   row(3),
+                   row(2),
+                   row(0));
+    }
+
     /**
      * Test for CASSANDRA-13305
      */

Reply via email to