Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.0 f6695884e -> 60e45c0ae


Small optimizations of sstable index serialization

patch by slebresne; reviewed by aweisberg for CASSANDRA-10232


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/60e45c0a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/60e45c0a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/60e45c0a

Branch: refs/heads/cassandra-3.0
Commit: 60e45c0ae83f10c6fd9526ce97234701cd5ea308
Parents: f669588
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Wed Aug 26 17:39:54 2015 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue Sep 8 10:39:34 2015 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/ColumnIndex.java    |  20 +--
 .../org/apache/cassandra/db/RowIndexEntry.java  | 133 ++++++++++++++-----
 .../org/apache/cassandra/db/Serializers.java    |   5 -
 .../columniterator/AbstractSSTableIterator.java |  13 +-
 .../cassandra/io/sstable/IndexHelper.java       | 109 ++++++---------
 .../cassandra/io/sstable/KeyIterator.java       |   8 +-
 .../apache/cassandra/io/sstable/SSTable.java    |   2 +-
 .../io/sstable/format/SSTableReader.java        |   6 +-
 .../io/sstable/format/big/BigTableReader.java   |   2 +-
 .../io/sstable/format/big/BigTableScanner.java  |   4 +-
 .../apache/cassandra/service/CacheService.java  |   7 +-
 .../db/SinglePartitionSliceCommandTest.java     |  39 +++---
 .../cassandra/io/sstable/IndexHelperTest.java   |   6 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java |   8 +-
 15 files changed, 212 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index afd45e5..d6ebe7a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.0-rc1
+ * Small optimizations of sstable index serialization (CASSANDRA-10232)
  * Support for both encrypted and unencrypted native transport connections 
(CASSANDRA-9590)
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/db/ColumnIndex.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java 
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 9eef23e..b350f90 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -31,14 +31,16 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class ColumnIndex
 {
+    public final long partitionHeaderLength;
     public final List<IndexHelper.IndexInfo> columnsIndex;
 
-    private static final ColumnIndex EMPTY = new 
ColumnIndex(Collections.<IndexHelper.IndexInfo>emptyList());
+    private static final ColumnIndex EMPTY = new ColumnIndex(-1, 
Collections.<IndexHelper.IndexInfo>emptyList());
 
-    private ColumnIndex(List<IndexHelper.IndexInfo> columnsIndex)
+    private ColumnIndex(long partitionHeaderLength, 
List<IndexHelper.IndexInfo> columnsIndex)
     {
         assert columnsIndex != null;
 
+        this.partitionHeaderLength = partitionHeaderLength;
         this.columnsIndex = columnsIndex;
     }
 
@@ -67,8 +69,10 @@ public class ColumnIndex
         private final SerializationHeader header;
         private final int version;
 
-        private final ColumnIndex result;
+        private final List<IndexHelper.IndexInfo> columnsIndex = new 
ArrayList<>();
         private final long initialPosition;
+        private long headerLength = -1;
+
         private long startPosition = -1;
 
         private int written;
@@ -87,8 +91,6 @@ public class ColumnIndex
             this.writer = writer;
             this.header = header;
             this.version = version;
-
-            this.result = new ColumnIndex(new 
ArrayList<IndexHelper.IndexInfo>());
             this.initialPosition = writer.getFilePointer();
         }
 
@@ -103,6 +105,7 @@ public class ColumnIndex
         public ColumnIndex build() throws IOException
         {
             writePartitionHeader(iterator);
+            this.headerLength = writer.getFilePointer() - initialPosition;
 
             while (iterator.hasNext())
                 add(iterator.next());
@@ -119,10 +122,9 @@ public class ColumnIndex
         {
             IndexHelper.IndexInfo cIndexInfo = new 
IndexHelper.IndexInfo(firstClustering,
                                                                          
lastClustering,
-                                                                         
startPosition,
                                                                          
currentPosition() - startPosition,
                                                                          
openMarker);
-            result.columnsIndex.add(cIndexInfo);
+            columnsIndex.add(cIndexInfo);
             firstClustering = null;
         }
 
@@ -164,8 +166,8 @@ public class ColumnIndex
                 addIndexBlock();
 
             // we should always have at least one computed index block, but we 
only write it out if there is more than that.
-            assert result.columnsIndex.size() > 0;
-            return result;
+            assert columnsIndex.size() > 0 && headerLength >= 0;
+            return new ColumnIndex(headerLength, columnsIndex);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java 
b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index e783508..f63e893 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -47,7 +47,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         this.position = position;
     }
 
