Repository: cassandra
Updated Branches:
  refs/heads/trunk be2cf1afa -> 65000b3c2


Don't account for unconsumed data when checking if past sstable index

patch by slebresne; reviewed by blambov for CASSANDRA-10590


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4a00438a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4a00438a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4a00438a

Branch: refs/heads/trunk
Commit: 4a00438a27cfa5bcd29cdcd9b30eb6d69e836a1f
Parents: 242b973
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Wed Oct 28 12:21:36 2015 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Thu Nov 5 11:07:09 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/db/UnfilteredDeserializer.java    | 45 +++++++++++++++++++-
 .../columniterator/AbstractSSTableIterator.java |  4 +-
 .../columniterator/SSTableReversedIterator.java | 17 ++------
 4 files changed, 51 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a00438a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5fdeae5..1ff2fdb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0
+ * Fix reading of legacy sstables (CASSANDRA-10590)
  * Use CQL type names in schema metadata tables (CASSANDRA-10365)
  * Guard batchlog replay against integer division by zero (CASSANDRA-9223)
  * Fix bug when adding a column to thrift with the same name than a primary 
key (CASSANDRA-10608)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a00438a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java 
b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 52de159..66f6b71 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.net.MessagingService;
 
 /**
@@ -107,6 +108,20 @@ public abstract class UnfilteredDeserializer
      */
     public abstract void skipNext() throws IOException;
 
+
+    /**
+     * For the legacy layout deserializer, we have to deal with the fact that 
a row can span multiple index blocks and that
+     * the call to hasNext() reads the next element upfront. We must take that 
into account when we check in AbstractSSTableIterator if
+     * we're past the end of an index block boundary as that check expect to 
account for only consumed data (that is, if hasNext has
+     * been called and made us cross an index boundary but neither readNext() 
or skipNext() as yet been called, we shouldn't consider
+     * the index block boundary crossed yet).
+     *
+     * TODO: we don't care about this for the current file format because a 
row can never span multiple index blocks (further, hasNext()
+     * only just basically read 2 bytes from disk in that case). So once we 
drop backward compatibility with pre-3.0 sstable, we should
+     * remove this.
+     */
+    public abstract long bytesReadForUnconsumedData();
+
     private static class CurrentDeserializer extends UnfilteredDeserializer
     {
         private final ClusteringPrefix.Deserializer clusteringDeserializer;
@@ -216,6 +231,13 @@ public abstract class UnfilteredDeserializer
             isReady = false;
             isDone = false;
         }
+
+        public long bytesReadForUnconsumedData()
+        {
+            // In theory, hasNext() does consume 2-3 bytes, but we don't care 
about this for the current file format so returning
+            // 0 to mean "do nothing".
+            return 0;
+        }
     }
 
     public static class OldFormatDeserializer extends UnfilteredDeserializer
@@ -233,8 +255,8 @@ public abstract class UnfilteredDeserializer
         // The Unfiltered as read from the old format input
         private final UnfilteredIterator iterator;
 
-        // Tracks which tombstone are opened at any given point of the 
deserialization. Note that this
-        // is directly populated by UnfilteredIterator.
+        // The position in the input after the last data consumption 
(readNext/skipNext).
+        private long lastConsumedPosition;
 
         private OldFormatDeserializer(CFMetaData metadata,
                                       DataInputPlus in,
@@ -245,6 +267,7 @@ public abstract class UnfilteredDeserializer
             super(metadata, in, helper);
             this.iterator = new UnfilteredIterator(partitionDeletion);
             this.readAllAsDynamic = readAllAsDynamic;
+            this.lastConsumedPosition = currentPosition();
         }
 
         public void setSkipStatic()
@@ -322,12 +345,20 @@ public abstract class UnfilteredDeserializer
             return nextIsRow() && ((Row)next).isStatic();
         }
 
+        private long currentPosition()
+        {
+            // We return a bogus value if the input is not file based, but 
check we never rely
+            // on that value in that case in bytesReadForUnconsumedData
+            return in instanceof FileDataInput ? 
((FileDataInput)in).getFilePointer() : 0;
+        }
+
         public Unfiltered readNext() throws IOException
         {
             if (!hasNext())
                 throw new IllegalStateException();
             Unfiltered toReturn = next;
             next = null;
+            lastConsumedPosition = currentPosition();
             return toReturn;
         }
 
@@ -336,6 +367,15 @@ public abstract class UnfilteredDeserializer
             if (!hasNext())
                 throw new UnsupportedOperationException();
             next = null;
+            lastConsumedPosition = currentPosition();
+        }
+
+        public long bytesReadForUnconsumedData()
+        {
+            if (!(in instanceof FileDataInput))
+                throw new AssertionError();
+
+            return currentPosition() - lastConsumedPosition;
         }
 
         public void clearState()
@@ -343,6 +383,7 @@ public abstract class UnfilteredDeserializer
             next = null;
             saved = null;
             iterator.clearState();
+            lastConsumedPosition = currentPosition();
         }
 
         // Groups atoms from the input into proper Unfiltered.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a00438a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 8900b31..5f280d7 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -486,7 +486,9 @@ abstract class AbstractSSTableIterator implements 
SliceableUnfilteredRowIterator
         // Check if we've crossed an index boundary (based on the mark on the 
beginning of the index block).
         public boolean isPastCurrentBlock()
         {
-            return reader.file.bytesPastMark(mark) >= currentIndex().width;
+            assert reader.deserializer != null;
+            long correction = reader.deserializer.bytesReadForUnconsumedData();
+            return reader.file.bytesPastMark(mark) - correction >= 
currentIndex().width;
         }
 
         public int currentBlockIdx()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a00438a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
index 01a8fb2..66c32ee 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/SSTableReversedIterator.java
@@ -155,23 +155,17 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
             buffer.reset();
 
             boolean isFirst = true;
-            boolean isDone = false;
 
             // If the start might be in this block, skip everything that comes 
before it.
             if (start != null)
             {
-                while (!isDone && deserializer.hasNext() && 
deserializer.compareNextTo(start) <= 0)
+                while (deserializer.hasNext() && 
deserializer.compareNextTo(start) <= 0 && !stopReadingDisk())
                 {
                     isFirst = false;
                     if (deserializer.nextIsRow())
                         deserializer.skipNext();
                     else
                         
updateOpenMarker((RangeTombstoneMarker)deserializer.readNext());
-
-                    // Note that because 'deserializer.hasNext()' may advance 
our file pointer, we need to always check stopReadingDisk() before any call to 
it,
-                    // i.e. just after we've called readNext/skipNext
-                    if (stopReadingDisk())
-                        isDone = true;
                 }
             }
 
@@ -183,17 +177,14 @@ public class SSTableReversedIterator extends 
AbstractSSTableIterator
             }
 
             // Now deserialize everything until we reach our requested end (if 
we have one)
-            while (!isDone
-                   && deserializer.hasNext()
-                   && (end == null || deserializer.compareNextTo(end) <= 0))
+            while (deserializer.hasNext()
+                   && (end == null || deserializer.compareNextTo(end) <= 0)
+                   && !stopReadingDisk())
             {
                 Unfiltered unfiltered = deserializer.readNext();
                 if (!isFirst || includeFirst)
                     buffer.add(unfiltered);
 
-                if (stopReadingDisk())
-                    isDone = true;
-
                 isFirst = false;
 
                 if (unfiltered.isRangeTombstoneMarker())

Reply via email to