http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java 
b/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
deleted file mode 100644
index 55826f5..0000000
--- a/src/java/org/apache/cassandra/db/RangeSliceVerbHandler.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import org.apache.cassandra.io.IVersionedSerializer;
-
-public class RangeSliceVerbHandler extends ReadCommandVerbHandler
-{
-    @Override
-    protected IVersionedSerializer<ReadResponse> serializer()
-    {
-        return ReadResponse.rangeSliceSerializer;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index d8051fe..0bda184 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -37,7 +37,6 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.IndexNotAvailableException;
-import org.apache.cassandra.io.ForwardingVersionedSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -64,43 +63,6 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
     protected static final Logger logger = 
LoggerFactory.getLogger(ReadCommand.class);
     public static final IVersionedSerializer<ReadCommand> serializer = new 
Serializer();
 
-    // For READ verb: will either dispatch on 'serializer' for 3.0 or 
'legacyReadCommandSerializer' for earlier version.
-    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 
backward compatibility.
-    public static final IVersionedSerializer<ReadCommand> readSerializer = new 
ForwardingVersionedSerializer<ReadCommand>()
-    {
-        protected IVersionedSerializer<ReadCommand> delegate(int version)
-        {
-            return version < MessagingService.VERSION_30
-                    ? legacyReadCommandSerializer : serializer;
-        }
-    };
-
-    // For RANGE_SLICE verb: will either dispatch on 'serializer' for 3.0 or 
'legacyRangeSliceCommandSerializer' for earlier version.
-    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 
backward compatibility.
-    public static final IVersionedSerializer<ReadCommand> rangeSliceSerializer 
= new ForwardingVersionedSerializer<ReadCommand>()
-    {
-        protected IVersionedSerializer<ReadCommand> delegate(int version)
-        {
-            return version < MessagingService.VERSION_30
-                    ? legacyRangeSliceCommandSerializer : serializer;
-        }
-    };
-
-    // For PAGED_RANGE verb: will either dispatch on 'serializer' for 3.0 or 
'legacyPagedRangeCommandSerializer' for earlier version.
-    // Can be removed (and replaced by 'serializer') once we drop pre-3.0 
backward compatibility.
-    public static final IVersionedSerializer<ReadCommand> pagedRangeSerializer 
= new ForwardingVersionedSerializer<ReadCommand>()
-    {
-        protected IVersionedSerializer<ReadCommand> delegate(int version)
-        {
-            return version < MessagingService.VERSION_30
-                    ? legacyPagedRangeCommandSerializer : serializer;
-        }
-    };
-
-    public static final IVersionedSerializer<ReadCommand> 
legacyRangeSliceCommandSerializer = new LegacyRangeSliceCommandSerializer();
-    public static final IVersionedSerializer<ReadCommand> 
legacyPagedRangeCommandSerializer = new LegacyPagedRangeCommandSerializer();
-    public static final IVersionedSerializer<ReadCommand> 
legacyReadCommandSerializer = new LegacyReadCommandSerializer();
-
     private final Kind kind;
     private final CFMetaData metadata;
     private final int nowInSec;
@@ -580,7 +542,7 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
     /**
      * Creates a message for this command.
      */
-    public abstract MessageOut<ReadCommand> createMessage(int version);
+    public abstract MessageOut<ReadCommand> createMessage();
 
     protected abstract void appendCQLWhereClause(StringBuilder sb);
 
@@ -666,8 +628,6 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
         public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
         {
-            assert version >= MessagingService.VERSION_30;
-
             out.writeByte(command.kind.ordinal());
             out.writeByte(digestFlag(command.isDigestQuery()) | 
thriftFlag(command.isForThrift()) | indexFlag(command.index.isPresent()));
             if (command.isDigestQuery())
@@ -685,8 +645,6 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
         public ReadCommand deserialize(DataInputPlus in, int version) throws 
IOException
         {
-            assert version >= MessagingService.VERSION_30;
-
             Kind kind = Kind.values()[in.readByte()];
             int flags = in.readByte();
             boolean isDigest = isDigest(flags);
@@ -699,8 +657,8 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
             RowFilter rowFilter = RowFilter.serializer.deserialize(in, 
version, metadata);
             DataLimits limits = DataLimits.serializer.deserialize(in, version, 
 metadata.comparator);
             Optional<IndexMetadata> index = hasIndex
-                                            ? deserializeIndexMetadata(in, 
version, metadata)
-                                            : Optional.empty();
+                                          ? deserializeIndexMetadata(in, 
version, metadata)
+                                          : Optional.empty();
 
             return kind.selectionDeserializer.deserialize(in, version, 
isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, 
rowFilter, limits, index);
         }
@@ -724,8 +682,6 @@ public abstract class ReadCommand extends MonitorableImpl 
implements ReadQuery
 
         public long serializedSize(ReadCommand command, int version)
         {
-            assert version >= MessagingService.VERSION_30;
-
             return 2 // kind + flags
                  + (command.isDigestQuery() ? 
TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0)
                  + CFMetaData.serializer.serializedSize(command.metadata(), 
version)
@@ -737,1015 +693,4 @@ public abstract class ReadCommand extends 
MonitorableImpl implements ReadQuery
                  + command.indexSerializedSize(version);
         }
     }
-
-    private enum LegacyType
-    {
-        GET_BY_NAMES((byte)1),
-        GET_SLICES((byte)2);
-
-        public final byte serializedValue;
-
-        LegacyType(byte b)
-        {
-            this.serializedValue = b;
-        }
-
-        public static LegacyType 
fromPartitionFilterKind(ClusteringIndexFilter.Kind kind)
-        {
-            return kind == ClusteringIndexFilter.Kind.SLICE
-                   ? GET_SLICES
-                   : GET_BY_NAMES;
-        }
-
-        public static LegacyType fromSerializedValue(byte b)
-        {
-            return b == 1 ? GET_BY_NAMES : GET_SLICES;
-        }
-    }
-
-    /**
-     * Serializer for pre-3.0 RangeSliceCommands.
-     */
-    private static class LegacyRangeSliceCommandSerializer implements 
IVersionedSerializer<ReadCommand>
-    {
-        public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
-        {
-            assert version < MessagingService.VERSION_30;
-
-            PartitionRangeReadCommand rangeCommand = 
(PartitionRangeReadCommand) command;
-            assert !rangeCommand.dataRange().isPaging();
-
-            // convert pre-3.0 incompatible names filters to slice filters
-            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
-
-            CFMetaData metadata = rangeCommand.metadata();
-
-            out.writeUTF(metadata.ksName);
-            out.writeUTF(metadata.cfName);
-            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from 
seconds to millis
-
-            // begin DiskAtomFilterSerializer.serialize()
-            if (rangeCommand.isNamesQuery())
-            {
-                out.writeByte(1);  // 0 for slices, 1 for names
-                ClusteringIndexNamesFilter filter = 
(ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
-                LegacyReadCommandSerializer.serializeNamesFilter(rangeCommand, 
filter, out);
-            }
-            else
-            {
-                out.writeByte(0);  // 0 for slices, 1 for names
-
-                // slice filter serialization
-                ClusteringIndexSliceFilter filter = 
(ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
-
-                boolean makeStaticSlice = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && 
!filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
-                LegacyReadCommandSerializer.serializeSlices(out, 
filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
-
-                out.writeBoolean(filter.isReversed());
-
-                // limit
-                DataLimits limits = rangeCommand.limits();
-                if (limits.isDistinct())
-                    out.writeInt(1);
-                else
-                    
out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().count(),
 filter.requestedSlices()));
-
-                int compositesToGroup;
-                boolean selectsStatics = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && 
filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
-                if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT)
-                    compositesToGroup = -1;
-                else if (limits.isDistinct() && !selectsStatics)
-                    compositesToGroup = -2;  // for DISTINCT queries 
(CASSANDRA-8490)
-                else
-                    compositesToGroup = metadata.isDense() ? -1 : 
metadata.clusteringColumns().size();
-
-                out.writeInt(compositesToGroup);
-            }
-
-            serializeRowFilter(out, rangeCommand.rowFilter());
-            
AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(),
 out, version);
-
-            // maxResults
-            out.writeInt(rangeCommand.limits().count());
-
-            // countCQL3Rows
-            if (rangeCommand.isForThrift() || 
rangeCommand.limits().perPartitionCount() == 1)  // if for Thrift or DISTINCT
-                out.writeBoolean(false);
-            else
-                out.writeBoolean(true);
-
-            // isPaging
-            out.writeBoolean(false);
-        }
-
-        public ReadCommand deserialize(DataInputPlus in, int version) throws 
IOException
-        {
-            assert version < MessagingService.VERSION_30;
-
-            String keyspace = in.readUTF();
-            String columnFamily = in.readUTF();
-
-            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, 
columnFamily);
-            if (metadata == null)
-            {
-                String message = String.format("Got legacy range command for 
nonexistent table %s.%s.", keyspace, columnFamily);
-                throw new UnknownColumnFamilyException(message, null);
-            }
-
-            int nowInSec = (int) (in.readLong() / 1000);  // convert from 
millis to seconds
-
-            ClusteringIndexFilter filter;
-            ColumnFilter selection;
-            int compositesToGroup = 0;
-            int perPartitionLimit = -1;
-            byte readType = in.readByte();  // 0 for slices, 1 for names
-            if (readType == 1)
-            {
-                Pair<ColumnFilter, ClusteringIndexNamesFilter> 
selectionAndFilter = 
LegacyReadCommandSerializer.deserializeNamesSelectionAndFilter(in, metadata);
-                selection = selectionAndFilter.left;
-                filter = selectionAndFilter.right;
-            }
-            else
-            {
-                Pair<ClusteringIndexSliceFilter, Boolean> p = 
LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
-                filter = p.left;
-                perPartitionLimit = in.readInt();
-                compositesToGroup = in.readInt();
-                selection = getColumnSelectionForSlice(p.right, 
compositesToGroup, metadata);
-            }
-
-            RowFilter rowFilter = deserializeRowFilter(in, metadata);
-
-            AbstractBounds<PartitionPosition> keyRange = 
AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, 
version);
-            int maxResults = in.readInt();
-
-            boolean countCQL3Rows = in.readBoolean();  // countCQL3Rows (not 
needed)
-            in.readBoolean();  // isPaging (not needed)
-
-            boolean selectsStatics = 
(!selection.fetchedColumns().statics.isEmpty() || 
filter.selects(Clustering.STATIC_CLUSTERING));
-            // We have 2 types of DISTINCT queries: ones on only the partition 
key, and ones on the partition key and static columns. For the former,
-            // we can easily detect the case because compositeToGroup is -2 
and that's the only case it can be that. The latter one is slightly less
-            // direct, but we know that on 2.1/2.2 queries, DISTINCT queries 
are the only CQL queries that have countCQL3Rows to false so we use
-            // that fact.
-            boolean isDistinct = compositesToGroup == -2 || (compositesToGroup 
!= -1 && !countCQL3Rows);
-            DataLimits limits;
-            if (isDistinct)
-                limits = DataLimits.distinctLimits(maxResults);
-            else if (compositesToGroup == -1)
-                limits = DataLimits.thriftLimits(maxResults, 
perPartitionLimit);
-            else
-                limits = DataLimits.cqlLimits(maxResults);
-
-            return new PartitionRangeReadCommand(false, 0, true, metadata, 
nowInSec, selection, rowFilter, limits, new DataRange(keyRange, filter), 
Optional.empty());
-        }
-
-        static void serializeRowFilter(DataOutputPlus out, RowFilter 
rowFilter) throws IOException
-        {
-            ArrayList<RowFilter.Expression> indexExpressions = 
Lists.newArrayList(rowFilter.iterator());
-            out.writeInt(indexExpressions.size());
-            for (RowFilter.Expression expression : indexExpressions)
-            {
-                
ByteBufferUtil.writeWithShortLength(expression.column().name.bytes, out);
-                expression.operator().writeTo(out);
-                
ByteBufferUtil.writeWithShortLength(expression.getIndexValue(), out);
-            }
-        }
-
-        static RowFilter deserializeRowFilter(DataInputPlus in, CFMetaData 
metadata) throws IOException
-        {
-            int numRowFilters = in.readInt();
-            if (numRowFilters == 0)
-                return RowFilter.NONE;
-
-            RowFilter rowFilter = RowFilter.create(numRowFilters);
-            for (int i = 0; i < numRowFilters; i++)
-            {
-                ByteBuffer columnName = ByteBufferUtil.readWithShortLength(in);
-                ColumnDefinition column = 
metadata.getColumnDefinition(columnName);
-                Operator op = Operator.readFrom(in);
-                ByteBuffer indexValue = ByteBufferUtil.readWithShortLength(in);
-                rowFilter.add(column, op, indexValue);
-            }
-            return rowFilter;
-        }
-
-        static long serializedRowFilterSize(RowFilter rowFilter)
-        {
-            long size = TypeSizes.sizeof(0);  // rowFilterCount
-            for (RowFilter.Expression expression : rowFilter)
-            {
-                size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
-                size += TypeSizes.sizeof(0);  // operator int value
-                size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
-            }
-            return size;
-        }
-
-        public long serializedSize(ReadCommand command, int version)
-        {
-            assert version < MessagingService.VERSION_30;
-            assert command.kind == Kind.PARTITION_RANGE;
-
-            PartitionRangeReadCommand rangeCommand = 
(PartitionRangeReadCommand) command;
-            rangeCommand = maybeConvertNamesToSlice(rangeCommand);
-            CFMetaData metadata = rangeCommand.metadata();
-
-            long size = TypeSizes.sizeof(metadata.ksName);
-            size += TypeSizes.sizeof(metadata.cfName);
-            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
-
-            size += 1;  // single byte flag: 0 for slices, 1 for names
-            if (rangeCommand.isNamesQuery())
-            {
-                PartitionColumns columns = 
rangeCommand.columnFilter().fetchedColumns();
-                ClusteringIndexNamesFilter filter = 
(ClusteringIndexNamesFilter) rangeCommand.dataRange().clusteringIndexFilter;
-                size += 
LegacyReadCommandSerializer.serializedNamesFilterSize(filter, metadata, 
columns);
-            }
-            else
-            {
-                ClusteringIndexSliceFilter filter = 
(ClusteringIndexSliceFilter) rangeCommand.dataRange().clusteringIndexFilter;
-                boolean makeStaticSlice = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && 
!filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
-                size += 
LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), 
makeStaticSlice, metadata);
-                size += TypeSizes.sizeof(filter.isReversed());
-                size += 
TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
-                size += TypeSizes.sizeof(0); // compositesToGroup
-            }
-
-            if (rangeCommand.rowFilter().equals(RowFilter.NONE))
-            {
-                size += TypeSizes.sizeof(0);
-            }
-            else
-            {
-                ArrayList<RowFilter.Expression> indexExpressions = 
Lists.newArrayList(rangeCommand.rowFilter().iterator());
-                size += TypeSizes.sizeof(indexExpressions.size());
-                for (RowFilter.Expression expression : indexExpressions)
-                {
-                    size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.column().name.bytes);
-                    size += TypeSizes.sizeof(expression.operator().ordinal());
-                    size += 
ByteBufferUtil.serializedSizeWithShortLength(expression.getIndexValue());
-                }
-            }
-
-            size += 
AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(),
 version);
