http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
index 25d1887..0b73292 100644
--- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java
@@ -24,27 +24,20 @@ import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
 import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSearchIterator;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.concurrent.OpOrder;
 import org.apache.cassandra.utils.concurrent.Locks;
 import org.apache.cassandra.utils.memory.MemtableAllocator;
 import org.apache.cassandra.utils.memory.HeapAllocator;
 import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater;
-import static org.apache.cassandra.utils.btree.BTree.Dir.desc;
 
 /**
  * A thread-safe and atomic Partition implementation.
@@ -54,10 +47,8 @@ import static 
org.apache.cassandra.utils.btree.BTree.Dir.desc;
  * other thread can see the state where only parts but not all rows have
  * been added.
  */
-public class AtomicBTreePartition implements Partition
+public class AtomicBTreePartition extends AbstractBTreePartition
 {
-    private static final Logger logger = 
LoggerFactory.getLogger(AtomicBTreePartition.class);
-
     public static final long EMPTY_SIZE = ObjectSizes.measure(new 
AtomicBTreePartition(CFMetaData.createFake("keyspace", "table"),
                                                                                
        DatabaseDescriptor.getPartitioner().decorateKey(ByteBuffer.allocate(1)),
                                                                                
        null));
@@ -75,6 +66,11 @@ public class AtomicBTreePartition implements Partition
     private static final int CLOCK_SHIFT = 17;
     // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms
 
+    private static final Holder EMPTY = new Holder(BTree.empty(), 
DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+
+    private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> 
wasteTrackerUpdater = 
AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, 
"wasteTracker");
+    private static final AtomicReferenceFieldUpdater<AtomicBTreePartition, 
Holder> refUpdater = 
AtomicReferenceFieldUpdater.newUpdater(AtomicBTreePartition.class, 
Holder.class, "ref");
+
     /**
      * (clock + allocation) granularity are combined to give us an acceptable 
(waste) allocation rate that is defined by
      * the passage of real time of 
ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 
7.45Mb/s
@@ -85,214 +81,25 @@ public class AtomicBTreePartition implements Partition
      */
     private volatile int wasteTracker = TRACKER_NEVER_WASTED;
 
-    private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> 
wasteTrackerUpdater = 
AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, 
"wasteTracker");
-
-    private static final Holder EMPTY = new Holder(BTree.empty(), 
DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
-
-    private final CFMetaData metadata;
-    private final DecoratedKey partitionKey;
     private final MemtableAllocator allocator;
-
     private volatile Holder ref;
 
-    private static final AtomicReferenceFieldUpdater<AtomicBTreePartition, 
Holder> refUpdater = 
AtomicReferenceFieldUpdater.newUpdater(AtomicBTreePartition.class, 
Holder.class, "ref");
-
     public AtomicBTreePartition(CFMetaData metadata, DecoratedKey 
partitionKey, MemtableAllocator allocator)
     {
-        this.metadata = metadata;
-        this.partitionKey = partitionKey;
+        // TODO: is this a potential bug? partition columns may be a subset if 
we alter columns while it's in memtable
+        super(metadata, partitionKey, metadata.partitionColumns());
         this.allocator = allocator;
         this.ref = EMPTY;
     }
 
-    public boolean isEmpty()
-    {
-        return ref.deletionInfo.isLive() && BTree.isEmpty(ref.tree) && 
ref.staticRow == null;
-    }
-
-    public CFMetaData metadata()
-    {
-        return metadata;
-    }
-
-    public DecoratedKey partitionKey()
-    {
-        return partitionKey;
-    }
-
-    public DeletionTime partitionLevelDeletion()
-    {
-        return ref.deletionInfo.getPartitionDeletion();
-    }
-
-    public PartitionColumns columns()
-    {
-        // We don't really know which columns will be part of the update, so 
assume it's all of them
-        return metadata.partitionColumns();
-    }
-
-    public boolean hasRows()
-    {
-        return !BTree.isEmpty(ref.tree);
-    }
-
-    public EncodingStats stats()
-    {
-        return ref.stats;
-    }
-
-    public Row getRow(Clustering clustering)
-    {
-        Row row = searchIterator(ColumnFilter.selection(columns()), 
false).next(clustering);
-        // Note that for statics, this will never return null, this will 
return an empty row. However,
-        // it's more consistent for this method to return null if we don't 
really have a static row.
-        return row == null || (clustering == Clustering.STATIC_CLUSTERING && 
row.isEmpty()) ? null : row;
-    }
-
-    private Row staticRow(Holder current, ColumnFilter columns, boolean 
setActiveDeletionToRow)
-    {
-        DeletionTime partitionDeletion = 
current.deletionInfo.getPartitionDeletion();
-        if (columns.fetchedColumns().statics.isEmpty() || 
(current.staticRow.isEmpty() && partitionDeletion.isLive()))
-            return Rows.EMPTY_STATIC_ROW;
-
-        Row row = current.staticRow.filter(columns, partitionDeletion, 
setActiveDeletionToRow, metadata);
-        return row == null ? Rows.EMPTY_STATIC_ROW : row;
-    }
-
-    public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter 
columns, final boolean reversed)
-    {
-        // TODO: we could optimize comparison for "NativeRow" à la #6755
-        final Holder current = ref;
-        return new SearchIterator<Clustering, Row>()
-        {
-            private final SearchIterator<Clustering, Row> rawIter = new 
BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed));
-            private final DeletionTime partitionDeletion = 
current.deletionInfo.getPartitionDeletion();
-
-            public boolean hasNext()
-            {
-                return rawIter.hasNext();
-            }
-
-            public Row next(Clustering clustering)
-            {
-                if (clustering == Clustering.STATIC_CLUSTERING)
-                    return staticRow(current, columns, true);
-
-                Row row = rawIter.next(clustering);
-                RangeTombstone rt = 
current.deletionInfo.rangeCovering(clustering);
-
-                // A search iterator only return a row, so it doesn't allow to 
directly account for deletion that should apply to to row
-                // (the partition deletion or the deletion of a range 
tombstone that covers it). So if needs be, reuse the row deletion
-                // to carry the proper deletion on the row.
-                DeletionTime activeDeletion = partitionDeletion;
-                if (rt != null && rt.deletionTime().supersedes(activeDeletion))
-                    activeDeletion = rt.deletionTime();
-
-                if (row == null)
-                    return activeDeletion.isLive() ? null : 
BTreeBackedRow.emptyDeletedRow(clustering, activeDeletion);
-
-                return row.filter(columns, activeDeletion, true, metadata);
-            }
-        };
-    }
-
-    public UnfilteredRowIterator unfilteredIterator()
+    protected Holder holder()
     {
-        return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, 
false);
+        return ref;
     }
 