-    public int promotedSize(CFMetaData metadata, Version version, 
SerializationHeader header)
+    protected int promotedSize(IndexHelper.IndexInfo.Serializer idxSerializer)
     {
         return 0;
     }
@@ -61,7 +61,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         // since if there are insufficient columns to be worth indexing we're 
going to seek to
         // the beginning of the row anyway, so we might as well read the 
tombstone there as well.
         if (index.columnsIndex.size() > 1)
-            return new IndexedEntry(position, deletionTime, 
index.columnsIndex);
+            return new IndexedEntry(position, deletionTime, 
index.partitionHeaderLength, index.columnsIndex);
         else
             return new RowIndexEntry<>(position);
     }
@@ -89,6 +89,16 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         return 0;
     }
 
+    /**
+     * The length of the row header (partition key, partition deletion and 
static row).
+     * This value is only provided for indexed entries and this method will 
throw
+     * {@code UnsupportedOperationException} if {@code !isIndexed()}.
+     */
+    public long headerLength()
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public List<T> columnsIndex()
     {
         return Collections.emptyList();
@@ -108,48 +118,81 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
     public static class Serializer implements 
IndexSerializer<IndexHelper.IndexInfo>
     {
-        private final CFMetaData metadata;
+        private final IndexHelper.IndexInfo.Serializer idxSerializer;
         private final Version version;
-        private final SerializationHeader header;
 
         public Serializer(CFMetaData metadata, Version version, 
SerializationHeader header)
         {
-            this.metadata = metadata;
+            this.idxSerializer = new 
IndexHelper.IndexInfo.Serializer(metadata, version, header);
             this.version = version;
-            this.header = header;
         }
 
         public void serialize(RowIndexEntry<IndexHelper.IndexInfo> rie, 
DataOutputPlus out) throws IOException
         {
-            out.writeLong(rie.position);
-            out.writeInt(rie.promotedSize(metadata, version, header));
+            assert version.storeRows() : "We read old index files but we 
should never write them";
+
+            out.writeUnsignedVInt(rie.position);
+            out.writeUnsignedVInt(rie.promotedSize(idxSerializer));
 
             if (rie.isIndexed())
             {
+                out.writeUnsignedVInt(rie.headerLength());
                 DeletionTime.serializer.serialize(rie.deletionTime(), out);
-                out.writeInt(rie.columnsIndex().size());
-                IndexHelper.IndexInfo.Serializer idxSerializer = 
metadata.serializers().indexSerializer(version);
+                out.writeUnsignedVInt(rie.columnsIndex().size());
                 for (IndexHelper.IndexInfo info : rie.columnsIndex())
-                    idxSerializer.serialize(info, out, header);
+                    idxSerializer.serialize(info, out);
             }
         }
 
         public RowIndexEntry<IndexHelper.IndexInfo> deserialize(DataInputPlus 
in) throws IOException
         {
-            long position = in.readLong();
+            if (!version.storeRows())
+            {
+                long position = in.readLong();
+
+                int size = in.readInt();
+                if (size > 0)
+                {
+                    DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
+
+                    int entries = in.readInt();
+                    List<IndexHelper.IndexInfo> columnsIndex = new 
ArrayList<>(entries);
+
+                    // The old format didn't saved the partition header length 
per-se, but rather for each entry it's
+                    // offset from the beginning of the row. We don't use that 
offset anymore, but we do need the
+                    // header length so we basically need the first entry 
offset. And so we inline the deserialization
+                    // of the first index entry to get that information. While 
this is a bit ugly, we'll get rid of that
+                    // code once pre-3.0 backward compatibility is dropped so 
it feels fine as a temporary hack.
+                    ClusteringPrefix firstName = 
idxSerializer.clusteringSerializer.deserialize(in);
+                    ClusteringPrefix lastName = 
idxSerializer.clusteringSerializer.deserialize(in);
+                    long headerLength = in.readLong();
+                    long width = in.readLong();
+
+                    columnsIndex.add(new IndexHelper.IndexInfo(firstName, 
lastName, width, null));
+                    for (int i = 1; i < entries; i++)
+                        columnsIndex.add(idxSerializer.deserialize(in));
+
+                    return new IndexedEntry(position, deletionTime, 
headerLength, columnsIndex);
+                }
+                else
+                {
+                    return new RowIndexEntry<>(position);
+                }
+            }
 
-            int size = in.readInt();
+            long position = in.readUnsignedVInt();
+
+            int size = (int)in.readUnsignedVInt();
             if (size > 0)
             {
+                long headerLength = in.readUnsignedVInt();
                 DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
-
-                int entries = in.readInt();
-                IndexHelper.IndexInfo.Serializer idxSerializer = 
metadata.serializers().indexSerializer(version);
+                int entries = (int)in.readUnsignedVInt();
                 List<IndexHelper.IndexInfo> columnsIndex = new 
ArrayList<>(entries);
                 for (int i = 0; i < entries; i++)
-                    columnsIndex.add(idxSerializer.deserialize(in, header));
+                    columnsIndex.add(idxSerializer.deserialize(in));
 
-                return new IndexedEntry(position, deletionTime, columnsIndex);
+                return new IndexedEntry(position, deletionTime, headerLength, 
columnsIndex);
             }
             else
             {
@@ -157,15 +200,23 @@ public class RowIndexEntry<T> implements IMeasurableMemory
             }
         }
 
-        public static void skip(DataInput in) throws IOException
+        // Reads only the data 'position' of the index entry and returns it. 
Note that this left 'in' in the middle
+        // of reading an entry, so this is only useful if you know what you 
are doing and in most case 'deserialize'
+        // should be used instead.
+        public static long readPosition(DataInputPlus in, Version version) 
throws IOException
         {
-            in.readLong();
-            skipPromotedIndex(in);
+            return version.storeRows() ? in.readUnsignedVInt() : in.readLong();
         }
 
-        public static void skipPromotedIndex(DataInput in) throws IOException
+        public static void skip(DataInputPlus in, Version version) throws 
IOException
         {
-            int size = in.readInt();
+            readPosition(in, version);
+            skipPromotedIndex(in, version);
+        }
+
+        public static void skipPromotedIndex(DataInputPlus in, Version 
version) throws IOException
+        {
+            int size = version.storeRows() ? (int)in.readUnsignedVInt() : 
in.readInt();
             if (size <= 0)
                 return;
 
@@ -174,21 +225,21 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public int serializedSize(RowIndexEntry<IndexHelper.IndexInfo> rie)
         {
-            int size = TypeSizes.sizeof(rie.position) + 
TypeSizes.sizeof(rie.promotedSize(metadata, version, header));
+            assert version.storeRows() : "We read old index files but we 
should never write them";
+
+            int size = TypeSizes.sizeofUnsignedVInt(rie.position) + 
TypeSizes.sizeofUnsignedVInt(rie.promotedSize(idxSerializer));
 
             if (rie.isIndexed())
             {
                 List<IndexHelper.IndexInfo> index = rie.columnsIndex();
 
+                size += TypeSizes.sizeofUnsignedVInt(rie.headerLength());
                 size += 
DeletionTime.serializer.serializedSize(rie.deletionTime());
-                size += TypeSizes.sizeof(index.size());
+                size += TypeSizes.sizeofUnsignedVInt(index.size());
 
-                IndexHelper.IndexInfo.Serializer idxSerializer = 
metadata.serializers().indexSerializer(version);
                 for (IndexHelper.IndexInfo info : index)
-                    size += idxSerializer.serializedSize(info, header);
+                    size += idxSerializer.serializedSize(info);
             }
-
-
             return size;
         }
     }
@@ -199,17 +250,21 @@ public class RowIndexEntry<T> implements IMeasurableMemory
     private static class IndexedEntry extends 
RowIndexEntry<IndexHelper.IndexInfo>
     {
         private final DeletionTime deletionTime;
+
+        // The offset in the file when the index entry end
+        private final long headerLength;
         private final List<IndexHelper.IndexInfo> columnsIndex;
         private static final long BASE_SIZE =
-                ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 
Arrays.<IndexHelper.IndexInfo>asList(null, null)))
+                ObjectSizes.measure(new IndexedEntry(0, DeletionTime.LIVE, 0, 
Arrays.<IndexHelper.IndexInfo>asList(null, null)))
               + ObjectSizes.measure(new ArrayList<>(1));
 
-        private IndexedEntry(long position, DeletionTime deletionTime, 
List<IndexHelper.IndexInfo> columnsIndex)
+        private IndexedEntry(long position, DeletionTime deletionTime, long 
headerLength, List<IndexHelper.IndexInfo> columnsIndex)
         {
             super(position);
             assert deletionTime != null;
             assert columnsIndex != null && columnsIndex.size() > 1;
             this.deletionTime = deletionTime;
+            this.headerLength = headerLength;
             this.columnsIndex = columnsIndex;
         }
 
@@ -220,19 +275,25 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         }
 
         @Override