-            size += TypeSizes.sizeof(rangeCommand.limits().count());
-            size += TypeSizes.sizeof(!rangeCommand.isForThrift());
-            return size + 
TypeSizes.sizeof(rangeCommand.dataRange().isPaging());
-        }
-
-        static PartitionRangeReadCommand 
maybeConvertNamesToSlice(PartitionRangeReadCommand command)
-        {
-            if (!command.dataRange().isNamesQuery())
-                return command;
-
-            CFMetaData metadata = command.metadata();
-            if 
(!LegacyReadCommandSerializer.shouldConvertNamesToSlice(metadata, 
command.columnFilter().fetchedColumns()))
-                return command;
-
-            ClusteringIndexNamesFilter filter = (ClusteringIndexNamesFilter) 
command.dataRange().clusteringIndexFilter;
-            ClusteringIndexSliceFilter sliceFilter = 
LegacyReadCommandSerializer.convertNamesFilterToSliceFilter(filter, metadata);
-            DataRange newRange = new DataRange(command.dataRange().keyRange(), 
sliceFilter);
-            return new PartitionRangeReadCommand(
-                    command.isDigestQuery(), command.digestVersion(), 
command.isForThrift(), metadata, command.nowInSec(),
-                    command.columnFilter(), command.rowFilter(), 
command.limits(), newRange, Optional.empty());
-        }
-
-        static ColumnFilter getColumnSelectionForSlice(boolean selectsStatics, 
int compositesToGroup, CFMetaData metadata)
-        {
-            // A value of -2 indicates this is a DISTINCT query that doesn't 
select static columns, only partition keys.
-            // In that case, we'll basically be querying the first row of the 
partition, but we must make sure we include
-            // all columns so we get at least one cell if there is a live row 
as it would confuse pre-3.0 nodes otherwise.
-            if (compositesToGroup == -2)
-                return ColumnFilter.all(metadata);
-
-            // if a slice query from a pre-3.0 node doesn't cover statics, we 
shouldn't select them at all
-            PartitionColumns columns = selectsStatics
-                                     ? metadata.partitionColumns()
-                                     : 
metadata.partitionColumns().withoutStatics();
-            return ColumnFilter.selectionBuilder().addAll(columns).build();
-        }
-    }
-
-    /**
-     * Serializer for pre-3.0 PagedRangeCommands.
-     */
-    private static class LegacyPagedRangeCommandSerializer implements 
IVersionedSerializer<ReadCommand>
-    {
-        public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
-        {
-            assert version < MessagingService.VERSION_30;
-
-            PartitionRangeReadCommand rangeCommand = 
(PartitionRangeReadCommand) command;
-            assert rangeCommand.dataRange().isPaging();
-
-            CFMetaData metadata = rangeCommand.metadata();
-
-            out.writeUTF(metadata.ksName);
-            out.writeUTF(metadata.cfName);
-            out.writeLong(rangeCommand.nowInSec() * 1000L);  // convert from 
seconds to millis
-
-            
AbstractBounds.rowPositionSerializer.serialize(rangeCommand.dataRange().keyRange(),
 out, version);
-
-            // pre-3.0 nodes don't accept names filters for paged range 
commands
-            ClusteringIndexSliceFilter filter;
-            if (rangeCommand.dataRange().clusteringIndexFilter.kind() == 
ClusteringIndexFilter.Kind.NAMES)
-                filter = 
LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter)
 rangeCommand.dataRange().clusteringIndexFilter, metadata);