-    public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, 
Slices slices, boolean reversed)
+    protected boolean canHaveShadowedData()
     {
-        if (slices.size() == 0)
-        {
-            Holder current = ref;
-            DeletionTime partitionDeletion = 
current.deletionInfo.getPartitionDeletion();
-            if (selection.fetchedColumns().statics.isEmpty() && 
partitionDeletion.isLive())
-                return UnfilteredRowIterators.emptyIterator(metadata, 
partitionKey, reversed);
-
-            return new AbstractUnfilteredRowIterator(metadata,
-                                                     partitionKey,
-                                                     partitionDeletion,
-                                                     
selection.fetchedColumns(),
-                                                     staticRow(current, 
selection, false),
-                                                     reversed,
-                                                     current.stats)
-            {
-                protected Unfiltered computeNext()
-                {
-                    return endOfData();
-                }
-            };
-        }
-
-        Holder current = ref;
-        Row staticRow = staticRow(current, selection, false);
-        return slices.size() == 1
-             ? sliceIterator(selection, slices.get(0), reversed, current, 
staticRow)
-             : new SlicesIterator(metadata, partitionKey, selection, slices, 
reversed, current, staticRow);
-    }
-
-    private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice 
slice, boolean reversed, Holder current, Row staticRow)
-    {
-        Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : 
slice.start();
-        Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end();
-        Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, 
start, true, end, true, desc(reversed));
-
-        return new RowAndDeletionMergeIterator(metadata,
-                                               partitionKey,
-                                               
current.deletionInfo.getPartitionDeletion(),
-                                               selection,
-                                               staticRow,
-                                               reversed,
-                                               current.stats,
-                                               rowIter,
-                                               
current.deletionInfo.rangeIterator(slice, reversed),
-                                               true);
-    }
-
-    public class SlicesIterator extends AbstractUnfilteredRowIterator
-    {
-        private final Holder current;
-        private final ColumnFilter selection;
-        private final Slices slices;
-
-        private int idx;
-        private Iterator<Unfiltered> currentSlice;
-
-        private SlicesIterator(CFMetaData metadata,
-                               DecoratedKey key,
-                               ColumnFilter selection,
-                               Slices slices,
-                               boolean isReversed,
-                               Holder holder,
-                               Row staticRow)
-        {
-            super(metadata, key, holder.deletionInfo.getPartitionDeletion(), 
selection.fetchedColumns(), staticRow, isReversed, holder.stats);
-            this.current = holder;
-            this.selection = selection;
-            this.slices = slices;
-        }
-
-        protected Unfiltered computeNext()
-        {
-            while (true)
-            {
-                if (currentSlice == null)
-                {
-                    if (idx >= slices.size())
-                        return endOfData();
-
-                    int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : 
idx;
-                    currentSlice = sliceIterator(selection, 
slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW);
-                    idx++;
-                }
-
-                if (currentSlice.hasNext())
-                    return currentSlice.next();
-
-                currentSlice = null;
-            }
-        }
+        return true;
     }
 
     /**
@@ -417,28 +224,6 @@ public class AtomicBTreePartition implements Partition
         return wasteTracker;
     }
 
-    private static final class Holder
-    {
-        final DeletionInfo deletionInfo;
-        // the btree of rows
-        final Object[] tree;
-        final Row staticRow;
-        final EncodingStats stats;
-
-        Holder(Object[] tree, DeletionInfo deletionInfo, Row staticRow, 
EncodingStats stats)
-        {
-            this.tree = tree;
-            this.deletionInfo = deletionInfo;
-            this.staticRow = staticRow;
-            this.stats = stats;
-        }
-
-        Holder with(DeletionInfo info)
-        {
-            return new Holder(this.tree, info, this.staticRow, this.stats);
-        }
-    }
-
     // the function we provide to the btree utilities to perform any column 
replacements
     private static final class RowUpdater implements UpdateFunction<Row, Row>
     {
@@ -455,7 +240,6 @@ public class AtomicBTreePartition implements Partition
         final MemtableAllocator.DataReclaimer reclaimer;
         List<Row> inserted; // TODO: replace with walk of aborted BTree
 
-
         private RowUpdater(AtomicBTreePartition updating, MemtableAllocator 
allocator, OpOrder.Group writeOp, Updater indexer)
         {
             this.updating = updating;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
new file mode 100644
index 0000000..b122791
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.partitions;
+
+import java.io.IOException;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.btree.BTree;
+
+public class CachedBTreePartition extends ImmutableBTreePartition implements 
CachedPartition
+{
+    private final int createdAtInSec;
+
+    private final int cachedLiveRows;
+    private final int rowsWithNonExpiringCells;
+
+    private final int nonTombstoneCellCount;
+    private final int nonExpiringLiveCells;
+
+    private CachedBTreePartition(CFMetaData metadata,
+                                 DecoratedKey partitionKey,
+                                 PartitionColumns columns,
+                                 Holder holder,
+                                 int createdAtInSec,
+                                 int cachedLiveRows,
+                                 int rowsWithNonExpiringCells,
+                                 int nonTombstoneCellCount,
+                                 int nonExpiringLiveCells)
+    {
+        super(metadata, partitionKey, columns, holder);
+        this.createdAtInSec = createdAtInSec;
+        this.cachedLiveRows = cachedLiveRows;
+        this.rowsWithNonExpiringCells = rowsWithNonExpiringCells;
+        this.nonTombstoneCellCount = nonTombstoneCellCount;
+        this.nonExpiringLiveCells = nonExpiringLiveCells;
+    }
+
+    /**
+     * Creates an {@code ArrayBackedCachedPartition} holding all the data of 
the provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and 
it is
+     * up to the caller to do so.
+     *
+     * @param iterator the iterator got gather in memory.
+     * @param nowInSec the time of the creation in seconds. This is the time 
at which {@link #cachedLiveRows} applies.
+     * @return the created partition.
+     */
+    public static CachedBTreePartition create(UnfilteredRowIterator iterator, 
int nowInSec)
+    {
+        return create(iterator, 16, nowInSec);
+    }
+
+    /**
+     * Creates an {@code ArrayBackedCachedPartition} holding all the data of 
the provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and 
it is
+     * up to the caller to do so.
+     *
+     * @param iterator the iterator got gather in memory.
+     * @param initialRowCapacity sizing hint (in rows) to use for the created 
partition. It should ideally
+     * correspond or be a good estimation of the number or rows in {@code 
iterator}.
+     * @param nowInSec the time of the creation in seconds. This is the time 
at which {@link #cachedLiveRows} applies.
+     * @return the created partition.
+     */
+    public static CachedBTreePartition create(UnfilteredRowIterator iterator, 
int initialRowCapacity, int nowInSec)
+    {
+        Holder holder = ImmutableBTreePartition.build(iterator, 
initialRowCapacity);
+
+        int cachedLiveRows = 0;
+        int rowsWithNonExpiringCells = 0;
+        int nonTombstoneCellCount = 0;
+        int nonExpiringLiveCells = 0;
+
+        for (Row row : BTree.<Row>iterable(holder.tree))
+        {
+            if (row.hasLiveData(nowInSec))
+                ++cachedLiveRows;
+
+            int nonExpiringLiveCellsThisRow = 0;
+            for (Cell cell : row.cells())
+            {
+                if (!cell.isTombstone())
+                {
+                    ++nonTombstoneCellCount;
+                    if (!cell.isExpiring())
+                        ++nonExpiringLiveCellsThisRow;
+                }
+            }
+
+            if (nonExpiringLiveCellsThisRow > 0)
+            {
+                ++rowsWithNonExpiringCells;
+                nonExpiringLiveCells += nonExpiringLiveCellsThisRow;
+            }
+        }
+
+        return new CachedBTreePartition(iterator.metadata(),
+                                        iterator.partitionKey(),
+                                        iterator.columns(),
+                                        holder,
+                                        nowInSec,
+                                        cachedLiveRows,
+                                        rowsWithNonExpiringCells,
+                                        nonTombstoneCellCount,
+                                        nonExpiringLiveCells);
+    }
+
+    /**
+     * The number of rows that were live at the time the partition was cached.
+     *
+     * See {@link ColumnFamilyStore#isFilterFullyCoveredBy} to see why we need 
this.
+     *
+     * @return the number of rows in this partition that were live at the time 
the
+     * partition was cached (this can be different from the number of live 
rows now
+     * due to expiring cells).
+     */
+    public int cachedLiveRows()
+    {
+        return cachedLiveRows;
+    }
+
+    /**
+     * The number of rows in this cached partition that have at least one 
non-expiring
+     * non-deleted cell.
+     *
+     * Note that this is generally not a very meaningful number, but this is 
used by
+     * {@link DataLimits#hasEnoughLiveData} as an optimization.
+     *
+     * @return the number of row that have at least one non-expiring 
non-deleted cell.
+     */
+    public int rowsWithNonExpiringCells()
+    {
+        return rowsWithNonExpiringCells;
+    }
+
+    public int nonTombstoneCellCount()
+    {
+        return nonTombstoneCellCount;
+    }
+
+    public int nonExpiringLiveCells()
+    {
+        return nonExpiringLiveCells;
+    }
+
+    static class Serializer implements ISerializer<CachedPartition>
+    {
+        public void serialize(CachedPartition partition, DataOutputPlus out) 
throws IOException
+        {
+            int version = MessagingService.current_version;
+
+            assert partition instanceof CachedBTreePartition;
+            CachedBTreePartition p = (CachedBTreePartition)partition;
+
+            out.writeInt(p.createdAtInSec);
+            out.writeInt(p.cachedLiveRows);
+            out.writeInt(p.rowsWithNonExpiringCells);
+            out.writeInt(p.nonTombstoneCellCount);
+            out.writeInt(p.nonExpiringLiveCells);
+            CFMetaData.serializer.serialize(partition.metadata(), out, 
version);
+            try (UnfilteredRowIterator iter = p.unfilteredIterator())
+            {
+                UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
null, out, version, p.rowCount());
+            }
+        }
+
+        public CachedPartition deserialize(DataInputPlus in) throws IOException
+        {
+            int version = MessagingService.current_version;
+
+            // Note that it would be slightly simpler to just do
+            //   
ArrayBackedCachedPiartition.create(UnfilteredRowIteratorSerializer.serializer.deserialize(...));
+            // However deserializing the header separatly is not a lot harder 
and allows us to:
+            //   1) get the capacity of the partition so we can size it 
properly directly
+            //   2) saves the creation of a temporary iterator: rows are 
directly written to the partition, which
+            //      is slightly faster.
+
+            int createdAtInSec = in.readInt();
+            int cachedLiveRows = in.readInt();
+            int rowsWithNonExpiringCells = in.readInt();
+            int nonTombstoneCellCount = in.readInt();
+            int nonExpiringLiveCells = in.readInt();
+
+
+            CFMetaData metadata = CFMetaData.serializer.deserialize(in, 
version);
+            UnfilteredRowIteratorSerializer.Header header = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, 
in, version, SerializationHelper.Flag.LOCAL);
+            assert !header.isReversed && header.rowEstimate >= 0;
+
+            Holder holder;
+            try (UnfilteredRowIterator partition = 
UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, 
SerializationHelper.Flag.LOCAL, header))
+            {
+                holder = ImmutableBTreePartition.build(partition, 
header.rowEstimate);
+            }
+
+            return new CachedBTreePartition(metadata,
+                                                  header.key,
+                                                  header.sHeader.columns(),
+                                                  holder,
+                                                  createdAtInSec,
+                                                  cachedLiveRows,
+                                                  rowsWithNonExpiringCells,
+                                                  nonTombstoneCellCount,
+                                                  nonExpiringLiveCells);
+
+        }
+
+        public long serializedSize(CachedPartition partition)
+        {
+            int version = MessagingService.current_version;
+
+            assert partition instanceof CachedBTreePartition;
+            CachedBTreePartition p = (CachedBTreePartition)partition;
+
+            try (UnfilteredRowIterator iter = p.unfilteredIterator())
+            {
+                return TypeSizes.sizeof(p.createdAtInSec)
+                     + TypeSizes.sizeof(p.cachedLiveRows)
+                     + TypeSizes.sizeof(p.rowsWithNonExpiringCells)
+                     + TypeSizes.sizeof(p.nonTombstoneCellCount)
+                     + TypeSizes.sizeof(p.nonExpiringLiveCells)
+                     + 
CFMetaData.serializer.serializedSize(partition.metadata(), version)
+                     + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, 
MessagingService.current_version, p.rowCount());
+            }
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java 
b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
index dc482f3..33e6ecc 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedPartition.java
@@ -24,15 +24,13 @@ import org.apache.cassandra.io.ISerializer;
 /**
  * A partition stored in the partition cache.
  *
- * Note that in practice, the only implementation of this is {@link 
ArrayBackedPartition},
- * we keep this interface mainly 1) to make it clear what we need from 
partition in the cache
- * (that we don't otherwise) and 2) because {@code ArrayBackedPartition} is 
used for other
- * purpose (than caching) and hence using {@code CachedPartition} when we talk 
about caching is
- * clearer.
+ * Note that in practice, the only implementation of this is {@link 
CachedBTreePartition},
+ * we keep this interface mainly to make it clear what we need from partition 
in the cache
+ * (that we don't otherwise)
  */
 public interface CachedPartition extends Partition, IRowCacheEntry
 {
-    public static final ISerializer<CachedPartition> cacheSerializer = new 
ArrayBackedCachedPartition.Serializer();
+    public static final ISerializer<CachedPartition> cacheSerializer = new 
CachedBTreePartition.Serializer();
 
     /**
      * The number of {@code Row} objects in this cached partition.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java 
b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
index 3a57d1a..e0b568d 100644
--- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java
@@ -17,24 +17,19 @@
  */
 package org.apache.cassandra.db.partitions;
 
-import java.util.*;
+import java.util.Iterator;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.PartitionColumns;
 import org.apache.cassandra.db.rows.*;
 
-public class FilteredPartition extends AbstractThreadUnsafePartition
+public class FilteredPartition extends ImmutableBTreePartition
 {
-    private final Row staticRow;
-
-    private FilteredPartition(CFMetaData metadata,
-                              DecoratedKey partitionKey,
-                              PartitionColumns columns,
-                              Row staticRow,
-                              List<Row> rows)
+    public FilteredPartition(RowIterator rows)
     {
-        super(metadata, partitionKey, columns, rows);
-        this.staticRow = staticRow;
+        super(rows.metadata(), rows.partitionKey(), rows.columns(), 
build(rows, DeletionInfo.LIVE, false, 16));
     }
 
     /**
@@ -45,43 +40,7 @@ public class FilteredPartition extends 
AbstractThreadUnsafePartition
      */
     public static FilteredPartition create(RowIterator iterator)
     {
-        CFMetaData metadata = iterator.metadata();
-        boolean reversed = iterator.isReverseOrder();
-
-        List<Row> rows = new ArrayList<>();
-
-        while (iterator.hasNext())
-        {
-            Unfiltered unfiltered = iterator.next();
-            if (unfiltered.isRow())
-                rows.add((Row)unfiltered);
-        }
-
-        if (reversed)
-            Collections.reverse(rows);
-
-        return new FilteredPartition(metadata, iterator.partitionKey(), 
iterator.columns(), iterator.staticRow(), rows);
-    }
-
-    protected boolean canHaveShadowedData()
-    {
-        // We only create instances from RowIterator that don't have shadowed 
data (nor deletion info really)
-        return false;
-    }
-
-    public Row staticRow()
-    {
-        return staticRow;
-    }
-
-    public DeletionInfo deletionInfo()
-    {
-        return DeletionInfo.LIVE;
-    }
-
-    public EncodingStats stats()
-    {
-        return EncodingStats.NO_STATS;
+        return new FilteredPartition(iterator);
     }
 
     public RowIterator rowIterator()
@@ -106,7 +65,7 @@ public class FilteredPartition extends 
AbstractThreadUnsafePartition
 
             public DecoratedKey partitionKey()
             {
-                return key;
+                return partitionKey;
             }
 
             public Row staticRow()
@@ -114,6 +73,8 @@ public class FilteredPartition extends 
AbstractThreadUnsafePartition
                 return FilteredPartition.this.staticRow();
             }
 
+            public void close() {}
+
             public boolean hasNext()
             {
                 return iter.hasNext();
@@ -124,34 +85,10 @@ public class FilteredPartition extends 
AbstractThreadUnsafePartition
                 return iter.next();
             }
 
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public void close()
+            public boolean isEmpty()
             {
+                return staticRow().isEmpty() && !hasRows();
             }
         };
     }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-