+        public long headerLength()
+        {
+            return headerLength;
+        }
+
+        @Override
         public List<IndexHelper.IndexInfo> columnsIndex()
         {
             return columnsIndex;
         }
 
         @Override
-        public int promotedSize(CFMetaData metadata, Version version, 
SerializationHeader header)
+        protected int promotedSize(IndexHelper.IndexInfo.Serializer 
idxSerializer)
         {
-            long size = DeletionTime.serializer.serializedSize(deletionTime);
-            size += TypeSizes.sizeof(columnsIndex.size()); // number of entries
-            IndexHelper.IndexInfo.Serializer idxSerializer = 
metadata.serializers().indexSerializer(version);
+            long size = TypeSizes.sizeofUnsignedVInt(headerLength)
+                      + DeletionTime.serializer.serializedSize(deletionTime)
+                      + TypeSizes.sizeofUnsignedVInt(columnsIndex.size()); // 
number of entries
             for (IndexHelper.IndexInfo info : columnsIndex)
-                size += idxSerializer.serializedSize(info, header);
+                size += idxSerializer.serializedSize(info);
 
             return Ints.checkedCast(size);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java 
b/src/java/org/apache/cassandra/db/Serializers.java
index 2561bbe..9b29d89 100644
--- a/src/java/org/apache/cassandra/db/Serializers.java
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -43,11 +43,6 @@ public class Serializers
         this.metadata = metadata;
     }
 
