http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java 
b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
new file mode 100644
index 0000000..c3f3d29
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.ListType;
+import org.apache.cassandra.db.marshal.MapType;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.*;
+
+/**
+ * Convenience object to create single row updates.
+ *
+ * This is meant for system table update, when performance is not of the 
utmost importance.
+ */
+public class RowUpdateBuilder
+{
+    private final PartitionUpdate update;
+
+    private final LivenessInfo defaultLiveness;
+    private final LivenessInfo deletionLiveness;
+    private final DeletionTime deletionTime;
+
+    private final Mutation mutation;
+
+    private Row.Writer regularWriter;
+    private Row.Writer staticWriter;
+
+    private boolean hasSetClustering;
+    private boolean useRowMarker = true;
+
+    private RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl, 
int localDeletionTime, Mutation mutation)
+    {
+        this.update = update;
+
+        this.defaultLiveness = SimpleLivenessInfo.forUpdate(timestamp, ttl, 
localDeletionTime, update.metadata());
+        this.deletionLiveness = SimpleLivenessInfo.forDeletion(timestamp, 
localDeletionTime);
+        this.deletionTime = new SimpleDeletionTime(timestamp, 
localDeletionTime);
+
+        // note that the created mutation may get further update later on, so 
we don't use the ctor that create a singletonMap
+        // underneath (this class if for convenience, not performance)
+        this.mutation = mutation == null ? new 
Mutation(update.metadata().ksName, update.partitionKey()).add(update) : 
mutation;
+    }
+
+    private RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl, 
Mutation mutation)
+    {
+        this(update, timestamp, ttl, FBUtilities.nowInSeconds(), mutation);
+    }
+
+    private Row.Writer writer()
+    {
+        assert staticWriter == null : "Cannot update both static and 
non-static columns with the same RowUpdateBuilder object";
+        if (regularWriter == null)
+        {
+            regularWriter = update.writer();
+
+            // If a CQL table, add the "row marker"
+            if (update.metadata().isCQLTable() && useRowMarker)
+                regularWriter.writePartitionKeyLivenessInfo(defaultLiveness);
+        }
+        return regularWriter;
+    }
+
+    private Row.Writer staticWriter()
+    {
+        assert regularWriter == null : "Cannot update both static and 
non-static columns with the same RowUpdateBuilder object";
+        if (staticWriter == null)
+            staticWriter = update.staticWriter();
+        return staticWriter;
+    }
+
+    private Row.Writer writer(ColumnDefinition c)
+    {
+        return c.isStatic() ? staticWriter() : writer();
+    }
+
+    public RowUpdateBuilder(CFMetaData metadata, long timestamp, Object 
partitionKey)
+    {
+        this(metadata, FBUtilities.nowInSeconds(), timestamp, partitionKey);
+    }
+
+    public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long 
timestamp, Object partitionKey)
+    {
+        this(metadata, localDeletionTime, timestamp, 
metadata.getDefaultTimeToLive(), partitionKey);
+    }
+
+    public RowUpdateBuilder(CFMetaData metadata, long timestamp, int ttl, 
Object partitionKey)
+    {
+        this(metadata, FBUtilities.nowInSeconds(), timestamp, ttl, 
partitionKey);
+    }
+
+    public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long 
timestamp, int ttl, Object partitionKey)
+    {
+        this(new PartitionUpdate(metadata, makeKey(metadata, partitionKey), 
metadata.partitionColumns(), 1), timestamp, ttl, localDeletionTime, null);
+    }
+
+    public RowUpdateBuilder(CFMetaData metadata, long timestamp, Mutation 
mutation)
+    {
+        this(metadata, timestamp, LivenessInfo.NO_TTL, mutation);
+    }
+
+    public RowUpdateBuilder(CFMetaData metadata, long timestamp, int ttl, 
Mutation mutation)
+    {
+        this(getOrAdd(metadata, mutation), timestamp, ttl, mutation);
+    }
+
+    public RowUpdateBuilder(PartitionUpdate update, long timestamp, int ttl)
+    {
+        this(update, timestamp, ttl, null);
+    }
+
+    // This must be called before any addition or deletion if used.
+    public RowUpdateBuilder noRowMarker()
+    {
+        this.useRowMarker = false;
+        return this;
+    }
+
+    public RowUpdateBuilder clustering(Object... clusteringValues)
+    {
+        assert clusteringValues.length == update.metadata().comparator.size()
+            : "Invalid clustering values length. Expected: " + 
update.metadata().comparator.size() + " got: " + clusteringValues.length;
+        hasSetClustering = true;
+        if (clusteringValues.length > 0)
+            
Rows.writeClustering(update.metadata().comparator.make(clusteringValues), 
writer());
+        return this;
+    }
+
+    public Mutation build()
+    {
+        Row.Writer writer = regularWriter == null ? staticWriter : 
regularWriter;
+        if (writer != null)
+            writer.endOfRow();
+        return mutation;
+    }
+
+    public PartitionUpdate buildUpdate()
+    {
+        build();
+        return update;
+    }
+
+    private static void deleteRow(PartitionUpdate update, long timestamp, 
Object...clusteringValues)
+    {
+        assert clusteringValues.length == update.metadata().comparator.size() 
|| (clusteringValues.length == 0 && !update.columns().statics.isEmpty());
+
+        Row.Writer writer = clusteringValues.length == 
update.metadata().comparator.size()
+                          ? update.writer()
+                          : update.staticWriter();
+
+        if (clusteringValues.length > 0)
+            
Rows.writeClustering(update.metadata().comparator.make(clusteringValues), 
writer);
+        writer.writeRowDeletion(new SimpleDeletionTime(timestamp, 
FBUtilities.nowInSeconds()));
+        writer.endOfRow();
+    }
+
+    public static Mutation deleteRow(CFMetaData metadata, long timestamp, 
Mutation mutation, Object... clusteringValues)
+    {
+        deleteRow(getOrAdd(metadata, mutation), timestamp, clusteringValues);
+        return mutation;
+    }
+
+    public static Mutation deleteRow(CFMetaData metadata, long timestamp, 
Object key, Object... clusteringValues)
+    {
+        PartitionUpdate update = new PartitionUpdate(metadata, 
makeKey(metadata, key), metadata.partitionColumns(), 0);
+        deleteRow(update, timestamp, clusteringValues);
+        // note that the created mutation may get further update later on, so 
we don't use the ctor that create a singletonMap
+        // underneath (this class if for convenience, not performance)
+        return new Mutation(update.metadata().ksName, 
update.partitionKey()).add(update);
+    }
+
+    private static DecoratedKey makeKey(CFMetaData metadata, Object... 
partitionKey)
+    {
+        if (partitionKey.length == 1 && partitionKey[0] instanceof 
DecoratedKey)
+            return (DecoratedKey)partitionKey[0];
+
+        ByteBuffer key = 
CFMetaData.serializePartitionKey(metadata.getKeyValidatorAsClusteringComparator().make(partitionKey));
+        return StorageService.getPartitioner().decorateKey(key);
+    }
+
+    private static PartitionUpdate getOrAdd(CFMetaData metadata, Mutation 
mutation)
+    {
+        PartitionUpdate upd = mutation.get(metadata);
+        if (upd == null)
+        {
+            upd = new PartitionUpdate(metadata, mutation.key(), 
metadata.partitionColumns(), 1);
+            mutation.add(upd);
+        }
+        return upd;
+    }
+
+    public RowUpdateBuilder resetCollection(String columnName)
+    {
+        ColumnDefinition c = getDefinition(columnName);
+        assert c != null : "Cannot find column " + columnName;
+        assert c.isStatic() || update.metadata().comparator.size() == 0 || 
hasSetClustering : "Cannot set non static column " + c + " since no clustering 
hasn't been provided";
+        assert c.type.isCollection() && c.type.isMultiCell();
+        writer(c).writeComplexDeletion(c, new 
SimpleDeletionTime(defaultLiveness.timestamp() - 1, 
deletionTime.localDeletionTime()));
+        return this;
+    }
+
+    public RowUpdateBuilder addRangeTombstone(RangeTombstone rt)
+    {
+        update.addRangeTombstone(rt);
+        return this;
+    }
+
+    public RowUpdateBuilder addRangeTombstone(Slice slice)
+    {
+        update.addRangeTombstone(slice, deletionTime);
+        return this;
+    }
+
+    public RowUpdateBuilder addRangeTombstone(Object start, Object end)
+    {
+        ClusteringComparator cmp = update.metadata().comparator;
+        Slice slice = Slice.make(cmp.make(start), cmp.make(end));
+        return addRangeTombstone(slice);
+    }
+
+    public RowUpdateBuilder add(String columnName, Object value)
+    {
+        ColumnDefinition c = getDefinition(columnName);
+        assert c != null : "Cannot find column " + columnName;
+        return add(c, value);
+    }
+
+    public RowUpdateBuilder add(ColumnDefinition columnDefinition, Object 
value)
+    {
+        assert columnDefinition.isStatic() || 
update.metadata().comparator.size() == 0 || hasSetClustering : "Cannot set non 
static column " + columnDefinition + " since no clustering hasn't been 
provided";
+        if (value == null)
+            writer(columnDefinition).writeCell(columnDefinition, false, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, deletionLiveness, null);
+        else
+            writer(columnDefinition).writeCell(columnDefinition, false, 
bb(value, columnDefinition.type), defaultLiveness, null);
+        return this;
+    }
+
+    public RowUpdateBuilder delete(String columnName)
+    {
+        ColumnDefinition c = getDefinition(columnName);
+        assert c != null : "Cannot find column " + columnName;
+        return delete(c);
+    }
+
+    public RowUpdateBuilder delete(ColumnDefinition columnDefinition)
+    {
+        return add(columnDefinition, null);
+    }
+
+    private ByteBuffer bb(Object value, AbstractType<?> type)
+    {
+        if (value instanceof ByteBuffer)
+            return (ByteBuffer)value;
+
+        if (type.isCounter())
+        {
+            // See UpdateParameters.addCounter()
+            assert value instanceof Long : "Attempted to adjust Counter cell 
with non-long value.";
+            return 
CounterContext.instance().createGlobal(CounterId.getLocalId(), 1, (Long)value);
+        }
+        return ((AbstractType)type).decompose(value);
+    }
+
+    public RowUpdateBuilder addMapEntry(String columnName, Object key, Object 
value)
+    {
+        ColumnDefinition c = getDefinition(columnName);
+        assert c.isStatic() || update.metadata().comparator.size() == 0 || 
hasSetClustering : "Cannot set non static column " + c + " since no clustering 
hasn't been provided";
+        assert c.type instanceof MapType;
+        MapType mt = (MapType)c.type;
+        writer(c).writeCell(c, false, bb(value, mt.getValuesType()), 
defaultLiveness, CellPath.create(bb(key, mt.getKeysType())));
+        return this;
+    }
+
+    public RowUpdateBuilder addListEntry(String columnName, Object value)
+    {
+        ColumnDefinition c = getDefinition(columnName);
+        assert c.isStatic() || hasSetClustering : "Cannot set non static 
column " + c + " since no clustering hasn't been provided";
+        assert c.type instanceof ListType;
+        ListType lt = (ListType)c.type;
+        writer(c).writeCell(c, false, bb(value, lt.getElementsType()), 
defaultLiveness, CellPath.create(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())));
+        return this;
+    }
+
+    private ColumnDefinition getDefinition(String name)
+    {
+        return update.metadata().getColumnDefinition(new 
ColumnIdentifier(name, true));
+    }
+
+    public UnfilteredRowIterator unfilteredIterator()
+    {
+        return update.unfilteredIterator();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
new file mode 100644
index 0000000..304332e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.base.Function;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.db.marshal.TypeParser;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.Version;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+public class SerializationHeader
+{
+    private static final int DEFAULT_BASE_DELETION = 
computeDefaultBaseDeletion();
+
+    public static final Serializer serializer = new Serializer();
+
+    private final AbstractType<?> keyType;
+    private final List<AbstractType<?>> clusteringTypes;
+
+    private final PartitionColumns columns;
+    private final RowStats stats;
+
+    private final Map<ByteBuffer, AbstractType<?>> typeMap;
+
+    private final long baseTimestamp;
+    public final int baseDeletionTime;
+    private final int baseTTL;
+
+    // Whether or not to store cell in a sparse or dense way. See 
UnfilteredSerializer for details.
+    private final boolean useSparseColumnLayout;
+
+    private SerializationHeader(AbstractType<?> keyType,
+                                List<AbstractType<?>> clusteringTypes,
+                                PartitionColumns columns,
+                                RowStats stats,
+                                Map<ByteBuffer, AbstractType<?>> typeMap)
+    {
+        this.keyType = keyType;
+        this.clusteringTypes = clusteringTypes;
+        this.columns = columns;
+        this.stats = stats;
+        this.typeMap = typeMap;
+
+        // Not that if a given stats is unset, it means that either it's 
unused (there is
+        // no tombstone whatsoever for instance) or that we have no 
information on it. In
+        // that former case, it doesn't matter which base we use but in the 
former, we use
+        // bases that are more likely to provide small encoded values than the 
default
+        // "unset" value.
+        this.baseTimestamp = stats.hasMinTimestamp() ? stats.minTimestamp : 0;
+        this.baseDeletionTime = stats.hasMinLocalDeletionTime() ? 
stats.minLocalDeletionTime : DEFAULT_BASE_DELETION;
+        this.baseTTL = stats.minTTL;
+
+        // For the dense layout, we have a 1 byte overhead for absent columns. 
For the sparse layout, it's a 1
+        // overhead for present columns (in fact we use a 2 byte id, but 
assuming vint encoding, we'll pay 2 bytes
+        // only for the columns after the 128th one and for simplicity we 
assume that once you have that many column,
+        // you'll tend towards a clearly dense or clearly sparse case so that 
the heurstic above shouldn't still be
+        // too inapropriate). So if on average more than half of our columns 
are set per row, we better go for sparse.
+        this.useSparseColumnLayout = stats.avgColumnSetPerRow <= 
(columns.regulars.columnCount()/ 2);
+    }
+
+    public boolean useSparseColumnLayout(boolean isStatic)
+    {
+        // We always use a dense layout for the static row. Having very many 
static columns with  only a few set at
+        // any given time doesn't feel very common at all (and we already 
optimize the case where no static at all
+        // are provided).
+        return isStatic ? false : useSparseColumnLayout;
+    }
+
+    public static SerializationHeader forKeyCache(CFMetaData metadata)
+    {
+        // We don't save type information in the key cache (we could change
+        // that but it's easier right now), so instead we simply use BytesType
+        // for both serialization and deserialization. Note that we also only
+        // serializer clustering prefixes in the key cache, so only the 
clusteringTypes
+        // really matter.
+        int size = metadata.clusteringColumns().size();
+        List<AbstractType<?>> clusteringTypes = new ArrayList<>(size);
+        for (int i = 0; i < size; i++)
+            clusteringTypes.add(BytesType.instance);
+        return new SerializationHeader(BytesType.instance,
+                                       clusteringTypes,
+                                       PartitionColumns.NONE,
+                                       RowStats.NO_STATS,
+                                       Collections.<ByteBuffer, 
AbstractType<?>>emptyMap());
+    }
+
+    public static SerializationHeader make(CFMetaData metadata, 
Collection<SSTableReader> sstables)
+    {
+        // The serialization header has to be computed before the start of 
compaction (since it's used to write)
+        // the result. This means that when compacting multiple sources, we 
won't have perfectly accurate stats
+        // (for RowStats) since compaction may delete, purge and generally 
merge rows in unknown ways. This is
+        // kind of ok because those stats are only used for optimizing the 
underlying storage format and so we
+        // just have to strive for as good as possible. Currently, we stick to 
a relatively naive merge of existing
+        // global stats because it's simple and probably good enough in most 
situation but we could probably
+        // improve our marging of inaccuracy through the use of more 
fine-grained stats in the future.
+        // Note however that to avoid seeing our accuracy degrade through 
successive compactions, we don't base
+        // our stats merging on the compacted files headers, which as we just 
said can be somewhat inaccurate,
+        // but rather on their stats stored in StatsMetadata that are fully 
accurate.
+        RowStats.Collector stats = new RowStats.Collector();
+        PartitionColumns.Builder columns = PartitionColumns.builder();
+        for (SSTableReader sstable : sstables)
+        {
+            stats.updateTimestamp(sstable.getMinTimestamp());
+            stats.updateLocalDeletionTime(sstable.getMinLocalDeletionTime());
+            stats.updateTTL(sstable.getMinTTL());
+            stats.updateColumnSetPerRow(sstable.getTotalColumnsSet(), 
sstable.getTotalRows());
+            if (sstable.header == null)
+                columns.addAll(metadata.partitionColumns());
+            else
+                columns.addAll(sstable.header.columns());
+        }
+        return new SerializationHeader(metadata, columns.build(), stats.get());
+    }
+
+    public SerializationHeader(CFMetaData metadata,
+                               PartitionColumns columns,
+                               RowStats stats)
+    {
+        this(metadata.getKeyValidator(),
+             typesOf(metadata.clusteringColumns()),
+             columns,
+             stats,
+             null);
+    }
+
+    private static List<AbstractType<?>> typesOf(List<ColumnDefinition> 
columns)
+    {
+        return ImmutableList.copyOf(Lists.transform(columns, new 
Function<ColumnDefinition, AbstractType<?>>()
+        {
+            public AbstractType<?> apply(ColumnDefinition column)
+            {
+                return column.type;
+            }
+        }));
+    }
+
+    public PartitionColumns columns()
+    {
+        return columns;
+    }
+
+    public boolean hasStatic()
+    {
+        return !columns.statics.isEmpty();
+    }
+
+    private static int computeDefaultBaseDeletion()
+    {
+        // We need a fixed default, but one that is likely to provide small 
values (close to 0) when
+        // substracted to deletion times. Since deletion times are 'the 
current time in seconds', we
+        // use as base Jan 1, 2015 (in seconds).
+        Calendar c = Calendar.getInstance(TimeZone.getTimeZone("GMT-0"), 
Locale.US);
+        c.set(Calendar.YEAR, 2015);
+        c.set(Calendar.MONTH, Calendar.JANUARY);
+        c.set(Calendar.DAY_OF_MONTH, 1);
+        c.set(Calendar.HOUR_OF_DAY, 0);
+        c.set(Calendar.MINUTE, 0);
+        c.set(Calendar.SECOND, 0);
+        c.set(Calendar.MILLISECOND, 0);
+        return (int)(c.getTimeInMillis() / 1000);
+    }
+
+    public RowStats stats()
+    {
+        return stats;
+    }
+
+    public AbstractType<?> keyType()
+    {
+        return keyType;
+    }
+
+    public List<AbstractType<?>> clusteringTypes()
+    {
+        return clusteringTypes;
+    }
+
+    public Columns columns(boolean isStatic)
+    {
+        return isStatic ? columns.statics : columns.regulars;
+    }
+
+    public AbstractType<?> getType(ColumnDefinition column)
+    {
+        return typeMap == null ? column.type : typeMap.get(column.name.bytes);
+    }
+
+    public long encodeTimestamp(long timestamp)
+    {
+        return timestamp - baseTimestamp;
+    }
+
+    public long decodeTimestamp(long timestamp)
+    {
+        return baseTimestamp + timestamp;
+    }
+
+    public int encodeDeletionTime(int deletionTime)
+    {
+        return deletionTime - baseDeletionTime;
+    }
+
+    public int decodeDeletionTime(int deletionTime)
+    {
+        return baseDeletionTime + deletionTime;
+    }
+
+    public int encodeTTL(int ttl)
+    {
+        return ttl - baseTTL;
+    }
+
+    public int decodeTTL(int ttl)
+    {
+        return baseTTL + ttl;
+    }
+
+    public Component toComponent()
+    {
+        Map<ByteBuffer, AbstractType<?>> staticColumns = new LinkedHashMap<>();
+        Map<ByteBuffer, AbstractType<?>> regularColumns = new 
LinkedHashMap<>();
+        for (ColumnDefinition column : columns.statics)
+            staticColumns.put(column.name.bytes, column.type);
+        for (ColumnDefinition column : columns.regulars)
+            regularColumns.put(column.name.bytes, column.type);
+        return new Component(keyType, clusteringTypes, staticColumns, 
regularColumns, stats);
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("SerializationHeader[key=%s, cks=%s, columns=%s, 
stats=%s, typeMap=%s, baseTs=%d, baseDt=%s, baseTTL=%s]",
+                             keyType, clusteringTypes, columns, stats, 
typeMap, baseTimestamp, baseDeletionTime, baseTTL);
+    }
+
+    /**
+     * We need the CFMetadata to properly deserialize a SerializationHeader 
but it's clunky to pass that to
+     * a SSTable component, so we use this temporary object to delay the 
actual need for the metadata.
+     */
+    public static class Component extends MetadataComponent
+    {
+        private final AbstractType<?> keyType;
+        private final List<AbstractType<?>> clusteringTypes;
+        private final Map<ByteBuffer, AbstractType<?>> staticColumns;
+        private final Map<ByteBuffer, AbstractType<?>> regularColumns;
+        private final RowStats stats;
+
+        private Component(AbstractType<?> keyType,
+                          List<AbstractType<?>> clusteringTypes,
+                          Map<ByteBuffer, AbstractType<?>> staticColumns,
+                          Map<ByteBuffer, AbstractType<?>> regularColumns,
+                          RowStats stats)
+        {
+            this.keyType = keyType;
+            this.clusteringTypes = clusteringTypes;
+            this.staticColumns = staticColumns;
+            this.regularColumns = regularColumns;
+            this.stats = stats;
+        }
+
+        public MetadataType getType()
+        {
+            return MetadataType.HEADER;
+        }
+
+        public SerializationHeader toHeader(CFMetaData metadata)
+        {
+            Map<ByteBuffer, AbstractType<?>> typeMap = new 
HashMap<>(staticColumns.size() + regularColumns.size());
+            typeMap.putAll(staticColumns);
+            typeMap.putAll(regularColumns);
+
+            PartitionColumns.Builder builder = PartitionColumns.builder();
+            for (ByteBuffer name : typeMap.keySet())
+            {
+                ColumnDefinition column = metadata.getColumnDefinition(name);
+                if (column == null)
+                {
+                    // TODO: this imply we don't read data for a column we 
don't yet know about, which imply this is theoretically
+                    // racy with column addition. Currently, it is up to the 
user to not write data before the schema has propagated
+                    // and this is far from being the only place that has such 
problem in practice. This doesn't mean we shouldn't
+                    // improve this.
+
+                    // If we don't find the definition, it could be we have 
data for a dropped column, and we shouldn't
+                    // fail deserialization because of that. So we grab a 
"fake" ColumnDefinition that ensure proper
+                    // deserialization. The column will be ignore later on 
anyway.
+                    column = metadata.getDroppedColumnDefinition(name);
+                    if (column == null)
+                        throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
+                }
+                builder.add(column);
+            }
+            return new SerializationHeader(keyType, clusteringTypes, 
builder.build(), stats, typeMap);
+        }
+
+        @Override
+        public boolean equals(Object o)
+        {
+            if(!(o instanceof Component))
+                return false;
+
+            Component that = (Component)o;
+            return Objects.equals(this.keyType, that.keyType)
+                && Objects.equals(this.clusteringTypes, that.clusteringTypes)
+                && Objects.equals(this.staticColumns, that.staticColumns)
+                && Objects.equals(this.regularColumns, that.regularColumns)
+                && Objects.equals(this.stats, that.stats);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return Objects.hash(keyType, clusteringTypes, staticColumns, 
regularColumns, stats);
+        }
+
+        @Override
+        public String toString()
+        {
+            return String.format("SerializationHeader.Component[key=%s, 
cks=%s, statics=%s, regulars=%s, stats=%s]",
+                                 keyType, clusteringTypes, staticColumns, 
regularColumns, stats);
+        }
+    }
+
+    public static class Serializer implements 
IMetadataComponentSerializer<Component>
+    {
+        public void serializeForMessaging(SerializationHeader header, 
DataOutputPlus out, boolean hasStatic) throws IOException
+        {
+            RowStats.serializer.serialize(header.stats, out);
+
+            if (hasStatic)
+                Columns.serializer.serialize(header.columns.statics, out);
+            Columns.serializer.serialize(header.columns.regulars, out);
+        }
+
+        public SerializationHeader deserializeForMessaging(DataInput in, 
CFMetaData metadata, boolean hasStatic) throws IOException
+        {
+            RowStats stats = RowStats.serializer.deserialize(in);
+
+            AbstractType<?> keyType = metadata.getKeyValidator();
+            List<AbstractType<?>> clusteringTypes = 
typesOf(metadata.clusteringColumns());
+
+            Columns statics = hasStatic ? Columns.serializer.deserialize(in, 
metadata) : Columns.NONE;
+            Columns regulars = Columns.serializer.deserialize(in, metadata);
+
+            return new SerializationHeader(keyType, clusteringTypes, new 
PartitionColumns(statics, regulars), stats, null);
+        }
+
+        public long serializedSizeForMessaging(SerializationHeader header, 
TypeSizes sizes, boolean hasStatic)
+        {
+            long size = RowStats.serializer.serializedSize(header.stats, 
sizes);
+
+            if (hasStatic)
+                size += 
Columns.serializer.serializedSize(header.columns.statics, sizes);
+            size += Columns.serializer.serializedSize(header.columns.regulars, 
sizes);
+            return size;
+        }
+
+        // For SSTables
+        public void serialize(Component header, DataOutputPlus out) throws 
IOException
+        {
+            RowStats.serializer.serialize(header.stats, out);
+
+            writeType(header.keyType, out);
+            out.writeShort(header.clusteringTypes.size());
+            for (AbstractType<?> type : header.clusteringTypes)
+                writeType(type, out);
+
+            writeColumnsWithTypes(header.staticColumns, out);
+            writeColumnsWithTypes(header.regularColumns, out);
+        }
+
+        // For SSTables
+        public Component deserialize(Version version, DataInput in) throws 
IOException
+        {
+            RowStats stats = RowStats.serializer.deserialize(in);
+
+            AbstractType<?> keyType = readType(in);
+            int size = in.readUnsignedShort();
+            List<AbstractType<?>> clusteringTypes = new ArrayList<>(size);
+            for (int i = 0; i < size; i++)
+                clusteringTypes.add(readType(in));
+
+            Map<ByteBuffer, AbstractType<?>> staticColumns = new 
LinkedHashMap<>();
+            Map<ByteBuffer, AbstractType<?>> regularColumns = new 
LinkedHashMap<>();
+
+            readColumnsWithType(in, staticColumns);
+            readColumnsWithType(in, regularColumns);
+
+            return new Component(keyType, clusteringTypes, staticColumns, 
regularColumns, stats);
+        }
+
+        // For SSTables
+        public int serializedSize(Component header)
+        {
+            TypeSizes sizes = TypeSizes.NATIVE;
+            int size = RowStats.serializer.serializedSize(header.stats, sizes);
+
+            size += sizeofType(header.keyType, sizes);
+            size += sizes.sizeof((short)header.clusteringTypes.size());
+            for (AbstractType<?> type : header.clusteringTypes)
+                size += sizeofType(type, sizes);
+
+            size += sizeofColumnsWithTypes(header.staticColumns, sizes);
+            size += sizeofColumnsWithTypes(header.regularColumns, sizes);
+            return size;
+        }
+
+        private void writeColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> 
columns, DataOutputPlus out) throws IOException
+        {
+            out.writeShort(columns.size());
+            for (Map.Entry<ByteBuffer, AbstractType<?>> entry : 
columns.entrySet())
+            {
+                ByteBufferUtil.writeWithShortLength(entry.getKey(), out);
+                writeType(entry.getValue(), out);
+            }
+        }
+
+        private long sizeofColumnsWithTypes(Map<ByteBuffer, AbstractType<?>> 
columns, TypeSizes sizes)
+        {
+            long size = sizes.sizeof((short)columns.size());
+            for (Map.Entry<ByteBuffer, AbstractType<?>> entry : 
columns.entrySet())
+            {
+                size += sizes.sizeofWithShortLength(entry.getKey());
+                size += sizeofType(entry.getValue(), sizes);
+            }
+            return size;
+        }
+
+        private void readColumnsWithType(DataInput in, Map<ByteBuffer, 
AbstractType<?>> typeMap) throws IOException
+        {
+            int length = in.readUnsignedShort();
+            for (int i = 0; i < length; i++)
+            {
+                ByteBuffer name = ByteBufferUtil.readWithShortLength(in);
+                typeMap.put(name, readType(in));
+            }
+        }
+
+        private void writeType(AbstractType<?> type, DataOutputPlus out) 
throws IOException
+        {
+            // TODO: we should have a terser serializaion format. Not a big 
deal though
+            
ByteBufferUtil.writeWithLength(UTF8Type.instance.decompose(type.toString()), 
out);
+        }
+
+        private AbstractType<?> readType(DataInput in) throws IOException
+        {
+            ByteBuffer raw = ByteBufferUtil.readWithLength(in);
+            return TypeParser.parse(UTF8Type.instance.compose(raw));
+        }
+
+        private int sizeofType(AbstractType<?> type, TypeSizes sizes)
+        {
+            return 
sizes.sizeofWithLength(UTF8Type.instance.decompose(type.toString()));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Serializers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Serializers.java 
b/src/java/org/apache/cassandra/db/Serializers.java
new file mode 100644
index 0000000..862d02e
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Serializers.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.sstable.format.Version;
+
+import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
+
+/**
+ * Holds references on serializers that depend on the table definition.
+ */
+public class Serializers
+{
+    private final CFMetaData metadata;
+
+    public Serializers(CFMetaData metadata)
+    {
+        this.metadata = metadata;
+    }
+
+    public IndexInfo.Serializer indexSerializer(Version version)
+    {
+        return new IndexInfo.Serializer(metadata, version);
+    }
+
+    // Note that for the old layout, this will actually discard the cellname 
parts that are not strictly 
+    // part of the clustering prefix. Don't use this if that's not what you 
want.
+    public ISerializer<ClusteringPrefix> clusteringPrefixSerializer(final 
Version version, final SerializationHeader header)
+    {
+        if (!version.storeRows())
+            throw new UnsupportedOperationException();
+
+        return new ISerializer<ClusteringPrefix>()
+        {
+            public void serialize(ClusteringPrefix clustering, DataOutputPlus 
out) throws IOException
+            {
+                ClusteringPrefix.serializer.serialize(clustering, out, 
version.correspondingMessagingVersion(), header.clusteringTypes());
+            }
+
+            public ClusteringPrefix deserialize(DataInput in) throws 
IOException
+            {
+                return ClusteringPrefix.serializer.deserialize(in, 
version.correspondingMessagingVersion(), header.clusteringTypes());
+            }
+
+            public long serializedSize(ClusteringPrefix clustering, TypeSizes 
sizes)
+            {
+                return ClusteringPrefix.serializer.serializedSize(clustering, 
version.correspondingMessagingVersion(), header.clusteringTypes(), sizes);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SimpleClustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleClustering.java 
b/src/java/org/apache/cassandra/db/SimpleClustering.java
new file mode 100644
index 0000000..8b1cb7b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SimpleClustering.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.utils.ObjectSizes;
+
+public class SimpleClustering extends Clustering
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
SimpleClustering(new ByteBuffer[0]));
+
+    private final ByteBuffer[] values;
+
+    public SimpleClustering(ByteBuffer... values)
+    {
+        this.values = values;
+    }
+
+    public SimpleClustering(ByteBuffer value)
+    {
+        this(new ByteBuffer[]{ value });
+    }
+
+    public int size()
+    {
+        return values.length;
+    }
+
+    public ByteBuffer get(int i)
+    {
+        return values[i];
+    }
+
+    public ByteBuffer[] getRawValues()
+    {
+        return values;
+    }
+
+    @Override
+    public long unsharedHeapSize()
+    {
+        return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(values);
+    }
+
+    @Override
+    public Clustering takeAlias()
+    {
+        return this;
+    }
+
+    public static Builder builder(int size)
+    {
+        return new Builder(size);
+    }
+
+    public static class Builder implements Writer
+    {
+        private final ByteBuffer[] values;
+        private int idx;
+
+        private Builder(int size)
+        {
+            this.values = new ByteBuffer[size];
+        }
+
+        public void writeClusteringValue(ByteBuffer value)
+        {
+            values[idx++] = value;
+        }
+
+        public SimpleClustering build()
+        {
+            assert idx == values.length;
+            return new SimpleClustering(values);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SimpleDeletionTime.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleDeletionTime.java 
b/src/java/org/apache/cassandra/db/SimpleDeletionTime.java
new file mode 100644
index 0000000..738c5e6
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SimpleDeletionTime.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+
+import org.apache.cassandra.cache.IMeasurableMemory;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.FileUtils;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Simple implementation of DeletionTime.
+ */
+public class SimpleDeletionTime extends DeletionTime
+{
+    public final long markedForDeleteAt;
+    public final int localDeletionTime;
+
+    @VisibleForTesting
+    public SimpleDeletionTime(long markedForDeleteAt, int localDeletionTime)
+    {
+        this.markedForDeleteAt = markedForDeleteAt;
+        this.localDeletionTime = localDeletionTime;
+    }
+
+    public long markedForDeleteAt()
+    {
+        return markedForDeleteAt;
+    }
+
+    public int localDeletionTime()
+    {
+        return localDeletionTime;
+    }
+
+    public DeletionTime takeAlias()
+    {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java 
b/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java
new file mode 100644
index 0000000..fea1b86
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SimpleLivenessInfo.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Objects;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class SimpleLivenessInfo extends AbstractLivenessInfo
+{
+    private final long timestamp;
+    private final int ttl;
+    private final int localDeletionTime;
+
+    // Note that while some code use this ctor, the two following static 
creation methods
+    // are usually less error prone.
+    SimpleLivenessInfo(long timestamp, int ttl, int localDeletionTime)
+    {
+        this.timestamp = timestamp;
+        this.ttl = ttl;
+        this.localDeletionTime = localDeletionTime;
+    }
+
+    public static SimpleLivenessInfo forUpdate(long timestamp, int ttl, int 
nowInSec, CFMetaData metadata)
+    {
+        if (ttl == NO_TTL)
+            ttl = metadata.getDefaultTimeToLive();
+
+        return new SimpleLivenessInfo(timestamp, ttl, ttl == NO_TTL ? 
NO_DELETION_TIME : nowInSec + ttl);
+    }
+
+    public static SimpleLivenessInfo forDeletion(long timestamp, int 
localDeletionTime)
+    {
+        return new SimpleLivenessInfo(timestamp, NO_TTL, localDeletionTime);
+    }
+
+    public long timestamp()
+    {
+        return timestamp;
+    }
+
+    public int ttl()
+    {
+        return ttl;
+    }
+
+    public int localDeletionTime()
+    {
+        return localDeletionTime;
+    }
+
+    @Override
+    public LivenessInfo takeAlias()
+    {
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
new file mode 100644
index 0000000..d359b2b
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SinglePartitionNamesCommand.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.*;
+
+import com.google.common.collect.Sets;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics.Sampler;
+import org.apache.cassandra.thrift.ThriftResultsMerger;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.memory.HeapAllocator;
+
+/**
+ * General interface for storage-engine read queries.
+ */
+public class SinglePartitionNamesCommand extends 
SinglePartitionReadCommand<ClusteringIndexNamesFilter>
+{
+    protected SinglePartitionNamesCommand(boolean isDigest,
+                                          boolean isForThrift,
+                                          CFMetaData metadata,
+                                          int nowInSec,
+                                          ColumnFilter columnFilter,
+                                          RowFilter rowFilter,
+                                          DataLimits limits,
+                                          DecoratedKey partitionKey,
+                                          ClusteringIndexNamesFilter 
clusteringIndexFilter)
+    {
+        super(isDigest, isForThrift, metadata, nowInSec, columnFilter, 
rowFilter, limits, partitionKey, clusteringIndexFilter);
+    }
+
+    public SinglePartitionNamesCommand(CFMetaData metadata,
+                                       int nowInSec,
+                                       ColumnFilter columnFilter,
+                                       RowFilter rowFilter,
+                                       DataLimits limits,
+                                       DecoratedKey partitionKey,
+                                       ClusteringIndexNamesFilter 
clusteringIndexFilter)
+    {
+        this(false, false, metadata, nowInSec, columnFilter, rowFilter, 
limits, partitionKey, clusteringIndexFilter);
+    }
+
+    public SinglePartitionNamesCommand copy()
+    {
+        return new SinglePartitionNamesCommand(isDigestQuery(), isForThrift(), 
metadata(), nowInSec(), columnFilter(), rowFilter(), limits(), partitionKey(), 
clusteringIndexFilter());
+    }
+
+    protected UnfilteredRowIterator 
queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap)
+    {
+        Tracing.trace("Acquiring sstable references");
+        ColumnFamilyStore.ViewFragment view = 
cfs.select(cfs.viewFilter(partitionKey()));
+
+        ArrayBackedPartition result = null;
+        ClusteringIndexNamesFilter filter = clusteringIndexFilter();
+
+        Tracing.trace("Merging memtable contents");
+        for (Memtable memtable : view.memtables)
+        {
+            Partition partition = memtable.getPartition(partitionKey());
+            if (partition == null)
+                continue;
+
+            try (UnfilteredRowIterator iter = 
filter.getUnfilteredRowIterator(columnFilter(), partition))
+            {
+                if (iter.isEmpty())
+                    continue;
+
+                UnfilteredRowIterator clonedFilter = copyOnHeap
+                                                   ? 
UnfilteredRowIterators.cloningIterator(iter, HeapAllocator.instance)
+                                                   : iter;
+                result = add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(clonedFilter, nowInSec()) : clonedFilter, result);
+            }
+        }
+
+        /* add the SSTables on disk */
+        Collections.sort(view.sstables, SSTableReader.maxTimestampComparator);
+        int sstablesIterated = 0;
+
+        // read sorted sstables
+        for (SSTableReader sstable : view.sstables)
+        {
+            // if we've already seen a partition tombstone with a timestamp 
greater
+            // than the most recent update to this sstable, we're done, since 
the rest of the sstables
+            // will also be older
+            if (result != null && sstable.getMaxTimestamp() < 
result.partitionLevelDeletion().markedForDeleteAt())
+                break;
+
+            long currentMaxTs = sstable.getMaxTimestamp();
+            filter = reduceFilter(filter, result, currentMaxTs);
+            if (filter == null)
+                break;
+
+            Tracing.trace("Merging data from sstable {}", 
sstable.descriptor.generation);
+            sstable.incrementReadCount();
+            try (UnfilteredRowIterator iter = 
filter.filter(sstable.iterator(partitionKey(), columnFilter(), 
filter.isReversed(), isForThrift())))
+            {
+                if (iter.isEmpty())
+                    continue;
+
+                sstablesIterated++;
+                result = add(isForThrift() ? 
ThriftResultsMerger.maybeWrap(iter, nowInSec()) : iter, result);
+            }
+        }
+
+        cfs.metric.updateSSTableIterated(sstablesIterated);
+
+        if (result == null || result.isEmpty())
+            return UnfilteredRowIterators.emptyIterator(metadata(), 
partitionKey(), false);
+
+        DecoratedKey key = result.partitionKey();
+        cfs.metric.samplers.get(Sampler.READS).addSample(key.getKey(), 
key.hashCode(), 1);
+
+        // "hoist up" the requested data into a more recent sstable
+        if (sstablesIterated > cfs.getMinimumCompactionThreshold()
+            && !cfs.isAutoCompactionDisabled()
+            && cfs.getCompactionStrategyManager().shouldDefragment())
+        {
+            // !!WARNING!!   if we stop copying our data to a heap-managed 
object,
+            //               we will need to track the lifetime of this 
mutation as well
+            Tracing.trace("Defragmenting requested data");
+
+            try (UnfilteredRowIterator iter = 
result.unfilteredIterator(columnFilter(), Slices.ALL, false))
+            {
+                final Mutation mutation = new 
Mutation(UnfilteredRowIterators.toUpdate(iter));
+                StageManager.getStage(Stage.MUTATION).execute(new Runnable()
+                {
+                    public void run()
+                    {
+                        // skipping commitlog and index updates is fine since 
we're just de-fragmenting existing data
+                        
Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
+                    }
+                });
+            }
+        }
+
+        return result.unfilteredIterator(columnFilter(), Slices.ALL, 
clusteringIndexFilter().isReversed());
+    }
+
+    private ArrayBackedPartition add(UnfilteredRowIterator iter, 
ArrayBackedPartition result)
+    {
+        int maxRows = Math.max(clusteringIndexFilter().requestedRows().size(), 
1);
+        if (result == null)
+            return ArrayBackedPartition.create(iter, maxRows);
+
+        try (UnfilteredRowIterator merged = 
UnfilteredRowIterators.merge(Arrays.asList(iter, 
result.unfilteredIterator(columnFilter(), Slices.ALL, false)), nowInSec()))
+        {
+            return ArrayBackedPartition.create(merged, maxRows);
+        }
+    }
+
+    private ClusteringIndexNamesFilter reduceFilter(ClusteringIndexNamesFilter 
filter, Partition result, long sstableTimestamp)
+    {
+        if (result == null)
+            return filter;
+
+        SearchIterator<Clustering, Row> searchIter = 
result.searchIterator(columnFilter(), false);
+
+        PartitionColumns columns = columnFilter().fetchedColumns();
+        NavigableSet<Clustering> clusterings = filter.requestedRows();
+
+        // We want to remove rows for which we have values for all requested 
columns. We have to deal with both static and regular rows.
+        // TODO: we could also remove a selected column if we've found values 
for every requested row but we'll leave
+        // that for later.
+
+        boolean removeStatic = false;
+        if (!columns.statics.isEmpty())
+        {
+            Row staticRow = searchIter.next(Clustering.STATIC_CLUSTERING);
+            removeStatic = staticRow != null && canRemoveRow(staticRow, 
columns.statics, sstableTimestamp);
+        }
+
+        NavigableSet<Clustering> toRemove = null;
+        for (Clustering clustering : clusterings)
+        {
+            if (!searchIter.hasNext())
+                break;
+
+            Row row = searchIter.next(clustering);
+            if (row == null || !canRemoveRow(row, columns.regulars, 
sstableTimestamp))
+                continue;
+
+            if (toRemove == null)
+                toRemove = new TreeSet<>(result.metadata().comparator);
+            toRemove.add(clustering);
+        }
+
+        if (!removeStatic && toRemove == null)
+            return filter;
+
+        // Check if we have everything we need
+        boolean hasNoMoreStatic = columns.statics.isEmpty() || removeStatic;
+        boolean hasNoMoreClusterings = clusterings.isEmpty() || (toRemove != 
null && toRemove.size() == clusterings.size());
+        if (hasNoMoreStatic && hasNoMoreClusterings)
+            return null;
+
+        NavigableSet<Clustering> newClusterings = clusterings;
+        if (toRemove != null)
+        {
+            newClusterings = new TreeSet<>(result.metadata().comparator);
+            newClusterings.addAll(Sets.difference(clusterings, toRemove));
+        }
+        return new ClusteringIndexNamesFilter(newClusterings, 
filter.isReversed());
+    }
+
+    private boolean canRemoveRow(Row row, Columns requestedColumns, long 
sstableTimestamp)
+    {
+        // We can remove a row if it has data that is more recent that the 
next sstable to consider for the data that the query
+        // cares about. And the data we care about is 1) the row timestamp 
(since every query cares if the row exists or not)
+        // and 2) the requested columns.
+        if (!row.primaryKeyLivenessInfo().hasTimestamp() || 
row.primaryKeyLivenessInfo().timestamp() <= sstableTimestamp)
+            return false;
+
+        for (ColumnDefinition column : requestedColumns)
+        {
+            // We can never be sure we have all of a collection, so never 
remove rows in that case.
+            if (column.type.isCollection())
+                return false;
+
+            Cell cell = row.getCell(column);
+            if (cell == null || cell.livenessInfo().timestamp() <= 
sstableTimestamp)
+                return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
new file mode 100644
index 0000000..38651c1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.util.*;
+
+import org.apache.cassandra.cache.*;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.exceptions.RequestExecutionException;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.metrics.ColumnFamilyMetrics;
+import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.pager.*;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.concurrent.OpOrder;
+
+/**
+ * A read command that selects a (part of a) single partition.
+ */
+public abstract class SinglePartitionReadCommand<F extends 
ClusteringIndexFilter> extends ReadCommand
+{
+    protected static final SelectionDeserializer selectionDeserializer = new 
Deserializer();
+
+    private final DecoratedKey partitionKey;
+    private final F clusteringIndexFilter;
+
+    protected SinglePartitionReadCommand(boolean isDigest,
+                                         boolean isForThrift,
+                                         CFMetaData metadata,
+                                         int nowInSec,
+                                         ColumnFilter columnFilter,
+                                         RowFilter rowFilter,
+                                         DataLimits limits,
+                                         DecoratedKey partitionKey,
+                                         F clusteringIndexFilter)
+    {
+        super(Kind.SINGLE_PARTITION, isDigest, isForThrift, metadata, 
nowInSec, columnFilter, rowFilter, limits);
+        this.partitionKey = partitionKey;
+        this.clusteringIndexFilter = clusteringIndexFilter;
+    }
+
+    /**
+     * Creates a new read command on a single partition.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param columnFilter the column filter to use for the query.
+     * @param rowFilter the row filter to use for the query.
+     * @param limits the limits to use for the query.
+     * @param partitionKey the partition key for the partition to query.
+     * @param clusteringIndexFilter the clustering index filter to use for the 
query.
+     *
+     * @return a newly created read command.
+     */
+    public static SinglePartitionReadCommand<?> create(CFMetaData metadata,
+                                                       int nowInSec,
+                                                       ColumnFilter 
columnFilter,
+                                                       RowFilter rowFilter,
+                                                       DataLimits limits,
+                                                       DecoratedKey 
partitionKey,
+                                                       ClusteringIndexFilter 
clusteringIndexFilter)
+    {
+        return create(false, metadata, nowInSec, columnFilter, rowFilter, 
limits, partitionKey, clusteringIndexFilter);
+    }
+
+    /**
+     * Creates a new read command on a single partition for thrift.
+     *
+     * @param isForThrift whether the query is for thrift or not.
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param columnFilter the column filter to use for the query.
+     * @param rowFilter the row filter to use for the query.
+     * @param limits the limits to use for the query.
+     * @param partitionKey the partition key for the partition to query.
+     * @param clusteringIndexFilter the clustering index filter to use for the 
query.
+     *
+     * @return a newly created read command.
+     */
+    public static SinglePartitionReadCommand<?> create(boolean isForThrift,
+                                                       CFMetaData metadata,
+                                                       int nowInSec,
+                                                       ColumnFilter 
columnFilter,
+                                                       RowFilter rowFilter,
+                                                       DataLimits limits,
+                                                       DecoratedKey 
partitionKey,
+                                                       ClusteringIndexFilter 
clusteringIndexFilter)
+    {
+        if (clusteringIndexFilter instanceof ClusteringIndexSliceFilter)
+            return new SinglePartitionSliceCommand(false, isForThrift, 
metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, 
(ClusteringIndexSliceFilter) clusteringIndexFilter);
+
+        assert clusteringIndexFilter instanceof ClusteringIndexNamesFilter;
+        return new SinglePartitionNamesCommand(false, isForThrift, metadata, 
nowInSec, columnFilter, rowFilter, limits, partitionKey, 
(ClusteringIndexNamesFilter) clusteringIndexFilter);
+    }
+
+    /**
+     * Creates a new read command on a single partition.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     * @param columnFilter the column filter to use for the query.
+     * @param filter the clustering index filter to use for the query.
+     *
+     * @return a newly created read command. The returned command will use no 
row filter and have no limits.
+     */
+    public static SinglePartitionReadCommand<?> create(CFMetaData metadata, 
int nowInSec, DecoratedKey key, ColumnFilter columnFilter, 
ClusteringIndexFilter filter)
+    {
+        return create(metadata, nowInSec, columnFilter, RowFilter.NONE, 
DataLimits.NONE, key, filter);
+    }
+
+    /**
+     * Creates a new read command that queries a single partition in its 
entirety.
+     *
+     * @param metadata the table to query.
+     * @param nowInSec the time in seconds to use are "now" for this query.
+     * @param key the partition key for the partition to query.
+     *
+     * @return a newly created read command that queries all the rows of 
{@code key}.
+     */
+    public static SinglePartitionReadCommand fullPartitionRead(CFMetaData 
metadata, int nowInSec, DecoratedKey key)
+    {
+        return SinglePartitionSliceCommand.create(metadata, nowInSec, key, 
Slices.ALL);
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return partitionKey;
+    }
+
+    public F clusteringIndexFilter()
+    {
+        return clusteringIndexFilter;
+    }
+
+    public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key)
+    {
+        return clusteringIndexFilter;
+    }
+
+    public long getTimeout()
+    {
+        return DatabaseDescriptor.getReadRpcTimeout();
+    }
+
+    public boolean selects(DecoratedKey partitionKey, Clustering clustering)
+    {
+        if (!partitionKey().equals(partitionKey))
+            return false;
+
+        if (clustering == Clustering.STATIC_CLUSTERING)
+            return !columnFilter().fetchedColumns().statics.isEmpty();
+
+        return clusteringIndexFilter().selects(clustering);
+    }
+
+    /**
+     * Returns a new command suitable to paging from the last returned row.
+     *
+     * @param lastReturned the last row returned by the previous page. The 
newly created command
+     * will only query row that comes after this (in query order). This can be 
{@code null} if this
+     * is the first page.
+     * @param pageSize the size to use for the page to query.
+     *
+     * @return the newly create command.
+     */
+    public SinglePartitionReadCommand forPaging(Clustering lastReturned, int 
pageSize)
+    {
+        // We shouldn't have set digest yet when reaching that point
+        assert !isDigestQuery();
+        return create(isForThrift(),
+                      metadata(),
+                      nowInSec(),
+                      columnFilter(),
+                      rowFilter(),
+                      limits().forPaging(pageSize),
+                      partitionKey(),
+                      lastReturned == null ? clusteringIndexFilter() : 
clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false));
+    }
+
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState 
clientState) throws RequestExecutionException
+    {
+        return StorageProxy.read(Group.one(this), consistency, clientState);
+    }
+
+    public SinglePartitionPager getPager(PagingState pagingState)
+    {
+        return getPager(this, pagingState);
+    }
+
+    private static SinglePartitionPager getPager(SinglePartitionReadCommand 
command, PagingState pagingState)
+    {
+        return new SinglePartitionPager(command, pagingState);
+    }
+
+    protected void recordLatency(ColumnFamilyMetrics metric, long latencyNanos)
+    {
+        metric.readLatency.addNano(latencyNanos);
+    }
+
+    protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore 
cfs, ReadOrderGroup orderGroup)
+    {
+        @SuppressWarnings("resource") // we close the created iterator through 
closing the result of this method (and SingletonUnfilteredPartitionIterator 
ctor cannot fail)
+        UnfilteredRowIterator partition = cfs.isRowCacheEnabled()
+                                        ? getThroughCache(cfs, 
orderGroup.baseReadOpOrderGroup())
+                                        : queryMemtableAndDisk(cfs, 
orderGroup.baseReadOpOrderGroup());
+        return new SingletonUnfilteredPartitionIterator(partition, 
isForThrift());
+    }
+
+    /**
+     * Fetch the rows requested if in cache; if not, read it from disk and 
cache it.
+     * <p>
+     * If the partition is cached, and the filter given is within its bounds, 
we return
+     * from cache, otherwise from disk.
+     * <p>
+     * If the partition is is not cached, we figure out what filter is 
"biggest", read
+     * that from disk, then filter the result and either cache that or return 
it.
+     */
+    private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, 
OpOrder.Group readOp)
+    {
+        assert !cfs.isIndex(); // CASSANDRA-5732
+        assert cfs.isRowCacheEnabled() : String.format("Row cache is not 
enabled on table [" + cfs.name + "]");
+
+        UUID cfId = metadata().cfId;
+        RowCacheKey key = new RowCacheKey(cfId, partitionKey());
+
+        // Attempt a sentinel-read-cache sequence.  if a write invalidates our 
sentinel, we'll return our
+        // (now potentially obsolete) data, but won't cache it. see 
CASSANDRA-3862
+        // TODO: don't evict entire partitions on writes (#2864)
+        IRowCacheEntry cached = CacheService.instance.rowCache.get(key);
+        if (cached != null)
+        {
+            if (cached instanceof RowCacheSentinel)
+            {
+                // Some other read is trying to cache the value, just do a 
normal non-caching read
+                Tracing.trace("Row cache miss (race)");
+                cfs.metric.rowCacheMiss.inc();
+                return queryMemtableAndDisk(cfs, readOp);
+            }
+
+            CachedPartition cachedPartition = (CachedPartition)cached;
+            if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), 
cachedPartition, nowInSec()))
+            {
+                cfs.metric.rowCacheHit.inc();
+                Tracing.trace("Row cache hit");
+                return 
clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), 
cachedPartition);
+            }
+
+            cfs.metric.rowCacheHitOutOfRange.inc();
+            Tracing.trace("Ignoring row cache as cached value could not 
satisfy query");
+            return queryMemtableAndDisk(cfs, readOp);
+        }
+
+        cfs.metric.rowCacheMiss.inc();
+        Tracing.trace("Row cache miss");
+
+        boolean cacheFullPartitions = 
metadata().getCaching().rowCache.cacheFullPartitions();
+
+        // To be able to cache what we read, what we read must at least covers 
what the cache holds, that
+        // is the 'rowsToCache' first rows of the partition. We could read 
those 'rowsToCache' first rows
+        // systematically, but we'd have to "extend" that to whatever is 
needed for the user query that the
+        // 'rowsToCache' first rows don't cover and it's not trivial with our 
existing filters. So currently
+        // we settle for caching what we read only if the user query does 
query the head of the partition since
+        // that's the common case of when we'll be able to use the cache 
anyway. One exception is if we cache
+        // full partitions, in which case we just always read it all and cache.
+        if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter())
+        {
+            RowCacheSentinel sentinel = new RowCacheSentinel();
+            boolean sentinelSuccess = 
CacheService.instance.rowCache.putIfAbsent(key, sentinel);
+            boolean sentinelReplaced = false;
+
+            try
+            {
+                int rowsToCache = cacheFullPartitions ? Integer.MAX_VALUE : 
metadata().getCaching().rowCache.rowsToCache;
+                @SuppressWarnings("resource") // we close on exception or upon 
closing the result of this method
+                UnfilteredRowIterator iter = 
SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), 
partitionKey()).queryMemtableAndDisk(cfs, readOp);
+                try
+                {
+                    // We want to cache only rowsToCache rows
+                    CachedPartition toCache = 
ArrayBackedCachedPartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter,
 nowInSec()), nowInSec());
+                    if (sentinelSuccess && !toCache.isEmpty())
+                    {
+                        Tracing.trace("Caching {} rows", toCache.rowCount());
+                        CacheService.instance.rowCache.replace(key, sentinel, 
toCache);
+                        // Whether or not the previous replace has worked, our 
sentinel is not in the cache anymore
+                        sentinelReplaced = true;
+                    }
+
+                    // We then re-filter out what this query wants.
+                    // Note that in the case where we don't cache full 
partitions, it's possible that the current query is interested in more
+                    // than what we've cached, so we can't just use toCache.
+                    UnfilteredRowIterator cacheIterator = 
clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache);
+                    if (cacheFullPartitions)
+                    {
+                        // Everything is guaranteed to be in 'toCache', we're 
done with 'iter'
+                        assert !iter.hasNext();
+                        iter.close();
+                        return cacheIterator;
+                    }
+                    return UnfilteredRowIterators.concat(cacheIterator, 
clusteringIndexFilter().filterNotIndexed(columnFilter(), iter));
+                }
+                catch (RuntimeException | Error e)
+                {
+                    iter.close();
+                    throw e;
+                }
+            }
+            finally
+            {
+                if (sentinelSuccess && !sentinelReplaced)
+                    cfs.invalidateCachedPartition(key);
+            }
+        }
+
+        Tracing.trace("Fetching data but not populating cache as query does 
not query from the start of the partition");
+        return queryMemtableAndDisk(cfs, readOp);
+    }
+
+    /**
+     * Queries both memtable and sstables to fetch the result of this query.
+     * <p>
+     * Please note that this method:
+     *   1) does not check the row cache.
+     *   2) does not apply the query limit, nor the row filter (and so ignore 
2ndary indexes).
+     *      Those are applied in {@link ReadCommand#executeLocally}.
+     *   3) does not record some of the read metrics (latency, scanned cells 
histograms) nor
+     *      throws TombstoneOverwhelmingException.
+     * It is publicly exposed because there is a few places where that is 
exactly what we want,
+     * but it should be used only where you know you don't need thoses things.
+     * <p>
+     * Also note that one must have "started" a {@code OpOrder.Group} on the 
queried table, and that is
+     * to enforce that that it is required as parameter, even though it's not 
explicitlly used by the method.
+     */
+    public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, 
OpOrder.Group readOp)
+    {
+        Tracing.trace("Executing single-partition query on {}", cfs.name);
+
+        boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap();
+        return queryMemtableAndDiskInternal(cfs, copyOnHeap);
+    }
+
+    protected abstract UnfilteredRowIterator 
queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap);
+
+    @Override
+    public String toString()
+    {
+        return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s 
key=%s filter=%s, nowInSec=%d)",
+                             metadata().ksName,
+                             metadata().cfName,
+                             columnFilter(),
+                             rowFilter(),
+                             limits(),
+                             
metadata().getKeyValidator().getString(partitionKey().getKey()),
+                             clusteringIndexFilter.toString(metadata()),
+                             nowInSec());
+    }
+
+    protected void appendCQLWhereClause(StringBuilder sb)
+    {
+        sb.append(" WHERE ");
+
+        
sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append("
 = ");
+        DataRange.appendKeyString(sb, metadata().getKeyValidator(), 
partitionKey().getKey());
+
+        // We put the row filter first because the clustering index filter can 
end by "ORDER BY"
+        if (!rowFilter().isEmpty())
+            sb.append(" AND ").append(rowFilter());
+
+        String filterString = clusteringIndexFilter().toCQLString(metadata());
+        if (!filterString.isEmpty())
+            sb.append(" AND ").append(filterString);
+    }
+
+    protected void serializeSelection(DataOutputPlus out, int version) throws 
IOException
+    {
+        metadata().getKeyValidator().writeValue(partitionKey().getKey(), out);
+        ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), 
out, version);
+    }
+
+    protected long selectionSerializedSize(int version)
+    {
+        TypeSizes sizes = TypeSizes.NATIVE;
+        return 
metadata().getKeyValidator().writtenLength(partitionKey().getKey(), sizes)
+             + 
ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), 
version);
+    }
+
+    /**
+     * Groups multiple single partition read commands.
+     */
+    public static class Group implements ReadQuery
+    {
+        public final List<SinglePartitionReadCommand<?>> commands;
+        private final DataLimits limits;
+        private final int nowInSec;
+
+        public Group(List<SinglePartitionReadCommand<?>> commands, DataLimits 
limits)
+        {
+            assert !commands.isEmpty();
+            this.commands = commands;
+            this.limits = limits;
+            this.nowInSec = commands.get(0).nowInSec();
+            for (int i = 1; i < commands.size(); i++)
+                assert commands.get(i).nowInSec() == nowInSec;
+        }
+
+        public static Group one(SinglePartitionReadCommand<?> command)
+        {
+            return new 
Group(Collections.<SinglePartitionReadCommand<?>>singletonList(command), 
command.limits());
+        }
+
+        public PartitionIterator execute(ConsistencyLevel consistency, 
ClientState clientState) throws RequestExecutionException
+        {
+            return StorageProxy.read(this, consistency, clientState);
+        }
+
+        public int nowInSec()
+        {
+            return nowInSec;
+        }
+
+        public DataLimits limits()
+        {
+            return limits;
+        }
+
+        public CFMetaData metadata()
+        {
+            return commands.get(0).metadata();
+        }
+
+        public ReadOrderGroup startOrderGroup()
+        {
+            // Note that the only difference between the command in a group 
must be the partition key on which
+            // they applied. So as far as ReadOrderGroup is concerned, we can 
use any of the commands to start one.
+            return commands.get(0).startOrderGroup();
+        }
+
+        public PartitionIterator executeInternal(ReadOrderGroup orderGroup)
+        {
+            List<PartitionIterator> partitions = new 
ArrayList<>(commands.size());
+            for (SinglePartitionReadCommand cmd : commands)
+                partitions.add(cmd.executeInternal(orderGroup));
+
+            // Because we only have enforce the limit per command, we need to 
enforce it globally.
+            return limits.filter(PartitionIterators.concat(partitions), 
nowInSec);
+        }
+
+        public QueryPager getPager(PagingState pagingState)
+        {
+            if (commands.size() == 1)
+                return SinglePartitionReadCommand.getPager(commands.get(0), 
pagingState);
+
+            return new MultiPartitionPager(this, pagingState);
+        }
+
+        @Override
+        public String toString()
+        {
+            return commands.toString();
+        }
+    }
+
+    private static class Deserializer extends SelectionDeserializer
+    {
+        public ReadCommand deserialize(DataInput in, int version, boolean 
isDigest, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter 
columnFilter, RowFilter rowFilter, DataLimits limits)
+        throws IOException
+        {
+            DecoratedKey key = 
StorageService.getPartitioner().decorateKey(metadata.getKeyValidator().readValue(in));
+            ClusteringIndexFilter filter = 
ClusteringIndexFilter.serializer.deserialize(in, version, metadata);
+            if (filter instanceof ClusteringIndexNamesFilter)
+                return new SinglePartitionNamesCommand(isDigest, isForThrift, 
metadata, nowInSec, columnFilter, rowFilter, limits, key, 
(ClusteringIndexNamesFilter)filter);
+            else
+                return new SinglePartitionSliceCommand(isDigest, isForThrift, 
metadata, nowInSec, columnFilter, rowFilter, limits, key, 
(ClusteringIndexSliceFilter)filter);
+        }
+    };
+}

Reply via email to