-        sb.append(String.format("[%s.%s] key=%s columns=%s",
-                    metadata.ksName,
-                    metadata.cfName,
-                    
metadata.getKeyValidator().getString(partitionKey().getKey()),
-                    columns));
-
-        if (staticRow() != Rows.EMPTY_STATIC_ROW)
-            sb.append("\n    ").append(staticRow().toString(metadata));
-
-        for (Row row : this)
-            sb.append("\n    ").append(row.toString(metadata));
-
-        return sb.toString();
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
new file mode 100644
index 0000000..a13e070
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java
@@ -0,0 +1,93 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.partitions;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionInfo;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.rows.*;
+
+public class ImmutableBTreePartition extends AbstractBTreePartition
+{
+
+    protected final Holder holder;
+
+    public ImmutableBTreePartition(CFMetaData metadata,
+                                      DecoratedKey partitionKey,
+                                      PartitionColumns columns,
+                                      Row staticRow,
+                                      Object[] tree,
+                                      DeletionInfo deletionInfo,
+                                      EncodingStats stats)
+    {
+        super(metadata, partitionKey, columns);
+        this.holder = new Holder(tree, deletionInfo, staticRow, stats);
+    }
+
+    protected ImmutableBTreePartition(CFMetaData metadata,
+                                      DecoratedKey partitionKey,
+                                      PartitionColumns columns,
+                                      Holder holder)
+    {
+        super(metadata, partitionKey, columns);
+        this.holder = holder;
+    }
+
+    /**
+     * Creates an {@code ImmutableBTreePartition} holding all the data of the 
provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and 
it is
+     * up to the caller to do so.
+     *
+     * @param iterator the iterator to gather in memory.
+     * @return the created partition.
+     */
+    public static ImmutableBTreePartition create(UnfilteredRowIterator 
iterator)
+    {
+        return create(iterator, 16);
+    }
+
+    /**
+     * Creates an {@code ImmutableBTreePartition} holding all the data of the 
provided iterator.
+     *
+     * Warning: Note that this method does not close the provided iterator and 
it is
+     * up to the caller to do so.
+     *
+     * @param iterator the iterator to gather in memory.
+     * @param initialRowCapacity sizing hint (in rows) to use for the created 
partition. It should ideally
+     * correspond or be a good estimation of the number or rows in {@code 
iterator}.
+     * @return the created partition.
+     */
+    public static ImmutableBTreePartition create(UnfilteredRowIterator 
iterator, int initialRowCapacity)
+    {
+        return new ImmutableBTreePartition(iterator.metadata(), 
iterator.partitionKey(), iterator.columns(),
+                                        build(iterator, initialRowCapacity));
+    }
+
+    protected Holder holder()
+    {
+        return holder;
+    }
+
+    protected boolean canHaveShadowedData()
+    {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index e6d51e5..2f0a4ec 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -43,6 +43,8 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.btree.BTree;
+import org.apache.cassandra.utils.btree.UpdateFunction;
 
 /**
  * Stores updates made on a partition.
@@ -58,7 +60,7 @@ import org.apache.cassandra.utils.Pair;
  * is also a few static helper constructor methods for special cases ({@code 
emptyUpdate()},
  * {@code fullPartitionDelete} and {@code singleRowUpdate}).
  */
-public class PartitionUpdate extends AbstractThreadUnsafePartition
+public class PartitionUpdate extends AbstractBTreePartition
 {
     protected static final Logger logger = 
LoggerFactory.getLogger(PartitionUpdate.class);
 
@@ -73,28 +75,37 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
     private boolean isBuilt;
     private boolean canReOpen = true;
 
-    private final MutableDeletionInfo deletionInfo;
-    private EncodingStats stats; // will be null if isn't built
-
-    private Row staticRow = Rows.EMPTY_STATIC_ROW;
+    private Holder holder;
+    private BTree.Builder<Row> rowBuilder;
+    private MutableDeletionInfo deletionInfo;
 
     private final boolean canHaveShadowedData;
 
     private PartitionUpdate(CFMetaData metadata,
                             DecoratedKey key,
                             PartitionColumns columns,
-                            Row staticRow,
-                            List<Row> rows,
                             MutableDeletionInfo deletionInfo,
-                            EncodingStats stats,
-                            boolean isBuilt,
+                            int initialRowCapacity,
+                            boolean canHaveShadowedData)
+    {
+        super(metadata, key, columns);
+        this.deletionInfo = deletionInfo;
+        this.holder = new Holder(BTree.empty(), deletionInfo, 
Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+        this.canHaveShadowedData = canHaveShadowedData;
+        rowBuilder = builder(initialRowCapacity);
+    }
+
+    private PartitionUpdate(CFMetaData metadata,
+                            DecoratedKey key,
+                            PartitionColumns columns,
+                            Holder holder,
+                            MutableDeletionInfo deletionInfo,
                             boolean canHaveShadowedData)
     {
-        super(metadata, key, columns, rows);
-        this.staticRow = staticRow;
+        super(metadata, key, columns);
+        this.holder = holder;
         this.deletionInfo = deletionInfo;
-        this.stats = stats;
-        this.isBuilt = isBuilt;
+        this.isBuilt = true;
         this.canHaveShadowedData = canHaveShadowedData;
     }
 
@@ -103,7 +114,7 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
                            PartitionColumns columns,
                            int initialRowCapacity)
     {
-        this(metadata, key, columns, Rows.EMPTY_STATIC_ROW, new 
ArrayList<>(initialRowCapacity), MutableDeletionInfo.live(), null, false, true);
+        this(metadata, key, columns, MutableDeletionInfo.live(), 
initialRowCapacity, true);
     }
 
     public PartitionUpdate(CFMetaData metadata,
@@ -127,7 +138,9 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
      */
     public static PartitionUpdate emptyUpdate(CFMetaData metadata, 
DecoratedKey key)
     {
-        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 
Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), 
MutableDeletionInfo.live(), EncodingStats.NO_STATS, true, false);
+        MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
+        Holder holder = new Holder(BTree.empty(), deletionInfo, 
Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 
holder, deletionInfo, false);
     }
 
     /**
@@ -142,7 +155,9 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
      */
     public static PartitionUpdate fullPartitionDelete(CFMetaData metadata, 
DecoratedKey key, long timestamp, int nowInSec)
     {
-        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 
Rows.EMPTY_STATIC_ROW, Collections.<Row>emptyList(), new 
MutableDeletionInfo(timestamp, nowInSec), EncodingStats.NO_STATS, true, false);
+        MutableDeletionInfo deletionInfo = new MutableDeletionInfo(timestamp, 
nowInSec);
+        Holder holder = new Holder(BTree.empty(), deletionInfo, 
Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+        return new PartitionUpdate(metadata, key, PartitionColumns.NONE, 
holder, deletionInfo, false);
     }
 
     /**
@@ -156,9 +171,17 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
      */
     public static PartitionUpdate singleRowUpdate(CFMetaData metadata, 
DecoratedKey key, Row row)
     {
-        return row.isStatic()
-             ? new PartitionUpdate(metadata, key, new 
PartitionColumns(row.columns(), Columns.NONE), row, 
Collections.<Row>emptyList(), MutableDeletionInfo.live(), 
EncodingStats.NO_STATS, true, false)
-             : new PartitionUpdate(metadata, key, new 
PartitionColumns(Columns.NONE, row.columns()), Rows.EMPTY_STATIC_ROW, 
Collections.singletonList(row), MutableDeletionInfo.live(), 
EncodingStats.NO_STATS, true, false);
+        MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
+        if (row.isStatic())
+        {
+            Holder holder = new Holder(BTree.empty(), deletionInfo, row, 
EncodingStats.NO_STATS);
+            return new PartitionUpdate(metadata, key, new 
PartitionColumns(row.columns(), Columns.NONE), holder, deletionInfo, false);
+        }
+        else
+        {
+            Holder holder = new Holder(BTree.singleton(row), deletionInfo, 
Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS);
+            return new PartitionUpdate(metadata, key, new 
PartitionColumns(Columns.NONE, row.columns()), holder, deletionInfo, false);
+        }
     }
 
     /**
@@ -183,47 +206,16 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
      */
     public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator)
     {
-        CFMetaData metadata = iterator.metadata();
-        boolean reversed = iterator.isReverseOrder();
-
-        List<Row> rows = new ArrayList<>();
-        MutableDeletionInfo.Builder deletionBuilder = 
MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), 
metadata.comparator, reversed);
-
-        while (iterator.hasNext())
-        {
-            Unfiltered unfiltered = iterator.next();
-            if (unfiltered.kind() == Unfiltered.Kind.ROW)
-                rows.add((Row)unfiltered);
-            else
-                deletionBuilder.add((RangeTombstoneMarker)unfiltered);
-        }
-
-        if (reversed)
-            Collections.reverse(rows);
-
-        return new PartitionUpdate(metadata, iterator.partitionKey(), 
iterator.columns(), iterator.staticRow(), rows, deletionBuilder.build(), 
iterator.stats(), true, false);
+        Holder holder = build(iterator, 16);
+        MutableDeletionInfo deletionInfo = (MutableDeletionInfo) 
holder.deletionInfo;
+        return new PartitionUpdate(iterator.metadata(), 
iterator.partitionKey(), iterator.columns(), holder, deletionInfo, false);
     }
 
     public static PartitionUpdate fromIterator(RowIterator iterator)
     {
-        CFMetaData metadata = iterator.metadata();
-        boolean reversed = iterator.isReverseOrder();
-
-        List<Row> rows = new ArrayList<>();
-
-        EncodingStats.Collector collector = new EncodingStats.Collector();
-
-        while (iterator.hasNext())
-        {
-            Row row = iterator.next();
-            rows.add(row);
-            Rows.collectStats(row, collector);
-        }
-
-        if (reversed)
-            Collections.reverse(rows);
-
-        return new PartitionUpdate(metadata, iterator.partitionKey(), 
iterator.columns(), iterator.staticRow(), rows, MutableDeletionInfo.live(), 
collector.get(), true, false);
+        MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
+        Holder holder = build(iterator, deletionInfo, true, 16);
+        return new PartitionUpdate(iterator.metadata(), 
iterator.partitionKey(), iterator.columns(), holder, deletionInfo, false);
     }
 
     protected boolean canHaveShadowedData()