-    public IndexInfo.Serializer indexSerializer(Version version)
-    {
-        return new IndexInfo.Serializer(metadata, version);
-    }
-
     // TODO: Once we drop support for old (pre-3.0) sstables, we can drop this 
method and inline the calls to
     // ClusteringPrefix.serializer in IndexHelper directly. At which point 
this whole class probably becomes
     // unecessary (since IndexInfo.Serializer won't depend on the metadata 
either).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index 87a57c6..c075a2b 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -412,6 +412,7 @@ abstract class AbstractSSTableIterator implements 
SliceableUnfilteredRowIterator
 
         private final RowIndexEntry indexEntry;
         private final List<IndexHelper.IndexInfo> indexes;
+        private final long[] blockOffsets;
         private final boolean reversed;
 
         private int currentIndexIdx;
@@ -427,6 +428,14 @@ abstract class AbstractSSTableIterator implements 
SliceableUnfilteredRowIterator
             this.indexes = indexEntry.columnsIndex();
             this.reversed = reversed;
             this.currentIndexIdx = reversed ? indexEntry.columnsIndex().size() 
: -1;
+
+            this.blockOffsets = new long[indexes.size()];
+            long offset = indexEntry.position + indexEntry.headerLength();
+            for (int i = 0; i < blockOffsets.length; i++)
+            {
+                blockOffsets[i] = offset;
+                offset += indexes.get(i).width;
+            }
         }
 
         public boolean isDone()
