http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java 
b/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
new file mode 100644
index 0000000..016b4fa
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/ArrayBackedRow.java
@@ -0,0 +1,927 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Predicate;
+
+import com.google.common.collect.AbstractIterator;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.ObjectSizes;
+
+/**
+ * Immutable implementation of a Row object.
+ */
+public class ArrayBackedRow extends AbstractRow
+{
+    private static final ColumnData[] NO_DATA = new ColumnData[0];
+
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
ArrayBackedRow(Clustering.EMPTY, Columns.NONE, LivenessInfo.EMPTY, 
DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE));
+
+    private final Clustering clustering;
+    private final Columns columns;
+    private final LivenessInfo primaryKeyLivenessInfo;
+    private final DeletionTime deletion;
+
+    // The data for each columns present in this row in column sorted order.
+    private final int size;
+    private final ColumnData[] data;
+
+    // We need to filter the tombstones of a row on every read (twice in fact: 
first to remove purgeable tombstone, and then after reconciliation to remove
+    // all tombstone since we don't return them to the client) as well as on 
compaction. But it's likely that many rows won't have any tombstone at all, so
+    // we want to speed up that case by not having to iterate/copy the row in 
this case. We could keep a single boolean telling us if we have tombstones,
+    // but that doesn't work for expiring columns. So instead we keep the 
deletion time for the first thing in the row to be deleted. This allow at any 
given
+    // time to know if we have any deleted information or not. If we any 
"true" tombstone (i.e. not an expiring cell), this value will be forced to
+    // Integer.MIN_VALUE, but if we don't and have expiring cells, this will 
the time at which the first expiring cell expires. If we have no tombstones and
+    // no expiring cells, this will be Integer.MAX_VALUE;
+    private final int minLocalDeletionTime;
+
+    private ArrayBackedRow(Clustering clustering, Columns columns, 
LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, 
ColumnData[] data, int minLocalDeletionTime)
+    {
+        this.clustering = clustering;
+        this.columns = columns;
+        this.primaryKeyLivenessInfo = primaryKeyLivenessInfo;
+        this.deletion = deletion;
+        this.size = size;
+        this.data = data;
+        this.minLocalDeletionTime = minLocalDeletionTime;
+    }
+
+    // Note that it's often easier/safer to use the 
sortedBuilder/unsortedBuilder or one of the static creation method below. Only 
directly useful in a small amount of cases.
+    public static ArrayBackedRow create(Clustering clustering, Columns 
columns, LivenessInfo primaryKeyLivenessInfo, DeletionTime deletion, int size, 
ColumnData[] data)
+    {
+        int minDeletionTime = 
Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+        if (minDeletionTime != Integer.MIN_VALUE)
+        {
+            for (int i = 0; i < size; i++)
+                minDeletionTime = Math.min(minDeletionTime, 
minDeletionTime(data[i]));
+        }
+
+        return new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, 
deletion, size, data, minDeletionTime);
+    }
+
+    public static ArrayBackedRow emptyRow(Clustering clustering)
+    {
+        return new ArrayBackedRow(clustering, Columns.NONE, 
LivenessInfo.EMPTY, DeletionTime.LIVE, 0, NO_DATA, Integer.MAX_VALUE);
+    }
+
+    public static ArrayBackedRow singleCellRow(Clustering clustering, Cell 
cell)
+    {
+        if (cell.column().isSimple())
+            return new ArrayBackedRow(clustering, Columns.of(cell.column()), 
LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ cell }, 
minDeletionTime(cell));
+
+        ComplexColumnData complexData = new ComplexColumnData(cell.column(), 
new Cell[]{ cell }, DeletionTime.LIVE);
+        return new ArrayBackedRow(clustering, Columns.of(cell.column()), 
LivenessInfo.EMPTY, DeletionTime.LIVE, 1, new ColumnData[]{ complexData }, 
minDeletionTime(cell));
+    }
+
+    public static ArrayBackedRow emptyDeletedRow(Clustering clustering, 
DeletionTime deletion)
+    {
+        assert !deletion.isLive();
+        return new ArrayBackedRow(clustering, Columns.NONE, 
LivenessInfo.EMPTY, deletion, 0, NO_DATA, Integer.MIN_VALUE);
+    }
+
+    public static ArrayBackedRow noCellLiveRow(Clustering clustering, 
LivenessInfo primaryKeyLivenessInfo)
+    {
+        assert !primaryKeyLivenessInfo.isEmpty();
+        return new ArrayBackedRow(clustering, Columns.NONE, 
primaryKeyLivenessInfo, DeletionTime.LIVE, 0, NO_DATA, 
minDeletionTime(primaryKeyLivenessInfo));
+    }
+
+    private static int minDeletionTime(Cell cell)
+    {
+        return cell.isTombstone() ? Integer.MIN_VALUE : 
cell.localDeletionTime();
+    }
+
+    private static int minDeletionTime(LivenessInfo info)
+    {
+        return info.isExpiring() ? info.localExpirationTime() : 
Integer.MAX_VALUE;
+    }
+
+    private static int minDeletionTime(DeletionTime dt)
+    {
+        return dt.isLive() ? Integer.MAX_VALUE : Integer.MIN_VALUE;
+    }
+
+    private static int minDeletionTime(ComplexColumnData cd)
+    {
+        int min = minDeletionTime(cd.complexDeletion());
+        for (Cell cell : cd)
+            min = Math.min(min, minDeletionTime(cell));
+        return min;
+    }
+
+    private static int minDeletionTime(ColumnData cd)
+    {
+        return cd.column().isSimple() ? minDeletionTime((Cell)cd) : 
minDeletionTime((ComplexColumnData)cd);
+    }
+
+    public Clustering clustering()
+    {
+        return clustering;
+    }
+
+    public Columns columns()
+    {
+        return columns;
+    }
+
+    public LivenessInfo primaryKeyLivenessInfo()
+    {
+        return primaryKeyLivenessInfo;
+    }
+
+    public DeletionTime deletion()
+    {
+        return deletion;
+    }
+
+    public Cell getCell(ColumnDefinition c)
+    {
+        assert !c.isComplex();
+        int idx = binarySearch(c);
+        return idx < 0 ? null : (Cell)data[idx];
+    }
+
+    public Cell getCell(ColumnDefinition c, CellPath path)
+    {
+        assert c.isComplex();
+        int idx = binarySearch(c);
+        if (idx < 0)
+            return null;
+
+        return ((ComplexColumnData)data[idx]).getCell(path);
+    }
+
+    public ComplexColumnData getComplexColumnData(ColumnDefinition c)
+    {
+        assert c.isComplex();
+        int idx = binarySearch(c);
+        return idx < 0 ? null : (ComplexColumnData)data[idx];
+    }
+
+    public Iterator<ColumnData> iterator()
+    {
+        return new ColumnDataIterator();
+    }
+
+    public Iterable<Cell> cells()
+    {
+        return CellIterator::new;
+    }
+
+    public SearchIterator<ColumnDefinition, ColumnData> searchIterator()
+    {
+        return new ColumnSearchIterator();
+    }
+
+    public Row filter(ColumnFilter filter, CFMetaData metadata)
+    {
+        return filter(filter, DeletionTime.LIVE, false, metadata);
+    }
+
+    public Row filter(ColumnFilter filter, DeletionTime activeDeletion, 
boolean setActiveDeletionToRow, CFMetaData metadata)
+    {
+        Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = 
metadata.getDroppedColumns();
+
+        if (filter.includesAllColumns() && (activeDeletion.isLive() || 
deletion.supersedes(activeDeletion)) && droppedColumns.isEmpty())
+            return this;
+
+        boolean mayHaveShadowed = activeDeletion.supersedes(deletion);
+
+        LivenessInfo newInfo = primaryKeyLivenessInfo;
+        DeletionTime newDeletion = deletion;
+        if (mayHaveShadowed)
+        {
+            if (activeDeletion.deletes(newInfo.timestamp()))
+                newInfo = LivenessInfo.EMPTY;
+            // note that mayHaveShadowed means the activeDeletion shadows the 
row deletion. So if don't have setActiveDeletionToRow,
+            // the row deletion is shadowed and we shouldn't return it.
+            newDeletion = setActiveDeletionToRow ? activeDeletion : 
DeletionTime.LIVE;
+        }
+
+        ColumnData[] newData = new ColumnData[size];
+        int newMinDeletionTime = Math.min(minDeletionTime(newInfo), 
minDeletionTime(newDeletion));
+        Columns columns = filter.fetchedColumns().columns(isStatic());
+        Predicate<ColumnDefinition> inclusionTester = 
columns.inOrderInclusionTester();
+        int newSize = 0;
+        for (int i = 0; i < size; i++)
+        {
+            ColumnData cd = data[i];
+            ColumnDefinition column = cd.column();
+            if (!inclusionTester.test(column))
+                continue;
+
+            CFMetaData.DroppedColumn dropped = 
droppedColumns.get(column.name.bytes);
+            if (column.isSimple())
+            {
+                Cell cell = (Cell)cd;
+                if ((dropped == null || cell.timestamp() > 
dropped.droppedTime) && !(mayHaveShadowed && activeDeletion.deletes(cell)))
+                {
+                    newData[newSize++] = cell;
+                    newMinDeletionTime = Math.min(newMinDeletionTime, 
minDeletionTime(cell));
+                }
+            }
+            else
+            {
+                ColumnData newCd = ((ComplexColumnData)cd).filter(filter, 
mayHaveShadowed ? activeDeletion : DeletionTime.LIVE, dropped);
+                if (newCd != null)
+                {
+                    newData[newSize++] = newCd;
+                    newMinDeletionTime = Math.min(newMinDeletionTime, 
minDeletionTime(newCd));
+                }
+            }
+        }
+
+        if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive())
+            return null;
+
+        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, 
newSize, newData, newMinDeletionTime);
+    }
+
+    public boolean hasComplexDeletion()
+    {
+        // We start by the end cause we know complex columns sort before 
simple ones
+        for (int i = size - 1; i >= 0; i--)
+        {
+            ColumnData cd = data[i];
+            if (cd.column().isSimple())
+                return false;
+
+            if (!((ComplexColumnData)cd).complexDeletion().isLive())
+                return true;
+        }
+        return false;
+    }
+
+    public Row markCounterLocalToBeCleared()
+    {
+        ColumnData[] newData = null;
+        for (int i = 0; i < size; i++)
+        {
+            ColumnData cd = data[i];
+            ColumnData newCd = cd.column().cellValueType().isCounter()
+                             ? cd.markCounterLocalToBeCleared()
+                             : cd;
+            if (newCd != cd)
+            {
+                if (newData == null)
+                    newData = Arrays.copyOf(data, size);
+                newData[i] = newCd;
+            }
+        }
+
+        return newData == null
+             ? this
+             : new ArrayBackedRow(clustering, columns, primaryKeyLivenessInfo, 
deletion, size, newData, minLocalDeletionTime);
+    }
+
+    public boolean hasDeletion(int nowInSec)
+    {
+        return nowInSec >= minLocalDeletionTime;
+    }
+
+    /**
+     * Returns a copy of the row where all timestamps for live data have 
replaced by {@code newTimestamp} and
+     * all deletion timestamp by {@code newTimestamp - 1}.
+     *
+     * This exists for the Paxos path, see {@link 
PartitionUpdate#updateAllTimestamp} for additional details.
+     */
+    public Row updateAllTimestamp(long newTimestamp)
+    {
+        LivenessInfo newInfo = primaryKeyLivenessInfo.isEmpty() ? 
primaryKeyLivenessInfo : 
primaryKeyLivenessInfo.withUpdatedTimestamp(newTimestamp);
+        DeletionTime newDeletion = deletion.isLive() ? deletion : new 
DeletionTime(newTimestamp - 1, deletion.localDeletionTime());
+
+        ColumnData[] newData = new ColumnData[size];
+        for (int i = 0; i < size; i++)
+            newData[i] = data[i].updateAllTimestamp(newTimestamp);
+
+        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, 
size, newData, minLocalDeletionTime);
+    }
+
+    public Row purge(DeletionPurger purger, int nowInSec)
+    {
+        if (!hasDeletion(nowInSec))
+            return this;
+
+        LivenessInfo newInfo = purger.shouldPurge(primaryKeyLivenessInfo, 
nowInSec) ? LivenessInfo.EMPTY : primaryKeyLivenessInfo;
+        DeletionTime newDeletion = purger.shouldPurge(deletion) ? 
DeletionTime.LIVE : deletion;
+
+        int newMinDeletionTime = Math.min(minDeletionTime(newInfo), 
minDeletionTime(newDeletion));
+        ColumnData[] newData = new ColumnData[size];
+        int newSize = 0;
+        for (int i = 0; i < size; i++)
+        {
+            ColumnData purged = data[i].purge(purger, nowInSec);
+            if (purged != null)
+            {
+                newData[newSize++] = purged;
+                newMinDeletionTime = Math.min(newMinDeletionTime, 
minDeletionTime(purged));
+            }
+        }
+
+        if (newSize == 0 && newInfo.isEmpty() && newDeletion.isLive())
+            return null;
+
+        return new ArrayBackedRow(clustering, columns, newInfo, newDeletion, 
newSize, newData, newMinDeletionTime);
+    }
+
+    public int dataSize()
+    {
+        int dataSize = clustering.dataSize()
+                     + primaryKeyLivenessInfo.dataSize()
+                     + deletion.dataSize();
+
+        for (int i = 0; i < size; i++)
+            dataSize += data[i].dataSize();
+        return dataSize;
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        long heapSize = EMPTY_SIZE
+                      + clustering.unsharedHeapSizeExcludingData()
+                      + ObjectSizes.sizeOfArray(data);
+
+        for (int i = 0; i < size; i++)
+            heapSize += data[i].unsharedHeapSizeExcludingData();
+        return heapSize;
+    }
+
+    public static Row.Builder sortedBuilder(Columns columns)
+    {
+        return new SortedBuilder(columns);
+    }
+
+    public static Row.Builder unsortedBuilder(Columns columns, int nowInSec)
+    {
+        return new UnsortedBuilder(columns, nowInSec);
+    }
+
+    // This is only used by PartitionUpdate.CounterMark but other uses should 
be avoided as much as possible as it breaks our general
+    // assumption that Row objects are immutable. This method should go away 
post-#6506 in particular.
+    // This method is in particular not exposed by the Row API on purpose.
+    // This method also *assumes* that the cell we're setting already exists.
+    public void setValue(ColumnDefinition column, CellPath path, ByteBuffer 
value)
+    {
+        int idx = binarySearch(column);
+        assert idx >= 0;
+        if (column.isSimple())
+            data[idx] = ((Cell)data[idx]).withUpdatedValue(value);
+        else
+            ((ComplexColumnData)data[idx]).setValue(path, value);
+    }
+
+    private int binarySearch(ColumnDefinition column)
+    {
+        return binarySearch(column, 0, size);
+    }
+
+    /**
+     * Simple binary search for a given column (in the data list).
+     *
+     * The return value has the exact same meaning that the one of 
Collections.binarySearch() but
+     * we don't use the later because we're searching for a 'ColumnDefinition' 
in an array of 'ColumnData'.
+     */
+    private int binarySearch(ColumnDefinition column, int fromIndex, int 
toIndex)
+    {
+        int low = fromIndex;
+        int mid = toIndex;
+        int high = mid - 1;
+        int result = -1;
+        while (low <= high)
+        {
+            mid = (low + high) >> 1;
+            if ((result = column.compareTo(data[mid].column())) > 0)
+                low = mid + 1;
+            else if (result == 0)
+                return mid;
+            else
+                high = mid - 1;
+        }
+        return -mid - (result < 0 ? 1 : 2);
+    }
+
+    private class ColumnDataIterator extends AbstractIterator<ColumnData>
+    {
+        private int i;
+
+        protected ColumnData computeNext()
+        {
+            return i < size ? data[i++] : endOfData();
+        }
+    }
+
+    private class CellIterator extends AbstractIterator<Cell>
+    {
+        private int i;
+        private Iterator<Cell> complexCells;
+
+        protected Cell computeNext()
+        {
+            while (true)
+            {
+                if (complexCells != null)
+                {
+                    if (complexCells.hasNext())
+                        return complexCells.next();
+
+                    complexCells = null;
+                }
+
+                if (i >= size)
+                    return endOfData();
+
+                ColumnData cd = data[i++];
+                if (cd.column().isComplex())
+                    complexCells = ((ComplexColumnData)cd).iterator();
+                else
+                    return (Cell)cd;
+            }
+        }
+    }
+
+    private class ColumnSearchIterator implements 
SearchIterator<ColumnDefinition, ColumnData>
+    {
+        // The index at which the next call to "next" should start looking from
+        private int searchFrom = 0;
+
+        public boolean hasNext()
+        {
+            return searchFrom < size;
+        }
+
+        public ColumnData next(ColumnDefinition column)
+        {
+            int idx = binarySearch(column, searchFrom, size);
+            if (idx < 0)
+            {
+                searchFrom = -idx - 1;
+                return null;
+            }
+            else
+            {
+                // We've found it. We'll start after it next time.
+                searchFrom = idx + 1;
+                return data[idx];
+            }
+        }
+    }
+
+    private static abstract class AbstractBuilder implements Row.Builder
+    {
+        protected final Columns columns;
+
+        protected Clustering clustering;
+        protected LivenessInfo primaryKeyLivenessInfo;
+        protected DeletionTime deletion;
+
+        protected List<Cell> cells = new ArrayList<>();
+
+        // For complex column at index i of 'columns', we store at 
complexDeletions[i] its complex deletion.
+        protected DeletionTime[] complexDeletions;
+        protected int columnsWithComplexDeletion;
+
+        protected AbstractBuilder(Columns columns)
+        {
+            this.columns = columns;
+            this.complexDeletions = new 
DeletionTime[columns.complexColumnCount()];
+        }
+
+        public void newRow(Clustering clustering)
+        {
+            assert cells.isEmpty(); // Ensures we've properly called build() 
if we've use this builder before
+            this.clustering = clustering;
+        }
+
+        public Clustering clustering()
+        {
+            return clustering;
+        }
+
+        protected void reset()
+        {
+            this.clustering = null;
+            this.primaryKeyLivenessInfo = LivenessInfo.EMPTY;
+            this.deletion = DeletionTime.LIVE;
+            this.cells.clear();
+            Arrays.fill(this.complexDeletions, null);
+            this.columnsWithComplexDeletion = 0;
+        }
+
+        public void addPrimaryKeyLivenessInfo(LivenessInfo info)
+        {
+            this.primaryKeyLivenessInfo = info;
+        }
+
+        public void addRowDeletion(DeletionTime deletion)
+        {
+            this.deletion = deletion;
+        }
+
+        public void addCell(Cell cell)
+        {
+            assert cell.column().isStatic() == (clustering == 
Clustering.STATIC_CLUSTERING) : "Column is " + cell.column() + ", clustering = 
" + clustering;
+            cells.add(cell);
+        }
+
+        public Row build()
+        {
+            Row row = buildInternal();
+            reset();
+            return row;
+        }
+
+        protected abstract Row buildInternal();
+
+        protected Row buildNoCells()
+        {
+            assert cells.isEmpty();
+            int minDeletionTime = 
Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+            if (columnsWithComplexDeletion == 0)
+                return new ArrayBackedRow(clustering, columns, 
primaryKeyLivenessInfo, deletion, 0, NO_DATA, minDeletionTime);
+
+            ColumnData[] data = new ColumnData[columnsWithComplexDeletion];
+            int size = 0;
+            for (int i = 0; i < complexDeletions.length; i++)
+            {
+                DeletionTime complexDeletion = complexDeletions[i];
+                if (complexDeletion != null)
+                {
+                    assert !complexDeletion.isLive();
+                    data[size++] = new 
ComplexColumnData(columns.getComplex(i), ComplexColumnData.NO_CELLS, 
complexDeletion);
+                    minDeletionTime = Integer.MIN_VALUE;
+                }
+            }
+            return new ArrayBackedRow(clustering, columns, 
primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
+        }
+    }
+
+    public static class SortedBuilder extends AbstractBuilder
+    {
+        private int columnCount;
+
+        private ColumnDefinition column;
+
+        // The index of the last column for which we've called setColumn if 
complex.
+        private int complexColumnIndex;
+
+        // For complex column at index i of 'columns', we store at 
complexColumnCellsCount[i] its number of added cells.
+        private final int[] complexColumnCellsCount;
+
+        protected SortedBuilder(Columns columns)
+        {
+            super(columns);
+            this.complexColumnCellsCount = new 
int[columns.complexColumnCount()];
+            reset();
+        }
+
+        @Override
+        protected void reset()
+        {
+            super.reset();
+            this.column = null;
+            this.columnCount = 0;
+            this.complexColumnIndex = -1;
+            Arrays.fill(this.complexColumnCellsCount, 0);
+        }
+
+        public boolean isSorted()
+        {
+            return true;
+        }
+
+        private void setColumn(ColumnDefinition column)
+        {
+            int cmp = this.column == null ? -1 : this.column.compareTo(column);
+            assert cmp <= 0 : "current = " + this.column + ", new = " + column;
+            if (cmp != 0)
+            {
+                this.column = column;
+                ++columnCount;
+                if (column.isComplex())
+                    complexColumnIndex = columns.complexIdx(column, 
complexColumnIndex + 1);
+            }
+        }
+
+        @Override
+        public void addCell(Cell cell)
+        {
+            setColumn(cell.column());
+            super.addCell(cell);
+            if (column.isComplex())
+                complexColumnCellsCount[complexColumnIndex] += 1;
+        }
+
+        @Override
+        public void addComplexDeletion(ColumnDefinition column, DeletionTime 
complexDeletion)
+        {
+            if (complexDeletion.isLive())
+                return;
+
+            setColumn(column);
+            assert complexDeletions[complexColumnIndex] == null;
+            complexDeletions[complexColumnIndex] = complexDeletion;
+            ++columnsWithComplexDeletion;
+        }
+
+        protected Row buildInternal()
+        {
+            if (cells.isEmpty())
+                return buildNoCells();
+
+            int minDeletionTime = 
Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+
+            ColumnData[] data = new ColumnData[columnCount];
+            int complexIdx = 0;
+            int i = 0;
+            int size = 0;
+            while (i < cells.size())
+            {
+                Cell cell = cells.get(i);
+                ColumnDefinition column = cell.column();
+                if (column.isSimple())
+                {
+                    data[size++] = cell;
+                    minDeletionTime = Math.min(minDeletionTime, 
minDeletionTime(cell));
+                    ++i;
+                }
+                else
+                {
+                    while (columns.getComplex(complexIdx).compareTo(column) < 
0)
+                    {
+                        if (complexDeletions[complexIdx] != null)
+                        {
+                            data[size++] = new 
ComplexColumnData(columns.getComplex(complexIdx), ComplexColumnData.NO_CELLS, 
complexDeletions[complexIdx]);
+                            minDeletionTime = Integer.MIN_VALUE;
+                        }
+                        ++complexIdx;
+                    }
+
+                    DeletionTime complexDeletion = 
complexDeletions[complexIdx];
+                    if (complexDeletion != null)
+                        minDeletionTime = Integer.MIN_VALUE;
+                    int cellCount = complexColumnCellsCount[complexIdx];
+                    Cell[] complexCells = new Cell[cellCount];
+                    for (int j = 0; j < cellCount; j++)
+                    {
+                        Cell complexCell = cells.get(i + j);
+                        complexCells[j] = complexCell;
+                        minDeletionTime = Math.min(minDeletionTime, 
minDeletionTime(complexCell));
+                    }
+                    i += cellCount;
+
+                    data[size++] = new ComplexColumnData(column, complexCells, 
complexDeletion == null ? DeletionTime.LIVE : complexDeletion);
+                    ++complexIdx;
+                }
+            }
+            for (int j = complexIdx; j < complexDeletions.length; j++)
+            {
+                if (complexDeletions[j] != null)
+                {
+                    data[size++] = new 
ComplexColumnData(columns.getComplex(j), ComplexColumnData.NO_CELLS, 
complexDeletions[j]);
+                    minDeletionTime = Integer.MIN_VALUE;
+                }
+            }
+            assert size == data.length;
+            return new ArrayBackedRow(clustering, columns, 
primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
+        }
+    }
+
+    private static class UnsortedBuilder extends AbstractBuilder
+    {
+        private final int nowInSec;
+
+        private UnsortedBuilder(Columns columns, int nowInSec)
+        {
+            super(columns);
+            this.nowInSec = nowInSec;
+            reset();
+        }
+
+        public boolean isSorted()
+        {
+            return false;
+        }
+
+        public void addComplexDeletion(ColumnDefinition column, DeletionTime 
complexDeletion)
+        {
+            assert column.isComplex();
+            assert column.isStatic() == (clustering == 
Clustering.STATIC_CLUSTERING);
+
+            if (complexDeletion.isLive())
+                return;
+
+            int complexColumnIndex = columns.complexIdx(column, 0);
+
+            DeletionTime previous = complexDeletions[complexColumnIndex];
+            if (previous == null || complexDeletion.supersedes(previous))
+            {
+                complexDeletions[complexColumnIndex] = complexDeletion;
+                if (previous == null)
+                    ++columnsWithComplexDeletion;
+            }
+        }
+
+        protected Row buildInternal()
+        {
+            // First, the easy cases
+            if (cells.isEmpty())
+                return buildNoCells();
+
+            // Cells have been added in an unsorted way, so sort them first
+            Collections.sort(cells, Cell.comparator);
+
+            // We now need to
+            //  1) merge equal cells together
+            //  2) group the cells for a given complex column together, and 
include their potential complex deletion time.
+            //     And this without forgetting that some complex columns may 
have a complex deletion but not cells.
+
+            int addedColumns = countAddedColumns();
+            ColumnData[] data = new ColumnData[addedColumns];
+
+            int nextComplexWithDeletion = findNextComplexWithDeletion(0);
+            ColumnDefinition previousColumn = null;
+
+            int minDeletionTime = 
Math.min(minDeletionTime(primaryKeyLivenessInfo), minDeletionTime(deletion));
+
+            int i = 0;
+            int size = 0;
+            while (i < cells.size())
+            {
+                Cell cell = cells.get(i++);
+                ColumnDefinition column = cell.column();
+                if (column.isSimple())
+                {
+                    // Either it's a cell for the same column than our 
previous cell and we merge them together, or it's a new column
+                    if (previousColumn != null && 
previousColumn.compareTo(column) == 0)
+                        data[size - 1] = Cells.reconcile((Cell)data[size - 1], 
cell, nowInSec);
+                    else
+                        data[size++] = cell;
+                }
+                else
+                {
+                    // First, collect the complex deletion time for the column 
we got the first complex column of. We'll
+                    // also find if there is columns that sorts before but had 
only a complex deletion and add them.
+                    DeletionTime complexDeletion = DeletionTime.LIVE;
+                    while (nextComplexWithDeletion >= 0)
+                    {
+                        int cmp = 
column.compareTo(columns.getComplex(nextComplexWithDeletion));
+                        if (cmp < 0)
+                        {
+                            // This is after the column we're gonna add cell 
for. We'll deal with it later
+                            break;
+                        }
+                        else if (cmp > 0)
+                        {
+                            // We have a column that only has a complex 
deletion and no column. Add its data first
+                            data[size++] = new 
ComplexColumnData(columns.getComplex(nextComplexWithDeletion), 
ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]);
+                            minDeletionTime = Integer.MIN_VALUE;
+                            nextComplexWithDeletion = 
findNextComplexWithDeletion(nextComplexWithDeletion + 1);
+                        }
+                        else // cmp == 0
+                        {
+                            // This is the column we'll about to add cell for. 
Record the deletion time and break to the cell addition
+                            complexDeletion = 
complexDeletions[nextComplexWithDeletion];
+                            minDeletionTime = Integer.MIN_VALUE;
+                            nextComplexWithDeletion = 
findNextComplexWithDeletion(nextComplexWithDeletion + 1);
+                            break;
+                        }
+                    }
+
+                    // Find how many cells the complex column has (cellCount) 
and the index of the next cell that doesn't belong to it (nextColumnIdx).
+                    int nextColumnIdx = i; // i is on cell following the 
current one
+                    int cellCount = 1; // We have at least the current cell
+                    Cell previousCell = cell;
+                    while (nextColumnIdx < cells.size())
+                    {
+                        Cell newCell = cells.get(nextColumnIdx);
+                        if (column.compareTo(newCell.column()) != 0)
+                            break;
+
+                        ++nextColumnIdx;
+                        if 
(column.cellPathComparator().compare(previousCell.path(), newCell.path()) != 0)
+                            ++cellCount;
+                        previousCell = newCell;
+                    }
+                    Cell[] columnCells = new Cell[cellCount];
+                    int complexSize = 0;
+                    columnCells[complexSize++] = cell;
+                    previousCell = cell;
+                    for (int j = i; j < nextColumnIdx; j++)
+                    {
+                        Cell newCell = cells.get(j);
+                        // Either it's a cell for the same path than our 
previous cell and we merge them together, or it's a new path
+                        if 
(column.cellPathComparator().compare(previousCell.path(), newCell.path()) == 0)
+                            columnCells[complexSize - 1] = 
Cells.reconcile(previousCell, newCell, nowInSec);
+                        else
+                            columnCells[complexSize++] = newCell;
+                        previousCell = newCell;
+                    }
+                    i = nextColumnIdx;
+
+                    data[size++] = new ComplexColumnData(column, columnCells, 
complexDeletion);
+                }
+                previousColumn = column;
+            }
+            // We may still have some complex columns with only a complex 
deletion
+            while (nextComplexWithDeletion >= 0)
+            {
+                data[size++] = new 
ComplexColumnData(columns.getComplex(nextComplexWithDeletion), 
ComplexColumnData.NO_CELLS, complexDeletions[nextComplexWithDeletion]);
+                nextComplexWithDeletion = 
findNextComplexWithDeletion(nextComplexWithDeletion + 1);
+                minDeletionTime = Integer.MIN_VALUE;
+            }
+            assert size == addedColumns;
+
+            // Reconciliation made it harder to compute minDeletionTime for 
cells in the loop above, so just do it now if we need to.
+            if (minDeletionTime != Integer.MIN_VALUE)
+            {
+                for (ColumnData cd : data)
+                    minDeletionTime = Math.min(minDeletionTime, 
minDeletionTime(cd));
+            }
+
+            return new ArrayBackedRow(clustering, columns, 
primaryKeyLivenessInfo, deletion, size, data, minDeletionTime);
+        }
+
+        private int findNextComplexWithDeletion(int from)
+        {
+            for (int i = from; i < complexDeletions.length; i++)
+            {
+                if (complexDeletions[i] != null)
+                    return i;
+            }
+            return -1;
+        }
+
+        // Should only be called once the cells have been sorted
+        private int countAddedColumns()
+        {
+            int columnCount = 0;
+            int nextComplexWithDeletion = findNextComplexWithDeletion(0);
+            ColumnDefinition previousColumn = null;
+            for (Cell cell : cells)
+            {
+                if (previousColumn != null && 
previousColumn.compareTo(cell.column()) == 0)
+                    continue;
+
+                ++columnCount;
+                previousColumn = cell.column();
+
+                // We know that simple columns sort before the complex ones, 
so don't bother with the column having complex deletion
+                // until we've reached the cells of complex columns.
+                if (!previousColumn.isComplex())
+                    continue;
+
+                while (nextComplexWithDeletion >= 0)
+                {
+                    // Check how the column we just counted compared to the 
next with complex deletion
+                    int cmp = 
previousColumn.compareTo(columns.getComplex(nextComplexWithDeletion));
+                    if (cmp < 0)
+                    {
+                        // it's before, we'll handle 
nextColumnWithComplexDeletion later
+                        break;
+                    }
+                    else if (cmp > 0)
+                    {
+                        // it's after. nextColumnWithComplexDeletion has no 
cell but we should count it
+                        ++columnCount;
+                        nextComplexWithDeletion = 
findNextComplexWithDeletion(nextComplexWithDeletion + 1);
+                    }
+                    else // cmp == 0
+                    {
+                        // it's the column we just counted. Ignore it and we 
know we're good with nextComplexWithDeletion for this loop
+                        nextComplexWithDeletion = 
findNextComplexWithDeletion(nextComplexWithDeletion + 1);
+                        break;
+                    }
+                }
+            }
+            // Anything remaining in complexDeletionColumns are complex 
columns with no cells but some complex deletion
+            while (nextComplexWithDeletion >= 0)
+            {
+                ++columnCount;
+                nextComplexWithDeletion = 
findNextComplexWithDeletion(nextComplexWithDeletion + 1);
+            }
+            return columnCount;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/BufferCell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java 
b/src/java/org/apache/cassandra/db/rows/BufferCell.java
new file mode 100644
index 0000000..c339092
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.marshal.ByteType;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
+
+public class BufferCell extends AbstractCell
+{
+    private static final long EMPTY_SIZE = ObjectSizes.measure(new 
BufferCell(ColumnDefinition.regularDef("", "", "", ByteType.instance), 0L, 0, 
0, ByteBufferUtil.EMPTY_BYTE_BUFFER, null));
+
+    private final ColumnDefinition column;
+
+    private final long timestamp;
+    private final int ttl;
+    private final int localDeletionTime;
+
+    private final ByteBuffer value;
+    private final CellPath path;
+
+    public BufferCell(ColumnDefinition column, long timestamp, int ttl, int 
localDeletionTime, ByteBuffer value, CellPath path)
+    {
+        assert column.isComplex() == (path != null);
+        this.column = column;
+        this.timestamp = timestamp;
+        this.ttl = ttl;
+        this.localDeletionTime = localDeletionTime;
+        this.value = value;
+        this.path = path;
+    }
+
+    public static BufferCell live(CFMetaData metadata, ColumnDefinition 
column, long timestamp, ByteBuffer value)
+    {
+        return live(metadata, column, timestamp, value, null);
+    }
+
+    public static BufferCell live(CFMetaData metadata, ColumnDefinition 
column, long timestamp, ByteBuffer value, CellPath path)
+    {
+        if (metadata.getDefaultTimeToLive() != NO_TTL)
+            return expiring(column, timestamp, 
metadata.getDefaultTimeToLive(), FBUtilities.nowInSeconds(), value, path);
+
+        return new BufferCell(column, timestamp, NO_TTL, NO_DELETION_TIME, 
value, path);
+    }
+
+    public static BufferCell expiring(ColumnDefinition column, long timestamp, 
int ttl, int nowInSec, ByteBuffer value)
+    {
+        return expiring(column, timestamp, ttl, nowInSec, value, null);
+    }
+
+    public static BufferCell expiring(ColumnDefinition column, long timestamp, 
int ttl, int nowInSec, ByteBuffer value, CellPath path)
+    {
+        assert ttl != NO_TTL;
+        return new BufferCell(column, timestamp, ttl, nowInSec + ttl, value, 
path);
+    }
+
+    public static BufferCell tombstone(ColumnDefinition column, long 
timestamp, int nowInSec)
+    {
+        return tombstone(column, timestamp, nowInSec, null);
+    }
+
+    public static BufferCell tombstone(ColumnDefinition column, long 
timestamp, int nowInSec, CellPath path)
+    {
+        return new BufferCell(column, timestamp, NO_TTL, nowInSec, 
ByteBufferUtil.EMPTY_BYTE_BUFFER, path);
+    }
+
+    public ColumnDefinition column()
+    {
+        return column;
+    }
+
+    public boolean isCounterCell()
+    {
+        return !isTombstone() && column.cellValueType().isCounter();
+    }
+
+    public boolean isLive(int nowInSec)
+    {
+        return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && 
nowInSec < localDeletionTime);
+    }
+
+    public boolean isTombstone()
+    {
+        return localDeletionTime != NO_DELETION_TIME && ttl == NO_TTL;
+    }
+
+    public boolean isExpiring()
+    {
+        return ttl != NO_TTL;
+    }
+
+    public long timestamp()
+    {
+        return timestamp;
+    }
+
+    public int ttl()
+    {
+        return ttl;
+    }
+
+    public int localDeletionTime()
+    {
+        return localDeletionTime;
+    }
+
+    public ByteBuffer value()
+    {
+        return value;
+    }
+
+    public CellPath path()
+    {
+        return path;
+    }
+
+    public Cell withUpdatedValue(ByteBuffer newValue)
+    {
+        return new BufferCell(column, timestamp, ttl, localDeletionTime, 
newValue, path);
+    }
+
+    public Cell copy(AbstractAllocator allocator)
+    {
+        if (!value.hasRemaining())
+            return this;
+
+        return new BufferCell(column, timestamp, ttl, localDeletionTime, 
allocator.clone(value), path == null ? null : path.copy(allocator));
+    }
+
+    public Cell markCounterLocalToBeCleared()
+    {
+        if (!isCounterCell())
+            return this;
+
+        ByteBuffer marked = 
CounterContext.instance().markLocalToBeCleared(value());
+        return marked == value() ? this : new BufferCell(column, timestamp, 
ttl, localDeletionTime, marked, path);
+    }
+
+    public Cell purge(DeletionPurger purger, int nowInSec)
+    {
+        if (!isLive(nowInSec))
+        {
+            if (purger.shouldPurge(timestamp, localDeletionTime))
+                return null;
+
+            // We slightly hijack purging to convert expired but not purgeable 
columns to tombstones. The reason we do that is
+            // that once a column has expired it is equivalent to a tombstone 
but actually using a tombstone is more compact since
+            // we don't keep the column value. The reason we do it here is 
that 1) it's somewhat related to dealing with tombstones
+            // so hopefully not too surprising and 2) we want to this and 
purging at the same places, so it's simpler/more efficient
+            // to do both here.
+            if (isExpiring())
+            {
+                // Note that as long as the expiring column and the tombstone 
put together live longer than GC grace seconds,
+                // we'll fulfil our responsibility to repair. See discussion at
+                // 
http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html
+                return BufferCell.tombstone(column, timestamp, 
localDeletionTime - ttl);
+            }
+        }
+        return this;
+    }
+
+    public Cell updateAllTimestamp(long newTimestamp)
+    {
+        return new BufferCell(column, isTombstone() ? newTimestamp - 1 : 
newTimestamp, ttl, localDeletionTime, value, path);
+    }
+
+    public int dataSize()
+    {
+        return TypeSizes.sizeof(timestamp)
+             + TypeSizes.sizeof(ttl)
+             + TypeSizes.sizeof(localDeletionTime)
+             + value.remaining()
+             + (path == null ? 0 : path.dataSize());
+    }
+
+    public long unsharedHeapSizeExcludingData()
+    {
+        return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value) + (path 
== null ? 0 : path.unsharedHeapSizeExcludingData());
+    }
+
+    /**
+     * The serialization format for cell is:
+     *     [ flags ][ timestamp ][ deletion time ][    ttl    ][ path size ][ 
path ][ value size ][ value ]
+     *     [   1b  ][ 8b (vint) ][   4b (vint)   ][ 4b (vint) ][ 4b (vint) ][  
arb ][  4b (vint) ][  arb  ]
+     *
+     * where not all field are always present (in fact, only the [ flags ] are 
guaranteed to be present). The fields have the following
+     * meaning:
+     *   - [ flags ] is the cell flags. It is a byte for which each bit 
represents a flag whose meaning is explained below (*_MASK constants)
+     *   - [ timestamp ] is the cell timestamp. Present unless the cell has 
the USE_TIMESTAMP_MASK.
+     *   - [ deletion time]: the local deletion time for the cell. Present if 
either the cell is deleted (IS_DELETED_MASK)
+     *       or it is expiring (IS_EXPIRING_MASK) but doesn't have the 
USE_ROW_TTL_MASK.
+     *   - [ ttl ]: the ttl for the cell. Present if the row is expiring 
(IS_EXPIRING_MASK) but doesn't have the
+     *       USE_ROW_TTL_MASK.
+     *   - [ value size ] is the size of the [ value ] field. It's present 
unless either the cell has the HAS_EMPTY_VALUE_MASK, or the value
+     *       for columns of this type have a fixed length.
+     *   - [ path size ] is the size of the [ path ] field. Present iff this 
is the cell of a complex column.
+     *   - [ value ]: the cell value, unless it has the HAS_EMPTY_VALUE_MASK.
+     *   - [ path ]: the cell path if the column this is a cell of is complex.
+     */
+    static class Serializer implements Cell.Serializer
+    {
+        private final static int PRESENCE_MASK               = 0x01; // Marks 
the actual presence of a cell. This is used only when serialized on-disk and
+                                                                     // 
on-wire (i.e. an actual ByteBufferBackedCell instance cannot have this flag 
set).
+        private final static int IS_DELETED_MASK             = 0x02; // 
Whether the cell is a tombstone or not.
+        private final static int IS_EXPIRING_MASK            = 0x04; // 
Whether the cell is expiring.
+        private final static int HAS_EMPTY_VALUE_MASK        = 0x08; // Wether 
the cell has an empty value. This will be the case for tombstone in particular.
+        private final static int USE_ROW_TIMESTAMP_MASK      = 0x10; // Wether 
the cell has the same timestamp than the row this is a cell of.
+        private final static int USE_ROW_TTL_MASK            = 0x20; // Wether 
the cell has the same ttl than the row this is a cell of.
+
+        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo 
rowLiveness, SerializationHeader header) throws IOException
+        {
+            if (cell == null)
+            {
+                out.writeByte((byte)0);
+                return;
+            }
+
+            boolean hasValue = cell.value().hasRemaining();
+            boolean isDeleted = cell.isTombstone();
+            boolean isExpiring = cell.isExpiring();
+            boolean useRowTimestamp = !rowLiveness.isEmpty() && 
cell.timestamp() == rowLiveness.timestamp();
+            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && 
cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == 
rowLiveness.localExpirationTime();
+            int flags = PRESENCE_MASK;
+            if (!hasValue)
+                flags |= HAS_EMPTY_VALUE_MASK;
+
+            if (isDeleted)
+                flags |= IS_DELETED_MASK;
+            else if (isExpiring)
+                flags |= IS_EXPIRING_MASK;
+
+            if (useRowTimestamp)
+                flags |= USE_ROW_TIMESTAMP_MASK;
+            if (useRowTTL)
+                flags |= USE_ROW_TTL_MASK;
+
+            out.writeByte((byte)flags);
+
+            if (!useRowTimestamp)
+                out.writeVInt(header.encodeTimestamp(cell.timestamp()));
+
+            if ((isDeleted || isExpiring) && !useRowTTL)
+                
out.writeVInt(header.encodeDeletionTime(cell.localDeletionTime()));
+            if (isExpiring && !useRowTTL)
+                out.writeVInt(header.encodeTTL(cell.ttl()));
+
+            if (cell.column().isComplex())
+                cell.column().cellPathSerializer().serialize(cell.path(), out);
+
+            if (hasValue)
+                header.getType(cell.column()).writeValue(cell.value(), out);
+        }
+
+        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, 
ColumnDefinition column, SerializationHeader header, SerializationHelper 
helper) throws IOException
+        {
+            int flags = in.readUnsignedByte();
+            if ((flags & PRESENCE_MASK) == 0)
+                return null;
+
+            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+            long timestamp = useRowTimestamp ? rowLiveness.timestamp() : 
header.decodeTimestamp(in.readVInt());
+
+            int localDeletionTime = useRowTTL
+                                  ? rowLiveness.localExpirationTime()
+                                  : (isDeleted || isExpiring ? 
header.decodeDeletionTime((int)in.readVInt()) : NO_DELETION_TIME);
+
+            int ttl = useRowTTL
+                    ? rowLiveness.ttl()
+                    : (isExpiring ? header.decodeTTL((int)in.readVInt()) : 
NO_TTL);
+
+            CellPath path = column.isComplex()
+                          ? column.cellPathSerializer().deserialize(in)
+                          : null;
+
+            boolean isCounter = localDeletionTime == NO_DELETION_TIME && 
column.type.isCounter();
+
+            ByteBuffer value = ByteBufferUtil.EMPTY_BYTE_BUFFER;
+            if (hasValue)
+            {
+                if (helper.canSkipValue(column) || (path != null && 
helper.canSkipValue(path)))
+                {
+                    header.getType(column).skipValue(in);
+                }
+                else
+                {
+                    value = header.getType(column).readValue(in);
+                    if (isCounter)
+                        value = helper.maybeClearCounterValue(value);
+                }
+            }
+
+            return new BufferCell(column, timestamp, ttl, localDeletionTime, 
value, path);
+        }
+
+        public long serializedSize(Cell cell, LivenessInfo rowLiveness, 
SerializationHeader header)
+        {
+            long size = 1; // flags
+
+            if (cell == null)
+                return size;
+
+            boolean hasValue = cell.value().hasRemaining();
+            boolean isDeleted = cell.isTombstone();
+            boolean isExpiring = cell.isExpiring();
+            boolean useRowTimestamp = !rowLiveness.isEmpty() && 
cell.timestamp() == rowLiveness.timestamp();
+            boolean useRowTTL = isExpiring && rowLiveness.isExpiring() && 
cell.ttl() == rowLiveness.ttl() && cell.localDeletionTime() == 
rowLiveness.localExpirationTime();
+
+            if (!useRowTimestamp)
+                size += 
TypeSizes.sizeofVInt(header.encodeTimestamp(cell.timestamp()));
+
+            if ((isDeleted || isExpiring) && !useRowTTL)
+                size += 
TypeSizes.sizeofVInt(header.encodeDeletionTime(cell.localDeletionTime()));
+            if (isExpiring && !useRowTTL)
+                size += TypeSizes.sizeofVInt(header.encodeTTL(cell.ttl()));
+
+            if (cell.column().isComplex())
+                size += 
cell.column().cellPathSerializer().serializedSize(cell.path());
+
+            if (hasValue)
+                size += 
header.getType(cell.column()).writtenLength(cell.value());
+
+            return size;
+        }
+
+        // Returns if the skipped cell was an actual cell (i.e. it had its 
presence flag).
+        public boolean skip(DataInputPlus in, ColumnDefinition column, 
SerializationHeader header) throws IOException
+        {
+            int flags = in.readUnsignedByte();
+            if ((flags & PRESENCE_MASK) == 0)
+                return false;
+
+            boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
+            boolean isDeleted = (flags & IS_DELETED_MASK) != 0;
+            boolean isExpiring = (flags & IS_EXPIRING_MASK) != 0;
+            boolean useRowTimestamp = (flags & USE_ROW_TIMESTAMP_MASK) != 0;
+            boolean useRowTTL = (flags & USE_ROW_TTL_MASK) != 0;
+
+            if (!useRowTimestamp)
+                in.readVInt();
+
+            if (!useRowTTL && (isDeleted || isExpiring))
+                in.readVInt();
+
+            if (!useRowTTL && isExpiring)
+                in.readVInt();
+
+            if (column.isComplex())
+                column.cellPathSerializer().skip(in);
+
+            if (hasValue)
+                header.getType(column).skipValue(in);
+
+            return true;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Cell.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 80bf901..ccb9708 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -17,40 +17,41 @@
  */
 package org.apache.cassandra.db.rows;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.security.MessageDigest;
+import java.util.Comparator;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.Aliasable;
-import org.apache.cassandra.db.LivenessInfo;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
- * A cell holds a single "simple" value for a given column, as well as 
"liveness"
- * informations regarding that value.
+ * A cell is our atomic unit for a single value of a single column.
  * <p>
- * The is 2 kind of columns: simple ones and complex ones.
- * Simple columns have only a single associated cell, while complex ones,
- * the one corresponding to non-frozen collections and UDTs, are comprised
- * of multiple cells. For complex columns, the different cells are 
distinguished
- * by their cell path.
- * <p>
- * We can also distinguish different kind of cells based on the property of 
their
- * {@link #livenessInfo}:
- *  1) "Normal" cells: their liveness info has no ttl and no deletion time.
- *  2) Expiring cells: their liveness info has both a ttl and a deletion time 
(the latter
- *    deciding when the cell is actually expired).
- *  3) Tombstones/deleted cells: their liveness info has a deletion time but 
no ttl. Those
- *     cells don't really have a value but their {@link #value} method return 
an empty
- *     buffer by convention.
+ * A cell always holds at least a timestamp that gives us how the cell 
reconcile. We then
+ * have 3 main types of cells:
+ *   1) live regular cells: those will also have a value and, if for a complex 
column, a path.
+ *   2) expiring cells: on top of regular cells, those have a ttl and a local 
deletion time (when they are expired).
+ *   3) tombstone cells: those won't have value, but they have a local 
deletion time (when the tombstone was created).
  */