@@ -231,16 +223,6 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
         return canHaveShadowedData;
     }
 
-    public Row staticRow()
-    {
-        return staticRow;
-    }
-
-    public DeletionInfo deletionInfo()
-    {
-        return deletionInfo;
-    }
-
     /**
      * Deserialize a partition update from a provided byte buffer.
      *
@@ -312,7 +294,7 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
      *
      * @return a partition update that include (merge) all the updates from 
{@code updates}.
      */
-    public static PartitionUpdate merge(Collection<PartitionUpdate> updates)
+    public static PartitionUpdate merge(List<PartitionUpdate> updates)
     {
         assert !updates.isEmpty();
         final int size = updates.size();
@@ -320,72 +302,9 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
         if (size == 1)
             return Iterables.getOnlyElement(updates);
 
-        // Used when merging row to decide of liveness
-        int nowInSec = FBUtilities.nowInSeconds();
-
-        PartitionColumns.Builder builder = PartitionColumns.builder();
-        DecoratedKey key = null;
-        CFMetaData metadata = null;
-        MutableDeletionInfo deletion = MutableDeletionInfo.live();
-        Row staticRow = Rows.EMPTY_STATIC_ROW;
-        List<Iterator<Row>> updateRowIterators = new ArrayList<>(size);
-        EncodingStats stats = EncodingStats.NO_STATS;
-
-        for (PartitionUpdate update : updates)
-        {
-            builder.addAll(update.columns());
-            deletion.add(update.deletionInfo());
-            if (!update.staticRow().isEmpty())
-                staticRow = staticRow == Rows.EMPTY_STATIC_ROW ? 
update.staticRow() : Rows.merge(staticRow, update.staticRow(), nowInSec);
-            updateRowIterators.add(update.iterator());
-            stats = stats.mergeWith(update.stats());
-
-            if (key == null)
-                key = update.partitionKey();
-            else
-                assert key.equals(update.partitionKey());
-
-            if (metadata == null)
-                metadata = update.metadata();
-            else
-                assert metadata.cfId.equals(update.metadata().cfId);
-        }
-
-        PartitionColumns columns = builder.build();
-
-        final Row.Merger merger = new Row.Merger(size, nowInSec, 
columns.regulars);
-
-        Iterator<Row> merged = MergeIterator.get(updateRowIterators, 
metadata.comparator, new MergeIterator.Reducer<Row, Row>()
-        {
-            @Override
-            public boolean trivialReduceIsTrivial()
-            {
-                return true;
-            }
-
-            public void reduce(int idx, Row current)
-            {
-                merger.add(idx, current);
-            }
-
-            protected Row getReduced()
-            {
-                // Note that while merger.getRow() can theoretically return 
null, it won't in this case because
-                // we don't pass an "activeDeletion".
-                return merger.merge(DeletionTime.LIVE);
-            }
-
-            @Override
-            protected void onKeyChange()
-            {
-                merger.clear();
-            }
-        });
-
-        List<Row> rows = new ArrayList<>();
-        Iterators.addAll(rows, merged);
-
-        return new PartitionUpdate(metadata, key, columns, staticRow, rows, 
deletion, stats, true, true);
+        int nowInSecs = FBUtilities.nowInSeconds();
+        List<UnfilteredRowIterator> asIterators = Lists.transform(updates, 
AbstractBTreePartition::unfilteredIterator);
+        return fromIterator(UnfilteredRowIterators.merge(asIterators, 
nowInSecs));
     }
 
     /**
@@ -404,17 +323,12 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
      */
     public void updateAllTimestamp(long newTimestamp)
     {
-        // We know we won't be updating that update again after this call, and 
doing is post built is potentially
-        // slightly more efficient (things are more "compact"). So force a 
build if it hasn't happened yet.
-        maybeBuild();
-
+        Holder holder = holder();
         deletionInfo.updateAllTimestamp(newTimestamp - 1);
-
-        if (!staticRow.isEmpty())
-            staticRow = staticRow.updateAllTimestamp(newTimestamp);
-
-        for (int i = 0; i < rows.size(); i++)
-            rows.set(i, rows.get(i).updateAllTimestamp(newTimestamp));
+        Object[] tree = BTree.<Row>transformAndFilter(holder.tree, (x) -> 
x.updateAllTimestamp(newTimestamp));
+        Row staticRow = holder.staticRow.updateAllTimestamp(newTimestamp);
+        EncodingStats newStats = EncodingStats.Collector.collect(staticRow, 
BTree.<Row>iterator(tree), deletionInfo);
+        this.holder = new Holder(tree, deletionInfo, staticRow, newStats);
     }
 
     /**
@@ -427,7 +341,7 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
      */
     public int operationCount()
     {
-        return rows.size()
+        return rowCount()
              + deletionInfo.rangeCount()
              + (deletionInfo.getPartitionDeletion().isLive() ? 0 : 1);
     }