-            else
-                filter = (ClusteringIndexSliceFilter) 
rangeCommand.dataRange().clusteringIndexFilter;
-
-            // slice filter
-            boolean makeStaticSlice = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && 
!filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
-            LegacyReadCommandSerializer.serializeSlices(out, 
filter.requestedSlices(), filter.isReversed(), makeStaticSlice, metadata);
-            out.writeBoolean(filter.isReversed());
-
-            // slice filter's count
-            DataLimits.Kind kind = rangeCommand.limits().kind();
-            boolean isDistinct = (kind == DataLimits.Kind.CQL_LIMIT || kind == 
DataLimits.Kind.CQL_PAGING_LIMIT) && rangeCommand.limits().perPartitionCount() 
== 1;
-            if (isDistinct)
-                out.writeInt(1);
-            else
-                
out.writeInt(LegacyReadCommandSerializer.updateLimitForQuery(rangeCommand.limits().perPartitionCount(),
 filter.requestedSlices()));
-
-            // compositesToGroup
-            boolean selectsStatics = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() || 
filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
-            int compositesToGroup;
-            if (kind == DataLimits.Kind.THRIFT_LIMIT)
-                compositesToGroup = -1;
-            else if (isDistinct && !selectsStatics)
-                compositesToGroup = -2;  // for DISTINCT queries 
(CASSANDRA-8490)
-            else
-                compositesToGroup = metadata.isDense() ? -1 : 
metadata.clusteringColumns().size();
-
-            out.writeInt(compositesToGroup);
-
-            // command-level "start" and "stop" composites.  The start is the 
last-returned cell name if there is one,
-            // otherwise it's the same as the slice filter's start.  The stop 
appears to always be the same as the
-            // slice filter's stop.
-            DataRange.Paging pagingRange = (DataRange.Paging) 
rangeCommand.dataRange();
-            Clustering lastReturned = pagingRange.getLastReturned();
-            ClusteringBound newStart = 
ClusteringBound.inclusiveStartOf(lastReturned);
-            Slice lastSlice = 
filter.requestedSlices().get(filter.requestedSlices().size() - 1);
-            
ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeBound(metadata, 
newStart, true), out);
-            
ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeClustering(metadata, 
lastSlice.end().clustering()), out);
-
-            LegacyRangeSliceCommandSerializer.serializeRowFilter(out, 
rangeCommand.rowFilter());
-
-            // command-level limit
-            // Pre-3.0 we would always request one more row than we actually 
needed and the command-level "start" would
-            // be the last-returned cell name, so the response would always 
include it.
-            int maxResults = rangeCommand.limits().count() + 1;
-            out.writeInt(maxResults);
-
-            // countCQL3Rows
-            if (rangeCommand.isForThrift() || 
rangeCommand.limits().perPartitionCount() == 1)  // for Thrift or DISTINCT
-                out.writeBoolean(false);
-            else
-                out.writeBoolean(true);
-        }
-
-        public ReadCommand deserialize(DataInputPlus in, int version) throws 
IOException
-        {
-            assert version < MessagingService.VERSION_30;
-
-            String keyspace = in.readUTF();
-            String columnFamily = in.readUTF();
-
-            CFMetaData metadata = Schema.instance.getCFMetaData(keyspace, 
columnFamily);
-            if (metadata == null)
-            {
-                String message = String.format("Got legacy paged range command 
for nonexistent table %s.%s.", keyspace, columnFamily);
-                throw new UnknownColumnFamilyException(message, null);
-            }
-
-            int nowInSec = (int) (in.readLong() / 1000);  // convert from 
millis to seconds
-            AbstractBounds<PartitionPosition> keyRange = 
AbstractBounds.rowPositionSerializer.deserialize(in, metadata.partitioner, 
version);
-
-            Pair<ClusteringIndexSliceFilter, Boolean> p = 
LegacyReadCommandSerializer.deserializeSlicePartitionFilter(in, metadata);
-            ClusteringIndexSliceFilter filter = p.left;
-            boolean selectsStatics = p.right;
-
-            int perPartitionLimit = in.readInt();
-            int compositesToGroup = in.readInt();
-
-            // command-level Composite "start" and "stop"
-            LegacyLayout.LegacyBound startBound = 
LegacyLayout.decodeBound(metadata, ByteBufferUtil.readWithShortLength(in), 
true);
-
-            ByteBufferUtil.readWithShortLength(in);  // the composite "stop", 
which isn't actually needed
-
-            ColumnFilter selection = 
LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, 
compositesToGroup, metadata);
-
-            RowFilter rowFilter = 
LegacyRangeSliceCommandSerializer.deserializeRowFilter(in, metadata);
-            int maxResults = in.readInt();
-            boolean countCQL3Rows = in.readBoolean();
-
-            // We have 2 types of DISTINCT queries: ones on only the partition 
key, and ones on the partition key and static columns. For the former,
-            // we can easily detect the case because compositeToGroup is -2 
and that's the only case it can be that. The latter one is slightly less
-            // direct, but we know that on 2.1/2.2 queries, DISTINCT queries 
are the only CQL queries that have countCQL3Rows to false so we use
-            // that fact.
-            boolean isDistinct = compositesToGroup == -2 || (compositesToGroup 
!= -1 && !countCQL3Rows);
-            DataLimits limits;
-            if (isDistinct)
-                limits = DataLimits.distinctLimits(maxResults);
-            else
-                limits = DataLimits.cqlLimits(maxResults);
-
-            limits = limits.forPaging(maxResults);
-
-            // The pagedRangeCommand is used in pre-3.0 for both the first 
page and the following ones. On the first page, the startBound will be
-            // the start of the overall slice and will not be a proper 
Clustering. So detect that case and just return a non-paging DataRange, which
-            // is what 3.0 does.
-            DataRange dataRange = new DataRange(keyRange, filter);
-            Slices slices = filter.requestedSlices();
-            if (!isDistinct && startBound != LegacyLayout.LegacyBound.BOTTOM 
&& !startBound.bound.equals(slices.get(0).start()))
-            {
-                // pre-3.0 nodes normally expect pages to include the last 
cell from the previous page, but they handle it
-                // missing without any problems, so we can safely always set 
"inclusive" to false in the data range
-                dataRange = dataRange.forPaging(keyRange, metadata.comparator, 
startBound.getAsClustering(metadata), false);
-            }
-            return new PartitionRangeReadCommand(false, 0, true, metadata, 
nowInSec, selection, rowFilter, limits, dataRange, Optional.empty());
-        }
-
-        public long serializedSize(ReadCommand command, int version)
-        {
-            assert version < MessagingService.VERSION_30;
-            assert command.kind == Kind.PARTITION_RANGE;
-
-            PartitionRangeReadCommand rangeCommand = 
(PartitionRangeReadCommand) command;
-            CFMetaData metadata = rangeCommand.metadata();
-            assert rangeCommand.dataRange().isPaging();
-
-            long size = TypeSizes.sizeof(metadata.ksName);
-            size += TypeSizes.sizeof(metadata.cfName);
-            size += TypeSizes.sizeof((long) rangeCommand.nowInSec());
-
-            size += 
AbstractBounds.rowPositionSerializer.serializedSize(rangeCommand.dataRange().keyRange(),
 version);
-
-            // pre-3.0 nodes only accept slice filters for paged range commands
-            ClusteringIndexSliceFilter filter;
-            if (rangeCommand.dataRange().clusteringIndexFilter.kind() == 
ClusteringIndexFilter.Kind.NAMES)
-                filter = 
LegacyReadCommandSerializer.convertNamesFilterToSliceFilter((ClusteringIndexNamesFilter)
 rangeCommand.dataRange().clusteringIndexFilter, metadata);
-            else
-                filter = (ClusteringIndexSliceFilter) 
rangeCommand.dataRange().clusteringIndexFilter;
-
-            // slice filter
-            boolean makeStaticSlice = 
!rangeCommand.columnFilter().fetchedColumns().statics.isEmpty() && 
!filter.requestedSlices().selects(Clustering.STATIC_CLUSTERING);
-            size += 
LegacyReadCommandSerializer.serializedSlicesSize(filter.requestedSlices(), 
makeStaticSlice, metadata);
-            size += TypeSizes.sizeof(filter.isReversed());
-
-            // slice filter's count
-            size += 
TypeSizes.sizeof(rangeCommand.limits().perPartitionCount());
-
-            // compositesToGroup
-            size += TypeSizes.sizeof(0);
-
-            // command-level Composite "start" and "stop"
-            DataRange.Paging pagingRange = (DataRange.Paging) 
rangeCommand.dataRange();
-            Clustering lastReturned = pagingRange.getLastReturned();
-            Slice lastSlice = 
filter.requestedSlices().get(filter.requestedSlices().size() - 1);
-            size += 
ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata,
 lastReturned));
-            size += 
ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeClustering(metadata,
 lastSlice.end().clustering()));