@@ -438,7 +447,7 @@ abstract class AbstractSSTableIterator implements 
SliceableUnfilteredRowIterator
         public void setToBlock(int blockIdx) throws IOException
         {
             if (blockIdx >= 0 && blockIdx < indexes.size())
-                reader.seekToPosition(indexEntry.position + 
indexes.get(blockIdx).offset);
+                reader.seekToPosition(blockOffsets[blockIdx]);
 
             currentIndexIdx = blockIdx;
             reader.openMarker = blockIdx > 0 ? indexes.get(blockIdx - 
1).endOpenMarker : null;
@@ -461,7 +470,7 @@ abstract class AbstractSSTableIterator implements 
SliceableUnfilteredRowIterator
 
                 // We have to set the mark, and we have to set it at the 
beginning of the block. So if we're not at the beginning of the block, this 
forces us to a weird seek dance.
                 // This can only happen when reading old file however.
-                long startOfBlock = indexEntry.position + 
indexes.get(currentIndexIdx).offset;
+                long startOfBlock = blockOffsets[currentIndexIdx];
                 long currentFilePointer = reader.file.getFilePointer();
                 if (startOfBlock == currentFilePointer)
                 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java 
b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
index 4dabe69..e95af29 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexHelper.java
@@ -37,34 +37,6 @@ import org.apache.cassandra.utils.*;
  */
 public class IndexHelper
 {
-    public static void skipBloomFilter(DataInput in) throws IOException
-    {
-        int size = in.readInt();
-        FileUtils.skipBytesFully(in, size);
-    }
-
-    /**
-     * Skip the index
-     * @param in the data input from which the index should be skipped
-     * @throws IOException if an I/O error occurs.
-     */
-    public static void skipIndex(DataInput in) throws IOException
-    {
-        /* read only the column index list */
-        int columnIndexSize = in.readInt();
-        /* skip the column index data */
-        if (in instanceof FileDataInput)
-        {
-            FileUtils.skipBytesFully(in, columnIndexSize);
-        }
-        else
-        {
-            // skip bytes
-            byte[] skip = new byte[columnIndexSize];
-            in.readFully(skip);
-        }
-    }
-
     /**
      * The index of the IndexInfo in which a scan starting with @name should 
begin.
      *
@@ -78,7 +50,7 @@ public class IndexHelper
      */
     public static int indexFor(ClusteringPrefix name, List<IndexInfo> 
indexList, ClusteringComparator comparator, boolean reversed, int lastIndex)
     {
-        IndexInfo target = new IndexInfo(name, name, 0, 0, null);
+        IndexInfo target = new IndexInfo(name, name, 0, null);
         /*
         Take the example from the unit test, and say your index looks like 
this:
         [0..5][10..15][20..25]
@@ -115,12 +87,11 @@ public class IndexHelper
 
     public static class IndexInfo
     {
-        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
IndexInfo(null, null, 0, 0, null));
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
IndexInfo(null, null, 0, null));
 
         public final long width;
-        public final ClusteringPrefix lastName;
         public final ClusteringPrefix firstName;
-        public final long offset;
+        public final ClusteringPrefix lastName;
 
         // If at the end of the index block there is an open range tombstone 
marker, this marker
         // deletion infos. null otherwise.
@@ -128,73 +99,77 @@ public class IndexHelper
 
         public IndexInfo(ClusteringPrefix firstName,
                          ClusteringPrefix lastName,
-                         long offset,
                          long width,
                          DeletionTime endOpenMarker)
         {
             this.firstName = firstName;
             this.lastName = lastName;
-            this.offset = offset;
             this.width = width;
             this.endOpenMarker = endOpenMarker;
         }
 
         public static class Serializer
         {
-            private final CFMetaData metadata;
+            // This is the default index size that we use to delta-encode 
width when serializing so we get better vint-encoding.
+            // This is imperfect as user can change the index size and ideally 
we would save the index size used with each index file
+            // to use as base. However, that's a bit more involved a change 
that we want for now and very seldom do use change the index
+            // size so using the default is almost surely better than using no 
base at all.
+            private static final long WIDTH_BASE = 64 * 1024;
+
+            // TODO: Only public for use in RowIndexEntry for backward 
compatibility code. Can be made private once backward compatibility is dropped.
+            public final ISerializer<ClusteringPrefix> clusteringSerializer;
             private final Version version;
 
-            public Serializer(CFMetaData metadata, Version version)
+            public Serializer(CFMetaData metadata, Version version, 
SerializationHeader header)
             {
-                this.metadata = metadata;
+                this.clusteringSerializer = 
metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
                 this.version = version;
             }
 
-            public void serialize(IndexInfo info, DataOutputPlus out, 
SerializationHeader header) throws IOException
+            public void serialize(IndexInfo info, DataOutputPlus out) throws 
IOException
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = 
metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
+                assert version.storeRows() : "We read old index files but we 
should never write them";
+
                 clusteringSerializer.serialize(info.firstName, out);
                 clusteringSerializer.serialize(info.lastName, out);
-                out.writeLong(info.offset);
-                out.writeLong(info.width);
+                out.writeVInt(info.width - WIDTH_BASE);
 
-                if (version.storeRows())
-                {
-                    out.writeBoolean(info.endOpenMarker != null);
-                    if (info.endOpenMarker != null)
-                        DeletionTime.serializer.serialize(info.endOpenMarker, 
out);
-                }
+                out.writeBoolean(info.endOpenMarker != null);
+                if (info.endOpenMarker != null)
+                    DeletionTime.serializer.serialize(info.endOpenMarker, out);
             }
 
-            public IndexInfo deserialize(DataInputPlus in, SerializationHeader 
header) throws IOException
+            public IndexInfo deserialize(DataInputPlus in) throws IOException
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = 
metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
-
                 ClusteringPrefix firstName = 
clusteringSerializer.deserialize(in);
                 ClusteringPrefix lastName = 
clusteringSerializer.deserialize(in);
-                long offset = in.readLong();
-                long width = in.readLong();
-                DeletionTime endOpenMarker = version.storeRows() && 
in.readBoolean()
-                                           ? 
DeletionTime.serializer.deserialize(in)
-                                           : null;
-
-                return new IndexInfo(firstName, lastName, offset, width, 
endOpenMarker);
+                long width;
+                DeletionTime endOpenMarker = null;
+                if (version.storeRows())
+                {
+                    width = in.readVInt() + WIDTH_BASE;
+                    if (in.readBoolean())
+                        endOpenMarker = 
DeletionTime.serializer.deserialize(in);
+                }
+                else
+                {
+                    in.readLong(); // skip offset
+                    width = in.readLong();
+                }
+                return new IndexInfo(firstName, lastName, width, 
endOpenMarker);
             }
 
-            public long serializedSize(IndexInfo info, SerializationHeader 
header)
+            public long serializedSize(IndexInfo info)
             {
-                ISerializer<ClusteringPrefix> clusteringSerializer = 
metadata.serializers().indexEntryClusteringPrefixSerializer(version, header);
+                assert version.storeRows() : "We read old index files but we 
should never write them";
+
                 long size = clusteringSerializer.serializedSize(info.firstName)
                           + clusteringSerializer.serializedSize(info.lastName)
-                          + TypeSizes.sizeof(info.offset)
-                          + TypeSizes.sizeof(info.width);
+                          + TypeSizes.sizeofVInt(info.width - WIDTH_BASE)
+                          + TypeSizes.sizeof(info.endOpenMarker != null);
 
-                if (version.storeRows())
-                {
-                    size += TypeSizes.sizeof(info.endOpenMarker != null);
-                    if (info.endOpenMarker != null)
-                        size += 
DeletionTime.serializer.serializedSize(info.endOpenMarker);
-                }
+                if (info.endOpenMarker != null)
+                    size += 
DeletionTime.serializer.serializedSize(info.endOpenMarker);
                 return size;
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java 
b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
index 6f1e2f4..f02b9d1 100644
--- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.DataInput;
 import java.io.File;
 import java.io.IOException;
 
@@ -27,6 +26,7 @@ import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.RandomAccessReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CloseableIterator;
@@ -49,7 +49,7 @@ public class KeyIterator extends 
AbstractIterator<DecoratedKey> implements Close
                 in = RandomAccessReader.open(path);
         }
 
-        public DataInput get()
+        public DataInputPlus get()
         {
             maybeInit();
             return in;
@@ -80,12 +80,14 @@ public class KeyIterator extends 
AbstractIterator<DecoratedKey> implements Close
         }
     }
 
+    private final Descriptor desc;
     private final In in;
     private final IPartitioner partitioner;
 
 
     public KeyIterator(Descriptor desc, CFMetaData metadata)
     {
+        this.desc = desc;
         in = new In(new File(desc.filenameFor(Component.PRIMARY_INDEX)));
         partitioner = metadata.partitioner;
     }
@@ -98,7 +100,7 @@ public class KeyIterator extends 
AbstractIterator<DecoratedKey> implements Close
                 return endOfData();
 
             DecoratedKey key = 
partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in.get()));
-            RowIndexEntry.Serializer.skip(in.get()); // skip remainder of the 
entry
+            RowIndexEntry.Serializer.skip(in.get(), desc.version); // skip 
remainder of the entry
             return key;
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java 
b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index b86d9b4..63b8f3e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -237,7 +237,7 @@ public abstract class SSTable
         while (ifile.getFilePointer() < BYTES_CAP && keys < SAMPLES_CAP)
         {
             ByteBufferUtil.skipShortLength(ifile);
-            RowIndexEntry.Serializer.skip(ifile);
+            RowIndexEntry.Serializer.skip(ifile, descriptor.version);
             keys++;
         }
         assert keys > 0 && ifile.getFilePointer() > 0 && ifile.length() > 0 : 
"Unexpected empty index file: " + ifile;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 5d8ab50..b958240 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -922,7 +922,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                     if (summaryEntriesChecked == 
Downsampling.BASE_SAMPLING_LEVEL)
                         return true;
                 }
-                RowIndexEntry.Serializer.skip(in);
+                RowIndexEntry.Serializer.skip(in, descriptor.version);
                 i++;
             }
         }
@@ -1199,7 +1199,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                 while ((indexPosition = primaryIndex.getFilePointer()) != 
indexSize)
                 {
                     
summaryBuilder.maybeAddEntry(decorateKey(ByteBufferUtil.readWithShortLength(primaryIndex)),
 indexPosition);
-                    RowIndexEntry.Serializer.skip(primaryIndex);
+                    RowIndexEntry.Serializer.skip(primaryIndex, 
descriptor.version);
                 }
 
                 return summaryBuilder.build(getPartitioner());
@@ -1605,7 +1605,7 @@ public abstract class SSTableReader extends SSTable 
implements SelfRefCounted<SS
                 if (indexDecoratedKey.compareTo(token) > 0)
                     return indexDecoratedKey;
 
-                RowIndexEntry.Serializer.skip(in);
+                RowIndexEntry.Serializer.skip(in, descriptor.version);
             }
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
index 4b66942..efd1057 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java
@@ -239,7 +239,7 @@ public class BigTableReader extends SSTableReader
                     return indexEntry;
                 }
 
-                RowIndexEntry.Serializer.skip(in);
+                RowIndexEntry.Serializer.skip(in, descriptor.version);
             }
         }
         catch (IOException e)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
index d135df0..1a4ac21 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java
@@ -176,14 +176,14 @@ public class BigTableScanner implements ISSTableScanner
                 if (indexDecoratedKey.compareTo(currentRange.left) > 0 || 
currentRange.contains(indexDecoratedKey))
                 {
                     // Found, just read the dataPosition and seek into index 
and data files
-                    long dataPosition = ifile.readLong();
+                    long dataPosition = 
RowIndexEntry.Serializer.readPosition(ifile, sstable.descriptor.version);
                     ifile.seek(indexPosition);
                     dfile.seek(dataPosition);
                     break;
                 }
                 else
                 {
-                    RowIndexEntry.Serializer.skip(ifile);
+                    RowIndexEntry.Serializer.skip(ifile, 
sstable.descriptor.version);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java 
b/src/java/org/apache/cassandra/service/CacheService.java
index 9213b20..a48466a 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -53,6 +53,7 @@ import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
@@ -469,7 +470,11 @@ public class CacheService implements CacheServiceMBean
             input.readBoolean(); // backwards compatibility for "promoted 
indexes" boolean
             if (reader == null)
             {
-                RowIndexEntry.Serializer.skipPromotedIndex(input);
+                // The sstable doesn't exist anymore, so we can't be sure of 
the exact version and assume its the current version. The only case where we'll 
be
+                // wrong is during upgrade, in which case we fail at 
deserialization. This is not a huge deal however since 1) this is unlikely 
enough that
+                // this won't affect many users (if any) and only once, 2) 
this doesn't prevent the node from starting and 3) CASSANDRA-10219 shows that 
this
+                // part of the code has been broken for a while without anyone 
noticing (it is, btw, still broken until CASSANDRA-10219 is fixed).
+                RowIndexEntry.Serializer.skipPromotedIndex(input, 
BigFormat.instance.getLatestVersion());
                 return null;
             }
             RowIndexEntry.IndexSerializer<?> indexSerializer = 
reader.descriptor.getFormat().getIndexSerializer(reader.metadata,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java 
b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
index 4b7f15a..9f80023 100644
--- a/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/SinglePartitionSliceCommandTest.java
@@ -132,10 +132,10 @@ public class SinglePartitionSliceCommandTest
     @Test
     public void staticColumnsAreReturned() throws IOException
     {
-        DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k"));
+        DecoratedKey key = cfm.decorateKey(ByteBufferUtil.bytes("k1"));
 
-        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES ('k', 
's')");
-        Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM 
ks.tbl WHERE k='k'").isEmpty());
+        QueryProcessor.executeInternal("INSERT INTO ks.tbl (k, s) VALUES 
('k1', 's')");
+        Assert.assertFalse(QueryProcessor.executeInternal("SELECT s FROM 
ks.tbl WHERE k='k1'").isEmpty());
 
         ColumnFilter columnFilter = 
ColumnFilter.selection(PartitionColumns.of(s));
         ClusteringIndexSliceFilter sliceFilter = new 
ClusteringIndexSliceFilter(Slices.NONE, false);
@@ -147,11 +147,11 @@ public class SinglePartitionSliceCommandTest
                                                           key,
                                                           sliceFilter);
 
-        UnfilteredPartitionIterator pi;
-
         // check raw iterator for static cell
-        pi = cmd.executeLocally(ReadOrderGroup.emptyGroup());
-        checkForS(pi);
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); 
UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+        {
+            checkForS(pi);
+        }
 
         ReadResponse response;
         DataOutputBuffer out;
@@ -159,24 +159,33 @@ public class SinglePartitionSliceCommandTest
         ReadResponse dst;
 
         // check (de)serialized iterator for memtable static cell
-        pi = cmd.executeLocally(ReadOrderGroup.emptyGroup());
-        response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); 
UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+        {
+            response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+        }
+
         out = new DataOutputBuffer((int) 
ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, 
MessagingService.VERSION_30);
         in = new DataInputBuffer(out.buffer(), true);
         dst = ReadResponse.serializer.deserialize(in, 
MessagingService.VERSION_30);
-        pi = dst.makeIterator(cfm, cmd);
-        checkForS(pi);
+        try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+        {
+            checkForS(pi);
+        }
 
         // check (de)serialized iterator for sstable static cell
         
Schema.instance.getColumnFamilyStoreInstance(cfm.cfId).forceBlockingFlush();
-        pi = cmd.executeLocally(ReadOrderGroup.emptyGroup());
-        response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+        try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); 
UnfilteredPartitionIterator pi = cmd.executeLocally(orderGroup))
+        {
+            response = ReadResponse.createDataResponse(pi, cmd.columnFilter());
+        }
         out = new DataOutputBuffer((int) 
ReadResponse.serializer.serializedSize(response, MessagingService.VERSION_30));
         ReadResponse.serializer.serialize(response, out, 
MessagingService.VERSION_30);
         in = new DataInputBuffer(out.buffer(), true);
         dst = ReadResponse.serializer.deserialize(in, 
MessagingService.VERSION_30);
-        pi = dst.makeIterator(cfm, cmd);
-        checkForS(pi);
+        try (UnfilteredPartitionIterator pi = dst.makeIterator(cfm, cmd))
+        {
+            checkForS(pi);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java 
b/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
index c9f268a..2c967d0 100644
--- a/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/IndexHelperTest.java
@@ -51,9 +51,9 @@ public class IndexHelperTest
         DeletionTime deletionInfo = new 
DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds());
 
         List<IndexInfo> indexes = new ArrayList<>();
-        indexes.add(new IndexInfo(cn(0L), cn(5L), 0, 0, deletionInfo));
-        indexes.add(new IndexInfo(cn(10L), cn(15L), 0, 0, deletionInfo));
-        indexes.add(new IndexInfo(cn(20L), cn(25L), 0, 0, deletionInfo));
+        indexes.add(new IndexInfo(cn(0L), cn(5L), 0, deletionInfo));
+        indexes.add(new IndexInfo(cn(10L), cn(15L), 0,deletionInfo));
+        indexes.add(new IndexInfo(cn(20L), cn(25L), 0, deletionInfo));
 
 
         assertEquals(0, IndexHelper.indexFor(cn(-1L), indexes, comp, false, 
-1));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60e45c0a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
index 4eebdeb..faa9c3e 100644
--- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java
@@ -167,7 +167,9 @@ public class SSTableLoaderTest
                                                   .withBufferSizeInMB(1)
                                                   .build();
 
-        for (int i = 0; i < 1000; i++) // make sure to write more than 1 MB
+        int NB_PARTITIONS = 5000; // Enough to write >1MB and get at least one 
completed sstable before we've closed the writer
+
+        for (int i = 0; i < NB_PARTITIONS; i++)
         {
             for (int j = 0; j < 100; j++)
                 writer.addRow(String.format("key%d", i), 
String.format("col%d", j), "100");
@@ -183,7 +185,7 @@ public class SSTableLoaderTest
 
         List<FilteredPartition> partitions = 
Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
 
-        assertTrue(partitions.size() > 0 && partitions.size() < 1000);
+        assertTrue(partitions.size() > 0 && partitions.size() < NB_PARTITIONS);
 
         // now we complete the write and the second loader should load the 
last sstable as well
         writer.close();
@@ -192,7 +194,7 @@ public class SSTableLoaderTest
         loader.stream(Collections.emptySet(), 
completionStreamListener(latch)).get();
 
         partitions = 
Util.getAll(Util.cmd(Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD2)).build());
-        assertEquals(1000, partitions.size());
+        assertEquals(NB_PARTITIONS, partitions.size());
 
         // The stream future is signalled when the work is complete but before 
releasing references. Wait for release
         // before cleanup (CASSANDRA-10118).

Reply via email to