@@ -449,17 +363,15 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
         return size;
     }
 
-    @Override
-    public int rowCount()
+    protected Holder holder()
     {
         maybeBuild();
-        return super.rowCount();
+        return holder;
     }
 
     public EncodingStats stats()
     {
-        maybeBuild();
-        return stats;
+        return holder().stats;
     }
 
     /**
@@ -480,6 +392,15 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
         // called concurrently with sort() (which should be avoided in the 
first place, but
         // better safe than sorry).
         isBuilt = false;
+        if (rowBuilder == null)
+            rowBuilder = builder(16);
+    }
+
+    private BTree.Builder<Row> builder(int initialCapacity)
+    {
+        return BTree.<Row>builder(metadata.comparator, initialCapacity)
+                    .setQuickResolver((a, b) ->
+                                      Rows.merge(a, b, createdAtInSec));
     }
 
     /**
@@ -565,7 +486,7 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
         canReOpen = false;
 
         List<CounterMark> l = new ArrayList<>();
-        for (Row row : rows)
+        for (Row row : this)
         {
             for (Cell cell : row.cells())
             {
@@ -616,28 +537,19 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
         {
             // We test for == first because in most case it'll be true and 
that is faster
             assert columns().statics == row.columns() || 
columns().statics.contains(row.columns());
-            staticRow = staticRow.isEmpty()
+            Row staticRow = holder.staticRow.isEmpty()
                       ? row
-                      : Rows.merge(staticRow, row, createdAtInSec);
+                      : Rows.merge(holder.staticRow, row, createdAtInSec);
+            holder = new Holder(holder.tree, holder.deletionInfo, staticRow, 
holder.stats);
         }
         else
         {
             // We test for == first because in most case it'll be true and 
that is faster
             assert columns().regulars == row.columns() || 
columns().regulars.contains(row.columns());
-            rows.add(row);
+            rowBuilder.add(row);
         }
     }
 
-    /**
-     * The number of rows contained in this update.
-     *
-     * @return the number of rows contained in this update.
-     */
-    public int size()
-    {
-        return rows.size();
-    }
-
     private void maybeBuild()
     {
         if (isBuilt)
@@ -651,53 +563,17 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
         if (isBuilt)
             return;
 
-        if (rows.size() <= 1)
-        {
-            finishBuild();
-            return;
-        }
+        Holder holder = this.holder;
+        Object[] cur = holder.tree;
+        Object[] add = rowBuilder.build();
+        Object[] merged = BTree.<Row>merge(cur, add, metadata.comparator,
+                                           UpdateFunction.Simple.of((a, b) -> 
Rows.merge(a, b, createdAtInSec)));
 
-        Comparator<Row> comparator = metadata.comparator.rowComparator();
-        // Sort the rows. Because the same row can have been added multiple 
times, we can still have duplicates after that
-        Collections.sort(rows, comparator);
+        assert deletionInfo == holder.deletionInfo;
+        EncodingStats newStats = 
EncodingStats.Collector.collect(holder.staticRow, BTree.<Row>iterator(merged), 
deletionInfo);
 
-        // Now find the duplicates and merge them together
-        int previous = 0; // The last element that was set
-        for (int current = 1; current < rows.size(); current++)
-        {
-            // There is really only 2 possible comparison: < 0 or == 0 since 
we've sorted already
-            Row previousRow = rows.get(previous);
-            Row currentRow = rows.get(current);
-            int cmp = comparator.compare(previousRow, currentRow);
-            if (cmp == 0)
-            {
-                // current and previous are the same row. Merge current into 
previous
-                // (and so previous + 1 will be "free").
-                rows.set(previous, Rows.merge(previousRow, currentRow, 
createdAtInSec));
-            }
-            else
-            {
-                // current != previous, so move current just after previous if 
needs be
-                ++previous;
-                if (previous != current)
-                    rows.set(previous, currentRow);
-            }
-        }
-
-        // previous is on the last value to keep
-        for (int j = rows.size() - 1; j > previous; j--)
-            rows.remove(j);
-
-        finishBuild();
-    }
-
-    private void finishBuild()
-    {
-        EncodingStats.Collector collector = new EncodingStats.Collector();
-        deletionInfo.collectStats(collector);
-        for (Row row : rows)
-            Rows.collectStats(row, collector);
-        stats = collector.get();
+        this.holder = new Holder(merged, holder.deletionInfo, 
holder.staticRow, newStats);
+        rowBuilder = null;
         isBuilt = true;
     }
 