-
-            size += 
LegacyRangeSliceCommandSerializer.serializedRowFilterSize(rangeCommand.rowFilter());
-
-            // command-level limit
-            size += TypeSizes.sizeof(rangeCommand.limits().count());
-
-            // countCQL3Rows
-            return size + TypeSizes.sizeof(true);
-        }
-    }
-
-    /**
-     * Serializer for pre-3.0 ReadCommands.
-     */
-    static class LegacyReadCommandSerializer implements 
IVersionedSerializer<ReadCommand>
-    {
-        public void serialize(ReadCommand command, DataOutputPlus out, int 
version) throws IOException
-        {
-            assert version < MessagingService.VERSION_30;
-            assert command.kind == Kind.SINGLE_PARTITION;
-
-            SinglePartitionReadCommand singleReadCommand = 
(SinglePartitionReadCommand) command;
-            singleReadCommand = maybeConvertNamesToSlice(singleReadCommand);
-
-            CFMetaData metadata = singleReadCommand.metadata();
-
-            
out.writeByte(LegacyType.fromPartitionFilterKind(singleReadCommand.clusteringIndexFilter().kind()).serializedValue);
-
-            out.writeBoolean(singleReadCommand.isDigestQuery());
-            out.writeUTF(metadata.ksName);
-            
ByteBufferUtil.writeWithShortLength(singleReadCommand.partitionKey().getKey(), 
out);
-            out.writeUTF(metadata.cfName);
-            out.writeLong(singleReadCommand.nowInSec() * 1000L);  // convert 
from seconds to millis
-
-            if (singleReadCommand.clusteringIndexFilter().kind() == 
ClusteringIndexFilter.Kind.SLICE)
-                serializeSliceCommand(singleReadCommand, out);
-            else
-                serializeNamesCommand(singleReadCommand, out);
-        }
-
-        public ReadCommand deserialize(DataInputPlus in, int version) throws 
IOException
-        {
-            assert version < MessagingService.VERSION_30;
-            LegacyType msgType = LegacyType.fromSerializedValue(in.readByte());
-
-            boolean isDigest = in.readBoolean();
-            String keyspaceName = in.readUTF();
-            ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
-            String cfName = in.readUTF();
-            long nowInMillis = in.readLong();
-            int nowInSeconds = (int) (nowInMillis / 1000);  // convert from 
millis to seconds
-            CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, 
cfName);
-            DecoratedKey dk = metadata.partitioner.decorateKey(key);
-
-            switch (msgType)
-            {
-                case GET_BY_NAMES:
-                    return deserializeNamesCommand(in, isDigest, metadata, dk, 
nowInSeconds, version);
-                case GET_SLICES:
-                    return deserializeSliceCommand(in, isDigest, metadata, dk, 
nowInSeconds, version);
-                default:
-                    throw new AssertionError();
-            }
-        }
-
-        public long serializedSize(ReadCommand command, int version)
-        {
-            assert version < MessagingService.VERSION_30;
-            assert command.kind == Kind.SINGLE_PARTITION;
-            SinglePartitionReadCommand singleReadCommand = 
(SinglePartitionReadCommand) command;
-            singleReadCommand = maybeConvertNamesToSlice(singleReadCommand);
-
-            int keySize = 
singleReadCommand.partitionKey().getKey().remaining();
-
-            CFMetaData metadata = singleReadCommand.metadata();
-
-            long size = 1;  // message type (single byte)
-            size += TypeSizes.sizeof(command.isDigestQuery());
-            size += TypeSizes.sizeof(metadata.ksName);
-            size += TypeSizes.sizeof((short) keySize) + keySize;
-            size += TypeSizes.sizeof((long) command.nowInSec());
-
-            if (singleReadCommand.clusteringIndexFilter().kind() == 
ClusteringIndexFilter.Kind.SLICE)
-                return size + serializedSliceCommandSize(singleReadCommand);
-            else
-                return size + serializedNamesCommandSize(singleReadCommand);
-        }
-
-        private void serializeNamesCommand(SinglePartitionReadCommand command, 
DataOutputPlus out) throws IOException
-        {
-            serializeNamesFilter(command, 
(ClusteringIndexNamesFilter)command.clusteringIndexFilter(), out);
-        }
-
-        private static void serializeNamesFilter(ReadCommand command, 
ClusteringIndexNamesFilter filter, DataOutputPlus out) throws IOException
-        {
-            PartitionColumns columns = command.columnFilter().fetchedColumns();
-            CFMetaData metadata = command.metadata();
-            SortedSet<Clustering> requestedRows = filter.requestedRows();
-
-            if (requestedRows.isEmpty())
-            {
-                // only static columns are requested
-                out.writeInt(columns.size());
-                for (ColumnDefinition column : columns)
-                    ByteBufferUtil.writeWithShortLength(column.name.bytes, 
out);
-            }
-            else
-            {
-                out.writeInt(requestedRows.size() * columns.size());
-                for (Clustering clustering : requestedRows)
-                {
-                    for (ColumnDefinition column : columns)
-                        
ByteBufferUtil.writeWithShortLength(LegacyLayout.encodeCellName(metadata, 
clustering, column.name.bytes, null), out);
-                }
-            }
-
-            // countCql3Rows should be true if it's not for Thrift or a 
DISTINCT query
-            if (command.isForThrift() || (command.limits().kind() == 
DataLimits.Kind.CQL_LIMIT && command.limits().perPartitionCount() == 1))
-                out.writeBoolean(false);  // it's compact and not a DISTINCT 
query
-            else
-                out.writeBoolean(true);
-        }
-
-        static long serializedNamesFilterSize(ClusteringIndexNamesFilter 
filter, CFMetaData metadata, PartitionColumns fetchedColumns)
-        {
-            SortedSet<Clustering> requestedRows = filter.requestedRows();
-
-            long size = 0;
-            if (requestedRows.isEmpty())
-            {
-                // only static columns are requested
-                size += TypeSizes.sizeof(fetchedColumns.size());
-                for (ColumnDefinition column : fetchedColumns)
-                    size += 
ByteBufferUtil.serializedSizeWithShortLength(column.name.bytes);
-            }
-            else
-            {
-                size += TypeSizes.sizeof(requestedRows.size() * 
fetchedColumns.size());
-                for (Clustering clustering : requestedRows)
-                {
-                    for (ColumnDefinition column : fetchedColumns)
-                        size += 
ByteBufferUtil.serializedSizeWithShortLength(LegacyLayout.encodeCellName(metadata,
 clustering, column.name.bytes, null));
-                }
-            }
-
-            return size + TypeSizes.sizeof(true);  // countCql3Rows
-        }
-
-        private SinglePartitionReadCommand 
deserializeNamesCommand(DataInputPlus in, boolean isDigest, CFMetaData 
metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
-        {
-            Pair<ColumnFilter, ClusteringIndexNamesFilter> selectionAndFilter 
= deserializeNamesSelectionAndFilter(in, metadata);
-
-            // messages from old nodes will expect the thrift format, so 
always use 'true' for isForThrift
-            return new SinglePartitionReadCommand(
-                    isDigest, version, true, metadata, nowInSeconds, 
selectionAndFilter.left, RowFilter.NONE, DataLimits.NONE,
-                    key, selectionAndFilter.right);
-        }
-
-        static Pair<ColumnFilter, ClusteringIndexNamesFilter> 
deserializeNamesSelectionAndFilter(DataInputPlus in, CFMetaData metadata) 
throws IOException
-        {
-            int numCellNames = in.readInt();
-
-            // The names filter could include either a) static columns or b) 
normal columns with the clustering columns
-            // fully specified.  We need to handle those cases differently in 
3.0.
-            NavigableSet<Clustering> clusterings = new 
TreeSet<>(metadata.comparator);
-
-            ColumnFilter.Builder selectionBuilder = 
ColumnFilter.selectionBuilder();
-            for (int i = 0; i < numCellNames; i++)
-            {
-                ByteBuffer buffer = ByteBufferUtil.readWithShortLength(in);
-                LegacyLayout.LegacyCellName cellName;
-                try
-                {
-                    cellName = LegacyLayout.decodeCellName(metadata, buffer);
-                }
-                catch (UnknownColumnException exc)
-                {
-                    // TODO this probably needs a new exception class that 
shares a parent with UnknownColumnFamilyException
-                    throw new UnknownColumnFamilyException(
-                            "Received legacy range read command with names 
filter for unrecognized column name. " +
-                                    "Fill name in filter (hex): " + 
ByteBufferUtil.bytesToHex(buffer), metadata.cfId);
-                }
-
-                // If we're querying for a static column, we may also need to 
read it
-                // as if it were a thrift dynamic column (because the column 
metadata,
-                // which makes it a static column in 3.0+, may have been added 
*after*
-                // some values were written). Note that all cql queries on 
non-compact
-                // tables used slice & not name filters prior to 3.0 so this 
path is
-                // not taken for non-compact tables. It is theoretically 
possible to
-                // get here via thrift, hence the check on 
metadata.isStaticCompactTable.
-                // See CASSANDRA-11087.
-                if (metadata.isStaticCompactTable() && 
cellName.clustering.equals(Clustering.STATIC_CLUSTERING))
-                {
-                    
clusterings.add(Clustering.make(cellName.column.name.bytes));
-                    selectionBuilder.add(metadata.compactValueColumn());
-                }
-                else
-                {
-                    clusterings.add(cellName.clustering);
-                }
-
-                selectionBuilder.add(cellName.column);
-            }
-
-            // for compact storage tables without clustering keys, the column 
holding the selected value is named
-            // 'value' internally we add it to the selection here to prevent 
errors due to unexpected column names
-            // when serializing the initial local data response
-            if (metadata.isStaticCompactTable() && clusterings.isEmpty())
-                selectionBuilder.addAll(metadata.partitionColumns());
-
-            in.readBoolean();  // countCql3Rows
-
-            // clusterings cannot include STATIC_CLUSTERING, so if the names 
filter is for static columns, clusterings
-            // will be empty.  However, by requesting the static columns in 
our ColumnFilter, this will still work.
-            ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(clusterings, false);
-            return Pair.create(selectionBuilder.build(), filter);
-        }
-
-        private long serializedNamesCommandSize(SinglePartitionReadCommand 
command)
-        {
-            ClusteringIndexNamesFilter filter = 
(ClusteringIndexNamesFilter)command.clusteringIndexFilter();
-            PartitionColumns columns = command.columnFilter().fetchedColumns();
-            return serializedNamesFilterSize(filter, command.metadata(), 
columns);
-        }
-
-        private void serializeSliceCommand(SinglePartitionReadCommand command, 
DataOutputPlus out) throws IOException
-        {
-            CFMetaData metadata = command.metadata();
-            ClusteringIndexSliceFilter filter = 
(ClusteringIndexSliceFilter)command.clusteringIndexFilter();
-
-            Slices slices = filter.requestedSlices();
-            boolean makeStaticSlice = 
!command.columnFilter().fetchedColumns().statics.isEmpty() && 
!slices.selects(Clustering.STATIC_CLUSTERING);
-            serializeSlices(out, slices, filter.isReversed(), makeStaticSlice, 
metadata);
-
-            out.writeBoolean(filter.isReversed());
-
-            boolean selectsStatics = 
!command.columnFilter().fetchedColumns().statics.isEmpty() || 
slices.selects(Clustering.STATIC_CLUSTERING);
-            DataLimits limits = command.limits();
-            if (limits.isDistinct())
-                out.writeInt(1);  // the limit is always 1 for DISTINCT queries
-            else
-                out.writeInt(updateLimitForQuery(command.limits().count(), 
filter.requestedSlices()));
-
-            int compositesToGroup;
-            if (limits.kind() == DataLimits.Kind.THRIFT_LIMIT || 
metadata.isDense())
-                compositesToGroup = -1;
-            else if (limits.isDistinct() && !selectsStatics)
-                compositesToGroup = -2;  // for DISTINCT queries 
(CASSANDRA-8490)
-            else
-                compositesToGroup = metadata.clusteringColumns().size();
-
-            out.writeInt(compositesToGroup);
-        }
-
-        private SinglePartitionReadCommand 
deserializeSliceCommand(DataInputPlus in, boolean isDigest, CFMetaData 
metadata, DecoratedKey key, int nowInSeconds, int version) throws IOException
-        {
-            Pair<ClusteringIndexSliceFilter, Boolean> p = 
deserializeSlicePartitionFilter(in, metadata);
-            ClusteringIndexSliceFilter filter = p.left;
-            boolean selectsStatics = p.right;
-            int count = in.readInt();
-            int compositesToGroup = in.readInt();
-
-            // if a slice query from a pre-3.0 node doesn't cover statics, we 
shouldn't select them at all
-            ColumnFilter columnFilter = 
LegacyRangeSliceCommandSerializer.getColumnSelectionForSlice(selectsStatics, 
compositesToGroup, metadata);
-
-            // We have 2 types of DISTINCT queries: ones on only the partition 
key, and ones on the partition key and static columns. For the former,
-            // we can easily detect the case because compositeToGroup is -2 
and that's the only case it can be that. The latter is probablematic
-            // however as we have no way to distinguish it from a normal 
select with a limit of 1 (and this, contrarily to the range query case
-            // were the countCQL3Rows boolean allows us to decide).
-            // So we consider this case not distinct here. This is ok because 
even if it is a distinct (with static), the count will be 1 and
-            // we'll still just query one row (a distinct DataLimits currently 
behave exactly like a CQL limit with a count of 1). The only
-            // drawback is that we'll send back the first row entirely while a 
2.1/2.2 node would return only the first cell in that same
-            // situation. This isn't a problem for 2.1/2.2 code however (it 
would be for a range query, as it would throw off the count for
-            // reasons similar to CASSANDRA-10762, but it's ok for single 
partition queries).
-            // We do _not_ want to do the reverse however and consider a 
'SELECT * FROM foo LIMIT 1' as a DISTINCT query as that would make
-            // us only return the 1st cell rather then 1st row.
-            DataLimits limits;
-            if (compositesToGroup == -2)
-                limits = DataLimits.distinctLimits(count);  // See 
CASSANDRA-8490 for the explanation of this value
-            else if (compositesToGroup == -1)
-                limits = DataLimits.thriftLimits(1, count);
-            else
-                limits = DataLimits.cqlLimits(count);
-
-            // messages from old nodes will expect the thrift format, so 
always use 'true' for isForThrift
-            return new SinglePartitionReadCommand(isDigest, version, true, 
metadata, nowInSeconds, columnFilter, RowFilter.NONE, limits, key, filter);
-        }
-
-        private long serializedSliceCommandSize(SinglePartitionReadCommand 
command)
-        {
-            CFMetaData metadata = command.metadata();
-            ClusteringIndexSliceFilter filter = 
(ClusteringIndexSliceFilter)command.clusteringIndexFilter();
-
-            Slices slices = filter.requestedSlices();
-            boolean makeStaticSlice = 
!command.columnFilter().fetchedColumns().statics.isEmpty() && 
!slices.selects(Clustering.STATIC_CLUSTERING);
-
-            long size = serializedSlicesSize(slices, makeStaticSlice, 
metadata);
-            size += 
TypeSizes.sizeof(command.clusteringIndexFilter().isReversed());
-            size += TypeSizes.sizeof(command.limits().count());
-            return size + TypeSizes.sizeof(0);  // compositesToGroup
-        }
-
-        static void serializeSlices(DataOutputPlus out, Slices slices, boolean 
isReversed, boolean makeStaticSlice, CFMetaData metadata) throws IOException
-        {
-            out.writeInt(slices.size() + (makeStaticSlice ? 1 : 0));
-
-            // In 3.0 we always store the slices in normal comparator order.  
Pre-3.0 nodes expect the slices to
-            // be in reversed order if the query is reversed, so we handle 
that here.
-            if (isReversed)
-            {
-                for (int i = slices.size() - 1; i >= 0; i--)
-                    serializeSlice(out, slices.get(i), true, metadata);
-                if (makeStaticSlice)
-                    serializeStaticSlice(out, true, metadata);
-            }
-            else
-            {
-                if (makeStaticSlice)
-                    serializeStaticSlice(out, false, metadata);
-                for (Slice slice : slices)
-                    serializeSlice(out, slice, false, metadata);
-            }
-        }
-
-        static long serializedSlicesSize(Slices slices, boolean 
makeStaticSlice, CFMetaData metadata)
-        {
-            long size = TypeSizes.sizeof(slices.size());
-
-            for (Slice slice : slices)
-            {
-                ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, 
slice.start(), true);
-                size += 
ByteBufferUtil.serializedSizeWithShortLength(sliceStart);
-                ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, 
slice.end(), false);
-                size += ByteBufferUtil.serializedSizeWithShortLength(sliceEnd);
-            }
-
-            if (makeStaticSlice)
-                size += serializedStaticSliceSize(metadata);
-
-            return size;
-        }
-
-        static long serializedStaticSliceSize(CFMetaData metadata)
-        {
-            // unlike serializeStaticSlice(), but we don't care about reversal 
for size calculations
-            ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, 
ClusteringBound.BOTTOM, false);
-            long size = 
ByteBufferUtil.serializedSizeWithShortLength(sliceStart);
-
-            size += TypeSizes.sizeof((short) (metadata.comparator.size() * 3 + 
2));
-            size += TypeSizes.sizeof((short) LegacyLayout.STATIC_PREFIX);
-            for (int i = 0; i < metadata.comparator.size(); i++)
-            {
-                size += 
ByteBufferUtil.serializedSizeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER);
-                size += 1;  // EOC
-            }
-            return size;
-        }
-
-        private static void serializeSlice(DataOutputPlus out, Slice slice, 
boolean isReversed, CFMetaData metadata) throws IOException
-        {
-            ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, 
isReversed ? slice.end() : slice.start(), !isReversed);
-            ByteBufferUtil.writeWithShortLength(sliceStart, out);
-
-            ByteBuffer sliceEnd = LegacyLayout.encodeBound(metadata, 
isReversed ? slice.start() : slice.end(), isReversed);
-            ByteBufferUtil.writeWithShortLength(sliceEnd, out);
-        }
-
-        private static void serializeStaticSlice(DataOutputPlus out, boolean 
isReversed, CFMetaData metadata) throws IOException
-        {
-            // if reversed, write an empty bound for the slice start; if 
reversed, write out an empty bound for the
-            // slice finish after we've written the static slice start
-            if (!isReversed)
-            {
-                ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, 
ClusteringBound.BOTTOM, false);
-                ByteBufferUtil.writeWithShortLength(sliceStart, out);
-            }
-
-            // write out the length of the composite
-            out.writeShort(2 + metadata.comparator.size() * 3);  // two bytes 
+ EOC for each component, plus static prefix
-            out.writeShort(LegacyLayout.STATIC_PREFIX);
-            for (int i = 0; i < metadata.comparator.size(); i++)
-            {
-                
ByteBufferUtil.writeWithShortLength(ByteBufferUtil.EMPTY_BYTE_BUFFER, out);
-                // write the EOC, using an inclusive end if we're on the final 
component
-                out.writeByte(i == metadata.comparator.size() - 1 ? 1 : 0);
-            }
-
-            if (isReversed)
-            {
-                ByteBuffer sliceStart = LegacyLayout.encodeBound(metadata, 
ClusteringBound.BOTTOM, false);
-                ByteBufferUtil.writeWithShortLength(sliceStart, out);
-            }
-        }
-
-        // Returns the deserialized filter, and whether static columns are 
queried (in pre-3.0, both info are determined by the slices,
-        // but in 3.0 they are separated: whether static columns are queried 
or not depends on the ColumnFilter).
-        static Pair<ClusteringIndexSliceFilter, Boolean> 
deserializeSlicePartitionFilter(DataInputPlus in, CFMetaData metadata) throws 
IOException
-        {
-            int numSlices = in.readInt();
-            ByteBuffer[] startBuffers = new ByteBuffer[numSlices];
-            ByteBuffer[] finishBuffers = new ByteBuffer[numSlices];
-            for (int i = 0; i < numSlices; i++)
-            {
-                startBuffers[i] = ByteBufferUtil.readWithShortLength(in);
-                finishBuffers[i] = ByteBufferUtil.readWithShortLength(in);
-            }
-
-            boolean reversed = in.readBoolean();
-
-            if (reversed)
-            {
-                // pre-3.0, reversed query slices put the greater element at 
the start of the slice
-                ByteBuffer[] tmp = finishBuffers;
-                finishBuffers = startBuffers;
-                startBuffers = tmp;
-            }
-
-            boolean selectsStatics = false;
-            Slices.Builder slicesBuilder = new 
Slices.Builder(metadata.comparator);
-            for (int i = 0; i < numSlices; i++)
-            {
-                LegacyLayout.LegacyBound start = 
LegacyLayout.decodeBound(metadata, startBuffers[i], true);
-                LegacyLayout.LegacyBound finish = 
LegacyLayout.decodeBound(metadata, finishBuffers[i], false);
-
-                if (start.isStatic)
-                {
-                    // If we start at the static block, this means we start at 
the beginning of the partition in 3.0
-                    // terms (since 3.0 handles static outside of the slice).
-                    start = LegacyLayout.LegacyBound.BOTTOM;
-
-                    // Then if we include the static, records it
-                    if (start.bound.isInclusive())
-                        selectsStatics = true;
-                }
-                else if (start == LegacyLayout.LegacyBound.BOTTOM)
-                {
-                    selectsStatics = true;
-                }
-
-                // If the end of the slice is the end of the statics, then 
that mean this slice was just selecting static
-                // columns. We have already recorded that in selectsStatics, 
so we can ignore the slice (which doesn't make
-                // sense for 3.0).
-                if (finish.isStatic)
-                {
-                    assert finish.bound.isInclusive(); // it would make no 
sense for a pre-3.0 node to have a slice that stops
-                                                     // before the static 
columns (since there is nothing before that)
-                    continue;
-                }
-
-                slicesBuilder.add(Slice.make(start.bound, finish.bound));
-            }
-
-            return Pair.create(new 
ClusteringIndexSliceFilter(slicesBuilder.build(), reversed), selectsStatics);
-        }
-
-        private static SinglePartitionReadCommand 
maybeConvertNamesToSlice(SinglePartitionReadCommand command)
-        {
-            if (command.clusteringIndexFilter().kind() != 
ClusteringIndexFilter.Kind.NAMES)
-                return command;
-
-            CFMetaData metadata = command.metadata();
-
-            if (!shouldConvertNamesToSlice(metadata, 
command.columnFilter().fetchedColumns()))
-                return command;
-
-            ClusteringIndexNamesFilter filter = 
(ClusteringIndexNamesFilter)command.clusteringIndexFilter();
-            ClusteringIndexSliceFilter sliceFilter = 
convertNamesFilterToSliceFilter(filter, metadata);
-            return new SinglePartitionReadCommand(
-                    command.isDigestQuery(), command.digestVersion(), 
command.isForThrift(), metadata, command.nowInSec(),
-                    command.columnFilter(), command.rowFilter(), 
command.limits(), command.partitionKey(), sliceFilter);
-        }
-
-        /**
-         * Returns true if a names filter on the given table and column 
selection should be converted to a slice
-         * filter for compatibility with pre-3.0 nodes, false otherwise.
-         */
-        static boolean shouldConvertNamesToSlice(CFMetaData metadata, 
PartitionColumns columns)
-        {
-            // On pre-3.0 nodes, due to CASSANDRA-5762, we always do a slice 
for CQL3 tables (not dense, composite).
-            if (!metadata.isDense() && metadata.isCompound())
-                return true;
-
-            // pre-3.0 nodes don't support names filters for reading 
collections, so if we're requesting any of those,
-            // we need to convert this to a slice filter
-            for (ColumnDefinition column : columns)
-            {
-                if (column.type.isMultiCell())
-                    return true;
-            }
-            return false;
-        }
-
-        /**
-         * Converts a names filter that is incompatible with pre-3.0 nodes to 
a slice filter that is compatible.
-         */
-        private static ClusteringIndexSliceFilter 
convertNamesFilterToSliceFilter(ClusteringIndexNamesFilter filter, CFMetaData 
metadata)
-        {
-            SortedSet<Clustering> requestedRows = filter.requestedRows();
-            Slices slices;
-            if (requestedRows.isEmpty())
-            {
-                slices = Slices.NONE;
-            }
-            else if (requestedRows.size() == 1 && requestedRows.first().size() 
== 0)
-            {
-                slices = Slices.ALL;
-            }
-            else
-            {
-                Slices.Builder slicesBuilder = new 
Slices.Builder(metadata.comparator);
-                for (Clustering clustering : requestedRows)
-                    
slicesBuilder.add(ClusteringBound.inclusiveStartOf(clustering), 
ClusteringBound.inclusiveEndOf(clustering));
-                slices = slicesBuilder.build();
-            }
-
-            return new ClusteringIndexSliceFilter(slices, filter.isReversed());
-        }
-
-        /**
-         * Potentially increases the existing query limit to account for the 
lack of exclusive bounds in pre-3.0 nodes.
-         * @param limit the existing query limit
-         * @param slices the requested slices
-         * @return the updated limit
-         */
-        static int updateLimitForQuery(int limit, Slices slices)
-        {
-            // Pre-3.0 nodes don't support exclusive bounds for slices. 
Instead, we query one more element if necessary
-            // and filter it later (in LegacyRemoteDataResponse)
-            if (!slices.hasLowerBound() && ! slices.hasUpperBound())
-                return limit;
-
-            for (Slice slice : slices)
-            {
-                if (limit == Integer.MAX_VALUE)
-                    return limit;
-
-                if (!slice.start().isInclusive())
-                    limit++;
-                if (!slice.end().isInclusive())
-                    limit++;
-            }
-            return limit;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/ReadResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java 
b/src/java/org/apache/cassandra/db/ReadResponse.java
index cca21f8..c3eae0d 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -32,7 +32,6 @@ import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
-import org.apache.cassandra.io.ForwardingVersionedSerializer;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataInputPlus;
@@ -47,20 +46,6 @@ public abstract class ReadResponse
 {
     // Serializer for single partition read response
     public static final IVersionedSerializer<ReadResponse> serializer = new 
Serializer();
-    // Serializer for the pre-3.0 rang slice responses.
-    public static final IVersionedSerializer<ReadResponse> 
legacyRangeSliceReplySerializer = new LegacyRangeSliceReplySerializer();
-    // Serializer for partition range read response (this actually delegate to 
'serializer' in 3.0 and to
-    // 'legacyRangeSliceReplySerializer' in older version.
-    public static final IVersionedSerializer<ReadResponse> 
rangeSliceSerializer = new ForwardingVersionedSerializer<ReadResponse>()
-    {
-        @Override
-        protected IVersionedSerializer<ReadResponse> delegate(int version)
-        {
-            return version < MessagingService.VERSION_30
-                    ? legacyRangeSliceReplySerializer
-                    : serializer;
-        }
-    };
 
     // This is used only when serializing data responses and we can't it 
easily in other cases. So this can be null, which is slighly
     // hacky, but as this hack doesn't escape this class, and it's easy enough 
to validate that it's not null when we need, it's "good enough".
@@ -95,7 +80,7 @@ public abstract class ReadResponse
     protected static ByteBuffer makeDigest(UnfilteredPartitionIterator 
iterator, ReadCommand command)
     {
         MessageDigest digest = FBUtilities.threadLocalMD5Digest();
-        UnfilteredPartitionIterators.digest(command, iterator, digest, 
command.digestVersion());
+        UnfilteredPartitionIterators.digest(iterator, digest, 
command.digestVersion());
         return ByteBuffer.wrap(digest.digest());
     }
 
@@ -210,130 +195,12 @@ public abstract class ReadResponse
         }
     }
 
-    /**
-     * A remote response from a pre-3.0 node.  This needs a separate class in 
order to cleanly handle trimming and
-     * reversal of results when the read command calls for it.  Pre-3.0 nodes 
always return results in the normal
-     * sorted order, even if the query asks for reversed results.  
Additionally,  pre-3.0 nodes do not have a notion of
-     * exclusive slices on non-composite tables, so extra rows may need to be 
trimmed.
-     */
-    @VisibleForTesting
-    static class LegacyRemoteDataResponse extends ReadResponse
-    {
-        private final List<ImmutableBTreePartition> partitions;
-
-        @VisibleForTesting
-        LegacyRemoteDataResponse(List<ImmutableBTreePartition> partitions)
-        {
-            super(null); // we never serialize LegacyRemoteDataResponses, so 
we don't care about the command
-            this.partitions = partitions;
-        }
-
-        public UnfilteredPartitionIterator makeIterator(final ReadCommand 
command)
-        {
-            // Due to a bug in the serialization of AbstractBounds, anything 
that isn't a Range is understood by pre-3.0 nodes
-            // as a Bound, which means IncludingExcludingBounds and 
ExcludingBounds responses may include keys they shouldn't.
-            // So filter partitions that shouldn't be included here.
-            boolean skipFirst = false;
-            boolean skipLast = false;
-            if (!partitions.isEmpty() && command instanceof 
PartitionRangeReadCommand)
-            {
-                AbstractBounds<PartitionPosition> keyRange = 
((PartitionRangeReadCommand)command).dataRange().keyRange();
-                boolean isExcludingBounds = keyRange instanceof 
ExcludingBounds;
-                skipFirst = isExcludingBounds && 
!keyRange.contains(partitions.get(0).partitionKey());
-                skipLast = (isExcludingBounds || keyRange instanceof 
IncludingExcludingBounds) && 
!keyRange.contains(partitions.get(partitions.size() - 1).partitionKey());
-            }
-
-            final List<ImmutableBTreePartition> toReturn;
-            if (skipFirst || skipLast)
-            {
-                toReturn = partitions.size() == 1
-                         ? Collections.emptyList()
-                         : partitions.subList(skipFirst ? 1 : 0, skipLast ? 
partitions.size() - 1 : partitions.size());
-            }
-            else
-            {
-                toReturn = partitions;
-            }
-
-            return new AbstractUnfilteredPartitionIterator()
-            {
-                private int idx;
-
-                public boolean isForThrift()
-                {
-                    return true;
-                }
-
-                public CFMetaData metadata()
-                {
-                    return command.metadata();
-                }
-
-                public boolean hasNext()
-                {
-                    return idx < toReturn.size();
-                }
-
-                public UnfilteredRowIterator next()
-                {
-                    ImmutableBTreePartition partition = toReturn.get(idx++);
-
-                    ClusteringIndexFilter filter = 
command.clusteringIndexFilter(partition.partitionKey());
-
-                    // Pre-3.0, we would always request one more row than we 
actually needed and the command-level "start" would
-                    // be the last-returned cell name, so the response would 
always include it.
-                    UnfilteredRowIterator iterator = 
partition.unfilteredIterator(command.columnFilter(), 
filter.getSlices(command.metadata()), filter.isReversed());
-
-                    // Wrap results with a ThriftResultMerger only if they're 
intended for the thrift command.
-                    if (command.isForThrift())
-                        return ThriftResultsMerger.maybeWrap(iterator, 
command.nowInSec());
-                    else
-                        return iterator;
-                }
-            };
-        }
-
-        public ByteBuffer digest(ReadCommand command)
-        {
-            try (UnfilteredPartitionIterator iterator = makeIterator(command))
-            {
-                return makeDigest(iterator, command);
-            }
-        }
-
-        public boolean isDigestResponse()
-        {
-            return false;
-        }
-    }
-
     private static class Serializer implements 
IVersionedSerializer<ReadResponse>
     {
         public void serialize(ReadResponse response, DataOutputPlus out, int 
version) throws IOException
         {
             boolean isDigest = response instanceof DigestResponse;
             ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : 
ByteBufferUtil.EMPTY_BYTE_BUFFER;
-            if (version < MessagingService.VERSION_30)
-            {
-                out.writeInt(digest.remaining());
-                out.write(digest);
-                out.writeBoolean(isDigest);
-                if (!isDigest)
-                {
-                    assert response.command != null; // we only serialize 
LocalDataResponse, which always has the command set
-                    try (UnfilteredPartitionIterator iter = 
response.makeIterator(response.command))
-                    {
-                        assert iter.hasNext();
-                        try (UnfilteredRowIterator partition = iter.next())
-                        {
-                            
ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
-                            
LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, 
version);
-                        }
-                        assert !iter.hasNext();
-                    }
-                }
-                return;
-            }
 
             ByteBufferUtil.writeWithVIntLength(digest, out);
             if (!isDigest)
@@ -345,38 +212,12 @@ public abstract class ReadResponse
 
         public ReadResponse deserialize(DataInputPlus in, int version) throws 
IOException
         {
-            if (version < MessagingService.VERSION_30)
-            {
-                byte[] digest = null;
-                int digestSize = in.readInt();
-                if (digestSize > 0)
-                {
-                    digest = new byte[digestSize];
-                    in.readFully(digest, 0, digestSize);
-                }
-                boolean isDigest = in.readBoolean();
-                assert isDigest == digestSize > 0;
-                if (isDigest)
-                {
-                    assert digest != null;
-                    return new DigestResponse(ByteBuffer.wrap(digest));
-                }
-
-                // ReadResponses from older versions are always 
single-partition (ranges are handled by RangeSliceReply)
-                ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
-                try (UnfilteredRowIterator rowIterator = 
LegacyLayout.deserializeLegacyPartition(in, version, 
SerializationHelper.Flag.FROM_REMOTE, key))
-                {
-                    if (rowIterator == null)
-                        return new 
LegacyRemoteDataResponse(Collections.emptyList());
-
-                    return new 
LegacyRemoteDataResponse(Collections.singletonList(ImmutableBTreePartition.create(rowIterator)));
-                }
-            }
-
             ByteBuffer digest = ByteBufferUtil.readWithVIntLength(in);
             if (digest.hasRemaining())
                 return new DigestResponse(digest);
 
+            // Note that we can only get there if version == 3.0, which is the 
current_version. When we'll change the
+            // version, we'll have to deserialize/re-serialize the data to be 
in the proper version.
             assert version == MessagingService.VERSION_30;
             ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
             return new RemoteDataResponse(data);
@@ -387,28 +228,6 @@ public abstract class ReadResponse
             boolean isDigest = response instanceof DigestResponse;
             ByteBuffer digest = isDigest ? ((DigestResponse)response).digest : 
ByteBufferUtil.EMPTY_BYTE_BUFFER;
 
-            if (version < MessagingService.VERSION_30)
-            {
-                long size = TypeSizes.sizeof(digest.remaining())
-                        + digest.remaining()
-                        + TypeSizes.sizeof(isDigest);
-                if (!isDigest)
-                {
-                    assert response.command != null; // we only serialize 
LocalDataResponse, which always has the command set
-                    try (UnfilteredPartitionIterator iter = 
response.makeIterator(response.command))
-                    {
-                        assert iter.hasNext();
-                        try (UnfilteredRowIterator partition = iter.next())
-                        {
-                            size += 
ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
-                            size += 
LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, 
version);
-                        }
-                        assert !iter.hasNext();
-                    }
-                }
-                return size;
-            }
-
             long size = ByteBufferUtil.serializedSizeWithVIntLength(digest);
             if (!isDigest)
             {
@@ -421,81 +240,4 @@ public abstract class ReadResponse
             return size;
         }
     }
-
-    private static class LegacyRangeSliceReplySerializer implements 
IVersionedSerializer<ReadResponse>
-    {
-        public void serialize(ReadResponse response, DataOutputPlus out, int 
version) throws IOException
-        {
-            assert version < MessagingService.VERSION_30;
-
-            // determine the number of partitions upfront for serialization
-            int numPartitions = 0;
-            assert response.command != null; // we only serialize 
LocalDataResponse, which always has the command set
-            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.command))
-            {
-                while (iterator.hasNext())
-                {
-                    try (UnfilteredRowIterator atomIterator = iterator.next())
-                    {
-                        numPartitions++;
-
-                        // we have to fully exhaust the subiterator
-                        while (atomIterator.hasNext())
-                            atomIterator.next();
-                    }
-                }
-            }
-
-            out.writeInt(numPartitions);
-
-            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.command))
-            {
-                while (iterator.hasNext())
-                {
-                    try (UnfilteredRowIterator partition = iterator.next())
-                    {
-                        
ByteBufferUtil.writeWithShortLength(partition.partitionKey().getKey(), out);
-                        
LegacyLayout.serializeAsLegacyPartition(response.command, partition, out, 
version);
-                    }
-                }
-            }
-        }
-
-        public ReadResponse deserialize(DataInputPlus in, int version) throws 
IOException
-        {
-            assert version < MessagingService.VERSION_30;
-
-            int partitionCount = in.readInt();
-            ArrayList<ImmutableBTreePartition> partitions = new 
ArrayList<>(partitionCount);
-            for (int i = 0; i < partitionCount; i++)
-            {
-                ByteBuffer key = ByteBufferUtil.readWithShortLength(in);
-                try (UnfilteredRowIterator partition = 
LegacyLayout.deserializeLegacyPartition(in, version, 
SerializationHelper.Flag.FROM_REMOTE, key))
-                {
-                    partitions.add(ImmutableBTreePartition.create(partition));
-                }
-            }
-            return new LegacyRemoteDataResponse(partitions);
-        }
-
-        public long serializedSize(ReadResponse response, int version)
-        {
-            assert version < MessagingService.VERSION_30;
-            long size = TypeSizes.sizeof(0);  // number of partitions
-
-            assert response.command != null; // we only serialize 
LocalDataResponse, which always has the command set
-            try (UnfilteredPartitionIterator iterator = 
response.makeIterator(response.command))
-            {
-                while (iterator.hasNext())
-                {
-                    try (UnfilteredRowIterator partition = iterator.next())
-                    {
-                        size += 
ByteBufferUtil.serializedSizeWithShortLength(partition.partitionKey().getKey());
-                        size += 
LegacyLayout.serializedSizeAsLegacyPartition(response.command, partition, 
version);
-                    }
-                }
-            }
-            return size;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4a246419/src/java/org/apache/cassandra/db/RowIndexEntry.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowIndexEntry.java 
b/src/java/org/apache/cassandra/db/RowIndexEntry.java
index e620dc0..a709ec3 100644
--- a/src/java/org/apache/cassandra/db/RowIndexEntry.java
+++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java
@@ -253,7 +253,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public Serializer(CFMetaData metadata, Version version, 
SerializationHeader header)
         {
-            this.idxInfoSerializer = 
metadata.serializers().indexInfoSerializer(version, header);
+            this.idxInfoSerializer = IndexInfo.serializer(version, header);
             this.version = version;
         }
 
@@ -264,22 +264,16 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public void serialize(RowIndexEntry<IndexInfo> rie, DataOutputPlus 
out, ByteBuffer indexInfo) throws IOException
         {
-            assert version.storeRows() : "We read old index files but we 
should never write them";
-
             rie.serialize(out, idxInfoSerializer, indexInfo);
         }
 
         public void serializeForCache(RowIndexEntry<IndexInfo> rie, 
DataOutputPlus out) throws IOException
         {
-            assert version.storeRows();
-
             rie.serializeForCache(out);
         }
 
         public RowIndexEntry<IndexInfo> deserializeForCache(DataInputPlus in) 
throws IOException
         {
-            assert version.storeRows();
-
             long position = in.readUnsignedVInt();
 
             switch (in.readByte())
@@ -297,8 +291,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public static void skipForCache(DataInputPlus in, Version version) 
throws IOException
         {
-            assert version.storeRows();
-
             /* long position = */in.readUnsignedVInt();
             switch (in.readByte())
             {
@@ -317,9 +309,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, long 
indexFilePosition) throws IOException
         {
-            if (!version.storeRows())
-                return LegacyShallowIndexedEntry.deserialize(in, 
indexFilePosition, idxInfoSerializer);
-
             long position = in.readUnsignedVInt();
 
             int size = (int)in.readUnsignedVInt();
@@ -354,9 +343,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         public long deserializePositionAndSkip(DataInputPlus in) throws 
IOException
         {
-            if (!version.storeRows())
-                return 
LegacyShallowIndexedEntry.deserializePositionAndSkip(in);
-
             return ShallowIndexedEntry.deserializePositionAndSkip(in);
         }
 
@@ -367,7 +353,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
          */
         public static long readPosition(DataInputPlus in, Version version) 
throws IOException
         {
-            return version.storeRows() ? in.readUnsignedVInt() : in.readLong();
+            return in.readUnsignedVInt();
         }
 
         public static void skip(DataInputPlus in, Version version) throws 
IOException
@@ -378,7 +364,7 @@ public class RowIndexEntry<T> implements IMeasurableMemory
 
         private static void skipPromotedIndex(DataInputPlus in, Version 
version) throws IOException
         {
-            int size = version.storeRows() ? (int)in.readUnsignedVInt() : 
in.readInt();
+            int size = (int)in.readUnsignedVInt();
             if (size <= 0)
                 return;
 
@@ -413,164 +399,6 @@ public class RowIndexEntry<T> implements IMeasurableMemory
         out.writeByte(CACHE_NOT_INDEXED);
     }
 
-    private static final class LegacyShallowIndexedEntry extends 
RowIndexEntry<IndexInfo>
-    {
-        private static final long BASE_SIZE;
-        static
-        {
-            BASE_SIZE = ObjectSizes.measure(new LegacyShallowIndexedEntry(0, 
0, DeletionTime.LIVE, 0, new int[0], null, 0));
-        }
-
-        private final long indexFilePosition;
-        private final int[] offsets;
-        @Unmetered
-        private final IndexInfo.Serializer idxInfoSerializer;
-        private final DeletionTime deletionTime;
-        private final long headerLength;
-        private final int serializedSize;
-
-        private LegacyShallowIndexedEntry(long dataFilePosition, long 
indexFilePosition,
-                                          DeletionTime deletionTime, long 
headerLength,
-                                          int[] offsets, IndexInfo.Serializer 
idxInfoSerializer,
-                                          int serializedSize)
-        {
-            super(dataFilePosition);
-            this.deletionTime = deletionTime;
-            this.headerLength = headerLength;
-            this.indexFilePosition = indexFilePosition;
-            this.offsets = offsets;
-            this.idxInfoSerializer = idxInfoSerializer;
-            this.serializedSize = serializedSize;
-        }
-
-        @Override
-        public DeletionTime deletionTime()
-        {
-            return deletionTime;
-        }
-
-        @Override
-        public long headerLength()
-        {
-            return headerLength;
-        }
-
-        @Override
-        public long unsharedHeapSize()
-        {
-            return BASE_SIZE + offsets.length * TypeSizes.sizeof(0);
-        }
-
-        @Override
-        public int columnsIndexCount()
-        {
-            return offsets.length;
-        }
-
-        @Override
-        public void serialize(DataOutputPlus out, IndexInfo.Serializer 
idxInfoSerializer, ByteBuffer indexInfo)
-        {
-            throw new UnsupportedOperationException("serializing legacy index 
entries is not supported");
-        }
-
-        @Override
-        public void serializeForCache(DataOutputPlus out)
-        {
-            throw new UnsupportedOperationException("serializing legacy index 
entries is not supported");
-        }
-
-        @Override
-        public IndexInfoRetriever openWithIndex(FileHandle indexFile)
-        {
-            int fieldsSize = (int) 
DeletionTime.serializer.serializedSize(deletionTime)
-                             + TypeSizes.sizeof(0); // columnIndexCount
-            indexEntrySizeHistogram.update(serializedSize);
-            indexInfoCountHistogram.update(offsets.length);
-            return new LegacyIndexInfoRetriever(indexFilePosition +
-                                                TypeSizes.sizeof(0L) + // 
position
-                                                TypeSizes.sizeof(0) + // 
indexInfoSize
-                                                fieldsSize,
-                                                offsets, 
indexFile.createReader(), idxInfoSerializer);
-        }
-
-        public static RowIndexEntry<IndexInfo> deserialize(DataInputPlus in, 
long indexFilePosition,
-                                                IndexInfo.Serializer 
idxInfoSerializer) throws IOException
-        {
-            long dataFilePosition = in.readLong();
-
-            int size = in.readInt();
-            if (size == 0)
-            {
-                return new RowIndexEntry<>(dataFilePosition);
-            }
-            else if (size <= DatabaseDescriptor.getColumnIndexCacheSize())
-            {
-                return new IndexedEntry(dataFilePosition, in, 
idxInfoSerializer);
-            }
-            else
-            {
-                DeletionTime deletionTime = 
DeletionTime.serializer.deserialize(in);
-
-                // For legacy sstables (i.e. sstables pre-"ma", pre-3.0) we 
have to scan all serialized IndexInfo
-                // objects to calculate the offsets array. However, it might 
be possible to deserialize all
-                // IndexInfo objects here - but to just skip feels more gentle 
to the heap/GC.
-
-                int entries = in.readInt();
-                int[] offsets = new int[entries];
-
-                TrackedDataInputPlus tracked = new TrackedDataInputPlus(in);
-                long start = tracked.getBytesRead();
-                long headerLength = 0L;
-                for (int i = 0; i < entries; i++)
-                {
-                    offsets[i] = (int) (tracked.getBytesRead() - start);
-                    if (i == 0)
-                    {
-                        IndexInfo info = 
idxInfoSerializer.deserialize(tracked);
-                        headerLength = info.offset;
-                    }
-                    else
-                        idxInfoSerializer.skip(tracked);
-                }
-
-                return new LegacyShallowIndexedEntry(dataFilePosition, 
indexFilePosition, deletionTime, headerLength, offsets, idxInfoSerializer, 
size);
-            }
-        }
-
-        static long deserializePositionAndSkip(DataInputPlus in) throws 
IOException
-        {
-            long position = in.readLong();
-
-            int size = in.readInt();
-            if (size > 0)
-                in.skipBytesFully(size);
-
-            return position;
-        }
-    }
-
-    private static final class LegacyIndexInfoRetriever extends 
FileIndexInfoRetriever
-    {
-        private final int[] offsets;
-
-        private LegacyIndexInfoRetriever(long indexFilePosition, int[] 
offsets, FileDataInput reader, IndexInfo.Serializer idxInfoSerializer)
-        {
-            super(indexFilePosition, reader, idxInfoSerializer);
-            this.offsets = offsets;
-        }
-
-        IndexInfo fetchIndex(int index) throws IOException
-        {
-            retrievals++;
-
-            // seek to posision of IndexInfo
-            indexReader.seek(indexInfoFilePosition + offsets[index]);
-
-            // deserialize IndexInfo
-            return idxInfoSerializer.deserialize(indexReader);
-        }
-    }
-
     /**
      * An entry in the row index for a row whose columns are indexed - used 
for both legacy and current formats.
      */
@@ -622,14 +450,9 @@ public class RowIndexEntry<T> implements IMeasurableMemory
             for (int i = 0; i < columnsIndexCount; i++)
                 this.columnsIndex[i] = idxInfoSerializer.deserialize(in);
 
-            int[] offsets = null;
-            if (version.storeRows())
-            {
-                offsets = new int[this.columnsIndex.length];
-                for (int i = 0; i < offsets.length; i++)
-                    offsets[i] = in.readInt();
-            }
-            this.offsets = offsets;
+            this.offsets = new int[this.columnsIndex.length];
+            for (int i = 0; i < offsets.length; i++)
+                offsets[i] = in.readInt();
 
             this.indexedPartSize = indexedPartSize;
 

Reply via email to