-public interface Cell extends Aliasable<Cell>
+public interface Cell extends ColumnData
 {
-    /**
-     * The column this cell belongs to.
-     *
-     * @return the column this cell belongs to.
-     */
-    public ColumnDefinition column();
+    public static final int NO_TTL = 0;
+    public static final int NO_DELETION_TIME = Integer.MAX_VALUE;
+
+    public final Comparator<Cell> comparator = (c1, c2) ->
+    {
+        int cmp = c1.column().compareTo(c2.column());
+        if (cmp != 0)
+            return cmp;
+
+        Comparator<CellPath> pathComparator = c1.column().cellPathComparator();
+        return pathComparator == null ? 0 : pathComparator.compare(c1.path(), 
c2.path());
+    };
+
+    public final Serializer serializer = new BufferCell.Serializer();
 
     /**
      * Whether the cell is a counter cell or not.
@@ -67,12 +68,26 @@ public interface Cell extends Aliasable<Cell>
     public ByteBuffer value();
 
     /**
-     * The liveness info of the cell, that is its timestamp and whether it is
-     * expiring, deleted or none of the above.
+     * The cell timestamp.
+     * <p>
+     * @return the cell timestamp.
+     */
+    public long timestamp();
+
+    /**
+     * The cell ttl.
+     *
+     * @return the cell ttl, or {@code NO_TTL} if the cell isn't an expiring 
one.
+     */
+    public int ttl();
+
+    /**
+     * The cell local deletion time.
      *
-     * @return the cell {@link LivenessInfo}.
+     * @return the cell local deletion time, or {@code NO_DELETION_TIME} if 
the cell is neither
+     * a tombstone nor an expiring one.
      */
-    public LivenessInfo livenessInfo();
+    public int localDeletionTime();
 
     /**
      * Whether the cell is a tombstone or not.
@@ -109,34 +124,27 @@ public interface Cell extends Aliasable<Cell>
      */
     public CellPath path();
 
-    /**
-     * Write the cell to the provided writer.
-     *
-     * @param writer the row writer to write the cell to.
-     */
-    public void writeTo(Row.Writer writer);
+    public Cell withUpdatedValue(ByteBuffer newValue);
 
-    /**
-     * Adds the cell to the provided digest.
-     *
-     * @param digest the {@code MessageDigest} to add the cell to.
-     */
-    public void digest(MessageDigest digest);
+    public Cell copy(AbstractAllocator allocator);
 
-    /**
-     * Validate the cell value.
-     *
-     * @throws MarshalException if the cell value is not a valid value for
-     * the column type this is a cell of.
-     */
-    public void validate();
+    @Override
+    // Overrides super type to provide a more precise return type.
+    public Cell markCounterLocalToBeCleared();
 
-    /**
-     * The size of the data hold by this cell.
-     *
-     * This is mainly used to verify if batches goes over a given size.
-     *
-     * @return the size used by the data of this cell.
-     */
-    public int dataSize();
+    @Override
+    // Overrides super type to provide a more precise return type.
+    public Cell purge(DeletionPurger purger, int nowInSec);
+
+    public interface Serializer
+    {
+        public void serialize(Cell cell, DataOutputPlus out, LivenessInfo 
rowLiveness, SerializationHeader header) throws IOException;
+
+        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, 
ColumnDefinition column, SerializationHeader header, SerializationHelper 
helper) throws IOException;
+
+        public long serializedSize(Cell cell, LivenessInfo rowLiveness, 
SerializationHeader header);
+
+        // Returns if the skipped cell was an actual cell (i.e. it had its 
presence flag).
+        public boolean skip(DataInputPlus in, ColumnDefinition column, 
SerializationHeader header) throws IOException;
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/CellData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/CellData.java 
b/src/java/org/apache/cassandra/db/rows/CellData.java
deleted file mode 100644
index 29eac01..0000000
--- a/src/java/org/apache/cassandra/db/rows/CellData.java
+++ /dev/null
@@ -1,275 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.utils.ObjectSizes;
-
-/**
- * Contains (non-counter) cell data for one or more rows.
- */
-class CellData
-{
-    private boolean isCounter;
-
-    private ByteBuffer[] values;
-    private final LivenessInfoArray livenessInfos;
-
-    CellData(int initialCellCapacity, boolean isCounter)
-    {
-        this.isCounter = isCounter;
-        this.values = new ByteBuffer[initialCellCapacity];
-        this.livenessInfos = new LivenessInfoArray(initialCellCapacity);
-    }
-
-    public void setCell(int idx, ByteBuffer value, LivenessInfo info)
-    {
-        ensureCapacity(idx);
-        values[idx] = value;
-        livenessInfos.set(idx, info);
-    }
-
-    public boolean hasCell(int idx)
-    {
-        return idx < values.length && values[idx] != null;
-    }
-
-    public ByteBuffer value(int idx)
-    {
-        return values[idx];
-    }
-
-    public void setValue(int idx, ByteBuffer value)
-    {
-        values[idx] = value;
-    }
-
-    private void ensureCapacity(int idxToSet)
-    {
-        int originalCapacity = values.length;
-        if (idxToSet < originalCapacity)
-            return;
-
-        int newCapacity = RowDataBlock.computeNewCapacity(originalCapacity, 
idxToSet);
-
-        values = Arrays.copyOf(values, newCapacity);
-        livenessInfos.resize(newCapacity);
-    }
-
-    // Swap cell i and j
-    public void swapCell(int i, int j)
-    {
-        ensureCapacity(Math.max(i, j));
-
-        ByteBuffer value = values[j];
-        values[j] = values[i];
-        values[i] = value;
-
-        livenessInfos.swap(i, j);
-    }
-
-    // Merge cell i into j
-    public void mergeCell(int i, int j, int nowInSec)
-    {
-        if (isCounter)
-            mergeCounterCell(this, i, this, j, this, j, nowInSec);
-        else
-            mergeRegularCell(this, i, this, j, this, j, nowInSec);
-    }
-
-    private static boolean handleNoCellCase(CellData d1, int i1, CellData d2, 
int i2, CellData merged, int iMerged)
-    {
-        if (!d1.hasCell(i1))
-        {
-            if (d2.hasCell(i2))
-                d2.moveCell(i2, merged, iMerged);
-            return true;
-        }
-        if (!d2.hasCell(i2))
-        {
-            d1.moveCell(i1, merged, iMerged);
-            return true;
-        }
-        return false;
-    }
-
-    public static void mergeRegularCell(CellData d1, int i1, CellData d2, int 
i2, CellData merged, int iMerged, int nowInSec)
-    {
-        if (handleNoCellCase(d1, i1, d2, i2, merged, iMerged))
-            return;
-
-        Conflicts.Resolution res = 
Conflicts.resolveRegular(d1.livenessInfos.timestamp(i1),
-                                                            
d1.livenessInfos.isLive(i1, nowInSec),
-                                                            
d1.livenessInfos.localDeletionTime(i1),
-                                                            d1.values[i1],
-                                                            
d2.livenessInfos.timestamp(i2),
-                                                            
d2.livenessInfos.isLive(i2, nowInSec),
-                                                            
d2.livenessInfos.localDeletionTime(i2),
-                                                            d2.values[i2]);
-
-        assert res != Conflicts.Resolution.MERGE;
-        if (res == Conflicts.Resolution.LEFT_WINS)
-            d1.moveCell(i1, merged, iMerged);
-        else
-            d2.moveCell(i2, merged, iMerged);
-    }
-
-    public static void mergeCounterCell(CellData d1, int i1, CellData d2, int 
i2, CellData merged, int iMerged, int nowInSec)
-    {
-        if (handleNoCellCase(d1, i1, d2, i2, merged, iMerged))
-            return;
-
-        Conflicts.Resolution res = 
Conflicts.resolveCounter(d1.livenessInfos.timestamp(i1),
-                                                            
d1.livenessInfos.isLive(i1, nowInSec),
-                                                            d1.values[i1],
-                                                            
d2.livenessInfos.timestamp(i2),
-                                                            
d2.livenessInfos.isLive(i2, nowInSec),
-                                                            d2.values[i2]);
-
-        switch (res)
-        {
-            case LEFT_WINS:
-                d1.moveCell(i1, merged, iMerged);
-                break;
-            case RIGHT_WINS:
-                d2.moveCell(i2, merged, iMerged);
-                break;
-            default:
-                merged.values[iMerged] = 
Conflicts.mergeCounterValues(d1.values[i1], d2.values[i2]);
-                if (d1.livenessInfos.timestamp(i1) > 
d2.livenessInfos.timestamp(i2))
-                    merged.livenessInfos.set(iMerged, 
d1.livenessInfos.timestamp(i1), d1.livenessInfos.ttl(i1), 
d1.livenessInfos.localDeletionTime(i1));
-                else
-                    merged.livenessInfos.set(iMerged, 
d2.livenessInfos.timestamp(i2), d2.livenessInfos.ttl(i2), 
d2.livenessInfos.localDeletionTime(i2));
-                break;
-        }
-    }
-
-    // Move cell i into j
-    public void moveCell(int i, int j)
-    {
-        moveCell(i, this, j);
-    }
-
-    public void moveCell(int i, CellData target, int j)
-    {
-        if (!hasCell(i) || (target == this && i == j))
-            return;
-
-        target.ensureCapacity(j);
-
-        target.values[j] = values[i];
-        target.livenessInfos.set(j, livenessInfos.timestamp(i),
-                                    livenessInfos.ttl(i),
-                                    livenessInfos.localDeletionTime(i));
-    }
-
-    public int dataSize()
-    {
-        int size = livenessInfos.dataSize();
-        for (int i = 0; i < values.length; i++)
-            if (values[i] != null)
-                size += values[i].remaining();
-        return size;
-    }
-
-    public void clear()
-    {
-        Arrays.fill(values, null);
-        livenessInfos.clear();
-    }
-
-    public long unsharedHeapSizeExcludingData()
-    {
-        return ObjectSizes.sizeOnHeapExcludingData(values)
-             + livenessInfos.unsharedHeapSize();
-    }
-
-    @Override
-    public String toString()
-    {
-        StringBuilder sb = new StringBuilder();
-        sb.append("CellData(size=").append(values.length);
-        if (isCounter)
-            sb.append(", counter");
-        sb.append("){");
-        LivenessInfoArray.Cursor cursor = LivenessInfoArray.newCursor();
-        for (int i = 0; i < values.length; i++)
-        {
-            if (values[i] == null)
-            {
-                sb.append("[null]");
-                continue;
-            }
-            sb.append("[len(v)=").append(values[i].remaining());
-            sb.append(", info=").append(cursor.setTo(livenessInfos, i));
-            sb.append("]");
-        }
-        return sb.append("}").toString();
-    }
-
-    static class ReusableCell extends AbstractCell
-    {
-        private final LivenessInfoArray.Cursor cursor = 
LivenessInfoArray.newCursor();
-
-        private CellData data;
-        private ColumnDefinition column;
-        protected int idx;
-
-        ReusableCell setTo(CellData data, ColumnDefinition column, int idx)
-        {
-            if (!data.hasCell(idx))
-                return null;
-
-            this.data = data;
-            this.column = column;
-            this.idx = idx;
-
-            cursor.setTo(data.livenessInfos, idx);
-            return this;
-        }
-
-        public ColumnDefinition column()
-        {
-            return column;
-        }
-
-        public boolean isCounterCell()
-        {
-            return data.isCounter && !cursor.hasLocalDeletionTime();
-        }
-
-        public ByteBuffer value()
-        {
-            return data.value(idx);
-        }
-
-        public LivenessInfo livenessInfo()
-        {
-            return cursor;
-        }
-
-        public CellPath path()
-        {
-            return null;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/CellPath.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/CellPath.java 
b/src/java/org/apache/cassandra/db/rows/CellPath.java
index 40d525c..68e3c2b 100644
--- a/src/java/org/apache/cassandra/db/rows/CellPath.java
+++ b/src/java/org/apache/cassandra/db/rows/CellPath.java
@@ -17,13 +17,16 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.io.DataInput;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Objects;
 
 import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.memory.AbstractAllocator;
 
 /**
  * A path for a cell belonging to a complex column type (non-frozen collection 
or UDT).
@@ -40,7 +43,7 @@ public abstract class CellPath
     public static CellPath create(ByteBuffer value)
     {
         assert value != null;
-        return new SimpleCellPath(new ByteBuffer[]{ value });
+        return new CollectionCellPath(value);
     }
 
     public int dataSize()
@@ -57,6 +60,10 @@ public abstract class CellPath
             digest.update(get(i).duplicate());
     }
 
+    public abstract CellPath copy(AbstractAllocator allocator);
+
+    public abstract long unsharedHeapSizeExcludingData();
+
     @Override
     public final int hashCode()
     {
@@ -86,28 +93,41 @@ public abstract class CellPath
     public interface Serializer
     {
         public void serialize(CellPath path, DataOutputPlus out) throws 
IOException;
-        public CellPath deserialize(DataInput in) throws IOException;
+        public CellPath deserialize(DataInputPlus in) throws IOException;
         public long serializedSize(CellPath path);
-        public void skip(DataInput in) throws IOException;
+        public void skip(DataInputPlus in) throws IOException;
     }
 
-    static class SimpleCellPath extends CellPath
+    private static class CollectionCellPath extends CellPath
     {
-        protected final ByteBuffer[] values;
+        private static final long EMPTY_SIZE = ObjectSizes.measure(new 
CollectionCellPath(ByteBufferUtil.EMPTY_BYTE_BUFFER));
 
-        public SimpleCellPath(ByteBuffer[] values)
+        protected final ByteBuffer value;
+
+        private CollectionCellPath(ByteBuffer value)
         {
-            this.values = values;
+            this.value = value;
         }
 
         public int size()
         {
-            return values.length;
+            return 1;
         }
 
         public ByteBuffer get(int i)
         {
-            return values[i];
+            assert i == 0;
+            return value;
+        }
+
+        public CellPath copy(AbstractAllocator allocator)
+        {
+            return new CollectionCellPath(allocator.clone(value));
+        }
+
+        public long unsharedHeapSizeExcludingData()
+        {
+            return EMPTY_SIZE + ObjectSizes.sizeOnHeapExcludingData(value);
         }
     }
 
@@ -122,5 +142,15 @@ public abstract class CellPath
         {
             throw new UnsupportedOperationException();
         }
+
+        public CellPath copy(AbstractAllocator allocator)
+        {
+            return this;
+        }
+
+        public long unsharedHeapSizeExcludingData()
+        {
+            return 0;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Cells.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Cells.java 
b/src/java/org/apache/cassandra/db/rows/Cells.java
index 1e329e5..080d640 100644
--- a/src/java/org/apache/cassandra/db/rows/Cells.java
+++ b/src/java/org/apache/cassandra/db/rows/Cells.java
@@ -23,9 +23,8 @@ import java.util.Iterator;
 
 import org.apache.cassandra.config.ColumnDefinition;
 import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.db.index.SecondaryIndexManager;
-import org.apache.cassandra.utils.ByteBufferUtil;
 
 /**
  * Static methods to work on cells.
@@ -35,54 +34,17 @@ public abstract class Cells
     private Cells() {}
 
     /**
-     * Writes a tombstone cell to the provided writer.
+     * Collect statistics ont a given cell.
      *
-     * @param writer the {@code Row.Writer} to write the tombstone to.
-     * @param column the column for the tombstone.
-     * @param timestamp the timestamp for the tombstone.
-     * @param localDeletionTime the local deletion time (in seconds) for the 
tombstone.
+     * @param cell the cell for which to collect stats.
+     * @param collector the stats collector.
      */
-    public static void writeTombstone(Row.Writer writer, ColumnDefinition 
column, long timestamp, int localDeletionTime)
+    public static void collectStats(Cell cell, PartitionStatisticsCollector 
collector)
     {
-        writer.writeCell(column, false, ByteBufferUtil.EMPTY_BYTE_BUFFER, 
SimpleLivenessInfo.forDeletion(timestamp, localDeletionTime), null);
-    }
-
-    /**
-     * Computes the difference between a cell and the result of merging this
-     * cell to other cells.
-     * <p>
-     * This method is used when cells from multiple sources are merged and we 
want to
-     * find for a given source if it was up to date for that cell, and if not, 
what
-     * should be sent to the source to repair it.
-     *
-     * @param merged the cell that is the result of merging multiple source.
-     * @param cell the cell from one of the source that has been merged to yied
-     * {@code merged}.
-     * @return {@code null} if the source having {@code cell} is up-to-date 
for that
-     * cell, or a cell that applied to the source will "repair" said source 
otherwise.
-     */
-    public static Cell diff(Cell merged, Cell cell)
-    {
-        // Note that it's enough to check if merged is a counterCell. If it 
isn't and
-        // cell is one, it means that merged is a tombstone with a greater 
timestamp
-        // than cell, because that's the only case where reconciling a counter 
with
-        // a tombstone don't yield a counter. If that's the case, the normal 
path will
-        // return what it should.
-        if (merged.isCounterCell())
-        {
-            if (merged.livenessInfo().supersedes(cell.livenessInfo()))
-                return merged;
+        collector.update(cell);
 
-            // Reconciliation never returns something with a timestamp 
strictly lower than its operand. This
-            // means we're in the case where merged.timestamp() == 
cell.timestamp(). As 1) tombstones
-            // always win over counters (CASSANDRA-7346) and 2) merged is a 
counter, it follows that cell
-            // can't be a tombstone or merged would be one too.
-            assert !cell.isTombstone();
-
-            CounterContext.Relationship rel = 
CounterContext.instance().diff(merged.value(), cell.value());
-            return (rel == CounterContext.Relationship.GREATER_THAN || rel == 
CounterContext.Relationship.DISJOINT) ? merged : null;
-        }
-        return merged.livenessInfo().supersedes(cell.livenessInfo()) ? merged 
: null;
+        if (cell.isCounterCell())
+            
collector.updateHasLegacyCounterShards(CounterCells.hasLegacyShards(cell));
     }
 
     /**
@@ -106,7 +68,7 @@ public abstract class Cells
      * {@code writer}.
      * @param deletion the deletion time that applies to the cells being 
considered.
      * This deletion time may delete both {@code existing} or {@code update}.
-     * @param writer the row writer to which the result of the reconciliation 
is written.
+     * @param builder the row builder to which the result of the 
reconciliation is written.
      * @param nowInSec the current time in seconds (which plays a role during 
reconciliation
      * because deleted cells always have precedence on timestamp equality and 
deciding if a
      * cell is a live or not depends on the current time due to expiring 
cells).
@@ -121,12 +83,12 @@ public abstract class Cells
                                  Cell existing,
                                  Cell update,
                                  DeletionTime deletion,
-                                 Row.Writer writer,
+                                 Row.Builder builder,
                                  int nowInSec,
                                  SecondaryIndexManager.Updater indexUpdater)
     {
-        existing = existing == null || 
deletion.deletes(existing.livenessInfo()) ? null : existing;
-        update = update == null || deletion.deletes(update.livenessInfo()) ? 
null : update;
+        existing = existing == null || deletion.deletes(existing) ? null : 
existing;
+        update = update == null || deletion.deletes(update) ? null : update;
         if (existing == null || update == null)
         {
             if (update != null)
@@ -135,17 +97,17 @@ public abstract class Cells
                 // we'll need to fix that damn 2ndary index API to avoid that.
                 updatePKIndexes(clustering, update, nowInSec, indexUpdater);
                 indexUpdater.insert(clustering, update);
-                update.writeTo(writer);
+                builder.addCell(update);
             }
             else if (existing != null)
             {
-                existing.writeTo(writer);
+                builder.addCell(existing);
             }
             return Long.MAX_VALUE;
         }
 
         Cell reconciled = reconcile(existing, update, nowInSec);
-        reconciled.writeTo(writer);
+        builder.addCell(reconciled);
 
         // Note that this test rely on reconcile returning either 'existing' 
or 'update'. That's not true for counters but we don't index them
         if (reconciled == update)
@@ -153,13 +115,13 @@ public abstract class Cells
             updatePKIndexes(clustering, update, nowInSec, indexUpdater);
             indexUpdater.update(clustering, existing, reconciled);
         }
-        return Math.abs(existing.livenessInfo().timestamp() - 
update.livenessInfo().timestamp());
+        return Math.abs(existing.timestamp() - update.timestamp());
     }
 
     private static void updatePKIndexes(Clustering clustering, Cell cell, int 
nowInSec, SecondaryIndexManager.Updater indexUpdater)
     {
         if (indexUpdater != SecondaryIndexManager.nullUpdater && 
cell.isLive(nowInSec))
-            indexUpdater.maybeIndex(clustering, 
cell.livenessInfo().timestamp(), cell.livenessInfo().ttl(), DeletionTime.LIVE);
+            indexUpdater.maybeIndex(clustering, cell.timestamp(), cell.ttl(), 
DeletionTime.LIVE);
     }
 
     /**
@@ -190,10 +152,10 @@ public abstract class Cells
 
         if (c1.isCounterCell() || c2.isCounterCell())
         {
-            Conflicts.Resolution res = 
Conflicts.resolveCounter(c1.livenessInfo().timestamp(),
+            Conflicts.Resolution res = Conflicts.resolveCounter(c1.timestamp(),
                                                                 
c1.isLive(nowInSec),
                                                                 c1.value(),
-                                                                
c2.livenessInfo().timestamp(),
+                                                                c2.timestamp(),
                                                                 
c2.isLive(nowInSec),
                                                                 c2.value());
 
@@ -203,26 +165,26 @@ public abstract class Cells
                 case RIGHT_WINS: return c2;
                 default:
                     ByteBuffer merged = 
Conflicts.mergeCounterValues(c1.value(), c2.value());
-                    LivenessInfo mergedInfo = 
c1.livenessInfo().mergeWith(c2.livenessInfo());
+                    long timestamp = Math.max(c1.timestamp(), c2.timestamp());
 
                     // We save allocating a new cell object if it turns out 
that one cell was
                     // a complete superset of the other
-                    if (merged == c1.value() && mergedInfo == 
c1.livenessInfo())
+                    if (merged == c1.value() && timestamp == c1.timestamp())
                         return c1;
-                    else if (merged == c2.value() && mergedInfo == 
c2.livenessInfo())
+                    else if (merged == c2.value() && timestamp == 
c2.timestamp())
                         return c2;
                     else // merge clocks and timestamps.
-                        return create(c1.column(), true, merged, mergedInfo, 
null);
+                        return new BufferCell(c1.column(), timestamp, 
Cell.NO_TTL, Cell.NO_DELETION_TIME, merged, c1.path());
             }
         }
 
-        Conflicts.Resolution res = 
Conflicts.resolveRegular(c1.livenessInfo().timestamp(),
+        Conflicts.Resolution res = Conflicts.resolveRegular(c1.timestamp(),
                                                             
c1.isLive(nowInSec),
-                                                            
c1.livenessInfo().localDeletionTime(),
+                                                            
c1.localDeletionTime(),
                                                             c1.value(),
-                                                            
c2.livenessInfo().timestamp(),
+                                                            c2.timestamp(),
                                                             
c2.isLive(nowInSec),
-                                                            
c2.livenessInfo().localDeletionTime(),
+                                                            
c2.localDeletionTime(),
                                                             c2.value());
         assert res != Conflicts.Resolution.MERGE;
         return res == Conflicts.Resolution.LEFT_WINS ? c1 : c2;
@@ -251,7 +213,7 @@ public abstract class Cells
      * {@code existing} to {@code writer}.
      * @param deletion the deletion time that applies to the cells being 
considered.
      * This deletion time may delete cells in both {@code existing} and {@code 
update}.
-     * @param writer the row writer to which the result of the reconciliation 
is written.
+     * @param builder the row build to which the result of the reconciliation 
is written.
      * @param nowInSec the current time in seconds (which plays a role during 
reconciliation
      * because deleted cells always have precedence on timestamp equality and 
deciding if a
      * cell is a live or not depends on the current time due to expiring 
cells).
@@ -270,7 +232,7 @@ public abstract class Cells
                                         Iterator<Cell> existing,
                                         Iterator<Cell> update,
                                         DeletionTime deletion,
-                                        Row.Writer writer,
+                                        Row.Builder builder,
                                         int nowInSec,
                                         SecondaryIndexManager.Updater 
indexUpdater)
     {
@@ -285,17 +247,17 @@ public abstract class Cells
                      : comparator.compare(nextExisting.path(), 
nextUpdate.path()));
             if (cmp < 0)
             {
-                reconcile(clustering, nextExisting, null, deletion, writer, 
nowInSec, indexUpdater);
+                reconcile(clustering, nextExisting, null, deletion, builder, 
nowInSec, indexUpdater);
                 nextExisting = getNext(existing);
             }
             else if (cmp > 0)
             {
-                reconcile(clustering, null, nextUpdate, deletion, writer, 
nowInSec, indexUpdater);
+                reconcile(clustering, null, nextUpdate, deletion, builder, 
nowInSec, indexUpdater);
                 nextUpdate = getNext(update);
             }
             else
             {
-                timeDelta = Math.min(timeDelta, reconcile(clustering, 
nextExisting, nextUpdate, deletion, writer, nowInSec, indexUpdater));
+                timeDelta = Math.min(timeDelta, reconcile(clustering, 
nextExisting, nextUpdate, deletion, builder, nowInSec, indexUpdater));
                 nextExisting = getNext(existing);
                 nextUpdate = getNext(update);
             }
@@ -307,65 +269,4 @@ public abstract class Cells
     {
         return iterator == null || !iterator.hasNext() ? null : 
iterator.next();
     }
-
-    /**
-     * Creates a simple cell.
-     * <p>
-     * Note that in general cell objects are created by the container they are 
in and so this method should
-     * only be used in a handful of cases when we know it's the right thing to 
do.
-     *
-     * @param column the column for the cell to create.
-     * @param isCounter whether the create cell should be a counter one.
-     * @param value the value for the cell.
-     * @param info the liveness info for the cell.
-     * @param path the cell path for the cell.
-     * @return the newly allocated cell object.
-     */
-    public static Cell create(ColumnDefinition column, boolean isCounter, 
ByteBuffer value, LivenessInfo info, CellPath path)
-    {
-        return new SimpleCell(column, isCounter, value, info, path);
-    }
-
-    private static class SimpleCell extends AbstractCell
-    {
-        private final ColumnDefinition column;
-        private final boolean isCounter;
-        private final ByteBuffer value;
-        private final LivenessInfo info;
-        private final CellPath path;
-
-        private SimpleCell(ColumnDefinition column, boolean isCounter, 
ByteBuffer value, LivenessInfo info, CellPath path)
-        {
-            this.column = column;
-            this.isCounter = isCounter;
-            this.value = value;
-            this.info = info.takeAlias();
-            this.path = path;
-        }
-
-        public ColumnDefinition column()
-        {
-            return column;
-        }
-
-        public boolean isCounterCell()
-        {
-            return isCounter;
-        }
-
-        public ByteBuffer value()
-        {
-            return value;
-        }
-
-        public LivenessInfo livenessInfo()
-        {
-            return info;
-        }
-
-        public CellPath path()
-        {
-            return path;
-        }
-    }
 }

Reply via email to