@@ -716,7 +592,7 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
                 else
                 {
                     CFMetaData.serializer.serialize(update.metadata(), out, 
version);
-                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
null, out, version, update.rows.size());
+                    UnfilteredRowIteratorSerializer.serializer.serialize(iter, 
null, out, version, update.rowCount());
                 }
             }
         }
@@ -760,7 +636,8 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
             assert header.rowEstimate >= 0;
 
             MutableDeletionInfo.Builder deletionBuilder = 
MutableDeletionInfo.builder(header.partitionDeletion, metadata.comparator, 
false);
-            List<Row> rows = new ArrayList<>(header.rowEstimate);
+            BTree.Builder<Row> rows = BTree.builder(metadata.comparator, 
header.rowEstimate);
+            rows.auto(false);
 
             try (UnfilteredRowIterator partition = 
UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, 
flag, header))
             {
@@ -774,14 +651,12 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
                 }
             }
 
+            MutableDeletionInfo deletionInfo = deletionBuilder.build();
             return new PartitionUpdate(metadata,
                                        header.key,
                                        header.sHeader.columns(),
-                                       header.staticRow,
-                                       rows,
-                                       deletionBuilder.build(),
-                                       header.sHeader.stats(),
-                                       true,
+                                       new Holder(rows.build(), deletionInfo, 
header.staticRow, header.sHeader.stats()),
+                                       deletionInfo,
                                        false);
         }
 
@@ -802,7 +677,7 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
                     return LegacyLayout.serializedSizeAsLegacyPartition(iter, 
version);
 
                 return CFMetaData.serializer.serializedSize(update.metadata(), 
version)
-                     + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, 
update.rows.size());
+                     + 
UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, 
update.rowCount());
             }
         }
     }
