http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java 
b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
deleted file mode 100644
index 9ef0c14..0000000
--- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.util.AbstractCollection;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Iterators;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.filter.ColumnSlice;
-import org.apache.cassandra.db.marshal.BytesType;
-import org.apache.cassandra.utils.*;
-import org.apache.cassandra.utils.SearchIterator;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSearchIterator;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-import org.apache.cassandra.utils.concurrent.Locks;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.HeapAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-import org.apache.cassandra.utils.memory.NativePool;
-
-import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
-
-/**
- * A thread-safe and atomic ISortedColumns implementation.
- * Operations (in particular addAll) on this implemenation are atomic and
- * isolated (in the sense of ACID). Typically a addAll is guaranteed that no
- * other thread can see the state where only parts but not all columns have
- * been added.
- * <p>
- * WARNING: removing element through getSortedColumns().iterator() is *not* 
supported
- * </p>
- */
-public class AtomicBTreeColumns extends ColumnFamily
-{
-    static final long EMPTY_SIZE = ObjectSizes.measure(new 
AtomicBTreeColumns(CFMetaData.denseCFMetaData("keyspace", "table", 
BytesType.instance), null))
-            + ObjectSizes.measure(new Holder(null, null));
-
-    // Reserved values for wasteTracker field. These values must not be 
consecutive (see avoidReservedValues)
-    private static final int TRACKER_NEVER_WASTED = 0;
-    private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE;
-
-    // The granularity with which we track wasted allocation/work; we round up
-    private static final int ALLOCATION_GRANULARITY_BYTES = 1024;
-    // The number of bytes we have to waste in excess of our acceptable 
realtime rate of waste (defined below)
-    private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L;
-    private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / 
ALLOCATION_GRANULARITY_BYTES);
-    // Note this is a shift, because dividing a long time and then picking the 
low 32 bits doesn't give correct rollover behavior
-    private static final int CLOCK_SHIFT = 17;
-    // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
-
-    /**
-     * (clock + allocation) granularity are combined to give us an acceptable 
(waste) allocation rate that is defined by
-     * the passage of real time of 
ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 
7.45Mb/s
-     *
-     * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the 
current time; whenever we waste bytes
-     * we increment the current value if it is within this window, and set it 
to the min of the window plus our waste
-     * otherwise.
-     */
-    private volatile int wasteTracker = TRACKER_NEVER_WASTED;
-
-    private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> 
wasteTrackerUpdater = 
AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker");
-
-    private static final Function<Cell, CellName> NAME = new Function<Cell, 
CellName>()
-    {
-        public CellName apply(Cell column)
-        {
-            return column.name();
-        }
-    };
-
-    public static final Factory<AtomicBTreeColumns> factory = new 
Factory<AtomicBTreeColumns>()
-    {
-        public AtomicBTreeColumns create(CFMetaData metadata, boolean 
insertReversed, int initialCapacity)
-        {
-            if (insertReversed)
-                throw new IllegalArgumentException();
-            return new AtomicBTreeColumns(metadata);
-        }
-    };
-
-    private static final DeletionInfo LIVE = DeletionInfo.live();
-    // This is a small optimization: DeletionInfo is mutable, but we know that 
we will always copy it in that class,
-    // so we can safely alias one DeletionInfo.live() reference and avoid some 
allocations.
-    private static final Holder EMPTY = new Holder(BTree.empty(), LIVE);
-
-    private volatile Holder ref;
-
-    private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, 
Holder> refUpdater = 
AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, 
"ref");
-
-    private AtomicBTreeColumns(CFMetaData metadata)
-    {
-        this(metadata, EMPTY);
-    }
-
-    private AtomicBTreeColumns(CFMetaData metadata, Holder holder)
-    {
-        super(metadata);
-        this.ref = holder;
-    }
-
-    public Factory getFactory()
-    {
-        return factory;
-    }
-
-    public ColumnFamily cloneMe()
-    {
-        return new AtomicBTreeColumns(metadata, ref);
-    }
-
-    public DeletionInfo deletionInfo()
-    {
-        return ref.deletionInfo;
-    }
-
-    public void delete(DeletionTime delTime)
-    {
-        delete(new DeletionInfo(delTime));
-    }
-
-    protected void delete(RangeTombstone tombstone)
-    {
-        delete(new DeletionInfo(tombstone, getComparator()));
-    }
-
-    public SearchIterator<CellName, Cell> searchIterator()
-    {
-        return new BTreeSearchIterator<>(ref.tree, asymmetricComparator());
-    }
-
-    public void delete(DeletionInfo info)
-    {
-        if (info.isLive())
-            return;
-
-        // Keeping deletion info for max markedForDeleteAt value
-        while (true)
-        {
-            Holder current = ref;
-            DeletionInfo curDelInfo = current.deletionInfo;
-            DeletionInfo newDelInfo = info.mayModify(curDelInfo) ? 
curDelInfo.copy().add(info) : curDelInfo;
-            if (refUpdater.compareAndSet(this, current, 
current.with(newDelInfo)))
-                break;
-        }
-    }
-
-    public void setDeletionInfo(DeletionInfo newInfo)
-    {
-        ref = ref.with(newInfo);
-    }
-
-    public void purgeTombstones(int gcBefore)
-    {
-        while (true)
-        {
-            Holder current = ref;
-            if (!current.deletionInfo.hasPurgeableTombstones(gcBefore))
-                break;
-
-            DeletionInfo purgedInfo = current.deletionInfo.copy();
-            purgedInfo.purge(gcBefore);
-            if (refUpdater.compareAndSet(this, current, 
current.with(purgedInfo)))
-                break;
-        }
-    }
-
-    /**
-     * This is only called by Memtable.resolve, so only AtomicBTreeColumns 
needs to implement it.
-     *
-     * @return the difference in size seen after merging the given columns
-     */
-    public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, 
MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
-    {
-        ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, 
allocator, writeOp, indexer);
-        DeletionInfo inputDeletionInfoCopy = null;
-
-        boolean monitorOwned = false;
-        try
-        {
-            if (usePessimisticLocking())
-            {
-                Locks.monitorEnterUnsafe(this);
-                monitorOwned = true;
-            }
-            while (true)
-            {
-                Holder current = ref;
-                updater.ref = current;
-                updater.reset();
-
-                DeletionInfo deletionInfo;
-                if (cm.deletionInfo().mayModify(current.deletionInfo))
-                {
-                    if (inputDeletionInfoCopy == null)
-                        inputDeletionInfoCopy = 
cm.deletionInfo().copy(HeapAllocator.instance);
-
-                    deletionInfo = 
current.deletionInfo.copy().add(inputDeletionInfoCopy);
-                    updater.allocated(deletionInfo.unsharedHeapSize() - 
current.deletionInfo.unsharedHeapSize());
-                }
-                else
-                {
-                    deletionInfo = current.deletionInfo;
-                }
-
-                Object[] tree = BTree.update(current.tree, 
metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof 
NativePool), cm, cm.getColumnCount(), true, updater);
-
-                if (tree != null && refUpdater.compareAndSet(this, current, 
new Holder(tree, deletionInfo)))
-                {
-                    indexer.updateRowLevelIndexes();
-                    updater.finish();
-                    return Pair.create(updater.dataSize, 
updater.colUpdateTimeDelta);
-                }
-                else if (!monitorOwned)
-                {
-                    boolean shouldLock = usePessimisticLocking();
-                    if (!shouldLock)
-                    {
-                        shouldLock = 
updateWastedAllocationTracker(updater.heapSize);
-                    }
-                    if (shouldLock)
-                    {
-                        Locks.monitorEnterUnsafe(this);
-                        monitorOwned = true;
-                    }
-                }
-            }
-        }
-        finally
-        {
-            if (monitorOwned)
-                Locks.monitorExitUnsafe(this);
-        }
-    }
-
-    boolean usePessimisticLocking()
-    {
-        return wasteTracker == TRACKER_PESSIMISTIC_LOCKING;
-    }
-
-    /**
-     * Update the wasted allocation tracker state based on newly wasted 
allocation information
-     *
-     * @param wastedBytes the number of bytes wasted by this thread
-     * @return true if the caller should now proceed with pessimistic locking 
because the waste limit has been reached
-     */
-    private boolean updateWastedAllocationTracker(long wastedBytes) {
-        // Early check for huge allocation that exceeds the limit
-        if (wastedBytes < EXCESS_WASTE_BYTES)
-        {
-            // We round up to ensure work < granularity are still accounted for
-            int wastedAllocation = ((int) (wastedBytes + 
ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES;
-
-            int oldTrackerValue;
-            while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = 
wasteTracker))
-            {
-                // Note this time value has an arbitrary offset, but is a 
constant rate 32 bit counter (that may wrap)
-                int time = (int) (System.nanoTime() >>> CLOCK_SHIFT);
-                int delta = oldTrackerValue - time;
-                if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || 
delta < -EXCESS_WASTE_OFFSET)
-                    delta = -EXCESS_WASTE_OFFSET;
-                delta += wastedAllocation;
-                if (delta >= 0)
-                    break;
-                if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, 
avoidReservedValues(time + delta)))
-                    return false;
-            }
-        }
-        // We have definitely reached our waste limit so set the state if it 
isn't already
-        wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING);
-        // And tell the caller to proceed with pessimistic locking
-        return true;
-    }
-
-    private static int avoidReservedValues(int wasteTracker)
-    {
-        if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == 
TRACKER_PESSIMISTIC_LOCKING)
-            return wasteTracker + 1;
-        return wasteTracker;
-    }
-
-    // no particular reason not to implement these next methods, we just 
haven't needed them yet
-
-    public void addColumn(Cell column)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester 
tester, int gcBefore)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void appendColumn(Cell cell)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void addAll(ColumnFamily cf)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public void clear()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    public Cell getColumn(CellName name)
-    {
-        return (Cell) BTree.find(ref.tree, asymmetricComparator(), name);
-    }
-
-    private Comparator<Object> asymmetricComparator()
-    {
-        return 
metadata.comparator.asymmetricColumnComparator(Memtable.MEMORY_POOL instanceof 
NativePool);
-    }
-
-    public Iterable<CellName> getColumnNames()
-    {
-        return collection(false, NAME);
-    }
-
-    public Collection<Cell> getSortedColumns()
-    {
-        return collection(true, Functions.<Cell>identity());
-    }
-
-    public Collection<Cell> getReverseSortedColumns()
-    {
-        return collection(false, Functions.<Cell>identity());
-    }
-
-    private <V> Collection<V> collection(final boolean forwards, final 
Function<Cell, V> f)
-    {
-        final Holder ref = this.ref;
-        return new AbstractCollection<V>()
-        {
-            public Iterator<V> iterator()
-            {
-                return Iterators.transform(BTree.<Cell>slice(ref.tree, 
forwards), f);
-            }
-
-            public int size()
-            {
-                return BTree.slice(ref.tree, true).count();
-            }
-        };
-    }
-
-    public int getColumnCount()
-    {
-        return BTree.slice(ref.tree, true).count();
-    }
-
-    public boolean hasColumns()
-    {
-        return !BTree.isEmpty(ref.tree);
-    }
-
-    public Iterator<Cell> iterator(ColumnSlice[] slices)
-    {
-        return slices.length == 1
-             ? slice(ref.tree, asymmetricComparator(), slices[0].start, 
slices[0].finish, true)
-             : new SliceIterator(ref.tree, asymmetricComparator(), true, 
slices);
-    }
-
-    public Iterator<Cell> reverseIterator(ColumnSlice[] slices)
-    {
-        return slices.length == 1
-             ? slice(ref.tree, asymmetricComparator(), slices[0].finish, 
slices[0].start, false)
-             : new SliceIterator(ref.tree, asymmetricComparator(), false, 
slices);
-    }
-
-    public boolean isInsertReversed()
-    {
-        return false;
-    }
-
-    public BatchRemoveIterator<Cell> batchRemoveIterator()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    private static final class Holder
-    {
-        final DeletionInfo deletionInfo;
-        // the btree of columns
-        final Object[] tree;
-
-        Holder(Object[] tree, DeletionInfo deletionInfo)
-        {
-            this.tree = tree;
-            this.deletionInfo = deletionInfo;
-        }
-
-        Holder with(DeletionInfo info)
-        {
-            return new Holder(this.tree, info);
-        }
-    }
-
-    // the function we provide to the btree utilities to perform any column 
replacements
-    private static final class ColumnUpdater implements UpdateFunction<Cell>
-    {
-        final AtomicBTreeColumns updating;
-        final CFMetaData metadata;
-        final MemtableAllocator allocator;
-        final OpOrder.Group writeOp;
-        final Updater indexer;
-        Holder ref;
-        long dataSize;
-        long heapSize;
-        long colUpdateTimeDelta = Long.MAX_VALUE;
-        final MemtableAllocator.DataReclaimer reclaimer;
-        List<Cell> inserted; // TODO: replace with walk of aborted BTree
-
-        private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData 
metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer)
-        {
-            this.updating = updating;
-            this.allocator = allocator;
-            this.writeOp = writeOp;
-            this.indexer = indexer;
-            this.metadata = metadata;
-            this.reclaimer = allocator.reclaimer();
-        }
-
-        public Cell apply(Cell insert)
-        {
-            indexer.insert(insert);
-            insert = insert.localCopy(metadata, allocator, writeOp);
-            this.dataSize += insert.cellDataSize();
-            this.heapSize += insert.unsharedHeapSizeExcludingData();
-            if (inserted == null)
-                inserted = new ArrayList<>();
-            inserted.add(insert);
-            return insert;
-        }
-
-        public Cell apply(Cell existing, Cell update)
-        {
-            Cell reconciled = existing.reconcile(update);
-            indexer.update(existing, reconciled);
-            if (existing != reconciled)
-            {
-                reconciled = reconciled.localCopy(metadata, allocator, 
writeOp);
-                dataSize += reconciled.cellDataSize() - 
existing.cellDataSize();
-                heapSize += reconciled.unsharedHeapSizeExcludingData() - 
existing.unsharedHeapSizeExcludingData();
-                if (inserted == null)
-                    inserted = new ArrayList<>();
-                inserted.add(reconciled);
-                discard(existing);
-                //Getting the minimum delta for an update containing multiple 
columns
-                colUpdateTimeDelta =  Math.min(Math.abs(existing.timestamp()  
- update.timestamp()), colUpdateTimeDelta);
-            }
-            return reconciled;
-        }
-
-        protected void reset()
-        {
-            this.dataSize = 0;
-            this.heapSize = 0;
-            if (inserted != null)
-            {
-                for (Cell cell : inserted)
-                    abort(cell);
-                inserted.clear();
-            }
-            reclaimer.cancel();
-        }
-
-        protected void abort(Cell abort)
-        {
-            reclaimer.reclaimImmediately(abort);
-        }
-
-        protected void discard(Cell discard)
-        {
-            reclaimer.reclaim(discard);
-        }
-
-        public boolean abortEarly()
-        {
-            return updating.ref != ref;
-        }
-
-        public void allocated(long heapSize)
-        {
-            this.heapSize += heapSize;
-        }
-
-        protected void finish()
-        {
-            allocator.onHeap().allocate(heapSize, writeOp);
-            reclaimer.commit();
-        }
-    }
-
-    private static class SliceIterator extends AbstractIterator<Cell>
-    {
-        private final Object[] btree;
-        private final boolean forwards;
-        private final Comparator<Object> comparator;
-        private final ColumnSlice[] slices;
-
-        private int idx = 0;
-        private Iterator<Cell> currentSlice;
-
-        SliceIterator(Object[] btree, Comparator<Object> comparator, boolean 
forwards, ColumnSlice[] slices)
-        {
-            this.btree = btree;
-            this.comparator = comparator;
-            this.slices = slices;
-            this.forwards = forwards;
-        }
-
-        protected Cell computeNext()
-        {
-            while (currentSlice != null || idx < slices.length)
-            {
-                if (currentSlice == null)
-                {
-                    ColumnSlice slice = slices[idx++];
-                    if (forwards)
-                        currentSlice = slice(btree, comparator, slice.start, 
slice.finish, true);
-                    else
-                        currentSlice = slice(btree, comparator, slice.finish, 
slice.start, false);
-                }
-
-                if (currentSlice.hasNext())
-                    return currentSlice.next();
-
-                currentSlice = null;
-            }
-
-            return endOfData();
-        }
-    }
-
-    private static Iterator<Cell> slice(Object[] btree, Comparator<Object> 
comparator, Composite start, Composite finish, boolean forwards)
-    {
-        return BTree.slice(btree,
-                           comparator,
-                           start.isEmpty() ? null : start,
-                           true,
-                           finish.isEmpty() ? null : finish,
-                           true,
-                           forwards);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BatchlogManager.java 
b/src/java/org/apache/cassandra/db/BatchlogManager.java
index 6038475..83a0654 100644
--- a/src/java/org/apache/cassandra/db/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.marshal.UUIDType;
 import org.apache.cassandra.dht.Token;
@@ -138,12 +139,12 @@ public class BatchlogManager implements 
BatchlogManagerMBean
     @VisibleForTesting
     static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, 
UUID uuid, int version, long now)
     {
-        ColumnFamily cf = 
ArrayBackedSortedColumns.factory.create(SystemKeyspace.Batchlog);
-        CFRowAdder adder = new CFRowAdder(cf, 
SystemKeyspace.Batchlog.comparator.builder().build(), now);
-        adder.add("data", serializeMutations(mutations, version))
-             .add("written_at", new Date(now / 1000))
-             .add("version", version);
-        return new Mutation(SystemKeyspace.NAME, 
UUIDType.instance.decompose(uuid), cf);
+        return new RowUpdateBuilder(SystemKeyspace.Batchlog, now, uuid)
+               .clustering()
+               .add("data", serializeMutations(mutations, version))
+               .add("written_at", new Date(now / 1000))
+               .add("version", version)
+               .build();
     }
 
     private static ByteBuffer serializeMutations(Collection<Mutation> 
mutations, int version)
@@ -186,7 +187,7 @@ public class BatchlogManager implements BatchlogManagerMBean
                                                  SystemKeyspace.NAME,
                                                  SystemKeyspace.BATCHLOG,
                                                  PAGE_SIZE),
