http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
index ad46a0f..bbaacc8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java
@@ -17,22 +17,18 @@
  */
 package org.apache.cassandra.cql3.statements;
 
-import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.index.SecondaryIndex;
-import org.apache.cassandra.db.index.SecondaryIndexManager;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
-import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
-
 /**
  * An <code>UPDATE</code> statement parsed from a CQL query statement.
  *
@@ -51,98 +47,52 @@ public class UpdateStatement extends ModificationStatement
         return true;
     }
 
-    public void addUpdateForKey(ColumnFamily cf,
-                                ByteBuffer key,
-                                Composite prefix,
-                                UpdateParameters params) throws 
InvalidRequestException
+    public void addUpdateForKey(PartitionUpdate update, CBuilder cbuilder, 
UpdateParameters params)
+    throws InvalidRequestException
     {
-        addUpdateForKey(cf, key, prefix, params, true);
-    }
+        params.newPartition(update.partitionKey());
 
-    public void addUpdateForKey(ColumnFamily cf,
-                                ByteBuffer key,
-                                Composite prefix,
-                                UpdateParameters params,
-                                boolean validateIndexedColumns) throws 
InvalidRequestException
-    {
-        // Inserting the CQL row marker (see #4361)
-        // We always need to insert a marker for INSERT, because of the 
following situation:
-        //   CREATE TABLE t ( k int PRIMARY KEY, c text );
-        //   INSERT INTO t(k, c) VALUES (1, 1)
-        //   DELETE c FROM t WHERE k = 1;
-        //   SELECT * FROM t;
-        // The last query should return one row (but with c == null). Adding 
the marker with the insert make sure
-        // the semantic is correct (while making sure a 'DELETE FROM t WHERE k 
= 1' does remove the row entirely)
-        //
-        // We do not insert the marker for UPDATE however, as this amount to 
updating the columns in the WHERE
-        // clause which is inintuitive (#6782)
-        //
-        // We never insert markers for Super CF as this would confuse the 
thrift side.
-        if (type == StatementType.INSERT && cfm.isCQL3Table() && 
!prefix.isStatic())
-            cf.addColumn(params.makeColumn(cfm.comparator.rowMarker(prefix), 
ByteBufferUtil.EMPTY_BYTE_BUFFER));
-
-        List<Operation> updates = getOperations();
-
-        if (cfm.comparator.isDense())
+        if (updatesRegularRows())
         {
-            if (prefix.isEmpty())
-                throw new InvalidRequestException(String.format("Missing 
PRIMARY KEY part %s", cfm.clusteringColumns().get(0)));
+            Clustering clustering = cbuilder.build();
+            Row.Writer writer = update.writer();
+            params.writeClustering(clustering, writer);
 
-            // An empty name for the compact value is what we use to recognize 
the case where there is not column
-            // outside the PK, see CreateStatement.
-            if (!cfm.compactValueColumn().name.bytes.hasRemaining())
-            {
-                // There is no column outside the PK. So no operation could 
have passed through validation
-                assert updates.isEmpty();
-                new Constants.Setter(cfm.compactValueColumn(), 
EMPTY).execute(key, cf, prefix, params);
-            }
-            else
+
+            // We update the row timestamp (ex-row marker) only on INSERT 
(#6782)
+            // Further, COMPACT tables semantic differs from "CQL3" ones in 
that a row exists only if it has
+            // a non-null column, so we don't want to set the row timestamp 
for them.
+            if (type == StatementType.INSERT && cfm.isCQLTable())
+                params.writePartitionKeyLivenessInfo(writer);
+
+            List<Operation> updates = getRegularOperations();
+
+            // For compact tablw, when we translate it to thrift, we don't 
have a row marker. So we don't accept an insert/update
+            // that only sets the PK unless the is no declared non-PK columns 
(in the latter we just set the value empty).
+
+            // For a dense layout, when we translate it to thrift, we don't 
have a row marker. So we don't accept an insert/update
+            // that only sets the PK unless the is no declared non-PK columns 
(which we recognize because in that case the compact
+            // value is of type "EmptyType").
+            if (cfm.isCompactTable() && updates.isEmpty())
             {
-                // dense means we don't have a row marker, so don't accept to 
set only the PK. See CASSANDRA-5648.
-                if (updates.isEmpty())
+                if (CompactTables.hasEmptyCompactValue(cfm))
+                    updates = Collections.<Operation>singletonList(new 
Constants.Setter(cfm.compactValueColumn(), EMPTY));
+                else
                     throw new InvalidRequestException(String.format("Column %s 
is mandatory for this COMPACT STORAGE table", cfm.compactValueColumn().name));
-
-                for (Operation update : updates)
-                    update.execute(key, cf, prefix, params);
             }
-        }
-        else
-        {
-            for (Operation update : updates)
-                update.execute(key, cf, prefix, params);
-        }
 
-        // validateIndexedColumns trigger a call to Keyspace.open() which we 
want to be able to avoid in some case
-        //(e.g. when using CQLSSTableWriter)
-        if (validateIndexedColumns)
-            validateIndexedColumns(key, cf);
-    }
+            for (Operation op : updates)
+                op.execute(update.partitionKey(), clustering, writer, params);
 
-    /**
-     * Checks if the values of the indexed columns are valid.
-     *
-     * @param key row key for the column family
-     * @param cf the column family
-     * @throws InvalidRequestException if one of the values of the indexed 
columns is not valid
-     */
-    private void validateIndexedColumns(ByteBuffer key, ColumnFamily cf)
-    {
-        SecondaryIndexManager indexManager = 
Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager;
-        if (indexManager.hasIndexes())
+            writer.endOfRow();
+        }
+
+        if (updatesStaticRow())
         {
-            for (Cell cell : cf)
-            {
-                // Indexed values must be validated by any applicable index. 
See CASSANDRA-3057/4240/8081 for more details
-                SecondaryIndex failedIndex = indexManager.validate(key, cell);
-                if (failedIndex != null)
-                {
-                    throw invalidRequest(String.format("Can't index column 
value of size %d for index %s on %s.%s",
-                                                       
cell.value().remaining(),
-                                                       
failedIndex.getIndexName(),
-                                                       cfm.ksName,
-                                                       cfm.cfName));
-                }
-            }
+            Row.Writer writer = update.staticWriter();
+            for (Operation op : getStaticOperations())
+                op.execute(update.partitionKey(), 
Clustering.STATIC_CLUSTERING, writer, params);
+            writer.endOfRow();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/AbstractCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractCell.java 
b/src/java/org/apache/cassandra/db/AbstractCell.java
deleted file mode 100644
index bd63985..0000000
--- a/src/java/org/apache/cassandra/db/AbstractCell.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOError;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.util.Iterator;
-
-import com.google.common.collect.AbstractIterator;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.io.sstable.format.Version;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-
-public abstract class AbstractCell implements Cell
-{
-    public static Iterator<OnDiskAtom> onDiskIterator(final DataInput in,
-                                                      final 
ColumnSerializer.Flag flag,
-                                                      final int expireBefore,
-                                                      final Version version,
-                                                      final CellNameType type)
-    {
-        return new AbstractIterator<OnDiskAtom>()
-        {
-            protected OnDiskAtom computeNext()
-            {
-                OnDiskAtom atom;
-                try
-                {
-                    atom = 
type.onDiskAtomSerializer().deserializeFromSSTable(in, flag, expireBefore, 
version);
-                }
-                catch (IOException e)
-                {
-                    throw new IOError(e);
-                }
-                if (atom == null)
-                    return endOfData();
-
-                return atom;
-            }
-        };
-    }
-
-    public boolean isLive()
-    {
-        return true;
-    }
-
-    public boolean isLive(long now)
-    {
-        return true;
-    }
-
-    public int cellDataSize()
-    {
-        return name().dataSize() + value().remaining() + 
TypeSizes.NATIVE.sizeof(timestamp());
-    }
-
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        /*
-         * Size of a column is =
-         *   size of a name (short + length of the string)
-         * + 1 byte to indicate if the column has been deleted
-         * + 8 bytes for timestamp
-         * + 4 bytes which basically indicates the size of the byte array
-         * + entire byte array.
-        */
-        int valueSize = value().remaining();
-        return ((int)type.cellSerializer().serializedSize(name(), typeSizes)) 
+ 1 + typeSizes.sizeof(timestamp()) + typeSizes.sizeof(valueSize) + valueSize;
-    }
-
-    public int serializationFlags()
-    {
-        return 0;
-    }
-
-    public Cell diff(Cell cell)
-    {
-        if (timestamp() < cell.timestamp())
-            return cell;
-        return null;
-    }
-
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name().toByteBuffer().duplicate());
-        digest.update(value().duplicate());
-
-        FBUtilities.updateWithLong(digest, timestamp());
-        FBUtilities.updateWithByte(digest, serializationFlags());
-    }
-
-    public int getLocalDeletionTime()
-    {
-        return Integer.MAX_VALUE;
-    }
-
-    public Cell reconcile(Cell cell)
-    {
-        long ts1 = timestamp(), ts2 = cell.timestamp();
-        if (ts1 != ts2)
-            return ts1 < ts2 ? cell : this;
-        if (isLive() != cell.isLive())
-            return isLive() ? cell : this;
-        return value().compareTo(cell.value()) < 0 ? cell : this;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        return this == o || (o instanceof Cell && equals((Cell) o));
-    }
-
-    public boolean equals(Cell cell)
-    {
-        return timestamp() == cell.timestamp() && name().equals(cell.name()) 
&& value().equals(cell.value())
-               && serializationFlags() == cell.serializationFlags();
-    }
-
-    public int hashCode()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s:%b:%d@%d",
-                             comparator.getString(name()),
-                             !isLive(),
-                             value().remaining(),
-                             timestamp());
-    }
-
-    public void validateName(CFMetaData metadata) throws MarshalException
-    {
-        metadata.comparator.validate(name());
-    }
-
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-
-        AbstractType<?> valueValidator = metadata.getValueValidator(name());
-        if (valueValidator != null)
-            valueValidator.validateCellValue(value());
-    }
-
-    public static Cell create(CellName name, ByteBuffer value, long timestamp, 
int ttl, CFMetaData metadata)
-    {
-        if (ttl <= 0)
-            ttl = metadata.getDefaultTimeToLive();
-
-        return ttl > 0
-                ? new BufferExpiringCell(name, value, timestamp, ttl)
-                : new BufferCell(name, value, timestamp);
-    }
-
-    public Cell diffCounter(Cell cell)
-    {
-        assert this instanceof CounterCell : "Wrong class type: " + getClass();
-
-        if (timestamp() < cell.timestamp())
-            return cell;
-
-        // Note that if at that point, cell can't be a tombstone. Indeed,
-        // cell is the result of merging us with other nodes results, and
-        // merging a CounterCell with a tombstone never return a tombstone
-        // unless that tombstone timestamp is greater that the CounterCell
-        // one.
-        assert cell instanceof CounterCell : "Wrong class type: " + 
cell.getClass();
-
-        if (((CounterCell) this).timestampOfLastDelete() < ((CounterCell) 
cell).timestampOfLastDelete())
-            return cell;
-
-        CounterContext.Relationship rel = 
CounterCell.contextManager.diff(cell.value(), value());
-        return (rel == CounterContext.Relationship.GREATER_THAN || rel == 
CounterContext.Relationship.DISJOINT) ? cell : null;
-    }
-
-    /** This is temporary until we start creating Cells of the different type 
(buffer vs. native) */
-    public Cell reconcileCounter(Cell cell)
-    {
-        assert this instanceof CounterCell : "Wrong class type: " + getClass();
-
-        // No matter what the counter cell's timestamp is, a tombstone always 
takes precedence. See CASSANDRA-7346.
-        if (cell instanceof DeletedCell)
-            return cell;
-
-        assert (cell instanceof CounterCell) : "Wrong class type: " + 
cell.getClass();
-
-        // live < live last delete
-        if (timestamp() < ((CounterCell) cell).timestampOfLastDelete())
-            return cell;
-
-        long timestampOfLastDelete = ((CounterCell) 
this).timestampOfLastDelete();
-
-        // live last delete > live
-        if (timestampOfLastDelete > cell.timestamp())
-            return this;
-
-        // live + live. return one of the cells if its context is a superset 
of the other's, or merge them otherwise
-        ByteBuffer context = CounterCell.contextManager.merge(value(), 
cell.value());
-        if (context == value() && timestamp() >= cell.timestamp() && 
timestampOfLastDelete >= ((CounterCell) cell).timestampOfLastDelete())
-            return this;
-        else if (context == cell.value() && cell.timestamp() >= timestamp() && 
((CounterCell) cell).timestampOfLastDelete() >= timestampOfLastDelete)
-            return cell;
-        else // merge clocks and timestamps.
-            return new BufferCounterCell(name(),
-                                         context,
-                                         Math.max(timestamp(), 
cell.timestamp()),
-                                         Math.max(timestampOfLastDelete, 
((CounterCell) cell).timestampOfLastDelete()));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java 
b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
new file mode 100644
index 0000000..9ea071d
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractClusteringPrefix.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.Objects;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public abstract class AbstractClusteringPrefix implements ClusteringPrefix
+{
+    public ClusteringPrefix clustering()
+    {
+        return this;
+    }
+
+    public int dataSize()
+    {
+        int size = 0;
+        for (int i = 0; i < size(); i++)
+        {
+            ByteBuffer bb = get(i);
+            size += bb == null ? 0 : bb.remaining();
+        }
+        return size;
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        for (int i = 0; i < size(); i++)
+        {
+            ByteBuffer bb = get(i);
+            if (bb != null)
+            digest.update(bb.duplicate());
+        }
+        FBUtilities.updateWithByte(digest, kind().ordinal());
+    }
+
+    public void writeTo(Writer writer)
+    {
+        for (int i = 0; i < size(); i++)
+            writer.writeClusteringValue(get(i));
+    }
+
+    public long unsharedHeapSize()
+    {
+        // unsharedHeapSize is used inside the cache and in memtables. 
Implementations that are
+        // safe to use there (SimpleClustering, Slice.Bound.SimpleBound and 
MemtableRow.* classes) overwrite this.
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public final int hashCode()
+    {
+        int result = 31;
+        for (int i = 0; i < size(); i++)
+            result += 31 * Objects.hashCode(get(i));
+        return 31 * result + Objects.hashCode(kind());
+    }
+
+    @Override
+    public final boolean equals(Object o)
+    {
+        if(!(o instanceof ClusteringPrefix))
+            return false;
+
+        ClusteringPrefix that = (ClusteringPrefix)o;
+        if (this.kind() != that.kind() || this.size() != that.size())
+            return false;
+
+        for (int i = 0; i < size(); i++)
+            if (!Objects.equals(this.get(i), that.get(i)))
+                return false;
+
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java 
b/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java
new file mode 100644
index 0000000..bbda598
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractLivenessInfo.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.util.Objects;
+import java.security.MessageDigest;
+
+import org.apache.cassandra.serializers.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * Base abstract class for {@code LivenessInfo} implementations.
+ *
+ * All {@code LivenessInfo} should extends this class unless it has a very
+ * good reason not to.
+ */
+public abstract class AbstractLivenessInfo implements LivenessInfo
+{
+    public boolean hasTimestamp()
+    {
+        return timestamp() != NO_TIMESTAMP;
+    }
+
+    public boolean hasTTL()
+    {
+        return ttl() != NO_TTL;
+    }
+
+    public boolean hasLocalDeletionTime()
+    {
+        return localDeletionTime() != NO_DELETION_TIME;
+    }
+
+    public int remainingTTL(int nowInSec)
+    {
+        if (!hasTTL())
+            return -1;
+
+        int remaining = localDeletionTime() - nowInSec;
+        return remaining >= 0 ? remaining : -1;
+    }
+
+    public boolean isLive(int nowInSec)
+    {
+        // Note that we don't rely on localDeletionTime() only because if we 
were to, we
+        // could potentially consider a tombstone as a live cell (due to time 
skew). So
+        // if a cell has a local deletion time and no ttl, it's a tombstone 
and consider
+        // dead no matter what it's actual local deletion value is.
+        return hasTimestamp() && (!hasLocalDeletionTime() || (hasTTL() && 
nowInSec < localDeletionTime()));
+    }
+
+    public void digest(MessageDigest digest)
+    {
+        FBUtilities.updateWithLong(digest, timestamp());
+        FBUtilities.updateWithInt(digest, localDeletionTime());
+        FBUtilities.updateWithInt(digest, ttl());
+    }
+
+    public void validate()
+    {
+        if (ttl() < 0)
+            throw new MarshalException("A TTL should not be negative");
+        if (localDeletionTime() < 0)
+            throw new MarshalException("A local deletion time should not be 
negative");
+        if (hasTTL() && !hasLocalDeletionTime())
+            throw new MarshalException("Shoud not have a TTL without an 
associated local deletion time");
+    }
+
+    public int dataSize()
+    {
+        int size = 0;
+        if (hasTimestamp())
+            size += TypeSizes.NATIVE.sizeof(timestamp());
+        if (hasTTL())
+            size += TypeSizes.NATIVE.sizeof(ttl());
+        if (hasLocalDeletionTime())
+            size += TypeSizes.NATIVE.sizeof(localDeletionTime());
+        return size;
+
+    }
+
+    public boolean supersedes(LivenessInfo other)
+    {
+        return timestamp() > other.timestamp();
+    }
+
+    public LivenessInfo mergeWith(LivenessInfo other)
+    {
+        return supersedes(other) ? this : other;
+    }
+
+    public LivenessInfo takeAlias()
+    {
+        return new SimpleLivenessInfo(timestamp(), ttl(), localDeletionTime());
+    };
+
+    public LivenessInfo withUpdatedTimestamp(long newTimestamp)
+    {
+        if (!hasTimestamp())
+            return this;
+
+        return new SimpleLivenessInfo(newTimestamp, ttl(), 
localDeletionTime());
+    }
+
+    public boolean isPurgeable(long maxPurgeableTimestamp, int gcBefore)
+    {
+        return timestamp() < maxPurgeableTimestamp && localDeletionTime() < 
gcBefore;
+    }
+
+    @Override
+    public String toString()
+    {
+        StringBuilder sb = new StringBuilder();
+        sb.append('[');
+        boolean needSpace = false;
+        if (hasTimestamp())
+        {
+            sb.append("ts=").append(timestamp());
+            needSpace = true;
+        }
+        if (hasTTL())
+        {
+            sb.append(needSpace ? ' ' : "").append("ttl=").append(ttl());
+            needSpace = true;
+        }
+        if (hasLocalDeletionTime())
+            sb.append(needSpace ? ' ' : 
"").append("ldt=").append(localDeletionTime());
+        sb.append(']');
+        return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+        if(!(other instanceof LivenessInfo))
+            return false;
+
+        LivenessInfo that = (LivenessInfo)other;
+        return this.timestamp() == that.timestamp()
+            && this.ttl() == that.ttl()
+            && this.localDeletionTime() == that.localDeletionTime();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(timestamp(), ttl(), localDeletionTime());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/AbstractNativeCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractNativeCell.java 
b/src/java/org/apache/cassandra/db/AbstractNativeCell.java
deleted file mode 100644
index 207a972..0000000
--- a/src/java/org/apache/cassandra/db/AbstractNativeCell.java
+++ /dev/null
@@ -1,710 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
-import java.security.MessageDigest;
-
-import net.nicoulaj.compilecommand.annotations.Inline;
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.composites.*;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.FastByteOperations;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.*;
-
-
-/**
- * <pre>
- * {@code
- * Packs a CellName AND a Cell into one off-heap representation.
- * Layout is:
- *
- * Note we store the ColumnIdentifier in full as bytes. This seems an okay 
tradeoff for now, as we just
- * look it back up again when we need to, and in the near future we hope to 
switch to ints, longs or
- * UUIDs representing column identifiers on disk, at which point we can switch 
that here as well.
- *
- * [timestamp][value offset][name size]][name extra][name offset deltas][cell 
names][value][Descendants]
- * [   8b    ][     4b     ][    2b   ][     1b    ][     each 2b      ][ arb 
< 64k][ arb ][ arbitrary ]
- *
- * descendants: any overriding classes will put their state here
- * name offsets are deltas from their base offset, and don't include the first 
offset, or the end position of the final entry,
- * i.e. there will be size - 1 entries, and each is a delta that is added to 
the offset of the position of the first name
- * (which is always CELL_NAME_OFFSETS_OFFSET + (2 * (size - 1))). The length 
of the final name fills up any remaining
- * space upto the value offset
- * name extra:  lowest 2 bits indicate the clustering size delta (i.e. how 
many name items are NOT part of the clustering key)
- *              the next 2 bits indicate the CellNameType
- *              the next bit indicates if the column is a static or 
clustered/dynamic column
- * }
- * </pre>
- */
-public abstract class AbstractNativeCell extends AbstractCell implements 
CellName
-{
-    static final int TIMESTAMP_OFFSET = 4;
-    private static final int VALUE_OFFSET_OFFSET = 12;
-    private static final int CELL_NAME_SIZE_OFFSET = 16;
-    private static final int CELL_NAME_EXTRA_OFFSET = 18;
-    private static final int CELL_NAME_OFFSETS_OFFSET = 19;
-    private static final int CELL_NAME_SIZE_DELTA_MASK = 3;
-    private static final int CELL_NAME_TYPE_SHIFT = 2;
-    private static final int CELL_NAME_TYPE_MASK = 7;
-
-    private static enum NameType
-    {
-        COMPOUND_DENSE(0 << 2), COMPOUND_SPARSE(1 << 2), 
COMPOUND_SPARSE_STATIC(2 << 2), SIMPLE_DENSE(3 << 2), SIMPLE_SPARSE(4 << 2);
-        static final NameType[] TYPES = NameType.values();
-        final int bits;
-
-        NameType(int bits)
-        {
-            this.bits = bits;
-        }
-
-        static NameType typeOf(CellName name)
-        {
-            if (name instanceof CompoundDenseCellName)
-            {
-                assert !name.isStatic();
-                return COMPOUND_DENSE;
-            }
-
-            if (name instanceof CompoundSparseCellName)
-                return name.isStatic() ? COMPOUND_SPARSE_STATIC : 
COMPOUND_SPARSE;
-
-            if (name instanceof SimpleDenseCellName)
-            {
-                assert !name.isStatic();
-                return SIMPLE_DENSE;
-            }
-
-            if (name instanceof SimpleSparseCellName)
-            {
-                assert !name.isStatic();
-                return SIMPLE_SPARSE;
-            }
-
-            if (name instanceof NativeCell)
-                return ((NativeCell) name).nametype();
-
-            throw new AssertionError();
-        }
-    }
-
-    private final long peer; // peer is assigned by peer updater in setPeer 
method
-
-    AbstractNativeCell()
-    {
-        peer = -1;
-    }
-
-    public AbstractNativeCell(NativeAllocator allocator, OpOrder.Group 
writeOp, Cell copyOf)
-    {
-        int size = sizeOf(copyOf);
-        peer = allocator.allocate(size, writeOp);
-
-        MemoryUtil.setInt(peer, size);
-        construct(copyOf);
-    }
-
-    protected int sizeOf(Cell cell)
-    {
-        int size = CELL_NAME_OFFSETS_OFFSET + Math.max(0, cell.name().size() - 
1) * 2 + cell.value().remaining();
-        CellName name = cell.name();
-        for (int i = 0; i < name.size(); i++)
-            size += name.get(i).remaining();
-        return size;
-    }
-
-    protected void construct(Cell from)
-    {
-        setLong(TIMESTAMP_OFFSET, from.timestamp());
-        CellName name = from.name();
-        int nameSize = name.size();
-        int offset = CELL_NAME_SIZE_OFFSET;
-        setShort(offset, (short) nameSize);
-        assert nameSize - name.clusteringSize() <= 2;
-        byte cellNameExtraBits = (byte) ((nameSize - name.clusteringSize()) | 
NameType.typeOf(name).bits);
-        setByte(offset += 2, cellNameExtraBits);
-        offset += 1;
-        short cellNameDelta = 0;
-        for (int i = 1; i < nameSize; i++)
-        {
-            cellNameDelta += name.get(i - 1).remaining();
-            setShort(offset, cellNameDelta);
-            offset += 2;
-        }
-        for (int i = 0; i < nameSize; i++)
-        {
-            ByteBuffer bb = name.get(i);
-            setBytes(offset, bb);
-            offset += bb.remaining();
-        }
-        setInt(VALUE_OFFSET_OFFSET, offset);
-        setBytes(offset, from.value());
-    }
-
-    // the offset at which to read the short that gives the names
-    private int nameDeltaOffset(int i)
-    {
-        return CELL_NAME_OFFSETS_OFFSET + ((i - 1) * 2);
-    }
-
-    int valueStartOffset()
-    {
-        return getInt(VALUE_OFFSET_OFFSET);
-    }
-
-    private int valueEndOffset()
-    {
-        return (int) (internalSize() - postfixSize());
-    }
-
-    protected int postfixSize()
-    {
-        return 0;
-    }
-
-    @Override
-    public ByteBuffer value()
-    {
-        long offset = valueStartOffset();
-        return getByteBuffer(offset, (int) (internalSize() - (postfixSize() + 
offset))).order(ByteOrder.BIG_ENDIAN);
-    }
-
-    private int clusteringSizeDelta()
-    {
-        return getByte(CELL_NAME_EXTRA_OFFSET) & CELL_NAME_SIZE_DELTA_MASK;
-    }
-
-    public boolean isStatic()
-    {
-        return nametype() == NameType.COMPOUND_SPARSE_STATIC;
-    }
-
-    NameType nametype()
-    {
-        return NameType.TYPES[(((int) this.getByte(CELL_NAME_EXTRA_OFFSET)) >> 
CELL_NAME_TYPE_SHIFT) & CELL_NAME_TYPE_MASK];
-    }
-
-    public long minTimestamp()
-    {
-        return timestamp();
-    }
-
-    public long maxTimestamp()
-    {
-        return timestamp();
-    }
-
-    public int clusteringSize()
-    {
-        return size() - clusteringSizeDelta();
-    }
-
-    @Override
-    public ColumnIdentifier cql3ColumnName(CFMetaData metadata)
-    {
-        switch (nametype())
-        {
-            case SIMPLE_SPARSE:
-                return getIdentifier(metadata, get(clusteringSize()));
-            case COMPOUND_SPARSE_STATIC:
-            case COMPOUND_SPARSE:
-                ByteBuffer buffer = get(clusteringSize());
-                if (buffer.remaining() == 0)
-                    return CompoundSparseCellNameType.rowMarkerId;
-
-                return getIdentifier(metadata, buffer);
-            case SIMPLE_DENSE:
-            case COMPOUND_DENSE:
-                return null;
-            default:
-                throw new AssertionError();
-        }
-    }
-
-    public ByteBuffer collectionElement()
-    {
-        return isCollectionCell() ? get(size() - 1) : null;
-    }
-
-    // we always have a collection element if our clustering size is 2 less 
than our total size,
-    // and we never have one otherwiss
-    public boolean isCollectionCell()
-    {
-        return clusteringSizeDelta() == 2;
-    }
-
-    public boolean isSameCQL3RowAs(CellNameType type, CellName other)
-    {
-        switch (nametype())
-        {
-            case SIMPLE_DENSE:
-            case COMPOUND_DENSE:
-                return type.compare(this, other) == 0;
-            case COMPOUND_SPARSE_STATIC:
-            case COMPOUND_SPARSE:
-                int clusteringSize = clusteringSize();
-                if (clusteringSize != other.clusteringSize() || 
other.isStatic() != isStatic())
-                    return false;
-                for (int i = 0; i < clusteringSize; i++)
-                    if (type.subtype(i).compare(get(i), other.get(i)) != 0)
-                        return false;
-                return true;
-            case SIMPLE_SPARSE:
-                return true;
-            default:
-                throw new AssertionError();
-        }
-    }
-
-    public int size()
-    {
-        return getShort(CELL_NAME_SIZE_OFFSET);
-    }
-
-    public boolean isEmpty()
-    {
-        return size() == 0;
-    }
-
-    public ByteBuffer get(int i)
-    {
-        return get(i, null);
-    }
-
-    private ByteBuffer get(int i, AbstractAllocator copy)
-    {
-        // remember to take dense/sparse into account, and only return EOC 
when not dense
-        int size = size();
-        assert i >= 0 && i < size();
-        int cellNamesOffset = nameDeltaOffset(size);
-        int startDelta = i == 0 ? 0 : getShort(nameDeltaOffset(i));
-        int endDelta = i < size - 1 ? getShort(nameDeltaOffset(i + 1)) : 
valueStartOffset() - cellNamesOffset;
-        int length = endDelta - startDelta;
-        if (copy == null)
-            return getByteBuffer(cellNamesOffset + startDelta, 
length).order(ByteOrder.BIG_ENDIAN);
-        ByteBuffer result = copy.allocate(length);
-        FastByteOperations.UnsafeOperations.copy(null, peer + cellNamesOffset 
+ startDelta, result, 0, length);
-        return result;
-    }
-
-    private static final ThreadLocal<byte[]> BUFFER = new ThreadLocal<byte[]>()
-    {
-        protected byte[] initialValue()
-        {
-            return new byte[256];
-        }
-    };
-
-    protected void writeComponentTo(MessageDigest digest, int i, boolean 
includeSize)
-    {
-        // remember to take dense/sparse into account, and only return EOC 
when not dense
-        int size = size();
-        assert i >= 0 && i < size();
-        int cellNamesOffset = nameDeltaOffset(size);
-        int startDelta = i == 0 ? 0 : getShort(nameDeltaOffset(i));
-        int endDelta = i < size - 1 ? getShort(nameDeltaOffset(i + 1)) : 
valueStartOffset() - cellNamesOffset;
-
-        int componentStart = cellNamesOffset + startDelta;
-        int count = endDelta - startDelta;
-
-        if (includeSize)
-            FBUtilities.updateWithShort(digest, count);
-
-        writeMemoryTo(digest, componentStart, count);
-    }
-
-    protected void writeMemoryTo(MessageDigest digest, int from, int count)
-    {
-        // only batch if we have more than 16 bytes remaining to transfer, 
otherwise fall-back to single-byte updates
-        int i = 0, batchEnd = count - 16;
-        if (i < batchEnd)
-        {
-            byte[] buffer = BUFFER.get();
-            while (i < batchEnd)
-            {
-                int transfer = Math.min(count - i, 256);
-                getBytes(from + i, buffer, 0, transfer);
-                digest.update(buffer, 0, transfer);
-                i += transfer;
-            }
-        }
-        while (i < count)
-            digest.update(getByte(from + i++));
-    }
-
-    public EOC eoc()
-    {
-        return EOC.NONE;
-    }
-
-    public Composite withEOC(EOC eoc)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public Composite start()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public Composite end()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public ColumnSlice slice()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public boolean isPrefixOf(CType type, Composite c)
-    {
-        if (size() > c.size() || isStatic() != c.isStatic())
-            return false;
-
-        for (int i = 0; i < size(); i++)
-        {
-            if (type.subtype(i).compare(get(i), c.get(i)) != 0)
-                return false;
-        }
-        return true;
-    }
-
-    public ByteBuffer toByteBuffer()
-    {
-        // for simple sparse we just return our one name buffer
-        switch (nametype())
-        {
-            case SIMPLE_DENSE:
-            case SIMPLE_SPARSE:
-                return get(0);
-            case COMPOUND_DENSE:
-            case COMPOUND_SPARSE_STATIC:
-            case COMPOUND_SPARSE:
-                // This is the legacy format of composites.
-                // See org.apache.cassandra.db.marshal.CompositeType for 
details.
-                ByteBuffer result = ByteBuffer.allocate(cellDataSize());
-                if (isStatic())
-                    ByteBufferUtil.writeShortLength(result, 
CompositeType.STATIC_MARKER);
-
-                for (int i = 0; i < size(); i++)
-                {
-                    ByteBuffer bb = get(i);
-                    ByteBufferUtil.writeShortLength(result, bb.remaining());
-                    result.put(bb);
-                    result.put((byte) 0);
-                }
-                result.flip();
-                return result;
-            default:
-                throw new AssertionError();
-        }
-    }
-
-    protected void updateWithName(MessageDigest digest)
-    {
-        // for simple sparse we just return our one name buffer
-        switch (nametype())
-        {
-            case SIMPLE_DENSE:
-            case SIMPLE_SPARSE:
-                writeComponentTo(digest, 0, false);
-                break;
-
-            case COMPOUND_DENSE:
-            case COMPOUND_SPARSE_STATIC:
-            case COMPOUND_SPARSE:
-                // This is the legacy format of composites.
-                // See org.apache.cassandra.db.marshal.CompositeType for 
details.
-                if (isStatic())
-                    FBUtilities.updateWithShort(digest, 
CompositeType.STATIC_MARKER);
-
-                for (int i = 0; i < size(); i++)
-                {
-                    writeComponentTo(digest, i, true);
-                    digest.update((byte) 0);
-                }
-                break;
-
-            default:
-                throw new AssertionError();
-        }
-    }
-
-    protected void updateWithValue(MessageDigest digest)
-    {
-        int offset = valueStartOffset();
-        int length = valueEndOffset() - offset;
-        writeMemoryTo(digest, offset, length);
-    }
-
-    @Override // this is the NAME dataSize, only!
-    public int dataSize()
-    {
-        switch (nametype())
-        {
-            case SIMPLE_DENSE:
-            case SIMPLE_SPARSE:
-                return valueStartOffset() - nameDeltaOffset(size());
-            case COMPOUND_DENSE:
-            case COMPOUND_SPARSE_STATIC:
-            case COMPOUND_SPARSE:
-                int size = size();
-                return valueStartOffset() - nameDeltaOffset(size) + 3 * size + 
(isStatic() ? 2 : 0);
-            default:
-                throw new AssertionError();
-        }
-    }
-
-    public boolean equals(Object obj)
-    {
-        if (obj == this)
-            return true;
-        if (obj instanceof CellName)
-            return equals((CellName) obj);
-        if (obj instanceof Cell)
-            return equals((Cell) obj);
-        return false;
-    }
-
-    public boolean equals(CellName that)
-    {
-        int size = this.size();
-        if (size != that.size())
-            return false;
-
-        for (int i = 0 ; i < size ; i++)
-            if (!get(i).equals(that.get(i)))
-                return false;
-        return true;
-    }
-
-    private static final ByteBuffer[] EMPTY = new ByteBuffer[0];
-
-    @Override
-    public CellName copy(CFMetaData cfm, AbstractAllocator allocator)
-    {
-        ByteBuffer[] r;
-        switch (nametype())
-        {
-            case SIMPLE_DENSE:
-                return CellNames.simpleDense(get(0, allocator));
-
-            case COMPOUND_DENSE:
-                r = new ByteBuffer[size()];
-                for (int i = 0; i < r.length; i++)
-                    r[i] = get(i, allocator);
-                return CellNames.compositeDense(r);
-
-            case COMPOUND_SPARSE_STATIC:
-            case COMPOUND_SPARSE:
-                int clusteringSize = clusteringSize();
-                r = clusteringSize == 0 ? EMPTY : new 
ByteBuffer[clusteringSize()];
-                for (int i = 0; i < clusteringSize; i++)
-                    r[i] = get(i, allocator);
-
-                ByteBuffer nameBuffer = get(r.length);
-                ColumnIdentifier name;
-
-                if (nameBuffer.remaining() == 0)
-                {
-                    name = CompoundSparseCellNameType.rowMarkerId;
-                }
-                else
-                {
-                    name = getIdentifier(cfm, nameBuffer);
-                }
-
-                if (clusteringSizeDelta() == 2)
-                {
-                    ByteBuffer element = allocator.clone(get(size() - 1));
-                    return CellNames.compositeSparseWithCollection(r, element, 
name, isStatic());
-                }
-                return CellNames.compositeSparse(r, name, isStatic());
-
-            case SIMPLE_SPARSE:
-                return CellNames.simpleSparse(getIdentifier(cfm, get(0)));
-        }
-        throw new IllegalStateException();
-    }
-
-    private static ColumnIdentifier getIdentifier(CFMetaData cfMetaData, 
ByteBuffer name)
-    {
-        ColumnDefinition def = cfMetaData.getColumnDefinition(name);
-        if (def != null)
-        {
-            return def.name;
-        }
-        else
-        {
-            // it's safe to simply grab based on clusteringPrefixSize() as we 
are only called if not a dense type
-            AbstractType<?> type = 
cfMetaData.comparator.subtype(cfMetaData.comparator.clusteringPrefixSize());
-            return new ColumnIdentifier(HeapAllocator.instance.clone(name), 
type);
-        }
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Cell withUpdatedTimestamp(long newTimestamp)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    protected long internalSize()
-    {
-        return MemoryUtil.getInt(peer);
-    }
-
-    private void checkPosition(long offset, long size)
-    {
-        assert size >= 0;
-        assert peer > 0 : "Memory was freed";
-        assert offset >= 0 && offset + size <= internalSize() : 
String.format("Illegal range: [%d..%d), size: %s", offset, offset + size, 
internalSize());
-    }
-
-    protected final void setByte(long offset, byte b)
-    {
-        checkPosition(offset, 1);
-        MemoryUtil.setByte(peer + offset, b);
-    }
-
-    protected final void setShort(long offset, short s)
-    {
-        checkPosition(offset, 1);
-        MemoryUtil.setShort(peer + offset, s);
-    }
-
-    protected final void setInt(long offset, int l)
-    {
-        checkPosition(offset, 4);
-        MemoryUtil.setInt(peer + offset, l);
-    }
-
-    protected final void setLong(long offset, long l)
-    {
-        checkPosition(offset, 8);
-        MemoryUtil.setLong(peer + offset, l);
-    }
-
-    protected final void setBytes(long offset, ByteBuffer buffer)
-    {
-        int start = buffer.position();
-        int count = buffer.limit() - start;
-        if (count == 0)
-            return;
-
-        checkPosition(offset, count);
-        MemoryUtil.setBytes(peer + offset, buffer);
-    }
-
-    protected final byte getByte(long offset)
-    {
-        checkPosition(offset, 1);
-        return MemoryUtil.getByte(peer + offset);
-    }
-
-    protected final void getBytes(long offset, byte[] trg, int trgOffset, int 
count)
-    {
-        checkPosition(offset, count);
-        MemoryUtil.getBytes(peer + offset, trg, trgOffset, count);
-    }
-
-    protected final int getShort(long offset)
-    {
-        checkPosition(offset, 2);
-        return MemoryUtil.getShort(peer + offset);
-    }
-
-    protected final int getInt(long offset)
-    {
-        checkPosition(offset, 4);
-        return MemoryUtil.getInt(peer + offset);
-    }
-
-    protected final long getLong(long offset)
-    {
-        checkPosition(offset, 8);
-        return MemoryUtil.getLong(peer + offset);
-    }
-
-    protected final ByteBuffer getByteBuffer(long offset, int length)
-    {
-        checkPosition(offset, length);
-        return MemoryUtil.getByteBuffer(peer + offset, length);
-    }
-
-    // requires isByteOrderComparable to be true. Compares the name components 
only; ; may need to compare EOC etc still
-    @Inline
-    public final int compareTo(final Composite that)
-    {
-        if (isStatic() != that.isStatic())
-        {
-            // Static sorts before non-static no matter what, except for empty 
which
-            // always sort first
-            if (isEmpty())
-                return that.isEmpty() ? 0 : -1;
-            if (that.isEmpty())
-                return 1;
-            return isStatic() ? -1 : 1;
-        }
-
-        int size = size();
-        int size2 = that.size();
-        int minSize = Math.min(size, size2);
-        int startDelta = 0;
-        int cellNamesOffset = nameDeltaOffset(size);
-        for (int i = 0 ; i < minSize ; i++)
-        {
-            int endDelta = i < size - 1 ? getShort(nameDeltaOffset(i + 1)) : 
valueStartOffset() - cellNamesOffset;
-            long offset = peer + cellNamesOffset + startDelta;
-            int length = endDelta - startDelta;
-            int cmp = FastByteOperations.UnsafeOperations.compareTo(null, 
offset, length, that.get(i));
-            if (cmp != 0)
-                return cmp;
-            startDelta = endDelta;
-        }
-
-        EOC eoc = that.eoc();
-        if (size == size2)
-            return this.eoc().compareTo(eoc);
-
-        return size < size2 ? this.eoc().prefixComparisonResult : 
-eoc.prefixComparisonResult;
-    }
-
-    public final int compareToSimple(final Composite that)
-    {
-        assert size() == 1 && that.size() == 1;
-        int length = valueStartOffset() - nameDeltaOffset(1);
-        long offset = peer + nameDeltaOffset(1);
-        return FastByteOperations.UnsafeOperations.compareTo(null, offset, 
length, that.get(0));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java 
b/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
deleted file mode 100644
index 8bcb5b3..0000000
--- a/src/java/org/apache/cassandra/db/AbstractRangeCommand.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.index.*;
-import org.apache.cassandra.dht.AbstractBounds;
-import org.apache.cassandra.net.MessageOut;
-import org.apache.cassandra.service.IReadCommand;
-
-public abstract class AbstractRangeCommand implements IReadCommand
-{
-    public final String keyspace;
-    public final String columnFamily;
-    public final long timestamp;
-
-    public final AbstractBounds<RowPosition> keyRange;
-    public final IDiskAtomFilter predicate;
-    public final List<IndexExpression> rowFilter;
-
-    public final SecondaryIndexSearcher searcher;
-
-    public AbstractRangeCommand(String keyspace, String columnFamily, long 
timestamp, AbstractBounds<RowPosition> keyRange, IDiskAtomFilter predicate, 
List<IndexExpression> rowFilter)
-    {
-        this.keyspace = keyspace;
-        this.columnFamily = columnFamily;
-        this.timestamp = timestamp;
-        this.keyRange = keyRange;
-        this.predicate = predicate;
-        this.rowFilter = rowFilter;
-        SecondaryIndexManager indexManager = 
Keyspace.open(keyspace).getColumnFamilyStore(columnFamily).indexManager;
-        this.searcher = 
indexManager.getHighestSelectivityIndexSearcher(rowFilter);
-    }
-
-    public boolean requiresScanningAllRanges()
-    {
-        return searcher != null && 
searcher.requiresScanningAllRanges(rowFilter);
-    }
-
-    public List<Row> postReconciliationProcessing(List<Row> rows)
-    {
-        return searcher == null ? trim(rows) : 
trim(searcher.postReconciliationProcessing(rowFilter, rows));
-    }
-
-    private List<Row> trim(List<Row> rows)
-    {
-        if (countCQL3Rows() || ignoredTombstonedPartitions())
-            return rows;
-        else
-            return rows.size() > limit() ? rows.subList(0, limit()) : rows;
-    }
-
-    public String getKeyspace()
-    {
-        return keyspace;
-    }
-
-    public abstract MessageOut<? extends AbstractRangeCommand> createMessage();
-    public abstract AbstractRangeCommand 
forSubRange(AbstractBounds<RowPosition> range);
-    public abstract AbstractRangeCommand withUpdatedLimit(int newLimit);
-
-    public abstract int limit();
-    public abstract boolean countCQL3Rows();
-
-    /**
-     * Returns true if tombstoned partitions should not be included in results 
or count towards the limit.
-     * See CASSANDRA-8490 for more details on why this is needed (and done 
this way).
-     * */
-    public boolean ignoredTombstonedPartitions()
-    {
-        if (!(predicate instanceof SliceQueryFilter))
-            return false;
-
-        return ((SliceQueryFilter) predicate).compositesToGroup == 
SliceQueryFilter.IGNORE_TOMBSTONED_PARTITIONS;
-    }
-
-    public abstract List<Row> executeLocally();
-
-    public long getTimeout()
-    {
-        return DatabaseDescriptor.getRangeRpcTimeout();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Aliasable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Aliasable.java 
b/src/java/org/apache/cassandra/db/Aliasable.java
new file mode 100644
index 0000000..a4396fc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Aliasable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+/**
+ * This interface marks objects that are only valid in a restricted scope and
+ * shouldn't be simply aliased outside of this scope (in other words, you 
should
+ * not keep a reference to the object that escaped said scope as the object 
will
+ * likely become invalid).
+ *
+ * For instance, most {@link RowIterator} implementation reuse the same {@link
+ * Row} object during iteration. This means that the following code would be
+ * incorrect.
+ * <pre>
+ *    RowIterator iter = ...;
+ *    Row someRow = null;
+ *    while (iter.hasNext())
+ *    {
+ *        Row row = iter.next();
+ *        if (someCondition(row))
+ *           someRow = row;         // This isn't safe
+ *        doSomethingElse();
+ *    }
+ *    useRow(someRow);
+ * </pre>
+ * The problem being that, because the row iterator reuse the same object,
+ * {@code someRow} will not point to the row that had met {@code someCondition}
+ * at the end of the iteration ({@code someRow} will point to the last iterated
+ * row in practice).
+ *
+ * When code do need to alias such {@code Aliasable} object, it should call the
+ * {@code takeAlias} method that will make a copy of the object if necessary.
+ *
+ * Of course, the {@code takeAlias} should not be abused, as it defeat the 
purpose
+ * of sharing objects in the first place.
+ *
+ * Also note that some implementation of an {@code Aliasable} object may be
+ * safe to alias, in which case its {@code takeAlias} method will be a no-op.
+ */
+public interface Aliasable<T>
+{
+    /**
+     * Returns either this object (if it's safe to alias) or a copy of it
+     * (it it isn't safe to alias).
+     */
+    public T takeAlias();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java 
b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
deleted file mode 100644
index 1beb982..0000000
--- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java
+++ /dev/null
@@ -1,774 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.*;
-
-import com.google.common.base.Function;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.utils.BatchRemoveIterator;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.SearchIterator;
-
-/**
- * A ColumnFamily backed by an array.
- * This implementation is not synchronized and should only be used when
- * thread-safety is not required. This implementation makes sense when the
- * main operations performed are iterating over the cells and adding cells
- * (especially if insertion is in sorted order).
- */
-public class ArrayBackedSortedColumns extends ColumnFamily
-{
-    private static final Cell[] EMPTY_ARRAY = new Cell[0];
-    private static final int MINIMAL_CAPACITY = 10;
-
-    private final boolean reversed;
-
-    private DeletionInfo deletionInfo;
-    private Cell[] cells;
-    private int size;
-    private int sortedSize;
-    private volatile boolean isSorted;
-
-    public static final ColumnFamily.Factory<ArrayBackedSortedColumns> factory 
= new Factory<ArrayBackedSortedColumns>()
-    {
-        public ArrayBackedSortedColumns create(CFMetaData metadata, boolean 
insertReversed, int initialCapacity)
-        {
-            return new ArrayBackedSortedColumns(metadata, insertReversed, 
initialCapacity == 0 ? EMPTY_ARRAY : new Cell[initialCapacity], 0, 0);
-        }
-    };
-
-    private ArrayBackedSortedColumns(CFMetaData metadata, boolean reversed, 
Cell[] cells, int size, int sortedSize)
-    {
-        super(metadata);
-        this.reversed = reversed;
-        this.deletionInfo = DeletionInfo.live();
-        this.cells = cells;
-        this.size = size;
-        this.sortedSize = sortedSize;
-        this.isSorted = size == sortedSize;
-    }
-
-    protected ArrayBackedSortedColumns(CFMetaData metadata, boolean reversed)
-    {
-        this(metadata, reversed, EMPTY_ARRAY, 0, 0);
-    }
-
-    private ArrayBackedSortedColumns(ArrayBackedSortedColumns original)
-    {
-        super(original.metadata);
-        this.reversed = original.reversed;
-        this.deletionInfo = DeletionInfo.live(); // this is INTENTIONALLY not 
set to original.deletionInfo.
-        this.cells = Arrays.copyOf(original.cells, original.size);
-        this.size = original.size;
-        this.sortedSize = original.sortedSize;
-        this.isSorted = original.isSorted;
-    }
-
-    public static ArrayBackedSortedColumns localCopy(ColumnFamily original, 
AbstractAllocator allocator)
-    {
-        ArrayBackedSortedColumns copy = new 
ArrayBackedSortedColumns(original.metadata, false, new 
Cell[original.getColumnCount()], 0, 0);
-        for (Cell cell : original)
-            copy.internalAdd(cell.localCopy(original.metadata, allocator));
-        copy.sortedSize = copy.size; // internalAdd doesn't update sortedSize.
-        copy.delete(original);
-        return copy;
-    }
-
-    public ColumnFamily.Factory getFactory()
-    {
-        return factory;
-    }
-
-    public ColumnFamily cloneMe()
-    {
-        return new ArrayBackedSortedColumns(this);
-    }
-
-    public boolean isInsertReversed()
-    {
-        return reversed;
-    }
-
-    public BatchRemoveIterator<Cell> batchRemoveIterator()
-    {
-        maybeSortCells();
-
-        return new BatchRemoveIterator<Cell>()
-        {
-            private final Iterator<Cell> iter = iterator();
-            private BitSet removedIndexes = new BitSet(size);
-            private int idx = -1;
-            private boolean shouldCallNext = false;
-            private boolean isCommitted = false;
-            private boolean removedAnything = false;
-
-            public void commit()
-            {
-                if (isCommitted)
-                    throw new IllegalStateException();
-                isCommitted = true;
-
-                if (!removedAnything)
-                    return;
-
-                int retainedCount = 0;
-                int clearIdx, setIdx = -1;
-
-                // shift all [clearIdx, setIdx) segments to the left, skipping 
any removed columns
-                while (true)
-                {
-                    clearIdx = removedIndexes.nextClearBit(setIdx + 1);
-                    if (clearIdx >= size)
-                        break; // nothing left to retain
-
-                    setIdx = removedIndexes.nextSetBit(clearIdx + 1);
-                    if (setIdx < 0)
-                        setIdx = size; // no removals past retainIdx - copy 
all remaining cells
-
-                    if (retainedCount != clearIdx)
-                        System.arraycopy(cells, clearIdx, cells, 
retainedCount, setIdx - clearIdx);
-
-                    retainedCount += (setIdx - clearIdx);
-                }
-
-                for (int i = retainedCount; i < size; i++)
-                    cells[i] = null;
-
-                size = sortedSize = retainedCount;
-            }
-
-            public boolean hasNext()
-            {
-                return iter.hasNext();
-            }
-
-            public Cell next()
-            {
-                idx++;
-                shouldCallNext = false;
-                return iter.next();
-            }
-
-            public void remove()
-            {
-                if (shouldCallNext)
-                    throw new IllegalStateException();
-
-                removedIndexes.set(reversed ? size - idx - 1 : idx);
-                removedAnything = true;
-                shouldCallNext = true;
-            }
-        };
-    }
-
-    private Comparator<Composite> internalComparator()
-    {
-        return reversed ? getComparator().reverseComparator() : 
getComparator();
-    }
-
-    private void maybeSortCells()
-    {
-        if (!isSorted)
-            sortCells();
-    }
-
-    /**
-     * synchronized so that concurrent (read-only) accessors don't mess the 
internal state.
-     */
-    private synchronized void sortCells()
-    {
-        if (isSorted)
-            return; // Just sorted by a previous call
-
-        Comparator<Cell> comparator = reversed
-                                    ? getComparator().columnReverseComparator()
-                                    : getComparator().columnComparator(false);
-
-        // Sort the unsorted segment - will still potentially contain 
duplicate (non-reconciled) cells
-        Arrays.sort(cells, sortedSize, size, comparator);
-
-        // Determine the merge start position for that segment
-        int pos = binarySearch(0, sortedSize, cells[sortedSize].name(), 
internalComparator());
-        if (pos < 0)
-            pos = -pos - 1;
-
-        // Copy [pos, lastSortedCellIndex] cells into a separate array
-        Cell[] leftCopy = pos == sortedSize
-                        ? EMPTY_ARRAY
-                        : Arrays.copyOfRange(cells, pos, sortedSize);
-
-        // Store the beginning (inclusive) and the end (exclusive) indexes of 
the right segment
-        int rightStart = sortedSize;
-        int rightEnd = size;
-
-        // 'Trim' the sizes to what's left without the leftCopy
-        size = sortedSize = pos;
-
-        // Merge the cells from both segments. When adding from the left 
segment we can rely on it not having any
-        // duplicate cells, and thus omit the comparison with the previously 
entered cell - we'll never need to reconcile.
-        int l = 0, r = rightStart;
-        while (l < leftCopy.length && r < rightEnd)
-        {
-            int cmp = comparator.compare(leftCopy[l], cells[r]);
-            if (cmp < 0)
-                append(leftCopy[l++]);
-            else if (cmp == 0)
-                append(leftCopy[l++].reconcile(cells[r++]));
-            else
-                appendOrReconcile(cells[r++]);
-        }
-        while (l < leftCopy.length)
-            append(leftCopy[l++]);
-        while (r < rightEnd)
-            appendOrReconcile(cells[r++]);
-
-        // Nullify the remainder of the array (in case we had duplicate cells 
that got reconciled)
-        for (int i = size; i < rightEnd; i++)
-            cells[i] = null;
-
-        // Fully sorted at this point
-        isSorted = true;
-    }
-
-    private void appendOrReconcile(Cell cell)
-    {
-        if (size > 0 && cells[size - 1].name().equals(cell.name()))
-            reconcileWith(size - 1, cell);
-        else
-            append(cell);
-    }
-
-    private void append(Cell cell)
-    {
-        cells[size] = cell;
-        size++;
-        sortedSize++;
-    }
-
-    public Cell getColumn(CellName name)
-    {
-        maybeSortCells();
-        int pos = binarySearch(name);
-        return pos >= 0 ? cells[pos] : null;
-    }
-
-    /**
-      * Adds a cell, assuming that:
-      * - it's non-gc-able (if a tombstone) or not a tombstone
-      * - it has a more recent timestamp than any partition/range tombstone 
shadowing it
-      * - it sorts *strictly after* the current-last cell in the array.
-      */
-    public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester 
tester, int gcBefore)
-    {
-        if (cell.getLocalDeletionTime() >= gcBefore && !tester.isDeleted(cell))
-            appendColumn(cell);
-    }
-
-    /**
-     * Adds a cell, assuming that it sorts *strictly after* the current-last 
cell in the array.
-     */
-    public void appendColumn(Cell cell)
-    {
-        internalAdd(cell);
-        sortedSize++;
-    }
-
-    public void addColumn(Cell cell)
-    {
-        if (size == 0)
-        {
-            internalAdd(cell);
-            sortedSize++;
-            return;
-        }
-
-        if (!isSorted)
-        {
-            internalAdd(cell);
-            return;
-        }
-
-        int c = internalComparator().compare(cells[size - 1].name(), 
cell.name());
-        if (c < 0)
-        {
-            // Append to the end
-            internalAdd(cell);
-            sortedSize++;
-        }
-        else if (c == 0)
-        {
-            // Resolve against the last cell
-            reconcileWith(size - 1, cell);
-        }
-        else
-        {
-            int pos = binarySearch(cell.name());
-            if (pos >= 0) // Reconcile with an existing cell
-            {
-                reconcileWith(pos, cell);
-            }
-            else
-            {
-                internalAdd(cell); // Append to the end, making cells unsorted 
from now on
-                isSorted = false;
-            }
-        }
-    }
-
-    public void addAll(ColumnFamily other)
-    {
-        delete(other.deletionInfo());
-
-        if (!other.hasColumns())
-            return;
-
-        // In reality, with ABSC being the only remaining container (aside 
from ABTC), other will aways be ABSC.
-        if (size == 0 && other instanceof ArrayBackedSortedColumns)
-        {
-            fastAddAll((ArrayBackedSortedColumns) other);
-        }
-        else
-        {
-            Iterator<Cell> iterator = reversed ? other.reverseIterator() : 
other.iterator();
-            while (iterator.hasNext())
-                addColumn(iterator.next());
-        }
-    }
-
-    // Fast path, when this ABSC is empty.
-    private void fastAddAll(ArrayBackedSortedColumns other)
-    {
-        if (other.isInsertReversed() == isInsertReversed())
-        {
-            cells = Arrays.copyOf(other.cells, other.cells.length);
-            size = other.size;
-            sortedSize = other.sortedSize;
-            isSorted = other.isSorted;
-        }
-        else
-        {
-            if (cells.length < other.getColumnCount())
-                cells = new Cell[Math.max(MINIMAL_CAPACITY, 
other.getColumnCount())];
-            Iterator<Cell> iterator = reversed ? other.reverseIterator() : 
other.iterator();
-            while (iterator.hasNext())
-                cells[size++] = iterator.next();
-            sortedSize = size;
-            isSorted = true;
-        }
-    }
-
-    /**
-     * Add a cell to the array, 'resizing' it first if necessary (if it 
doesn't fit).
-     */
-    private void internalAdd(Cell cell)
-    {
-        if (cells.length == size)
-            cells = Arrays.copyOf(cells, Math.max(MINIMAL_CAPACITY, size * 3 / 
2 + 1));
-        cells[size++] = cell;
-    }
-
-    /**
-     * Remove the cell at a given index, shifting the rest of the array to the 
left if needed.
-     * Please note that we mostly remove from the end, so the shifting should 
be rare.
-     */
-    private void internalRemove(int index)
-    {
-        int moving = size - index - 1;
-        if (moving > 0)
-            System.arraycopy(cells, index + 1, cells, index, moving);
-        cells[--size] = null;
-    }
-
-    /**
-     * Reconcile with a cell at position i.
-     * Assume that i is a valid position.
-     */
-    private void reconcileWith(int i, Cell cell)
-    {
-        cells[i] = cell.reconcile(cells[i]);
-    }
-
-    private int binarySearch(CellName name)
-    {
-        return binarySearch(0, size, name, internalComparator());
-    }
-
-    /**
-     * Simple binary search for a given cell name.
-     * The return value has the exact same meaning that the one of 
Collections.binarySearch().
-     * (We don't use Collections.binarySearch() directly because it would 
require us to create
-     * a fake Cell (as well as an Cell comparator) to do the search, which is 
ugly.
-     */
-    private int binarySearch(int fromIndex, int toIndex, Composite name, 
Comparator<Composite> comparator)
-    {
-        int low = fromIndex;
-        int mid = toIndex;
-        int high = mid - 1;
-        int result = -1;
-        while (low <= high)
-        {
-            mid = (low + high) >> 1;
-            if ((result = comparator.compare(name, cells[mid].name())) > 0)
-                low = mid + 1;
-            else if (result == 0)
-                return mid;
-            else
-                high = mid - 1;
-        }
-        return -mid - (result < 0 ? 1 : 2);
-    }
-
-    public Collection<Cell> getSortedColumns()
-    {
-        return new CellCollection(reversed);
-    }
-
-    public Collection<Cell> getReverseSortedColumns()
-    {
-        return new CellCollection(!reversed);
-    }
-
-    public int getColumnCount()
-    {
-        maybeSortCells();
-        return size;
-    }
-
-    public boolean hasColumns()
-    {
-        return size > 0;
-    }
-
-    public void clear()
-    {
-        setDeletionInfo(DeletionInfo.live());
-        for (int i = 0; i < size; i++)
-            cells[i] = null;
-        size = sortedSize = 0;
-        isSorted = true;
-    }
-
-    public DeletionInfo deletionInfo()
-    {
-        return deletionInfo;
-    }
-
-    public void delete(DeletionTime delTime)
-    {
-        deletionInfo.add(delTime);
-    }
-
-    public void delete(DeletionInfo newInfo)
-    {
-        deletionInfo.add(newInfo);
-    }
-
-    protected void delete(RangeTombstone tombstone)
-    {
-        deletionInfo.add(tombstone, getComparator());
-    }
-
-    public void setDeletionInfo(DeletionInfo newInfo)
-    {
-        deletionInfo = newInfo;
-    }
-
-    /**
-     * Purges any tombstones with a local deletion time before gcBefore.
-     * @param gcBefore a timestamp (in seconds) before which tombstones should 
be purged
-     */
-    public void purgeTombstones(int gcBefore)
-    {
-        deletionInfo.purge(gcBefore);
-    }
-
-    public Iterable<CellName> getColumnNames()
-    {
-        return Iterables.transform(new CellCollection(false), new 
Function<Cell, CellName>()
-        {
-            public CellName apply(Cell cell)
-            {
-                return cell.name();
-            }
-        });
-    }
-
-    public Iterator<Cell> iterator(ColumnSlice[] slices)
-    {
-        maybeSortCells();
-        return slices.length == 1
-             ? slice(slices[0], reversed, null)
-             : new SlicesIterator(slices, reversed);
-    }
-
-    public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
-    {
-        maybeSortCells();
-        return slices.length == 1
-             ? slice(slices[0], !reversed, null)
-             : new SlicesIterator(slices, !reversed);
-    }
-
-    public SearchIterator<CellName, Cell> searchIterator()
-    {
-        maybeSortCells();
-
-        return new SearchIterator<CellName, Cell>()
-        {
-            // the first index that we could find the next key at, i.e. one 
larger
-            // than the last key's location
-            private int i = 0;
-
-            // We assume a uniform distribution of keys,
-            // so we keep track of how many keys were skipped to satisfy last 
lookup, and only look at twice that
-            // many keys for next lookup initially, extending to whole range 
only if we couldn't find it in that subrange
-            private int range = size / 2;
-
-            public boolean hasNext()
-            {
-                return i < size;
-            }
-
-            public Cell next(CellName name)
-            {
-                if (!isSorted || !hasNext())
-                    throw new IllegalStateException();
-
-                // optimize for runs of sequential matches, as in 
CollationController
-                // checking to see if we've found the desired cells yet 
(CASSANDRA-6933)
-                int c = metadata.comparator.compare(name, cells[i].name());
-                if (c <= 0)
-                    return c < 0 ? null : cells[i++];
-
-                // use range to manually force a better bsearch "pivot" by 
breaking it into two calls:
-                // first for i..i+range, then i+range..size if necessary.
-                // 
https://issues.apache.org/jira/browse/CASSANDRA-6933?focusedCommentId=13958264&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13958264
-                int limit = Math.min(size, i + range);
-                int i2 = binarySearch(i + 1, limit, name, 
internalComparator());
-                if (-1 - i2 == limit)
-                    i2 = binarySearch(limit, size, name, internalComparator());
-                // i2 can't be zero since we already checked cells[i] above
-                if (i2 > 0)
-                {
-                    range = i2 - i;
-                    i = i2 + 1;
-                    return cells[i2];
-                }
-                i2 = -1 - i2;
-                range = i2 - i;
-                i = i2;
-                return null;
-            }
-        };
-    }
-
-    private class SlicesIterator extends AbstractIterator<Cell>
-    {
-        private final ColumnSlice[] slices;
-        private final boolean invert;
-
-        private int idx = 0;
-        private int previousSliceEnd;
-        private Iterator<Cell> currentSlice;
-
-        public SlicesIterator(ColumnSlice[] slices, boolean invert)
-        {
-            this.slices = slices;
-            this.invert = invert;
-            previousSliceEnd = invert ? size : 0;
-        }
-
-        protected Cell computeNext()
-        {
-            if (currentSlice == null)
-            {
-                if (idx >= slices.length)
-                    return endOfData();
-                currentSlice = slice(slices[idx++], invert, this);
-            }
-
-            if (currentSlice.hasNext())
-                return currentSlice.next();
-
-            currentSlice = null;
-            return computeNext();
-        }
-    }
-
-    /**
-     * @return a sub-range of our cells as an Iterator, between the provided 
composites (inclusive)
-     *
-     * @param slice  The slice with the inclusive start and finish bounds
-     * @param invert If the sort order of our collection is opposite to the 
desired sort order of the result;
-     *               this results in swapping the start/finish (since they are 
provided based on the desired
-     *               sort order, not our sort order), to normalise to our sort 
order, and a backwards iterator is returned
-     * @param iter   If this slice is part of a multi-slice, the iterator will 
be updated to ensure cells are visited only once
-     */
-    private Iterator<Cell> slice(ColumnSlice slice, boolean invert, 
SlicesIterator iter)
-    {
-        Composite start = invert ? slice.finish : slice.start;
-        Composite finish = invert ? slice.start : slice.finish;
-
-        int lowerBound = 0, upperBound = size;
-        if (iter != null)
-        {
-            if (invert)
-                upperBound = iter.previousSliceEnd;
-            else
-                lowerBound = iter.previousSliceEnd;
-        }
-
-        if (!start.isEmpty())
-        {
-            lowerBound = binarySearch(lowerBound, upperBound, start, 
internalComparator());
-            if (lowerBound < 0)
-                lowerBound = -lowerBound - 1;
-        }
-
-        if (!finish.isEmpty())
-        {
-            upperBound = binarySearch(lowerBound, upperBound, finish, 
internalComparator());
-            upperBound = upperBound < 0
-                       ? -upperBound - 1
-                       : upperBound + 1; // upperBound is exclusive for the 
iterators
-        }
-
-        // If we're going backwards (wrt our sort order) we store the startIdx 
and use it as our upper bound next round
-        if (iter != null)
-            iter.previousSliceEnd = invert ? lowerBound : upperBound;
-
-        return invert
-             ? new BackwardsCellIterator(lowerBound, upperBound)
-             : new ForwardsCellIterator(lowerBound, upperBound);
-    }
-
-    private final class BackwardsCellIterator implements Iterator<Cell>
-    {
-        private int idx, end;
-        private boolean shouldCallNext = true;
-
-        // lowerBound inclusive, upperBound exclusive
-        private BackwardsCellIterator(int lowerBound, int upperBound)
-        {
-            idx = upperBound - 1;
-            end = lowerBound - 1;
-        }
-
-        public boolean hasNext()
-        {
-            return idx > end;
-        }
-
-        public Cell next()
-        {
-            try
-            {
-                shouldCallNext = false;
-                return cells[idx--];
-            }
-            catch (ArrayIndexOutOfBoundsException e)
-            {
-                NoSuchElementException ne = new 
NoSuchElementException(e.getMessage());
-                ne.initCause(e);
-                throw ne;
-            }
-        }
-
-        public void remove()
-        {
-            if (shouldCallNext)
-                throw new IllegalStateException();
-            shouldCallNext = true;
-            internalRemove(idx + 1);
-            sortedSize--;
-        }
-    }
-
-    private final class ForwardsCellIterator implements Iterator<Cell>
-    {
-        private int idx, end;
-        private boolean shouldCallNext = true;
-
-        // lowerBound inclusive, upperBound exclusive
-        private ForwardsCellIterator(int lowerBound, int upperBound)
-        {
-            idx = lowerBound;
-            end = upperBound;
-        }
-
-        public boolean hasNext()
-        {
-            return idx < end;
-        }
-
-        public Cell next()
-        {
-            try
-            {
-                shouldCallNext = false;
-                return cells[idx++];
-            }
-            catch (ArrayIndexOutOfBoundsException e)
-            {
-                NoSuchElementException ne = new 
NoSuchElementException(e.getMessage());
-                ne.initCause(e);
-                throw ne;
-            }
-        }
-
-        public void remove()
-        {
-            if (shouldCallNext)
-                throw new IllegalStateException();
-            shouldCallNext = true;
-            internalRemove(--idx);
-            sortedSize--;
-            end--;
-        }
-    }
-
-    private final class CellCollection extends AbstractCollection<Cell>
-    {
-        private final boolean invert;
-
-        private CellCollection(boolean invert)
-        {
-            this.invert = invert;
-        }
-
-        public int size()
-        {
-            return getColumnCount();
-        }
-
-        public Iterator<Cell> iterator()
-        {
-            maybeSortCells();
-            return invert
-                 ? new BackwardsCellIterator(0, size)
-                 : new ForwardsCellIterator(0, size);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/AtomDeserializer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomDeserializer.java 
b/src/java/org/apache/cassandra/db/AtomDeserializer.java
deleted file mode 100644
index 74f1946..0000000
--- a/src/java/org/apache/cassandra/db/AtomDeserializer.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.io.DataInput;
-import java.io.IOException;
-
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.io.sstable.format.Version;
-
-/**
- * Helper class to deserialize OnDiskAtom efficiently.
- *
- * More precisely, this class is used by the low-level readers
- * (IndexedSliceReader and SSTableNamesIterator) to ensure we don't
- * do more work than necessary (i.e. we don't allocate/deserialize
- * objects for things we don't care about).
- */
-public class AtomDeserializer
-{
-    private final CellNameType type;
-    private final CellNameType.Deserializer nameDeserializer;
-    private final DataInput in;
-    private final ColumnSerializer.Flag flag;
-    private final int expireBefore;
-    private final Version version;
-
-    // The "flag" for the next name (which correspond to the "masks" in 
ColumnSerializer) if it has been
-    // read already, Integer.MIN_VALUE otherwise;
-    private int nextFlags = Integer.MIN_VALUE;
-
-    public AtomDeserializer(CellNameType type, DataInput in, 
ColumnSerializer.Flag flag, int expireBefore, Version version)
-    {
-        this.type = type;
-        this.nameDeserializer = type.newDeserializer(in);
-        this.in = in;
-        this.flag = flag;
-        this.expireBefore = expireBefore;
-        this.version = version;
-    }
-
-    /**
-     * Whether or not there is more atom to read.
-     */
-    public boolean hasNext() throws IOException
-    {
-        return nameDeserializer.hasNext();
-    }
-
-    /**
-     * Whether or not some atom has been read but not processed (neither 
readNext() nor
-     * skipNext() has been called for that atom) yet.
-     */
-    public boolean hasUnprocessed() throws IOException
-    {
-        return nameDeserializer.hasUnprocessed();
-    }
-
-    /**
-     * Compare the provided composite to the next atom to read on disk.
-     *
-     * This will not read/deserialize the whole atom but only what is 
necessary for the
-     * comparison. Whenever we know what to do with this atom (read it or skip 
it),
-     * readNext or skipNext should be called.
-     */
-    public int compareNextTo(Composite composite) throws IOException
-    {
-        return nameDeserializer.compareNextTo(composite);
-    }
-
-    /**
-     * Returns whether the next atom is a range tombstone or not.
-     *
-     * Please note that this should only be called after compareNextTo() has 
been called.
-     */
-    public boolean nextIsRangeTombstone() throws IOException
-    {
-        nextFlags = in.readUnsignedByte();
-        return (nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0;
-    }
-
-    /**
-     * Returns the next atom.
-     */
-    public OnDiskAtom readNext() throws IOException
-    {
-        Composite name = nameDeserializer.readNext();
-        assert !name.isEmpty(); // This would imply hasNext() hasn't been 
called
-
-        nextFlags = nextFlags == Integer.MIN_VALUE ? in.readUnsignedByte() : 
nextFlags;
-        OnDiskAtom atom = (nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) 
!= 0
-                        ? type.rangeTombstoneSerializer().deserializeBody(in, 
name, version)
-                        : type.columnSerializer().deserializeColumnBody(in, 
(CellName)name, nextFlags, flag, expireBefore);
-        nextFlags = Integer.MIN_VALUE;
-        return atom;
-    }
-
-    /**
-     * Skips the next atom.
-     */
-    public void skipNext() throws IOException
-    {
-        nameDeserializer.skipNext();
-        nextFlags = nextFlags == Integer.MIN_VALUE ? in.readUnsignedByte() : 
nextFlags;
-        if ((nextFlags & ColumnSerializer.RANGE_TOMBSTONE_MASK) != 0)
-            type.rangeTombstoneSerializer().skipBody(in, version);
-        else
-            type.columnSerializer().skipColumnBody(in, nextFlags);
-        nextFlags = Integer.MIN_VALUE;
-    }
-}

Reply via email to