@@ -851,8 +726,8 @@ public class PartitionUpdate extends 
AbstractThreadUnsafePartition
         {
             // This is a bit of a giant hack as this is the only place where 
we mutate a Row object. This makes it more efficient
             // for counters however and this won't be needed post-#6506 so 
that's probably fine.
-            assert row instanceof BTreeBackedRow;
-            ((BTreeBackedRow)row).setValue(column, path, value);
+            assert row instanceof BTreeRow;
+            ((BTreeRow)row).setValue(column, path, value);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/e51f83b6/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
deleted file mode 100644
index 548fb82..0000000
--- a/src/java/org/apache/cassandra/db/rows/BTreeBackedRow.java
+++ /dev/null
@@ -1,602 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.function.Predicate;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.marshal.AbstractType;
-import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.utils.AbstractIterator;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.btree.BTree;
-import org.apache.cassandra.utils.btree.BTreeSearchIterator;
-import org.apache.cassandra.utils.btree.UpdateFunction;
-
-/**
- * Immutable implementation of a Row object.
- */
-public class BTreeBackedRow extends AbstractRow
-{
-    private static final long EMPTY_SIZE = 
ObjectSizes.measure(emptyRow(Clustering.EMPTY));
-
-    private final Clustering clustering;
-    private final Columns columns;
-    private final LivenessInfo primaryKeyLivenessInfo;
-    private final DeletionTime deletion;
-
-    // The data for each columns present in this row in column sorted order.
-    private final Object[] btree;
-
-    // We need to filter the tombstones of a row on every read (twice in fact: 
first to remove purgeable tombstone, and then after reconciliation to remove
-    // all tombstone since we don't return them to the client) as well as on 
compaction. But it's likely that many rows won't have any tombstone at all, so
-    // we want to speed up that case by not having to iterate/copy the row in 
this case. We could keep a single boolean telling us if we have tombstones,
-    // but that doesn't work for expiring columns. So instead we keep the 
deletion time for the first thing in the row to be deleted. This allow at any 
given
-    // time to know if we have any deleted information or not. If we any 
"true" tombstone (i.e. not an expiring cell), this value will be forced to
-    // Integer.MIN_VALUE, but if we don't and have expiring cells, this will 
the time at which the first expiring cell expires. If we have no tombstones and
-    // no expiring cells, this will be Integer.MAX_VALUE;
-    private final int minLocalDeletionTime;
-
-    private BTreeBackedRow(Clustering clustering, Columns columns, 
LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] btree, int 
minLocalDeletionTime)
-    {
-        this.clustering = clustering;
-        this.columns = columns;
-        this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
-        this.deletion = deletion;
-        this.btree = btree;
-        this.minLocalDeletionTime = minLocalDeletionTime;
-    }
-
-    // Note that it's often easier/safer to use the 
sortedBuilder/unsortedBuilder or one of the static creation method below. Only 
directly useful in a small amount of cases.
-    public static BTreeBackedRow create(Clustering clustering, Columns 
columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, Object[] 
btree)
-    {
-        int minDeletionTime = 
Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
-        if (minDeletionTime != Integer.MIN_VALUE)
-        {
-            for (ColumnData cd : BTree.<ColumnData>iterable(btree))
-                minDeletionTime = Math.min(minDeletionTime, 
minDeletionTime(cd));
-        }
-
-        return new BTreeBackedRow(clustering, columns, primaryKeyLivenessInfo, 
deletion, btree, minDeletionTime);
-    }
-
-    public static BTreeBackedRow emptyRow(Clustering clustering)
-    {
-        return new BTreeBackedRow(clustering, Columns.NONE, 
LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.empty(), Integer.MAX_VALUE);
-    }
-
-    public static BTreeBackedRow singleCellRow(Clustering clustering, Cell 
cell)
-    {
-        if (cell.column().isSimple())
-            return new BTreeBackedRow(clustering, Columns.of(cell.column()), 
LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(cell), 
minDeletionTime(cell));
-
-        ComplexColumnData complexData = new ComplexColumnData(cell.column(), 
new Cell[]{ cell }, DeletionTime.LIVE);
-        return new BTreeBackedRow(clustering, Columns.of(cell.column()), 
LivenessInfo.EMPTY, DeletionTime.LIVE, BTree.singleton(complexData), 
minDeletionTime(cell));
-    }
-
-    public static BTreeBackedRow emptyDeletedRow(Clustering clustering, 
DeletionTime deletion)
-    {
-        assert !deletion.isLive();
-        return new BTreeBackedRow(clustering, Columns.NONE, 
LivenessInfo.EMPTY, deletion, BTree.empty(), Integer.MIN_VALUE);
-    }
-
-    public static BTreeBackedRow noCellLiveRow(Clustering clustering, 
LivenessInfo primaryKeyLivenessInfo)
-    {
-        assert !primaryKeyLivenessInfo.isEmpty();
-        return new BTreeBackedRow(clustering, Columns.NONE, 
primaryKeyLivenessInfo, DeletionTime.LIVE, BTree.empty(), 
minDeletionTime(primaryKeyLivenessInfo));
-    }
-
-    private static int minDeletionTime(Cell cell)
-    {
-        return cell.isTombstone() ? Integer.MIN_VALUE : 
cell.localDeletionTime();
-    }
-
-    private static int minDeletionTime(LivenessInfo info)
-    {
-        return info.isExpiring() ? info.localExpirationTime() : 
Integer.MAX_VALUE;
-    }
-
-    private static int minDeletionTime(DeletionTime dt)
-    {
-        return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
-    }
-
-    private static int minDeletionTime(ComplexColumnData cd)
-    {
-        int min = minDeletionTime(cd.complexDeletion());
-        for (Cell cell : cd)
-        {
-            min = Math.min(min, minDeletionTime(cell));
-            if (min == Integer.MIN_VALUE)
-                break;
-        }
-        return min;
-    }
-
-    private static int minDeletionTime(ColumnData cd)
-    {
-        return cd.column().isSimple() ? minDeletionTime((Cell) cd) : 
minDeletionTime((ComplexColumnData)cd);
-    }
-
-    private static int minDeletionTime(Object[] btree, LivenessInfo info, 
DeletionTime rowDeletion)
-    {
-        int min = Math.min(minDeletionTime(info), 
minDeletionTime(rowDeletion));
-        for (ColumnData cd : BTree.<ColumnData>iterable(btree))
-        {
-            min = Math.min(min, minDeletionTime(cd));
-            if (min == Integer.MIN_VALUE)
-                break;
-        }
-        return min;
-    }
-
-    public Clustering clustering()
-    {
-        return clustering;
-    }
-
-    public Columns columns()
-    {
-        return columns;
-    }
-
-    public LivenessInfo primaryKeyLivenessInfo()
-    {
-        return primaryKeyLivenessInfo;
-    }
-
-    public boolean isEmpty()
-    {
-        return primaryKeyLivenessInfo().isEmpty()
-               && deletion().isLive()
-               && BTree.isEmpty(btree);
-    }
-
-    public DeletionTime deletion()
-    {
-        return deletion;
-    }
-
-    public Cell getCell(ColumnDefinition c)
-    {
-        assert !c.isComplex();
-        return (Cell) BTree.<Object>find(btree, 
ColumnDefinition.asymmetricColumnDataComparator, c);
-    }
-
-    public Cell getCell(ColumnDefinition c, CellPath path)
-    {
-        assert c.isComplex();
-        ComplexColumnData cd = getComplexColumnData(c);
-        if (cd == null)
-            return null;
-        return cd.getCell(path);
-    }
-
-    public ComplexColumnData getComplexColumnData(ColumnDefinition c)
-    {
-        assert c.isComplex();
-        return (ComplexColumnData) BTree.<Object>find(btree, 
ColumnDefinition.asymmetricColumnDataComparator, c);
-    }
-
-    public Iterator<ColumnData> iterator()
-    {
-        return searchIterator();
-    }
-
-    public Iterable<Cell> cells()
-    {
-        return CellIterator::new;
-    }
-
-    public BTreeSearchIterator<ColumnDefinition, ColumnData> searchIterator()
-    {
-        return BTree.slice(btree, 
ColumnDefinition.asymmetricColumnDataComparator, BTree.Dir.ASC);
-    }
-
-    public Row filter(ColumnFilter filter, CFMetaData metadata)
-    {
-        return filter(filter, DeletionTime.LIVE, false, metadata);
-    }
-
-    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, 
boolean setActiveDeletionToRow, CFMetaData metadata)
-    {
-        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = 
metadata.getDroppedColumns();
-
-        if (filter.includesAllColumns() && (activeDeletion.isLive() || 
deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
-            return this;
-
-        boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
-
-        LivenessInfo newInfo = primaryKeyLivenessInfo;
-        DeletionTime newDeletion = deletion;
-        if (mayHaveShadowed)
-        {
-            if (activeDeletion.deletes(newInfo.timestamp()))
-                newInfo = LivenessInfo.EMPTY;
-            // note that mayHaveShadowed means the activeDeletion shadows the 
row deletion. So if don't have setActiveDeletionToRow,
-            // the row deletion is shadowed and we shouldn't return it.
-            newDeletion = setActiveDeletionToRow ? activeDeletion : 
DeletionTime.LIVE;
-        }
-
-        Columns columns = filter.fetchedColumns().columns(isStatic());
-        Predicate<ColumnDefinition> inclusionTester = 
columns.inOrderInclusionTester();
-        return transformAndFilter(newInfo, newDeletion, (cd) -> {
-
-            ColumnDefinition column = cd.column();
-            if (!inclusionTester.test(column))
-                return null;
-
-            CFMetaData.DroppedColumn dropped = 
droppedColumns.get(column.name.bytes);
-            if (column.isComplex())
-                return ((ComplexColumnData)cd).filter(filter, mayHaveShadowed 
? activeDeletion : DeletionTime.LIVE, dropped);
-
-            Cell cell = (Cell)cd;
-            return (dropped == null || cell.timestamp() > dropped.droppedTime) 
&& !(mayHaveShadowed && activeDeletion.deletes(cell))
-                   ? cell : null;
-        });
-    }
-
-    public boolean hasComplexDeletion()
-    {
-        // We start by the end cause we know complex columns sort before 
simple ones
-        for (ColumnData cd : BTree.<ColumnData>iterable(btree, BTree.Dir.DESC))
-        {
-            if (cd.column().isSimple())
-                return false;
-
-            if (!((ComplexColumnData)cd).complexDeletion().isLive())
-                return true;
-        }
-        return false;
-    }
-
-    public Row markCounterLocalToBeCleared()
-    {
-        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> 
cd.column().cellValueType().isCounter()
-                                                                            ? 
cd.markCounterLocalToBeCleared()
-                                                                            : 
cd);
-    }
-
-    public boolean hasDeletion(int nowInSec)
-    {
-        return nowInSec >= minLocalDeletionTime;
-    }
-
-    /**
-     * Returns a copy of the row where all timestamps for live data have 
replaced by {@code newTimestamp} and
-     * all deletion timestamp by {@code newTimestamp - 1}.
-     *
-     * This exists for the Paxos path, see {@link 
PartitionUpdate#updateAllTimestamp} for additional details.
-     */
-    public Row updateAllTimestamp(long newTimestamp)
-    {
-        LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? 
primaryKeyLivenessInfo : 
primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
-        DeletionTime newDeletion = deletion.isLive() ? deletion : new 
DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
-
-        return transformAndFilter(newInfo, newDeletion, (cd) -> 
cd.updateAllTimestamp(newTimestamp));
-    }
-
-    public Row purge(DeletionPurger purger, int nowInSec)
-    {
-        if (!hasDeletion(nowInSec))
-            return this;
-
-        LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, 
nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
-        DeletionTime newDeletion = purger.shouldPurge(deletion) ? 
DeletionTime.LIVE : deletion;
-
-        return transformAndFilter(newInfo, newDeletion, (cd) -> 
cd.purge(purger, nowInSec));
-    }
-
-    private Row transformAndFilter(LivenessInfo info, DeletionTime deletion, 
Function<ColumnData, ColumnData> function)
-    {
-        Object[] transformed = BTree.transformAndFilter(btree, function);
-
-        if (btree == transformed && info == this.primaryKeyLivenessInfo && 
deletion == this.deletion)
-            return this;
-
-        if (info.isEmpty() && deletion.isLive() && BTree.isEmpty(transformed))
-            return null;
-
-        int minDeletionTime = minDeletionTime(transformed, info, deletion);
-        return new BTreeBackedRow(clustering, columns, info, deletion, 
transformed, minDeletionTime);
-    }
-
-    public int dataSize()
-    {
-        int dataSize = clustering.dataSize()
-                     + primaryKeyLivenessInfo.dataSize()
-                     + deletion.dataSize();
-
-        for (ColumnData cd : this)
-            dataSize += cd.dataSize();
-        return dataSize;
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        long heapSize = EMPTY_SIZE
-                      + clustering.unsharedHeapSizeExcludingData()
-                      + BTree.sizeOfStructureOnHeap(btree);
-
-        for (ColumnData cd : this)
-            heapSize += cd.unsharedHeapSizeExcludingData();
-        return heapSize;
-    }
-
-    public static Row.Builder sortedBuilder(Columns columns)
-    {
-        return new Builder(columns, true);
-    }
-
-    public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
-    {
-        return new Builder(columns, false, nowInSec);
-    }
-
-    // This is only used by PartitionUpdate.CounterMark but other uses should 
be avoided as much as possible as it breaks our general
-    // assumption that Row objects are immutable. This method should go away 
post-#6506 in particular.
-    // This method is in particular not exposed by the Row API on purpose.
-    // This method also *assumes* that the cell we're setting already exists.
-    public void setValue(ColumnDefinition column, CellPath path, ByteBuffer 
value)
-    {
-        ColumnData current = (ColumnData) BTree.<Object>find(btree, 
ColumnDefinition.asymmetricColumnDataComparator, column);
-        if (column.isSimple())
-            BTree.replaceInSitu(btree, ColumnData.comparator, current, ((Cell) 
current).withUpdatedValue(value));
-        else
-            ((ComplexColumnData) current).setValue(path, value);
-    }
-
-    public Iterable<Cell> cellsInLegacyOrder(CFMetaData metadata)
-    {
-        return () -> new CellInLegacyOrderIterator(metadata);
-    }
-
-    private class CellIterator extends AbstractIterator<Cell>
-    {
-        private Iterator<ColumnData> columnData = iterator();
-        private Iterator<Cell> complexCells;
-
-        protected Cell computeNext()
-        {
-            while (true)
-            {
-                if (complexCells != null)
-                {
-                    if (complexCells.hasNext())
-                        return complexCells.next();
-
-                    complexCells = null;
-                }
-
-                if (!columnData.hasNext())
-                    return endOfData();
-
-                ColumnData cd = columnData.next();
-                if (cd.column().isComplex())
-                    complexCells = ((ComplexColumnData)cd).iterator();
-                else
-                    return (Cell)cd;
-            }
-        }
-    }
-
-    private class CellInLegacyOrderIterator extends AbstractIterator<Cell>
-    {
-        private final AbstractType<?> comparator;
-        private final int firstComplexIdx;
-        private int simpleIdx;
-        private int complexIdx;
-        private Iterator<Cell> complexCells;
-        private final Object[] data;
-
-        private CellInLegacyOrderIterator(CFMetaData metadata)
-        {
-            this.comparator = 
metadata.getColumnDefinitionNameComparator(isStatic() ? 
ColumnDefinition.Kind.STATIC : ColumnDefinition.Kind.REGULAR);
-
-            // copy btree into array for simple separate iteration of simple 
and complex columns
-            this.data = new Object[BTree.size(btree)];
-            BTree.toArray(btree, data, 0);
-
-            int idx = Iterators.indexOf(Iterators.forArray(data), cd -> cd 
instanceof ComplexColumnData);
-            this.firstComplexIdx = idx < 0 ? data.length : idx;
-            this.complexIdx = firstComplexIdx;
-        }
-
-        protected Cell computeNext()
-        {
-            while (true)
-            {
-                if (complexCells != null)
-                {
-                    if (complexCells.hasNext())
-                        return complexCells.next();
-
-                    complexCells = null;
-                }
-
-                if (simpleIdx >= firstComplexIdx)
-                {
-                    if (complexIdx >= data.length)
-                        return endOfData();
-
-                    complexCells = 
((ComplexColumnData)data[complexIdx++]).iterator();
-                }
-                else
-                {
-                    if (complexIdx >= data.length)
-                        return (Cell)data[simpleIdx++];
-
-                    if (comparator.compare(((ColumnData) 
data[simpleIdx]).column().name.bytes, ((ColumnData) 
data[complexIdx]).column().name.bytes) < 0)
-                        return (Cell)data[simpleIdx++];
-                    else
-                        complexCells = 
((ComplexColumnData)data[complexIdx++]).iterator();
-                }
-            }
-        }
-    }
-
-    public static class Builder implements Row.Builder
-    {
-        // a simple marker class that will sort to the beginning of a run of 
complex cells to store the deletion time
-        private static class ComplexColumnDeletion extends BufferCell
-        {
-            public ComplexColumnDeletion(ColumnDefinition column, DeletionTime 
deletionTime)
-            {
-                super(column, deletionTime.markedForDeleteAt(), 0, 
deletionTime.localDeletionTime(), ByteBufferUtil.EMPTY_BYTE_BUFFER, 
CellPath.BOTTOM);
-            }
-        }
-
-        // converts a run of Cell with equal column into a ColumnData
-        private static class CellResolver implements BTree.Builder.Resolver
-        {
-            final int nowInSec;
-            private CellResolver(int nowInSec)
-            {
-                this.nowInSec = nowInSec;
-            }
-
-            public ColumnData resolve(Object[] cells, int lb, int ub)
-            {
-                Cell cell = (Cell) cells[lb];
-                ColumnDefinition column = cell.column;
-                if (cell.column.isSimple())
-                {
-                    assert lb + 1 == ub || nowInSec != Integer.MIN_VALUE;
-                    while (++lb < ub)
-                        cell = Cells.reconcile(cell, (Cell) cells[lb], 
nowInSec);
-                    return cell;
-                }
-
-                // TODO: relax this in the case our outer provider is sorted 
(want to delay until remaining changes are
-                // bedded in, as less important; galloping makes it pretty 
cheap anyway)
-                Arrays.sort(cells, lb, ub, (Comparator<Object>) 
column.cellComparator());
-                cell = (Cell) cells[lb];
-                DeletionTime deletion = DeletionTime.LIVE;
-                if (cell instanceof ComplexColumnDeletion)
-                {
-                    // TODO: do we need to be robust to multiple of these 
being provided?
-                    deletion = new DeletionTime(cell.timestamp(), 
cell.localDeletionTime());
-                    lb++;
-                }
-
-                List<Object> buildFrom = Arrays.asList(cells).subList(lb, ub);
-                Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
-                return new ComplexColumnData(column, btree, deletion);
-            }
-
-        };
-        protected final Columns columns;
-
-        protected Clustering clustering;
-        protected LivenessInfo primaryKeyLivenessInfo = LivenessInfo.EMPTY;
-        protected DeletionTime deletion = DeletionTime.LIVE;
-
-        private final boolean isSorted;
-        private final BTree.Builder<Cell> cells;
-        private final CellResolver resolver;
-        private boolean hasComplex = false;
-
-        // For complex column at index i of 'columns', we store at 
complexDeletions[i] its complex deletion.
-
-        protected Builder(Columns columns, boolean isSorted)
-        {
-            this(columns, isSorted, Integer.MIN_VALUE);
-        }
-
-        protected Builder(Columns columns, boolean isSorted, int nowInSecs)
-        {
-            this.columns = columns;
-            this.cells = BTree.builder(ColumnData.comparator);
-            resolver = new CellResolver(nowInSecs);
-            this.isSorted = isSorted;
-            this.cells.auto(false);
-        }
-
-        public boolean isSorted()
-        {
-            return isSorted;
-        }
-
-        public void newRow(Clustering clustering)
-        {
-            assert this.clustering == null; // Ensures we've properly called 
build() if we've use this builder before
-            this.clustering = clustering;
-        }
-
-        public Clustering clustering()
-        {
-            return clustering;
-        }
-
-        protected void reset()
-        {
-            this.clustering = null;
-            this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
-            this.deletion = DeletionTime.LIVE;
-            this.cells.reuse();
-        }
-
-        public void addPrimaryKeyLivenessInfo(LivenessInfo info)
-        {
-            this.primaryKeyLivenessInfo = info;
-        }
-
-        public void addRowDeletion(DeletionTime deletion)
-        {
-            this.deletion = deletion;
-        }
-
-        public void addCell(Cell cell)
-        {
-            assert cell.column().isStatic() == (clustering == 
Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = 
" + clustering;
-            cells.add(cell);
-            hasComplex |= cell.column.isComplex();
-        }
-
-        public void addComplexDeletion(ColumnDefinition column, DeletionTime 
complexDeletion)
-        {
-            cells.add(new ComplexColumnDeletion(column, complexDeletion));
-            hasComplex = true;
-        }
-
-        public Row build()
-        {
-            if (!isSorted)
-                cells.sort();
-            // we can avoid resolving if we're sorted and have no complex 
values
-            // (because we'll only have unique simple cells, which are already 
in their final condition)
-            if (!isSorted | hasComplex)
-                cells.resolve(resolver);
-            Object[] btree = cells.build();
-            int minDeletionTime = minDeletionTime(btree, 
primaryKeyLivenessInfo, deletion);
-            Row row = new BTreeBackedRow(clustering, columns, 
primaryKeyLivenessInfo, deletion, btree, minDeletionTime);
-            reset();
-            return row;
-        }
-
-    }
-}

Reply via email to