-                                   id);
+                                                 id);
         }
 
         cleanup();
@@ -196,8 +197,8 @@ public class BatchlogManager implements BatchlogManagerMBean
 
     private void deleteBatch(UUID id)
     {
-        Mutation mutation = new Mutation(SystemKeyspace.NAME, 
UUIDType.instance.decompose(id));
-        mutation.delete(SystemKeyspace.BATCHLOG, 
FBUtilities.timestampMicros());
+        Mutation mutation = new Mutation(SystemKeyspace.NAME, 
StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(id)));
+        
mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, 
mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
         mutation.apply();
     }
 
@@ -382,7 +383,7 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             Set<InetAddress> liveEndpoints = new HashSet<>();
             String ks = mutation.getKeyspaceName();
-            Token tk = 
StorageService.getPartitioner().getToken(mutation.key());
+            Token tk = mutation.key().getToken();
 
             for (InetAddress endpoint : 
Iterables.concat(StorageService.instance.getNaturalEndpoints(ks, tk),
                                                          
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, ks)))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCell.java 
b/src/java/org/apache/cassandra/db/BufferCell.java
deleted file mode 100644
index a7d632d..0000000
--- a/src/java/org/apache/cassandra/db/BufferCell.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNames;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-public class BufferCell extends AbstractCell
-{
-    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
BufferCell(CellNames.simpleDense(ByteBuffer.allocate(1))));
-
-    protected final CellName name;
-    protected final ByteBuffer value;
-    protected final long timestamp;
-
-    BufferCell(CellName name)
-    {
-        this(name, ByteBufferUtil.EMPTY_BYTE_BUFFER);
-    }
-
-    public BufferCell(CellName name, ByteBuffer value)
-    {
-        this(name, value, 0);
-    }
-
-    public BufferCell(CellName name, ByteBuffer value, long timestamp)
-    {
-        assert name != null;
-        assert value != null;
-
-        this.name = name;
-        this.value = value;
-        this.timestamp = timestamp;
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new BufferCell(newName, value, timestamp);
-    }
-
-    @Override
-    public Cell withUpdatedTimestamp(long newTimestamp)
-    {
-        return new BufferCell(name, value, newTimestamp);
-    }
-
-    @Override
-    public CellName name() {
-        return name;
-    }
-
-    @Override
-    public ByteBuffer value() {
-        return value;
-    }
-
-    @Override
-    public long timestamp() {
-        return timestamp;
-    }
-
-    @Override
-    public long unsharedHeapSizeExcludingData()
-    {
-        return EMPTY_SIZE + name.unsharedHeapSizeExcludingData() + 
ObjectSizes.sizeOnHeapExcludingData(value);
-    }
-
-    @Override
-    public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
-    {
-        return new BufferCell(name.copy(metadata, allocator), 
allocator.clone(value), timestamp);
-    }
-
-    @Override
-    public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, 
OpOrder.Group opGroup)
-    {
-        return allocator.clone(this, metadata, opGroup);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferCounterCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCounterCell.java 
b/src/java/org/apache/cassandra/db/BufferCounterCell.java
deleted file mode 100644
index 827182a..0000000
--- a/src/java/org/apache/cassandra/db/BufferCounterCell.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-public class BufferCounterCell extends BufferCell implements CounterCell
-{
-    private final long timestampOfLastDelete;
-
-    public BufferCounterCell(CellName name, ByteBuffer value, long timestamp)
-    {
-        this(name, value, timestamp, Long.MIN_VALUE);
-    }
-
-    public BufferCounterCell(CellName name, ByteBuffer value, long timestamp, 
long timestampOfLastDelete)
-    {
-        super(name, value, timestamp);
-        this.timestampOfLastDelete = timestampOfLastDelete;
-    }
-
-    public static CounterCell create(CellName name, ByteBuffer value, long 
timestamp, long timestampOfLastDelete, ColumnSerializer.Flag flag)
-    {
-        if (flag == ColumnSerializer.Flag.FROM_REMOTE || (flag == 
ColumnSerializer.Flag.LOCAL && contextManager.shouldClearLocal(value)))
-            value = contextManager.clearAllLocal(value);
-        return new BufferCounterCell(name, value, timestamp, 
timestampOfLastDelete);
-    }
-
-    // For use by tests of compatibility with pre-2.1 counter only.
-    public static CounterCell createLocal(CellName name, long value, long 
timestamp, long timestampOfLastDelete)
-    {
-        return new BufferCounterCell(name, contextManager.createLocal(value), 
timestamp, timestampOfLastDelete);
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new BufferCounterCell(newName, value, timestamp, 
timestampOfLastDelete);
-    }
-
-    @Override
-    public long timestampOfLastDelete()
-    {
-        return timestampOfLastDelete;
-    }
-
-    @Override
-    public long total()
-    {
-        return contextManager.total(value);
-    }
-
-    @Override
-    public int cellDataSize()
-    {
-        // A counter column adds 8 bytes for timestampOfLastDelete to Cell.
-        return super.cellDataSize() + 
TypeSizes.NATIVE.sizeof(timestampOfLastDelete);
-    }
-
-    @Override
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        return super.serializedSize(type, typeSizes) + 
typeSizes.sizeof(timestampOfLastDelete);
-    }
-
-    @Override
-    public Cell diff(Cell cell)
-    {
-        return diffCounter(cell);
-    }
-
-    /*
-     * We have to special case digest creation for counter column because
-     * we don't want to include the information about which shard of the
-     * context is a delta or not, since this information differs from node to
-     * node.
-     */
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name().toByteBuffer().duplicate());
-        // We don't take the deltas into account in a digest
-        contextManager.updateDigest(digest, value());
-
-        FBUtilities.updateWithLong(digest, timestamp);
-        FBUtilities.updateWithByte(digest, serializationFlags());
-        FBUtilities.updateWithLong(digest, timestampOfLastDelete);
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        return reconcileCounter(cell);
-    }
-
-    @Override
-    public boolean hasLegacyShards()
-    {
-        return contextManager.hasLegacyShards(value);
-    }
-
-    @Override
-    public CounterCell localCopy(CFMetaData metadata, AbstractAllocator 
allocator)
-    {
-        return new BufferCounterCell(name.copy(metadata, allocator), 
allocator.clone(value), timestamp, timestampOfLastDelete);
-    }
-
-    @Override
-    public CounterCell localCopy(CFMetaData metadata, MemtableAllocator 
allocator, OpOrder.Group opGroup)
-    {
-        return allocator.clone(this, metadata, opGroup);
-    }
-
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s:false:%s@%d!%d",
-                             comparator.getString(name()),
-                             contextManager.toString(value()),
-                             timestamp(),
-                             timestampOfLastDelete);
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.COUNTER_MASK;
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-        // We cannot use the value validator as for other columns as the 
CounterColumnType validate a long,
-        // which is not the internal representation of counters
-        contextManager.validateContext(value());
-    }
-
-    @Override
-    public Cell markLocalToBeCleared()
-    {
-        ByteBuffer marked = contextManager.markLocalToBeCleared(value());
-        return marked == value() ? this : new BufferCounterCell(name(), 
marked, timestamp(), timestampOfLastDelete);
-    }
-
-    @Override
-    public boolean equals(Cell cell)
-    {
-        return super.equals(cell) && timestampOfLastDelete == ((CounterCell) 
cell).timestampOfLastDelete();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java 
b/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
deleted file mode 100644
index f7df3ea..0000000
--- a/src/java/org/apache/cassandra/db/BufferCounterUpdateCell.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-public class BufferCounterUpdateCell extends BufferCell implements 
CounterUpdateCell
-{
-    public BufferCounterUpdateCell(CellName name, long value, long timestamp)
-    {
-        this(name, ByteBufferUtil.bytes(value), timestamp);
-    }
-
-    public BufferCounterUpdateCell(CellName name, ByteBuffer value, long 
timestamp)
-    {
-        super(name, value, timestamp);
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new BufferCounterUpdateCell(newName, value, timestamp);
-    }
-
-    public long delta()
-    {
-        return value().getLong(value.position());
-    }
-
-    @Override
-    public Cell diff(Cell cell)
-    {
-        // Diff is used during reads, but we should never read those columns
-        throw new UnsupportedOperationException("This operation is unsupported 
on CounterUpdateCell.");
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        // No matter what the counter cell's timestamp is, a tombstone always 
takes precedence. See CASSANDRA-7346.
-        if (cell instanceof DeletedCell)
-            return cell;
-
-        assert cell instanceof CounterUpdateCell : "Wrong class type.";
-
-        // The only time this could happen is if a batch ships two increments 
for the same cell. Hence we simply sum the deltas.
-        return new BufferCounterUpdateCell(name, delta() + 
((CounterUpdateCell) cell).delta(), Math.max(timestamp, cell.timestamp()));
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.COUNTER_UPDATE_MASK;
-    }
-
-    @Override
-    public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public Cell localCopy(CFMetaData metadata, MemtableAllocator allocator, 
OpOrder.Group opGroup)
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s:%s@%d", comparator.getString(name()), 
ByteBufferUtil.toLong(value), timestamp());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferDeletedCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferDeletedCell.java 
b/src/java/org/apache/cassandra/db/BufferDeletedCell.java
deleted file mode 100644
index a38f322..0000000
--- a/src/java/org/apache/cassandra/db/BufferDeletedCell.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-public class BufferDeletedCell extends BufferCell implements DeletedCell
-{
-    public BufferDeletedCell(CellName name, int localDeletionTime, long 
timestamp)
-    {
-        this(name, ByteBufferUtil.bytes(localDeletionTime), timestamp);
-    }
-
-    public BufferDeletedCell(CellName name, ByteBuffer value, long timestamp)
-    {
-        super(name, value, timestamp);
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new BufferDeletedCell(newName, value, timestamp);
-    }
-
-    @Override
-    public Cell withUpdatedTimestamp(long newTimestamp)
-    {
-        return new BufferDeletedCell(name, value, newTimestamp);
-    }
-
-    @Override
-    public boolean isLive()
-    {
-        return false;
-    }
-
-    @Override
-    public boolean isLive(long now)
-    {
-        return false;
-    }
-
-    @Override
-    public int getLocalDeletionTime()
-    {
-       return value().getInt(value.position());
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        if (cell instanceof DeletedCell)
-            return super.reconcile(cell);
-        return cell.reconcile(this);
-    }
-
-    @Override
-    public DeletedCell localCopy(CFMetaData metadata, AbstractAllocator 
allocator)
-    {
-        return new BufferDeletedCell(name.copy(metadata, allocator), 
allocator.clone(value), timestamp);
-    }
-
-    @Override
-    public DeletedCell localCopy(CFMetaData metadata, MemtableAllocator 
allocator, OpOrder.Group opGroup)
-    {
-        return allocator.clone(this, metadata, opGroup);
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.DELETION_MASK;
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        validateName(metadata);
-        if (value().remaining() != 4)
-            throw new MarshalException("A tombstone value should be 4 bytes 
long");
-        if (getLocalDeletionTime() < 0)
-            throw new MarshalException("The local deletion time should not be 
negative");
-    }
-
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        digest.update(name().toByteBuffer().duplicate());
-
-        FBUtilities.updateWithLong(digest, timestamp());
-        FBUtilities.updateWithByte(digest, serializationFlags());
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/BufferExpiringCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/BufferExpiringCell.java 
b/src/java/org/apache/cassandra/db/BufferExpiringCell.java
deleted file mode 100644
index efb56d5..0000000
--- a/src/java/org/apache/cassandra/db/BufferExpiringCell.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.serializers.MarshalException;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-public class BufferExpiringCell extends BufferCell implements ExpiringCell
-{
-    private final int localExpirationTime;
-    private final int timeToLive;
-
-    public BufferExpiringCell(CellName name, ByteBuffer value, long timestamp, 
int timeToLive)
-    {
-        this(name, value, timestamp, timeToLive, (int) 
(System.currentTimeMillis() / 1000) + timeToLive);
-    }
-
-    public BufferExpiringCell(CellName name, ByteBuffer value, long timestamp, 
int timeToLive, int localExpirationTime)
-    {
-        super(name, value, timestamp);
-        assert timeToLive > 0 : timeToLive;
-        assert localExpirationTime > 0 : localExpirationTime;
-        this.timeToLive = timeToLive;
-        this.localExpirationTime = localExpirationTime;
-    }
-
-    public int getTimeToLive()
-    {
-        return timeToLive;
-    }
-
-    @Override
-    public Cell withUpdatedName(CellName newName)
-    {
-        return new BufferExpiringCell(newName, value(), timestamp(), 
timeToLive, localExpirationTime);
-    }
-
-    @Override
-    public Cell withUpdatedTimestamp(long newTimestamp)
-    {
-        return new BufferExpiringCell(name(), value(), newTimestamp, 
timeToLive, localExpirationTime);
-    }
-
-    @Override
-    public int cellDataSize()
-    {
-        return super.cellDataSize() + 
TypeSizes.NATIVE.sizeof(localExpirationTime) + 
TypeSizes.NATIVE.sizeof(timeToLive);
-    }
-
-    @Override
-    public int serializedSize(CellNameType type, TypeSizes typeSizes)
-    {
-        /*
-         * An expired column adds to a Cell :
-         *    4 bytes for the localExpirationTime
-         *  + 4 bytes for the timeToLive
-        */
-        return super.serializedSize(type, typeSizes) + 
typeSizes.sizeof(localExpirationTime) + typeSizes.sizeof(timeToLive);
-    }
-
-    @Override
-    public void updateDigest(MessageDigest digest)
-    {
-        super.updateDigest(digest);
-        FBUtilities.updateWithInt(digest, timeToLive);
-    }
-
-    @Override
-    public int getLocalDeletionTime()
-    {
-        return localExpirationTime;
-    }
-
-    @Override
-    public ExpiringCell localCopy(CFMetaData metadata, AbstractAllocator 
allocator)
-    {
-        return new BufferExpiringCell(name.copy(metadata, allocator), 
allocator.clone(value), timestamp, timeToLive, localExpirationTime);
-    }
-
-    @Override
-    public ExpiringCell localCopy(CFMetaData metadata, MemtableAllocator 
allocator, OpOrder.Group opGroup)
-    {
-        return allocator.clone(this, metadata, opGroup);
-    }
-
-    @Override
-    public String getString(CellNameType comparator)
-    {
-        return String.format("%s!%d", super.getString(comparator), timeToLive);
-    }
-
-    @Override
-    public boolean isLive()
-    {
-        return isLive(System.currentTimeMillis());
-    }
-
-    @Override
-    public boolean isLive(long now)
-    {
-        return (int) (now / 1000) < getLocalDeletionTime();
-    }
-
-    @Override
-    public int serializationFlags()
-    {
-        return ColumnSerializer.EXPIRATION_MASK;
-    }
-
-    @Override
-    public void validateFields(CFMetaData metadata) throws MarshalException
-    {
-        super.validateFields(metadata);
-
-        if (timeToLive <= 0)
-            throw new MarshalException("A column TTL should be > 0, but was " 
+ timeToLive);
-        if (localExpirationTime < 0)
-            throw new MarshalException("The local expiration time should not 
be negative but was " + localExpirationTime);
-    }
-
-    @Override
-    public Cell reconcile(Cell cell)
-    {
-        long ts1 = timestamp(), ts2 = cell.timestamp();
-        if (ts1 != ts2)
-            return ts1 < ts2 ? cell : this;
-        // we should prefer tombstones
-        if (cell instanceof DeletedCell)
-            return cell;
-        int c = value().compareTo(cell.value());
-        if (c != 0)
-            return c < 0 ? cell : this;
-        // If we have same timestamp and value, prefer the longest ttl
-        if (cell instanceof ExpiringCell)
-        {
-            int let1 = localExpirationTime, let2 = cell.getLocalDeletionTime();
-            if (let1 < let2)
-                return cell;
-        }
-        return this;
-    }
-
-    @Override
-    public boolean equals(Cell cell)
-    {
-        if (!super.equals(cell))
-            return false;
-        ExpiringCell that = (ExpiringCell) cell;
-        return getLocalDeletionTime() == that.getLocalDeletionTime() && 
getTimeToLive() == that.getTimeToLive();
-    }
-
-    /** @return Either a DeletedCell, or an ExpiringCell. */
-    public static Cell create(CellName name, ByteBuffer value, long timestamp, 
int timeToLive, int localExpirationTime, int expireBefore, 
ColumnSerializer.Flag flag)
-    {
-        if (localExpirationTime >= expireBefore || flag == 
ColumnSerializer.Flag.PRESERVE_SIZE)
-            return new BufferExpiringCell(name, value, timestamp, timeToLive, 
localExpirationTime);
-        // The column is now expired, we can safely return a simple tombstone. 
Note that
-        // as long as the expiring column and the tombstone put together live 
longer than GC grace seconds,
-        // we'll fulfil our responsibility to repair.  See discussion at
-        // 
http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
-        return new BufferDeletedCell(name, localExpirationTime - timeToLive, 
timestamp);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CBuilder.java 
b/src/java/org/apache/cassandra/db/CBuilder.java
new file mode 100644
index 0000000..56cabf1
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/CBuilder.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
+/**
+ * Allows to build ClusteringPrefixes, either Clustering or Slice.Bound.
+ */
+public abstract class CBuilder
+{
+    public static CBuilder STATIC_BUILDER = new CBuilder()
+    {
+        public int count()
+        {
+            return 0;
+        }
+
+        public int remainingCount()
+        {
+            return 0;
+        }
+
+        public ClusteringComparator comparator()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public CBuilder add(ByteBuffer value)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public CBuilder add(Object value)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public Clustering build()
+        {
+            return Clustering.STATIC_CLUSTERING;
+        }
+
+        public Slice.Bound buildBound(boolean isStart, boolean isInclusive)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public Slice buildSlice()
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public Clustering buildWith(ByteBuffer value)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public Clustering buildWith(List<ByteBuffer> newValues)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, 
boolean isInclusive)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public Slice.Bound buildBoundWith(List<ByteBuffer> newValues, boolean 
isStart, boolean isInclusive)
+        {
+            throw new UnsupportedOperationException();
+        }
+    };
+
+    public static CBuilder create(ClusteringComparator comparator)
+    {
+        return new ArrayBackedBuilder(comparator);
+    }
+
+    public abstract int count();
+    public abstract int remainingCount();
+    public abstract ClusteringComparator comparator();
+    public abstract CBuilder add(ByteBuffer value);
+    public abstract CBuilder add(Object value);
+    public abstract Clustering build();
+    public abstract Slice.Bound buildBound(boolean isStart, boolean 
isInclusive);
+    public abstract Slice buildSlice();
+    public abstract Clustering buildWith(ByteBuffer value);
+    public abstract Clustering buildWith(List<ByteBuffer> newValues);
+    public abstract Slice.Bound buildBoundWith(ByteBuffer value, boolean 
isStart, boolean isInclusive);
+    public abstract Slice.Bound buildBoundWith(List<ByteBuffer> newValues, 
boolean isStart, boolean isInclusive);
+
+    private static class ArrayBackedBuilder extends CBuilder
+    {
+        private final ClusteringComparator type;
+        private final ByteBuffer[] values;
+        private int size;
+        private boolean built;
+
+        public ArrayBackedBuilder(ClusteringComparator type)
+        {
+            this.type = type;
+            this.values = new ByteBuffer[type.size()];
+        }
+
+        public int count()
+        {
+            return size;
+        }
+
+        public int remainingCount()
+        {
+            return values.length - size;
+        }
+
+        public ClusteringComparator comparator()
+        {
+            return type;
+        }
+
+        public CBuilder add(ByteBuffer value)
+        {
+            if (isDone())
+                throw new IllegalStateException();
+            values[size++] = value;
+            return this;
+        }
+
+        public CBuilder add(Object value)
+        {
+            return add(((AbstractType)type.subtype(size)).decompose(value));
+        }
+
+        private boolean isDone()
+        {
+            return remainingCount() == 0 || built;
+        }
+
+        public Clustering build()
+        {
+            // We don't allow to add more element to a builder that has been 
built so
+            // that we don't have to copy values.
+            built = true;
+
+            // Currently, only dense table can leave some clustering column 
out (see #7990)
+            return size == 0 ? Clustering.EMPTY : new SimpleClustering(values);
+        }
+
+        public Slice.Bound buildBound(boolean isStart, boolean isInclusive)
+        {
+            // We don't allow to add more element to a builder that has been 
built so
+            // that we don't have to copy values (even though we have to do it 
in most cases).
+            built = true;
+
+            if (size == 0)
+                return isStart ? Slice.Bound.BOTTOM : Slice.Bound.TOP;
+
+            return Slice.Bound.create(Slice.Bound.boundKind(isStart, 
isInclusive),
+                                      size == values.length ? values : 
Arrays.copyOfRange(values, 0, size));
+        }
+
+        public Slice buildSlice()
+        {
+            // We don't allow to add more element to a builder that has been 
built so
+            // that we don't have to copy values.
+            built = true;
+
+            if (size == 0)
+                return Slice.ALL;
+
+            return Slice.make(buildBound(true, true), buildBound(false, true));
+        }
+
+        public Clustering buildWith(ByteBuffer value)
+        {
+            assert size+1 == type.size();
+
+            ByteBuffer[] newValues = Arrays.copyOf(values, size+1);
+            newValues[size] = value;
+            return new SimpleClustering(newValues);
+        }
+
+        public Clustering buildWith(List<ByteBuffer> newValues)
+        {
+            assert size + newValues.size() == type.size();
+            ByteBuffer[] buffers = Arrays.copyOf(values, size + 
newValues.size());
+            int newSize = size;
+            for (ByteBuffer value : newValues)
+                buffers[newSize++] = value;
+
+            return new SimpleClustering(buffers);
+        }
+
+        public Slice.Bound buildBoundWith(ByteBuffer value, boolean isStart, 
boolean isInclusive)
+        {
+            ByteBuffer[] newValues = Arrays.copyOf(values, size+1);
+            newValues[size] = value;
+            return Slice.Bound.create(Slice.Bound.boundKind(isStart, 
isInclusive), newValues);
+        }
+
+        public Slice.Bound buildBoundWith(List<ByteBuffer> newValues, boolean 
isStart, boolean isInclusive)
+        {
+            ByteBuffer[] buffers = Arrays.copyOf(values, size + 
newValues.size());
+            int newSize = size;
+            for (ByteBuffer value : newValues)
+                buffers[newSize++] = value;
+
+            return Slice.Bound.create(Slice.Bound.boundKind(isStart, 
isInclusive), buffers);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java 
b/src/java/org/apache/cassandra/db/CFRowAdder.java
deleted file mode 100644
index 6fab8d5..0000000
--- a/src/java/org/apache/cassandra/db/CFRowAdder.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.marshal.CollectionType;
-import org.apache.cassandra.db.marshal.ListType;
-import org.apache.cassandra.db.marshal.MapType;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.UUIDGen;
-
-/**
- * Convenience object to populate a given CQL3 row in a ColumnFamily object.
- *
- * This is meant for when performance is not of the utmost importance. When
- * performance matters, it might be worth allocating such builder.
- */
-public class CFRowAdder
-{
-    public final ColumnFamily cf;
-    public final Composite prefix;
-    public final long timestamp;
-    public final int ttl;
-    private final int ldt;
-
-    public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp)
-    {
-        this(cf, prefix, timestamp, 0);
-    }
-
-    public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp, int 
ttl)
-    {
-        this.cf = cf;
-        this.prefix = prefix;
-        this.timestamp = timestamp;
-        this.ttl = ttl;
-        this.ldt = (int) (System.currentTimeMillis() / 1000);
-
-        // If a CQL3 table, add the row marker
-        if (cf.metadata().isCQL3Table() && !prefix.isStatic())
-            cf.addColumn(new BufferCell(cf.getComparator().rowMarker(prefix), 
ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
-    }
-
-    public CFRowAdder add(String cql3ColumnName, Object value)
-    {
-        ColumnDefinition def = getDefinition(cql3ColumnName);
-        return add(cf.getComparator().create(prefix, def), def, value);
-    }
-
-    public CFRowAdder resetCollection(String cql3ColumnName)
-    {
-        ColumnDefinition def = getDefinition(cql3ColumnName);
-        assert def.type.isCollection() && def.type.isMultiCell();
-        Composite name = cf.getComparator().create(prefix, def);
-        cf.addAtom(new RangeTombstone(name.start(), name.end(), timestamp - 1, 
ldt));
-        return this;
-    }
-
-    public CFRowAdder addMapEntry(String cql3ColumnName, Object key, Object 
value)
-    {
-        ColumnDefinition def = getDefinition(cql3ColumnName);
-        assert def.type instanceof MapType;
-        MapType mt = (MapType)def.type;
-        CellName name = cf.getComparator().create(prefix, def, 
mt.getKeysType().decompose(key));
-        return add(name, def, value);
-    }
-
-    public CFRowAdder addListEntry(String cql3ColumnName, Object value)
-    {
-        ColumnDefinition def = getDefinition(cql3ColumnName);
-        assert def.type instanceof ListType;
-        CellName name = cf.getComparator().create(prefix, def, 
ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()));
-        return add(name, def, value);
-    }
-
-    private ColumnDefinition getDefinition(String name)
-    {
-        return cf.metadata().getColumnDefinition(new ColumnIdentifier(name, 
false));
-    }
-
-    private CFRowAdder add(CellName name, ColumnDefinition def, Object value)
-    {
-        if (value == null)
-        {
-            cf.addColumn(new BufferDeletedCell(name, ldt, timestamp));
-        }
-        else
-        {
-            AbstractType valueType = def.type.isCollection()
-                                   ? ((CollectionType) 
def.type).valueComparator()
-                                   : def.type;
-            ByteBuffer valueBytes = value instanceof ByteBuffer ? 
(ByteBuffer)value : valueType.decompose(value);
-            if (ttl == 0)
-                cf.addColumn(new BufferCell(name, valueBytes, timestamp));
-            else
-                cf.addColumn(new BufferExpiringCell(name, valueBytes, 
timestamp, ttl));
-        }
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Cell.java 
b/src/java/org/apache/cassandra/db/Cell.java
deleted file mode 100644
index 7c3926a..0000000
--- a/src/java/org/apache/cassandra/db/Cell.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db;
-
-import java.nio.ByteBuffer;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.composites.CellName;
-import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.utils.concurrent.OpOrder;
-import org.apache.cassandra.utils.memory.AbstractAllocator;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.memory.MemtableAllocator;
-
-/**
- * Cell is immutable, which prevents all kinds of confusion in a multithreaded 
environment.
- */
-public interface Cell extends OnDiskAtom
-{
-    public static final int MAX_NAME_LENGTH = FBUtilities.MAX_UNSIGNED_SHORT;
-
-    public Cell withUpdatedName(CellName newName);
-
-    public Cell withUpdatedTimestamp(long newTimestamp);
-
-    @Override
-    public CellName name();
-
-    public ByteBuffer value();
-
-    public boolean isLive();
-
-    public boolean isLive(long now);
-
-    public int cellDataSize();
-
-    // returns the size of the Cell and all references on the heap, excluding 
any costs associated with byte arrays
-    // that would be allocated by a localCopy, as these will be accounted for 
by the allocator
-    public long unsharedHeapSizeExcludingData();
-
-    public int serializedSize(CellNameType type, TypeSizes typeSizes);
-
-    public int serializationFlags();
-
-    public Cell diff(Cell cell);
-
-    public Cell reconcile(Cell cell);
-
-    public Cell localCopy(CFMetaData metadata, AbstractAllocator allocator);
-
-    public Cell localCopy(CFMetaData metaData, MemtableAllocator allocator, 
OpOrder.Group opGroup);
-
-    public String getString(CellNameType comparator);
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Clusterable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clusterable.java 
b/src/java/org/apache/cassandra/db/Clusterable.java
new file mode 100644
index 0000000..62ab9dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Clusterable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+/**
+ * Common class for objects that are identified by a clustering prefix, and 
can be thus sorted by a
+ * {@link ClusteringComparator}.
+ */
+public interface Clusterable
+{
+    public ClusteringPrefix clustering();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/Clustering.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Clustering.java 
b/src/java/org/apache/cassandra/db/Clustering.java
new file mode 100644
index 0000000..5ac9671
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/Clustering.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.io.util.DataOutputPlus;
+
+/**
+ * The clustering column values for a row.
+ * <p>
+ * A {@code Clustering} is a {@code ClusteringPrefix} that must always be 
"complete", i.e. have
+ * as many values as there is clustering columns in the table it is part of. 
It is the clustering
+ * prefix used by rows.
+ * <p>
+ * Note however that while it's size must be equal to the table clustering 
size, a clustering can have
+ * {@code null} values, and this mostly for thrift backward compatibility (in 
practice, if a value is null,
+ * all of the following ones will be too because that's what thrift allows, 
but it's never assumed by the
+ * code so we could start generally allowing nulls for clustering columns if 
we wanted to).
+ */
+public abstract class Clustering extends AbstractClusteringPrefix
+{
+    public static final Serializer serializer = new Serializer();
+
+    /**
+     * The special cased clustering used by all static rows. It is a special 
case in the
+     * sense that it's always empty, no matter how many clustering columns the 
table has.
+     */
+    public static final Clustering STATIC_CLUSTERING = new EmptyClustering()
+    {
+        @Override
+        public Kind kind()
+        {
+            return Kind.STATIC_CLUSTERING;
+        }
+
+        @Override
+        public String toString(CFMetaData metadata)
+        {
+            return "STATIC";
+        }
+    };
+
+    /** Empty clustering for tables having no clustering columns. */
+    public static final Clustering EMPTY = new EmptyClustering();
+
+    public Kind kind()
+    {
+        return Kind.CLUSTERING;
+    }
+
+    public Clustering takeAlias()
+    {
+        ByteBuffer[] values = new ByteBuffer[size()];
+        for (int i = 0; i < size(); i++)
+            values[i] = get(i);
+        return new SimpleClustering(values);
+    }
+
+    public String toString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size(); i++)
+        {
+            ColumnDefinition c = metadata.clusteringColumns().get(i);
+            sb.append(i == 0 ? "" : ", 
").append(c.name).append("=").append(get(i) == null ? "null" : 
c.type.getString(get(i)));
+        }
+        return sb.toString();
+    }
+
+    public String toCQLString(CFMetaData metadata)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < size(); i++)
+        {
+            ColumnDefinition c = metadata.clusteringColumns().get(i);
+            sb.append(i == 0 ? "" : ", ").append(c.type.getString(get(i)));
+        }
+        return sb.toString();
+    }
+
+    private static class EmptyClustering extends Clustering
+    {
+        private static final ByteBuffer[] EMPTY_VALUES_ARRAY = new 
ByteBuffer[0];
+
+        public int size()
+        {
+            return 0;
+        }
+
+        public ByteBuffer get(int i)
+        {
+            throw new UnsupportedOperationException();
+        }
+
+        public ByteBuffer[] getRawValues()
+        {
+            return EMPTY_VALUES_ARRAY;
+        }
+
+        @Override
+        public Clustering takeAlias()
+        {
+            return this;
+        }
+
+        @Override
+        public long unsharedHeapSize()
+        {
+            return 0;
+        }
+
+        @Override
+        public String toString(CFMetaData metadata)
+        {
+            return "EMPTY";
+        }
+    }
+
+    /**
+     * Serializer for Clustering object.
+     * <p>
+     * Because every clustering in a given table must have the same size (ant 
that size cannot actually change once the table
+     * has been defined), we don't record that size.
+     */
+    public static class Serializer
+    {
+        public void serialize(Clustering clustering, DataOutputPlus out, int 
version, List<AbstractType<?>> types) throws IOException
+        {
+            ClusteringPrefix.serializer.serializeValuesWithoutSize(clustering, 
out, version, types);
+        }
+
+        public long serializedSize(Clustering clustering, int version, 
List<AbstractType<?>> types, TypeSizes sizes)
+        {
+            return 
ClusteringPrefix.serializer.valuesWithoutSizeSerializedSize(clustering, 
version, types, sizes);
+        }
+
+        public void deserialize(DataInput in, int version, 
List<AbstractType<?>> types, Writer writer) throws IOException
+        {
+            ClusteringPrefix.serializer.deserializeValuesWithoutSize(in, 
types.size(), version, types, writer);
+        }
+
+        public Clustering deserialize(DataInput in, int version, 
List<AbstractType<?>> types) throws IOException
+        {
+            SimpleClustering.Builder builder = 
SimpleClustering.builder(types.size());
+            deserialize(in, version, types, builder);
+            return builder.build();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/ClusteringComparator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ClusteringComparator.java 
b/src/java/org/apache/cassandra/db/ClusteringComparator.java
new file mode 100644
index 0000000..b0e8e5c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/ClusteringComparator.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+
+import com.google.common.base.Joiner;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.io.sstable.IndexHelper.IndexInfo;
+
+/**
+ * A comparator of clustering prefixes (or more generally of {@link 
Clusterable}}.
+ * <p>
+ * This is essentially just a composite comparator that the clustering values 
of the provided
+ * clustering prefixes in lexicographical order, with each component being 
compared based on
+ * the type of the clustering column this is a value of.
+ */
+public class ClusteringComparator implements Comparator<Clusterable>
+{
+    private final List<AbstractType<?>> clusteringTypes;
+    private final boolean isByteOrderComparable;
+
+    private final Comparator<IndexInfo> indexComparator;
+    private final Comparator<IndexInfo> indexReverseComparator;
+    private final Comparator<Clusterable> reverseComparator;
+
+    public ClusteringComparator(AbstractType<?>... clusteringTypes)
+    {
+        this(Arrays.<AbstractType<?>>asList(clusteringTypes));
+    }
+
+    public ClusteringComparator(List<AbstractType<?>> clusteringTypes)
+    {
+        this.clusteringTypes = clusteringTypes;
+        this.isByteOrderComparable = isByteOrderComparable(clusteringTypes);
+
+        this.indexComparator = new Comparator<IndexInfo>()
+        {
+            public int compare(IndexInfo o1, IndexInfo o2)
+            {
+                return ClusteringComparator.this.compare(o1.lastName, 
o2.lastName);
+            }
+        };
+        this.indexReverseComparator = new Comparator<IndexInfo>()
+        {
+            public int compare(IndexInfo o1, IndexInfo o2)
+            {
+                return ClusteringComparator.this.compare(o1.firstName, 
o2.firstName);
+            }
+        };
+        this.reverseComparator = new Comparator<Clusterable>()
+        {
+            public int compare(Clusterable c1, Clusterable c2)
+            {
+                return ClusteringComparator.this.compare(c2, c1);
+            }
+        };
+    }
+
+    private static boolean isByteOrderComparable(Iterable<AbstractType<?>> 
types)
+    {
+        boolean isByteOrderComparable = true;
+        for (AbstractType<?> type : types)
+            isByteOrderComparable &= type.isByteOrderComparable();
+        return isByteOrderComparable;
+    }
+
+    /**
+     * The number of clustering columns for the table this is the comparator 
of.
+     */
+    public int size()
+    {
+        return clusteringTypes.size();
+    }
+
+    /**
+     * The "subtypes" of this clustering comparator, that is the types of the 
clustering
+     * columns for the table this is a comparator of.
+     */
+    public List<AbstractType<?>> subtypes()
+    {
+        return clusteringTypes;
+    }
+
+    /**
+     * Returns the type of the ith clustering column of the table.
+     */
+    public AbstractType<?> subtype(int i)
+    {
+        return clusteringTypes.get(i);
+    }
+
+    /**
+     * Creates a row clustering based on the clustering values.
+     * <p>
+     * Every argument can either be a {@code ByteBuffer}, in which case it is 
used as-is, or a object
+     * corresponding to the type of the corresponding clustering column, in 
which case it will be
+     * converted to a byte buffer using the column type.
+     *
+     * @param values the values to use for the created clustering. There 
should be exactly {@code size()}
+     * values which must be either byte buffers or of the type the column 
expect.
+     *
+     * @return the newly created clustering.
+     */
+    public Clustering make(Object... values)
+    {
+        if (values.length != size())
+            throw new IllegalArgumentException(String.format("Invalid number 
of components, expecting %d but got %d", size(), values.length));
+
+        CBuilder builder = CBuilder.create(this);
+        for (int i = 0; i < values.length; i++)
+        {
+            Object val = values[i];
+            if (val instanceof ByteBuffer)
+                builder.add((ByteBuffer)val);
+            else
+                builder.add(val);
+        }
+        return builder.build();
+    }
+
+    public int compare(Clusterable c1, Clusterable c2)
+    {
+        return compare(c1.clustering(), c2.clustering());
+    }
+
+    public int compare(ClusteringPrefix c1, ClusteringPrefix c2)
+    {
+        int s1 = c1.size();
+        int s2 = c2.size();
+        int minSize = Math.min(s1, s2);
+
+        for (int i = 0; i < minSize; i++)
+        {
+            int cmp = compareComponent(i, c1.get(i), c2.get(i));
+            if (cmp != 0)
+                return cmp;
+        }
+
+        if (s1 == s2)
+            return ClusteringPrefix.Kind.compare(c1.kind(), c2.kind());
+
+        return s1 < s2 ? c1.kind().prefixComparisonResult : 
-c2.kind().prefixComparisonResult;
+    }
+
+    public int compare(Clustering c1, Clustering c2)
+    {
+        for (int i = 0; i < size(); i++)
+        {
+            int cmp = compareComponent(i, c1.get(i), c2.get(i));
+            if (cmp != 0)
+                return cmp;
+        }
+        return 0;
+    }
+
+    public int compareComponent(int i, ByteBuffer v1, ByteBuffer v2)
+    {
+        if (v1 == null)
+            return v1 == null ? 0 : -1;
+        if (v2 == null)
+            return 1;
+
+        return isByteOrderComparable
+             ? ByteBufferUtil.compareUnsigned(v1, v2)
+             : clusteringTypes.get(i).compare(v1, v2);
+    }
+
+    /**
+     * Returns whether this clustering comparator is compatible with the 
provided one,
+     * that is if the provided one can be safely replaced by this new one.
+     *
+     * @param previous the previous comparator that we want to replace and test
+     * compatibility with.
+     *
+     * @return whether {@code previous} can be safely replaced by this 
comparator.
+     */
+    public boolean isCompatibleWith(ClusteringComparator previous)
+    {
+        if (this == previous)
+            return true;
+
+        // Extending with new components is fine, shrinking is not
+        if (size() < previous.size())
+            return false;
+
+        for (int i = 0; i < previous.size(); i++)
+        {
+            AbstractType<?> tprev = previous.subtype(i);
+            AbstractType<?> tnew = subtype(i);
+            if (!tnew.isCompatibleWith(tprev))
+                return false;
+        }
+        return true;
+    }
+
+    /**
+     * Validates the provided prefix for corrupted data.
+     *
+     * @param clustering the clustering prefix to validate.
+     *
+     * @throws MarshalException if {@code clustering} contains some invalid 
data.
+     */
+    public void validate(ClusteringPrefix clustering)
+    {
+        for (int i = 0; i < clustering.size(); i++)
+        {
+            ByteBuffer value = clustering.get(i);
+            if (value != null)
+                subtype(i).validate(value);
+        }
+    }
+
+    public Comparator<IndexInfo> indexComparator(boolean reversed)
+    {
+        return reversed ? indexReverseComparator : indexComparator;
+    }
+
+    public Comparator<Clusterable> reversed()
+    {
+        return reverseComparator;
+    }
+
+    /**
+     * Whether the two provided clustering prefix are on the same clustering 
values.
+     *
+     * @param c1 the first prefix.
+     * @param c2 the second prefix.
+     * @return whether {@code c1} and {@code c2} have the same clustering 
values (but not necessarily
+     * the same "kind") or not.
+     */
+    public boolean isOnSameClustering(ClusteringPrefix c1, ClusteringPrefix c2)
+    {
+        if (c1.size() != c2.size())
+            return false;
+
+        for (int i = 0; i < c1.size(); i++)
+        {
+            if (compareComponent(i, c1.get(i), c2.get(i)) != 0)
+                return false;
+        }
+        return true;
+    }
+
+    @Override
+    public String toString()
+    {
+        return String.format("comparator(%s)", Joiner.on(", 
").join(clusteringTypes));
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+            return true;
+
+        if (!(o instanceof ClusteringComparator))
+            return false;
+
+        ClusteringComparator that = (ClusteringComparator)o;
+        return this.clusteringTypes.equals(that.clusteringTypes);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hashCode(clusteringTypes);
+    }
+}

Reply via email to