This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8576e76  Minimize BTree iterator allocations
8576e76 is described below

commit 8576e769d13b3e887ea604074641fd4c42af5e8a
Author: Blake Eggleston <bdeggles...@gmail.com>
AuthorDate: Tue Oct 15 20:10:42 2019 -0700

    Minimize BTree iterator allocations
    
    Patch by Blake Eggleston; Reviewed by Benedict Elliott Smith for 
CASSANDRA-15389
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/ColumnIndex.java  |   6 +-
 src/java/org/apache/cassandra/db/Columns.java      |   9 +-
 src/java/org/apache/cassandra/db/Mutation.java     |   6 +-
 src/java/org/apache/cassandra/db/ReadResponse.java |   8 +-
 .../apache/cassandra/db/SerializationHeader.java   |   1 +
 .../cassandra/db/UnfilteredDeserializer.java       |   6 +-
 .../db/columniterator/AbstractSSTableIterator.java |   6 +-
 .../cassandra/db/commitlog/CommitLogReader.java    |   4 +-
 .../db/partitions/CachedBTreePartition.java        |   4 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |   7 +-
 .../partitions/UnfilteredPartitionIterators.java   |   2 +-
 .../org/apache/cassandra/db/rows/AbstractRow.java  |  18 +-
 .../org/apache/cassandra/db/rows/BTreeRow.java     | 103 +++++++----
 src/java/org/apache/cassandra/db/rows/Cell.java    |   2 +-
 .../org/apache/cassandra/db/rows/ColumnData.java   |   5 +
 .../cassandra/db/rows/ComplexColumnData.java       |  18 +-
 ...ationHelper.java => DeserializationHelper.java} |   6 +-
 src/java/org/apache/cassandra/db/rows/Row.java     |  23 ++-
 src/java/org/apache/cassandra/db/rows/Rows.java    |  72 ++++---
 .../cassandra/db/rows/SerializationHelper.java     | 134 +++-----------
 .../db/rows/UnfilteredRowIteratorSerializer.java   |  21 ++-
 .../cassandra/db/rows/UnfilteredSerializer.java    |  95 +++++-----
 .../db/streaming/CassandraStreamReader.java        |   4 +-
 .../io/sstable/SSTableIdentityIterator.java        |   4 +-
 .../io/sstable/SSTableSimpleIterator.java          |  12 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java    |   5 +-
 .../org/apache/cassandra/service/paxos/Commit.java |   2 +-
 .../{WrappedInt.java => BiLongAccumulator.java}    |  32 +---
 .../{WrappedInt.java => LongAccumulator.java}      |  32 +---
 .../org/apache/cassandra/utils/btree/BTree.java    | 206 ++++++++++++---------
 .../cassandra/utils/btree/BTreeSearchIterator.java |   4 +
 .../utils/btree/LeafBTreeSearchIterator.java       |  29 ++-
 .../db/commitlog/CommitLogStressTest.java          |   5 +-
 .../org/apache/cassandra/db/ReadCommandTest.java   |   3 +-
 .../org/apache/cassandra/db/RowIndexEntryTest.java |   7 +-
 .../cassandra/db/commitlog/CDCTestReplayer.java    |   4 +-
 .../db/commitlog/CommitLogTestReplayer.java        |   4 +-
 .../apache/cassandra/utils/btree/BTreeTest.java    |  55 +++++-
 39 files changed, 492 insertions(+), 473 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 96ce286..c1e5aeb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Minimize BTree iterator allocations (CASSANDRA-15389)
  * Add client request size server metrics (CASSANDRA-15704)
  * Add additional logging around FileUtils and compaction leftover cleanup 
(CASSANDRA-15705)
  * Mark system_views/system_virtual_schema as non-alterable keyspaces in cqlsh 
(CASSANDRA-15711)
diff --git a/src/java/org/apache/cassandra/db/ColumnIndex.java 
b/src/java/org/apache/cassandra/db/ColumnIndex.java
index 74ad264..e11f784 100644
--- a/src/java/org/apache/cassandra/db/ColumnIndex.java
+++ b/src/java/org/apache/cassandra/db/ColumnIndex.java
@@ -53,6 +53,7 @@ public class ColumnIndex
     public int columnIndexCount;
     private int[] indexOffsets;
 
+    private final SerializationHelper helper;
     private final SerializationHeader header;
     private final int version;
     private final SequentialWriter writer;
@@ -79,6 +80,7 @@ public class ColumnIndex
                         Collection<SSTableFlushObserver> observers,
                         ISerializer<IndexInfo> indexInfoSerializer)
     {
+        this.helper = new SerializationHelper(header);
         this.header = header;
         this.writer = writer;
         this.version = version.correspondingMessagingVersion();
@@ -126,7 +128,7 @@ public class ColumnIndex
         {
             Row staticRow = iterator.staticRow();
 
-            UnfilteredSerializer.serializer.serializeStaticRow(staticRow, 
header, writer, version);
+            UnfilteredSerializer.serializer.serializeStaticRow(staticRow, 
helper, writer, version);
             if (!observers.isEmpty())
                 observers.forEach((o) -> o.nextUnfilteredCluster(staticRow));
         }
@@ -248,7 +250,7 @@ public class ColumnIndex
             startPosition = pos;
         }
 
-        UnfilteredSerializer.serializer.serialize(unfiltered, header, writer, 
pos - previousRowStart, version);
+        UnfilteredSerializer.serializer.serialize(unfiltered, helper, writer, 
pos - previousRowStart, version);
 
         // notify observers about each new row
         if (!observers.isEmpty())
diff --git a/src/java/org/apache/cassandra/db/Columns.java 
b/src/java/org/apache/cassandra/db/Columns.java
index f56072e..fe13919 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -53,7 +53,7 @@ public class Columns extends 
AbstractCollection<ColumnMetadata> implements Colle
     public static final Serializer serializer = new Serializer();
     public static final Columns NONE = new Columns(BTree.empty(), 0);
 
-    private static final ColumnMetadata FIRST_COMPLEX_STATIC =
+    public static final ColumnMetadata FIRST_COMPLEX_STATIC =
         new ColumnMetadata("",
                            "",
                            
ColumnIdentifier.getInterned(ByteBufferUtil.EMPTY_BYTE_BUFFER, 
UTF8Type.instance),
@@ -61,7 +61,7 @@ public class Columns extends 
AbstractCollection<ColumnMetadata> implements Colle
                            ColumnMetadata.NO_POSITION,
                            ColumnMetadata.Kind.STATIC);
 
-    private static final ColumnMetadata FIRST_COMPLEX_REGULAR =
+    public static final ColumnMetadata FIRST_COMPLEX_REGULAR =
         new ColumnMetadata("",
                            "",
                            
ColumnIdentifier.getInterned(ByteBufferUtil.EMPTY_BYTE_BUFFER, 
UTF8Type.instance),
@@ -382,11 +382,10 @@ public class Columns extends 
AbstractCollection<ColumnMetadata> implements Colle
     /**
      * Apply a function to each column definition in forwards or reversed 
order.
      * @param function
-     * @param reversed
      */
-    public void apply(Consumer<ColumnMetadata> function, boolean reversed)
+    public void apply(Consumer<ColumnMetadata> function)
     {
-        BTree.apply(columns, function, reversed);
+        BTree.apply(columns, function);
     }
 
     @Override
diff --git a/src/java/org/apache/cassandra/db/Mutation.java 
b/src/java/org/apache/cassandra/db/Mutation.java
index 22c4ed8..3d27ef3 100644
--- a/src/java/org/apache/cassandra/db/Mutation.java
+++ b/src/java/org/apache/cassandra/db/Mutation.java
@@ -29,7 +29,7 @@ import org.apache.commons.lang3.StringUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -336,7 +336,7 @@ public class Mutation implements IMutation
                 PartitionUpdate.serializer.serialize(entry.getValue(), out, 
version);
         }
 
-        public Mutation deserialize(DataInputPlus in, int version, 
SerializationHelper.Flag flag) throws IOException
+        public Mutation deserialize(DataInputPlus in, int version, 
DeserializationHelper.Flag flag) throws IOException
         {
             int size = (int)in.readUnsignedVInt();
             assert size > 0;
@@ -359,7 +359,7 @@ public class Mutation implements IMutation
 
         public Mutation deserialize(DataInputPlus in, int version) throws 
IOException
         {
-            return deserialize(in, version, 
SerializationHelper.Flag.FROM_REMOTE);
+            return deserialize(in, version, 
DeserializationHelper.Flag.FROM_REMOTE);
         }
 
         public long serializedSize(Mutation mutation, int version)
diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java 
b/src/java/org/apache/cassandra/db/ReadResponse.java
index affbbbe..3f6481d 100644
--- a/src/java/org/apache/cassandra/db/ReadResponse.java
+++ b/src/java/org/apache/cassandra/db/ReadResponse.java
@@ -182,7 +182,7 @@ public abstract class ReadResponse
                   command.getRepairedDataDigest(),
                   command.isRepairedDataDigestConclusive(),
                   MessagingService.current_version,
-                  SerializationHelper.Flag.LOCAL);
+                  DeserializationHelper.Flag.LOCAL);
         }
 
         private static ByteBuffer build(UnfilteredPartitionIterator iter, 
ColumnFilter selection)
@@ -208,7 +208,7 @@ public abstract class ReadResponse
                                      boolean isRepairedDigestConclusive,
                                      int version)
         {
-            super(data, repairedDataDigest, isRepairedDigestConclusive, 
version, SerializationHelper.Flag.FROM_REMOTE);
+            super(data, repairedDataDigest, isRepairedDigestConclusive, 
version, DeserializationHelper.Flag.FROM_REMOTE);
         }
     }
 
@@ -220,13 +220,13 @@ public abstract class ReadResponse
         private final ByteBuffer repairedDataDigest;
         private final boolean isRepairedDigestConclusive;
         private final int dataSerializationVersion;
-        private final SerializationHelper.Flag flag;
+        private final DeserializationHelper.Flag flag;
 
         protected DataResponse(ByteBuffer data,
                                ByteBuffer repairedDataDigest,
                                boolean isRepairedDigestConclusive,
                                int dataSerializationVersion,
-                               SerializationHelper.Flag flag)
+                               DeserializationHelper.Flag flag)
         {
             super();
             this.data = data;
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 15ef268..1c22feb 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.*;
 
 import com.google.common.collect.ImmutableList;
+
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.TypeParser;
diff --git a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java 
b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
index 84ff691..f9ff1d7 100644
--- a/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
+++ b/src/java/org/apache/cassandra/db/UnfilteredDeserializer.java
@@ -34,7 +34,7 @@ public class UnfilteredDeserializer
 {
     protected final TableMetadata metadata;
     protected final DataInputPlus in;
-    protected final SerializationHelper helper;
+    protected final DeserializationHelper helper;
 
     private final ClusteringPrefix.Deserializer clusteringDeserializer;
     private final SerializationHeader header;
@@ -49,7 +49,7 @@ public class UnfilteredDeserializer
     private UnfilteredDeserializer(TableMetadata metadata,
                                    DataInputPlus in,
                                    SerializationHeader header,
-                                   SerializationHelper helper)
+                                   DeserializationHelper helper)
     {
         this.metadata = metadata;
         this.in = in;
@@ -62,7 +62,7 @@ public class UnfilteredDeserializer
     public static UnfilteredDeserializer create(TableMetadata metadata,
                                                 DataInputPlus in,
                                                 SerializationHeader header,
-                                                SerializationHelper helper)
+                                                DeserializationHelper helper)
     {
         return new UnfilteredDeserializer(metadata, in, header, helper);
     }
diff --git 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
index cfc7da2..c631f1c 100644
--- 
a/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
+++ 
b/src/java/org/apache/cassandra/db/columniterator/AbstractSSTableIterator.java
@@ -43,7 +43,7 @@ public abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
     protected final DecoratedKey key;
     protected final DeletionTime partitionLevelDeletion;
     protected final ColumnFilter columns;
-    protected final SerializationHelper helper;
+    protected final DeserializationHelper helper;
 
     protected final Row staticRow;
     protected final Reader reader;
@@ -70,7 +70,7 @@ public abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
         this.key = key;
         this.columns = columnFilter;
         this.slices = slices;
-        this.helper = new SerializationHelper(metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL, columnFilter);
+        this.helper = new DeserializationHelper(metadata, 
sstable.descriptor.version.correspondingMessagingVersion(), 
DeserializationHelper.Flag.LOCAL, columnFilter);
 
         if (indexEntry == null)
         {
@@ -159,7 +159,7 @@ public abstract class AbstractSSTableIterator implements 
UnfilteredRowIterator
 
     private static Row readStaticRow(SSTableReader sstable,
                                      FileDataInput file,
-                                     SerializationHelper helper,
+                                     DeserializationHelper helper,
                                      Columns statics) throws IOException
     {
         if (!sstable.header.hasStatic())
diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java 
b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
index c91841f..5123580 100644
--- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
+++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReader.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.db.Mutation;
 import 
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadErrorReason;
 import 
org.apache.cassandra.db.commitlog.CommitLogReadHandler.CommitLogReadException;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.util.DataInputBuffer;
@@ -429,7 +429,7 @@ public class CommitLogReader
         {
             mutation = Mutation.serializer.deserialize(bufIn,
                                                        
desc.getMessagingVersion(),
-                                                       
SerializationHelper.Flag.LOCAL);
+                                                       
DeserializationHelper.Flag.LOCAL);
             // doublecheck that what we read is still] valid for the current 
schema
             for (PartitionUpdate upd : mutation.getPartitionUpdates())
                 upd.validate();
diff --git 
a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java 
b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
index 4d9c227..9a2b331 100644
--- a/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
+++ b/src/java/org/apache/cassandra/db/partitions/CachedBTreePartition.java
@@ -177,11 +177,11 @@ public class CachedBTreePartition extends 
ImmutableBTreePartition implements Cac
 
 
             TableMetadata metadata = 
Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
-            UnfilteredRowIteratorSerializer.Header header = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, 
in, version, SerializationHelper.Flag.LOCAL);
+            UnfilteredRowIteratorSerializer.Header header = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, 
in, version, DeserializationHelper.Flag.LOCAL);
             assert !header.isReversed && header.rowEstimate >= 0;
 
             Holder holder;
-            try (UnfilteredRowIterator partition = 
UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, 
SerializationHelper.Flag.LOCAL, header))
+            try (UnfilteredRowIterator partition = 
UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, 
DeserializationHelper.Flag.LOCAL, header))
             {
                 holder = ImmutableBTreePartition.build(partition, 
header.rowEstimate);
             }
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index ec01fa6..076c975 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.db.partitions;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
 import java.util.List;
 
 import com.google.common.collect.Iterables;
@@ -38,7 +36,6 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.btree.BTree;
 import org.apache.cassandra.utils.btree.UpdateFunction;
 
@@ -230,7 +227,7 @@ public class PartitionUpdate extends AbstractBTreePartition
         {
             return serializer.deserialize(new DataInputBuffer(bytes, true),
                                           version,
-                                          SerializationHelper.Flag.LOCAL);
+                                          DeserializationHelper.Flag.LOCAL);
         }
         catch (IOException e)
         {
@@ -636,7 +633,7 @@ public class PartitionUpdate extends AbstractBTreePartition
             }
         }
 
-        public PartitionUpdate deserialize(DataInputPlus in, int version, 
SerializationHelper.Flag flag) throws IOException
+        public PartitionUpdate deserialize(DataInputPlus in, int version, 
DeserializationHelper.Flag flag) throws IOException
         {
             TableMetadata metadata = 
Schema.instance.getExistingTableMetadata(TableId.deserialize(in));
             UnfilteredRowIteratorSerializer.Header header = 
UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, 
in, version, flag);
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 945bcb4..30b4d9e 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -316,7 +316,7 @@ public abstract class UnfilteredPartitionIterators
             out.writeBoolean(false);
         }
 
-        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, 
final int version, final TableMetadata metadata, final ColumnFilter selection, 
final SerializationHelper.Flag flag) throws IOException
+        public UnfilteredPartitionIterator deserialize(final DataInputPlus in, 
final int version, final TableMetadata metadata, final ColumnFilter selection, 
final DeserializationHelper.Flag flag) throws IOException
         {
             // Skip now unused isForThrift boolean
             in.readBoolean();
diff --git a/src/java/org/apache/cassandra/db/rows/AbstractRow.java 
b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
index 957ffd4..fc90e34 100644
--- a/src/java/org/apache/cassandra/db/rows/AbstractRow.java
+++ b/src/java/org/apache/cassandra/db/rows/AbstractRow.java
@@ -24,10 +24,11 @@ import java.util.stream.StreamSupport;
 
 import com.google.common.collect.Iterables;
 
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.marshal.CollectionType;
 import org.apache.cassandra.db.marshal.UserType;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.serializers.MarshalException;
 
 /**
@@ -66,8 +67,7 @@ public abstract class AbstractRow implements Row
         deletion().digest(digest);
         primaryKeyLivenessInfo().digest(digest);
 
-        for (ColumnData cd : this)
-            cd.digest(digest);
+        apply(ColumnData::digest, digest);
     }
 
     public void validateData(TableMetadata metadata)
@@ -93,15 +93,7 @@ public abstract class AbstractRow implements Row
         if (deletion().time().localDeletionTime() < 0)
             throw new MarshalException("A local deletion time should not be 
negative in '" + metadata + "'");
 
-        for (ColumnData cd : this)
-            try
-            {
-                cd.validate();
-            }
-            catch (Exception e)
-            {
-                throw new MarshalException("data for '" + 
cd.column.debugString() + "', " + cd + " in '" + metadata + "' didn't 
validate", e);
-            }
+        apply(cd -> cd.validate());
     }
 
     public boolean hasInvalidDeletions()
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index dc3219a..6689c77 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -19,13 +19,14 @@ package org.apache.cassandra.db.rows;
 
 import java.nio.ByteBuffer;
 import java.util.*;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Collections2;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
@@ -53,6 +54,11 @@ public class BTreeRow extends AbstractRow
     // The data for each columns present in this row in column sorted order.
     private final Object[] btree;
 
+    private static final ColumnData FIRST_COMPLEX_STATIC = new 
ComplexColumnData(Columns.FIRST_COMPLEX_STATIC, new Object[0], new 
DeletionTime(0, 0));
+    private static final ColumnData FIRST_COMPLEX_REGULAR = new 
ComplexColumnData(Columns.FIRST_COMPLEX_REGULAR, new Object[0], new 
DeletionTime(0, 0));
+    private static final Comparator<ColumnData> COLUMN_COMPARATOR = (cd1, cd2) 
-> cd1.column.compareTo(cd2.column);
+
+
     // We need to filter the tombstones of a row on every read (twice in fact: 
first to remove purgeable tombstone, and then after reconciliation to remove
     // all tombstone since we don't return them to the client) as well as on 
compaction. But it's likely that many rows won't have any tombstone at all, so
     // we want to speed up that case by not having to iterate/copy the row in 
this case. We could keep a single boolean telling us if we have tombstones,
@@ -90,8 +96,8 @@ public class BTreeRow extends AbstractRow
         int minDeletionTime = 
Math.min(minDeletionTime(primaryKeyLivenessInfo), 
minDeletionTime(deletion.time()));
         if (minDeletionTime != Integer.MIN_VALUE)
         {
-            for (ColumnData cd : BTree.<ColumnData>iterable(btree))
-                minDeletionTime = Math.min(minDeletionTime, 
minDeletionTime(cd));
+            long result = BTree.<ColumnData>accumulate(btree, (cd, l) -> 
Math.min(l, minDeletionTime(cd)) , minDeletionTime);
+            minDeletionTime = Ints.checkedCast(result);
         }
 
         return create(clustering, primaryKeyLivenessInfo, deletion, btree, 
minDeletionTime);
@@ -168,23 +174,49 @@ public class BTreeRow extends AbstractRow
         return cd.column().isSimple() ? minDeletionTime((Cell) cd) : 
minDeletionTime((ComplexColumnData)cd);
     }
 
-    public void apply(Consumer<ColumnData> function, boolean reversed)
+    public void apply(Consumer<ColumnData> function)
+    {
+        BTree.apply(btree, function);
+    }
+
+    public <A> void apply(BiConsumer<A, ColumnData> function, A arg)
+    {
+        BTree.apply(btree, function, arg);
+    }
+
+    public long accumulate(LongAccumulator<ColumnData> accumulator, long 
initialValue)
     {
-        BTree.apply(btree, function, reversed);
+        return BTree.accumulate(btree, accumulator, initialValue);
     }
 
-    public void apply(Consumer<ColumnData> funtion, 
com.google.common.base.Predicate<ColumnData> stopCondition, boolean reversed)
+    public long accumulate(LongAccumulator<ColumnData> accumulator, 
Comparator<ColumnData> comparator, ColumnData from, long initialValue)
     {
-        BTree.apply(btree, funtion, stopCondition, reversed);
+        return BTree.accumulate(btree, accumulator, comparator, from, 
initialValue);
+    }
+
+    public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A 
arg, long initialValue)
+    {
+        return BTree.accumulate(btree, accumulator, arg, initialValue);
+    }
+
+    public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A 
arg, Comparator<ColumnData> comparator, ColumnData from, long initialValue)
+    {
+        return BTree.accumulate(btree, accumulator, arg, comparator, from, 
initialValue);
     }
 
     private static int minDeletionTime(Object[] btree, LivenessInfo info, 
DeletionTime rowDeletion)
     {
-        //we have to wrap this for the lambda
-        final WrappedInt min = new WrappedInt(Math.min(minDeletionTime(info), 
minDeletionTime(rowDeletion)));
+        long min = Math.min(minDeletionTime(info), 
minDeletionTime(rowDeletion));
+
+        min = BTree.<ColumnData>accumulate(btree, (cd, l) -> {
+            int m = Math.min((int) l, minDeletionTime(cd));
+            return m != Integer.MIN_VALUE ? m : Long.MAX_VALUE;
+        }, min);
 
-        BTree.<ColumnData>apply(btree, cd -> min.set( Math.min(min.get(), 
minDeletionTime(cd)) ), cd -> min.get() == Integer.MIN_VALUE, false);
-        return min.get();
+        if (min == Long.MAX_VALUE)
+            return Integer.MIN_VALUE;
+
+        return Ints.checkedCast(min);
     }
 
     public Clustering clustering()
@@ -334,33 +366,19 @@ public class BTreeRow extends AbstractRow
 
     public boolean hasComplex()
     {
-        // We start by the end cause we know complex columns sort after the 
simple ones
-        ColumnData cd = Iterables.getFirst(BTree.<ColumnData>iterable(btree, 
BTree.Dir.DESC), null);
-        return cd != null && cd.column.isComplex();
+        if (BTree.isEmpty(btree))
+            return false;
+
+        int size = BTree.size(btree);
+        ColumnData last = BTree.findByIndex(btree, size - 1);
+        return last.column.isComplex();
     }
 
     public boolean hasComplexDeletion()
     {
-        final WrappedBoolean result = new WrappedBoolean(false);
-
-        // We start by the end cause we know complex columns sort before 
simple ones
-        apply(c -> {}, cd -> {
-            if (cd.column.isSimple())
-            {
-                result.set(false);
-                return true;
-            }
-
-            if (!((ComplexColumnData) cd).complexDeletion().isLive())
-            {
-                result.set(true);
-                return true;
-            }
-
-            return false;
-        }, true);
-
-        return result.get();
+        long result = accumulate((cd, v) -> ((ComplexColumnData) 
cd).complexDeletion().isLive() ? 0 : Long.MAX_VALUE,
+                                 COLUMN_COMPARATOR, isStatic() ? 
FIRST_COMPLEX_STATIC : FIRST_COMPLEX_REGULAR, 0L);
+        return result == Long.MAX_VALUE;
     }
 
     public Row markCounterLocalToBeCleared()
@@ -375,6 +393,15 @@ public class BTreeRow extends AbstractRow
         return nowInSec >= minLocalDeletionTime;
     }
 
+    public boolean hasInvalidDeletions()
+    {
+        if (primaryKeyLivenessInfo().isExpiring() && 
(primaryKeyLivenessInfo().ttl() < 0 || 
primaryKeyLivenessInfo().localExpirationTime() < 0))
+            return true;
+        if (!deletion().time().validate())
+            return true;
+        return accumulate((cd, v) -> cd.hasInvalidDeletions() ? Long.MAX_VALUE 
: v, 0) != 0;
+    }
+
     /**
      * Returns a copy of the row where all timestamps for live data have 
replaced by {@code newTimestamp} and
      * all deletion timestamp by {@code newTimestamp - 1}.
@@ -440,9 +467,7 @@ public class BTreeRow extends AbstractRow
                      + primaryKeyLivenessInfo.dataSize()
                      + deletion.dataSize();
 
-        for (ColumnData cd : this)
-            dataSize += cd.dataSize();
-        return dataSize;
+        return Ints.checkedCast(accumulate((cd, v) -> v + cd.dataSize(), 
dataSize));
     }
 
     public long unsharedHeapSizeExcludingData()
@@ -451,9 +476,7 @@ public class BTreeRow extends AbstractRow
                       + clustering.unsharedHeapSizeExcludingData()
                       + BTree.sizeOfStructureOnHeap(btree);
 
-        for (ColumnData cd : this)
-            heapSize += cd.unsharedHeapSizeExcludingData();
-        return heapSize;
+        return accumulate((cd, v) -> v + cd.unsharedHeapSizeExcludingData(), 
heapSize);
     }
 
     public static Row.Builder sortedBuilder()
diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java 
b/src/java/org/apache/cassandra/db/rows/Cell.java
index 300bbce..959676a 100644
--- a/src/java/org/apache/cassandra/db/rows/Cell.java
+++ b/src/java/org/apache/cassandra/db/rows/Cell.java
@@ -214,7 +214,7 @@ public abstract class Cell extends ColumnData
                 header.getType(column).writeValue(cell.value(), out);
         }
 
-        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, 
ColumnMetadata column, SerializationHeader header, SerializationHelper helper) 
throws IOException
+        public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, 
ColumnMetadata column, SerializationHeader header, DeserializationHelper 
helper) throws IOException
         {
             int flags = in.readUnsignedByte();
             boolean hasValue = (flags & HAS_EMPTY_VALUE_MASK) == 0;
diff --git a/src/java/org/apache/cassandra/db/rows/ColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ColumnData.java
index e5f5550..36aad97 100644
--- a/src/java/org/apache/cassandra/db/rows/ColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ColumnData.java
@@ -78,6 +78,11 @@ public abstract class ColumnData
      */
     public abstract void digest(Digest digest);
 
+    public static void digest(Digest digest, ColumnData cd)
+    {
+        cd.digest(digest);
+    }
+
     /**
      * Returns a copy of the data where all timestamps for live data have 
replaced by {@code newTimestamp} and
      * all deletion timestamp by {@code newTimestamp - 1}.
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index 832167f..5b03504 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -30,8 +30,11 @@ import org.apache.cassandra.db.LivenessInfo;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.marshal.ByteType;
 import org.apache.cassandra.db.marshal.SetType;
+import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.DroppedColumn;
+import org.apache.cassandra.utils.BiLongAccumulator;
+import org.apache.cassandra.utils.LongAccumulator;
 import org.apache.cassandra.utils.ObjectSizes;
 import org.apache.cassandra.utils.btree.BTree;
 
@@ -60,11 +63,6 @@ public class ComplexColumnData extends ColumnData implements 
Iterable<Cell>
         this.complexDeletion = complexDeletion;
     }
 
-    public boolean hasCells()
-    {
-        return !BTree.isEmpty(cells);
-    }
-
     public int cellsCount()
     {
         return BTree.size(cells);
@@ -106,6 +104,16 @@ public class ComplexColumnData extends ColumnData 
implements Iterable<Cell>
         return BTree.iterator(cells, BTree.Dir.DESC);
     }
 
+    public long accumulate(LongAccumulator<Cell> accumulator, long 
initialValue)
+    {
+        return BTree.accumulate(cells, accumulator, initialValue);
+    }
+
+    public <A> long accumulate(BiLongAccumulator<A, Cell> accumulator, A arg, 
long initialValue)
+    {
+        return BTree.accumulate(cells, accumulator, arg, initialValue);
+    }
+
     public int dataSize()
     {
         int size = complexDeletion.dataSize();
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java 
b/src/java/org/apache/cassandra/db/rows/DeserializationHelper.java
similarity index 95%
copy from src/java/org/apache/cassandra/db/rows/SerializationHelper.java
copy to src/java/org/apache/cassandra/db/rows/DeserializationHelper.java
index db23cb8..386e6ef 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/DeserializationHelper.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.context.CounterContext;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.schema.DroppedColumn;
 
-public class SerializationHelper
+public class DeserializationHelper
 {
     /**
      * Flag affecting deserialization behavior (this only affect counters in 
practice).
@@ -56,7 +56,7 @@ public class SerializationHelper
     private DroppedColumn currentDroppedComplex;
 
 
-    public SerializationHelper(TableMetadata metadata, int version, Flag flag, 
ColumnFilter columnsToFetch)
+    public DeserializationHelper(TableMetadata metadata, int version, Flag 
flag, ColumnFilter columnsToFetch)
     {
         this.flag = flag;
         this.version = version;
@@ -65,7 +65,7 @@ public class SerializationHelper
         this.hasDroppedColumns = droppedColumns.size() > 0;
     }
 
-    public SerializationHelper(TableMetadata metadata, int version, Flag flag)
+    public DeserializationHelper(TableMetadata metadata, int version, Flag 
flag)
     {
         this(metadata, version, flag, null);
     }
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java 
b/src/java/org/apache/cassandra/db/rows/Row.java
index 2f752b8..ee93da4 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -18,15 +18,16 @@
 package org.apache.cassandra.db.rows;
 
 import java.util.*;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
-import com.google.common.base.Predicate;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.utils.BiLongAccumulator;
+import org.apache.cassandra.utils.LongAccumulator;
 import org.apache.cassandra.utils.MergeIterator;
 import org.apache.cassandra.utils.SearchIterator;
 import org.apache.cassandra.utils.btree.BTree;
@@ -282,12 +283,24 @@ public interface Row extends Unfiltered, 
Iterable<ColumnData>
     /**
      * Apply a function to every column in a row
      */
-    public void apply(Consumer<ColumnData> function, boolean reverse);
+    public void apply(Consumer<ColumnData> function);
+
+    /**
+     * Apply a function to every column in a row
+     */
+    public <A> void apply(BiConsumer<A, ColumnData> function, A arg);
 
     /**
-     * Apply a funtion to every column in a row until a stop condition is 
reached
+     * Apply an accumulation funtion to every column in a row
      */
-    public void apply(Consumer<ColumnData> function, Predicate<ColumnData> 
stopCondition, boolean reverse);
+
+    public long accumulate(LongAccumulator<ColumnData> accumulator, long 
initialValue);
+
+    public long accumulate(LongAccumulator<ColumnData> accumulator, 
Comparator<ColumnData> comparator, ColumnData from, long initialValue);
+
+    public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A 
arg, long initialValue);
+
+    public <A> long accumulate(BiLongAccumulator<A, ColumnData> accumulator, A 
arg, Comparator<ColumnData> comparator, ColumnData from, long initialValue);
 
     /**
      * A row deletion/tombstone.
diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java 
b/src/java/org/apache/cassandra/db/rows/Rows.java
index d62d3b5..58284ac 100644
--- a/src/java/org/apache/cassandra/db/rows/Rows.java
+++ b/src/java/org/apache/cassandra/db/rows/Rows.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
 import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.utils.WrappedInt;
 
 /**
  * Static utilities to work on Row objects.
@@ -75,6 +74,46 @@ public abstract class Rows
         return new SimpleBuilders.RowBuilder(metadata, clusteringValues);
     }
 
+    private static class StatsAccumulation
+    {
+        private static final long COLUMN_INCR = 1L << 32;
+        private static final long CELL_INCR = 1L;
+
+        private static long accumulateOnCell(PartitionStatisticsCollector 
collector, Cell cell, long l)
+        {
+            Cells.collectStats(cell, collector);
+            return l + CELL_INCR;
+        }
+
+        private static long 
accumulateOnColumnData(PartitionStatisticsCollector collector, ColumnData cd, 
long l)
+        {
+            if (cd.column().isSimple())
+            {
+                l = accumulateOnCell(collector, (Cell) cd, l) + COLUMN_INCR;
+            }
+            else
+            {
+                ComplexColumnData complexData = (ComplexColumnData)cd;
+                collector.update(complexData.complexDeletion());
+                int startingCells = unpackCellCount(l);
+                l = 
complexData.accumulate(StatsAccumulation::accumulateOnCell, collector, l);
+                if (unpackCellCount(l) > startingCells)
+                    l += COLUMN_INCR;
+            }
+            return l;
+        }
+
+        private static int unpackCellCount(long v)
+        {
+            return (int) (v & 0xFFFFFFFFL);
+        }
+
+        private static int unpackColumnCount(long v)
+        {
+            return (int) (v >>> 32);
+        }
+    }
+
     /**
      * Collect statistics on a given row.
      *
@@ -89,35 +128,10 @@ public abstract class Rows
         collector.update(row.primaryKeyLivenessInfo());
         collector.update(row.deletion().time());
 
-        //we have to wrap these for the lambda
-        final WrappedInt columnCount = new WrappedInt(0);
-        final WrappedInt cellCount = new WrappedInt(0);
-
-        row.apply(cd -> {
-            if (cd.column().isSimple())
-            {
-                columnCount.increment();
-                cellCount.increment();
-                Cells.collectStats((Cell) cd, collector);
-            }
-            else
-            {
-                ComplexColumnData complexData = (ComplexColumnData)cd;
-                collector.update(complexData.complexDeletion());
-                if (complexData.hasCells())
-                {
-                    columnCount.increment();
-                    for (Cell cell : complexData)
-                    {
-                        cellCount.increment();
-                        Cells.collectStats(cell, collector);
-                    }
-                }
-            }
-        }, false);
+        long result = 
row.accumulate(StatsAccumulation::accumulateOnColumnData, collector, 0);
 
-        collector.updateColumnSetPerRow(columnCount.get());
-        return cellCount.get();
+        
collector.updateColumnSetPerRow(StatsAccumulation.unpackColumnCount(result));
+        return StatsAccumulation.unpackCellCount(result);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java 
b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
index db23cb8..dca4240 100644
--- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
+++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java
@@ -15,135 +15,43 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.cassandra.db.rows;
 
-import java.nio.ByteBuffer;
-import java.util.*;
+package org.apache.cassandra.db.rows;
 
+import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.schema.ColumnMetadata;
-import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.context.CounterContext;
-import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.schema.DroppedColumn;
+import org.apache.cassandra.utils.SearchIterator;
+import org.apache.cassandra.utils.btree.BTreeSearchIterator;
 
 public class SerializationHelper
 {
-    /**
-     * Flag affecting deserialization behavior (this only affect counters in 
practice).
-     *  - LOCAL: for deserialization of local data (Expired columns are
-     *      converted to tombstones (to gain disk space)).
-     *  - FROM_REMOTE: for deserialization of data received from remote hosts
-     *      (Expired columns are converted to tombstone and counters have
-     *      their delta cleared)
-     *  - PRESERVE_SIZE: used when no transformation must be performed, i.e,
-     *      when we must ensure that deserializing and reserializing the
-     *      result yield the exact same bytes. Streaming uses this.
-     */
-    public enum Flag
-    {
-        LOCAL, FROM_REMOTE, PRESERVE_SIZE
-    }
-
-    private final Flag flag;
-    public final int version;
-
-    private final ColumnFilter columnsToFetch;
-    private ColumnFilter.Tester tester;
-
-    private final boolean hasDroppedColumns;
-    private final Map<ByteBuffer, DroppedColumn> droppedColumns;
-    private DroppedColumn currentDroppedComplex;
-
-
-    public SerializationHelper(TableMetadata metadata, int version, Flag flag, 
ColumnFilter columnsToFetch)
-    {
-        this.flag = flag;
-        this.version = version;
-        this.columnsToFetch = columnsToFetch;
-        this.droppedColumns = metadata.droppedColumns;
-        this.hasDroppedColumns = droppedColumns.size() > 0;
-    }
-
-    public SerializationHelper(TableMetadata metadata, int version, Flag flag)
-    {
-        this(metadata, version, flag, null);
-    }
+    public final SerializationHeader header;
+    private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics = null;
+    private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars = 
null;
 
-    public boolean includes(ColumnMetadata column)
+    public SerializationHelper(SerializationHeader header)
     {
-        return columnsToFetch == null || columnsToFetch.fetches(column);
+        this.header = header;
     }
 
-    public boolean includes(Cell cell, LivenessInfo rowLiveness)
+    private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> statics()
     {
-        if (columnsToFetch == null)
-            return true;
-
-        // During queries, some columns are included even though they are not 
queried by the user because
-        // we always need to distinguish between having a row (with 
potentially only null values) and not
-        // having a row at all (see #CASSANDRA-7085 for background). In the 
case where the column is not
-        // actually requested by the user however (canSkipValue), we can skip 
the full cell if the cell
-        // timestamp is lower than the row one, because in that case, the row 
timestamp is enough proof
-        // of the liveness of the row. Otherwise, we'll only be able to skip 
the values of those cells.
-        ColumnMetadata column = cell.column();
-        if (column.isComplex())
-        {
-            if (!includes(cell.path()))
-                return false;
-
-            return !canSkipValue(cell.path()) || cell.timestamp() >= 
rowLiveness.timestamp();
-        }
-        else
-        {
-            return columnsToFetch.fetchedColumnIsQueried(column) || 
cell.timestamp() >= rowLiveness.timestamp();
-        }
-    }
-
-    public boolean includes(CellPath path)
-    {
-        return path == null || tester == null || tester.fetches(path);
-    }
-
-    public boolean canSkipValue(ColumnMetadata column)
-    {
-        return columnsToFetch != null && 
!columnsToFetch.fetchedColumnIsQueried(column);
-    }
-
-    public boolean canSkipValue(CellPath path)
-    {
-        return path != null && tester != null && 
!tester.fetchedCellIsQueried(path);
-    }
-
-    public void startOfComplexColumn(ColumnMetadata column)
-    {
-        this.tester = columnsToFetch == null ? null : 
columnsToFetch.newTester(column);
-        this.currentDroppedComplex = droppedColumns.get(column.name.bytes);
-    }
-
-    public void endOfComplexColumn()
-    {
-        this.tester = null;
-    }
-
-    public boolean isDropped(Cell cell, boolean isComplex)
-    {
-        if (!hasDroppedColumns)
-            return false;
-
-        DroppedColumn dropped = isComplex ? currentDroppedComplex : 
droppedColumns.get(cell.column().name.bytes);
-        return dropped != null && cell.timestamp() <= dropped.droppedTime;
+        if (statics == null)
+            statics = header.columns().statics.iterator();
+        return statics;
     }
 
-    public boolean isDroppedComplexDeletion(DeletionTime complexDeletion)
+    private BTreeSearchIterator<ColumnMetadata, ColumnMetadata> regulars()
     {
-        return currentDroppedComplex != null && 
complexDeletion.markedForDeleteAt() <= currentDroppedComplex.droppedTime;
+        if (regulars == null)
+            regulars = header.columns().regulars.iterator();
+        return regulars;
     }
 
-    public ByteBuffer maybeClearCounterValue(ByteBuffer value)
+    public SearchIterator<ColumnMetadata, ColumnMetadata> iterator(boolean 
isStatic)
     {
-        return flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && 
CounterContext.instance().shouldClearLocal(value))
-             ? CounterContext.instance().clearAllLocal(value)
-             : value;
+        BTreeSearchIterator<ColumnMetadata, ColumnMetadata> iterator = 
isStatic ? statics() : regulars();
+        iterator.rewind();
+        return iterator;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 7cac5e6..df67754 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -123,18 +123,19 @@ public class UnfilteredRowIteratorSerializer
         out.writeByte((byte)flags);
 
         SerializationHeader.serializer.serializeForMessaging(header, 
selection, out, hasStatic);
+        SerializationHelper helper = new SerializationHelper(header);
 
         if (!partitionDeletion.isLive())
             header.writeDeletionTime(partitionDeletion, out);
 
         if (hasStatic)
-            UnfilteredSerializer.serializer.serialize(staticRow, header, out, 
version);
+            UnfilteredSerializer.serializer.serialize(staticRow, helper, out, 
version);
 
         if (rowEstimate >= 0)
             out.writeUnsignedVInt(rowEstimate);
 
         while (iterator.hasNext())
-            UnfilteredSerializer.serializer.serialize(iterator.next(), header, 
out, version);
+            UnfilteredSerializer.serializer.serialize(iterator.next(), helper, 
out, version);
         UnfilteredSerializer.serializer.writeEndOfPartition(out);
     }
 
@@ -147,6 +148,8 @@ public class UnfilteredRowIteratorSerializer
                                                              
iterator.columns(),
                                                              iterator.stats());
 
+        SerializationHelper helper = new SerializationHelper(header);
+
         assert rowEstimate >= 0;
 
         long size = 
ByteBufferUtil.serializedSizeWithVIntLength(iterator.partitionKey().getKey())
@@ -165,19 +168,19 @@ public class UnfilteredRowIteratorSerializer
             size += header.deletionTimeSerializedSize(partitionDeletion);
 
         if (hasStatic)
-            size += UnfilteredSerializer.serializer.serializedSize(staticRow, 
header, version);
+            size += UnfilteredSerializer.serializer.serializedSize(staticRow, 
helper, version);
 
         if (rowEstimate >= 0)
             size += TypeSizes.sizeofUnsignedVInt(rowEstimate);
 
         while (iterator.hasNext())
-            size += 
UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, 
version);
+            size += 
UnfilteredSerializer.serializer.serializedSize(iterator.next(), helper, 
version);
         size += UnfilteredSerializer.serializer.serializedSizeEndOfPartition();
 
         return size;
     }
 
-    public Header deserializeHeader(TableMetadata metadata, ColumnFilter 
selection, DataInputPlus in, int version, SerializationHelper.Flag flag) throws 
IOException
+    public Header deserializeHeader(TableMetadata metadata, ColumnFilter 
selection, DataInputPlus in, int version, DeserializationHelper.Flag flag) 
throws IOException
     {
         DecoratedKey key = 
metadata.partitioner.decorateKey(ByteBufferUtil.readWithVIntLength(in));
         int flags = in.readUnsignedByte();
@@ -198,18 +201,18 @@ public class UnfilteredRowIteratorSerializer
 
         Row staticRow = Rows.EMPTY_STATIC_ROW;
         if (hasStatic)
-            staticRow = 
UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new 
SerializationHelper(metadata, version, flag));
+            staticRow = 
UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new 
DeserializationHelper(metadata, version, flag));
 
         int rowEstimate = hasRowEstimate ? (int)in.readUnsignedVInt() : -1;
         return new Header(header, key, isReversed, false, partitionDeletion, 
staticRow, rowEstimate);
     }
 
-    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
TableMetadata metadata, SerializationHelper.Flag flag, Header header) throws 
IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
TableMetadata metadata, DeserializationHelper.Flag flag, Header header) throws 
IOException
     {
         if (header.isEmpty)
             return EmptyIterators.unfilteredRow(metadata, header.key, 
header.isReversed);
 
-        final SerializationHelper helper = new SerializationHelper(metadata, 
version, flag);
+        final DeserializationHelper helper = new 
DeserializationHelper(metadata, version, flag);
         final SerializationHeader sHeader = header.sHeader;
         return new AbstractUnfilteredRowIterator(metadata, header.key, 
header.partitionDeletion, sHeader.columns(), header.staticRow, 
header.isReversed, sHeader.stats())
         {
@@ -230,7 +233,7 @@ public class UnfilteredRowIteratorSerializer
         };
     }
 
-    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
TableMetadata metadata, ColumnFilter selection, SerializationHelper.Flag flag) 
throws IOException
+    public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
TableMetadata metadata, ColumnFilter selection, DeserializationHelper.Flag 
flag) throws IOException
     {
         return deserialize(in, version, metadata, flag, 
deserializeHeader(metadata, selection, in, version, flag));
     }
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 7b48652..a5fad14 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,8 +19,6 @@ package org.apache.cassandra.db.rows;
 
 import java.io.IOException;
 
-import com.google.common.collect.Collections2;
-
 import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.db.*;
@@ -118,40 +116,41 @@ public class UnfilteredSerializer
     @Deprecated
     private final static int HAS_SHADOWABLE_DELETION = 0x02; // Whether the 
row deletion is shadowable. If there is no extended flag (or no row deletion), 
the deletion is assumed not shadowable.
 
-    public void serialize(Unfiltered unfiltered, SerializationHeader header, 
DataOutputPlus out, int version)
+    public void serialize(Unfiltered unfiltered, SerializationHelper helper, 
DataOutputPlus out, int version)
     throws IOException
     {
-        assert !header.isForSSTable();
-        serialize(unfiltered, header, out, 0, version);
+        assert !helper.header.isForSSTable();
+        serialize(unfiltered, helper, out, 0, version);
     }
 
-    public void serialize(Unfiltered unfiltered, SerializationHeader header, 
DataOutputPlus out, long previousUnfilteredSize, int version)
+    public void serialize(Unfiltered unfiltered, SerializationHelper helper, 
DataOutputPlus out, long previousUnfilteredSize, int version)
     throws IOException
     {
         if (unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER)
         {
-            serialize((RangeTombstoneMarker) unfiltered, header, out, 
previousUnfilteredSize, version);
+            serialize((RangeTombstoneMarker) unfiltered, helper, out, 
previousUnfilteredSize, version);
         }
         else
         {
-            serialize((Row) unfiltered, header, out, previousUnfilteredSize, 
version);
+            serialize((Row) unfiltered, helper, out, previousUnfilteredSize, 
version);
         }
     }
 
-    public void serializeStaticRow(Row row, SerializationHeader header, 
DataOutputPlus out, int version)
+    public void serializeStaticRow(Row row, SerializationHelper helper, 
DataOutputPlus out, int version)
     throws IOException
     {
         assert row.isStatic();
-        serialize(row, header, out, 0, version);
+        serialize(row, helper, out, 0, version);
     }
 
-    private void serialize(Row row, SerializationHeader header, DataOutputPlus 
out, long previousUnfilteredSize, int version)
+    private void serialize(Row row, SerializationHelper helper, DataOutputPlus 
out, long previousUnfilteredSize, int version)
     throws IOException
     {
         int flags = 0;
         int extendedFlags = 0;
 
         boolean isStatic = row.isStatic();
+        SerializationHeader header = helper.header;
         Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         Row.Deletion deletion = row.deletion();
@@ -191,7 +190,7 @@ public class UnfilteredSerializer
         {
             try (DataOutputBuffer dob = DataOutputBuffer.scratchBuffer.get())
             {
-                serializeRowBody(row, flags, header, dob);
+                serializeRowBody(row, flags, helper, dob);
 
                 out.writeUnsignedVInt(dob.position() + 
TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize));
                 // We write the size of the previous unfiltered to make 
reverse queries more efficient (and simpler).
@@ -202,16 +201,17 @@ public class UnfilteredSerializer
         }
         else
         {
-            serializeRowBody(row, flags, header, out);
+            serializeRowBody(row, flags, helper, out);
         }
     }
 
     @Inline
-    private void serializeRowBody(Row row, int flags, SerializationHeader 
header, DataOutputPlus out)
+    private void serializeRowBody(Row row, int flags, SerializationHelper 
helper, DataOutputPlus out)
     throws IOException
     {
         boolean isStatic = row.isStatic();
 
+        SerializationHeader header = helper.header;
         Columns headerColumns = header.columns(isStatic);
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         Row.Deletion deletion = row.deletion();
@@ -229,7 +229,7 @@ public class UnfilteredSerializer
         if ((flags & HAS_ALL_COLUMNS) == 0)
             Columns.serializer.serializeSubset(row.columns(), headerColumns, 
out);
 
-        SearchIterator<ColumnMetadata, ColumnMetadata> si = 
headerColumns.iterator();
+        SearchIterator<ColumnMetadata, ColumnMetadata> si = 
helper.iterator(isStatic);
 
         try
         {
@@ -253,7 +253,7 @@ public class UnfilteredSerializer
                 {
                     throw new WrappedException(e);
                 }
-            }, false);
+            });
         }
         catch (WrappedException e)
         {
@@ -275,9 +275,10 @@ public class UnfilteredSerializer
             Cell.serializer.serialize(cell, column, out, rowLiveness, header);
     }
 
-    private void serialize(RangeTombstoneMarker marker, SerializationHeader 
header, DataOutputPlus out, long previousUnfilteredSize, int version)
+    private void serialize(RangeTombstoneMarker marker, SerializationHelper 
helper, DataOutputPlus out, long previousUnfilteredSize, int version)
     throws IOException
     {
+        SerializationHeader header = helper.header;
         out.writeByte((byte)IS_MARKER);
         ClusteringBoundOrBoundary.serializer.serialize(marker.clustering(), 
out, version, header.clusteringTypes());
 
@@ -299,20 +300,20 @@ public class UnfilteredSerializer
         }
     }
 
-    public long serializedSize(Unfiltered unfiltered, SerializationHeader 
header, int version)
+    public long serializedSize(Unfiltered unfiltered, SerializationHelper 
helper, int version)
     {
-        assert !header.isForSSTable();
-        return serializedSize(unfiltered, header, 0, version);
+        assert !helper.header.isForSSTable();
+        return serializedSize(unfiltered, helper, 0, version);
     }
 
-    public long serializedSize(Unfiltered unfiltered, SerializationHeader 
header, long previousUnfilteredSize,int version)
+    public long serializedSize(Unfiltered unfiltered, SerializationHelper 
helper, long previousUnfilteredSize,int version)
     {
         return unfiltered.kind() == Unfiltered.Kind.RANGE_TOMBSTONE_MARKER
-             ? serializedSize((RangeTombstoneMarker) unfiltered, header, 
previousUnfilteredSize, version)
-             : serializedSize((Row) unfiltered, header, 
previousUnfilteredSize, version);
+             ? serializedSize((RangeTombstoneMarker) unfiltered, helper, 
previousUnfilteredSize, version)
+             : serializedSize((Row) unfiltered, helper, 
previousUnfilteredSize, version);
     }
 
-    private long serializedSize(Row row, SerializationHeader header, long 
previousUnfilteredSize, int version)
+    private long serializedSize(Row row, SerializationHelper helper, long 
previousUnfilteredSize, int version)
     {
         long size = 1; // flags
 
@@ -320,15 +321,16 @@ public class UnfilteredSerializer
             size += 1; // extended flags
 
         if (!row.isStatic())
-            size += Clustering.serializer.serializedSize(row.clustering(), 
version, header.clusteringTypes());
+            size += Clustering.serializer.serializedSize(row.clustering(), 
version, helper.header.clusteringTypes());
 
-        return size + serializedRowBodySize(row, header, 
previousUnfilteredSize, version);
+        return size + serializedRowBodySize(row, helper, 
previousUnfilteredSize, version);
     }
 
-    private long serializedRowBodySize(Row row, SerializationHeader header, 
long previousUnfilteredSize, int version)
+    private long serializedRowBodySize(Row row, SerializationHelper helper, 
long previousUnfilteredSize, int version)
     {
         long size = 0;
 
+        SerializationHeader header = helper.header;
         if (header.isForSSTable())
             size += TypeSizes.sizeofUnsignedVInt(previousUnfilteredSize);
 
@@ -352,19 +354,16 @@ public class UnfilteredSerializer
         if (!hasAllColumns)
             size += Columns.serializer.serializedSubsetSize(row.columns(), 
header.columns(isStatic));
 
-        SearchIterator<ColumnMetadata, ColumnMetadata> si = 
headerColumns.iterator();
-        for (ColumnData data : row)
-        {
+        SearchIterator<ColumnMetadata, ColumnMetadata> si = 
helper.iterator(isStatic);
+        return row.accumulate((data, v) -> {
             ColumnMetadata column = si.next(data.column());
             assert column != null;
 
             if (data.column.isSimple())
-                size += Cell.serializer.serializedSize((Cell) data, column, 
pkLiveness, header);
+                return v + Cell.serializer.serializedSize((Cell) data, column, 
pkLiveness, header);
             else
-                size += sizeOfComplexColumn((ComplexColumnData) data, column, 
hasComplexDeletion, pkLiveness, header);
-        }
-
-        return size;
+                return v + sizeOfComplexColumn((ComplexColumnData) data, 
column, hasComplexDeletion, pkLiveness, header);
+        }, size);
     }
 
     private long sizeOfComplexColumn(ComplexColumnData data, ColumnMetadata 
column, boolean hasComplexDeletion, LivenessInfo rowLiveness, 
SerializationHeader header)
@@ -381,12 +380,12 @@ public class UnfilteredSerializer
         return size;
     }
 
-    private long serializedSize(RangeTombstoneMarker marker, 
SerializationHeader header, long previousUnfilteredSize, int version)
+    private long serializedSize(RangeTombstoneMarker marker, 
SerializationHelper helper, long previousUnfilteredSize, int version)
     {
-        assert !header.isForSSTable();
+        assert !helper.header.isForSSTable();
         return 1 // flags
-             + 
ClusteringBoundOrBoundary.serializer.serializedSize(marker.clustering(), 
version, header.clusteringTypes())
-             + serializedMarkerBodySize(marker, header, 
previousUnfilteredSize, version);
+             + 
ClusteringBoundOrBoundary.serializer.serializedSize(marker.clustering(), 
version, helper.header.clusteringTypes())
+             + serializedMarkerBodySize(marker, helper.header, 
previousUnfilteredSize, version);
     }
 
     private long serializedMarkerBodySize(RangeTombstoneMarker marker, 
SerializationHeader header, long previousUnfilteredSize, int version)
@@ -428,7 +427,7 @@ public class UnfilteredSerializer
      * @return the deserialized {@link Unfiltered} or {@code null} if we've 
read the end of a partition. This method is
      * guaranteed to never return empty rows.
      */
-    public Unfiltered deserialize(DataInputPlus in, SerializationHeader 
header, SerializationHelper helper, Row.Builder builder)
+    public Unfiltered deserialize(DataInputPlus in, SerializationHeader 
header, DeserializationHelper helper, Row.Builder builder)
     throws IOException
     {
         while (true)
@@ -453,7 +452,7 @@ public class UnfilteredSerializer
      * But as {@link UnfilteredRowIterator} should not return empty
      * rows, this mean consumer of this method should make sure to skip said 
empty rows.
      */
-    private Unfiltered deserializeOne(DataInputPlus in, SerializationHeader 
header, SerializationHelper helper, Row.Builder builder)
+    private Unfiltered deserializeOne(DataInputPlus in, SerializationHeader 
header, DeserializationHelper helper, Row.Builder builder)
     throws IOException
     {
         // It wouldn't be wrong per-se to use an unsorted builder, but it 
would be inefficient so make sure we don't do it by mistake
@@ -481,7 +480,7 @@ public class UnfilteredSerializer
         }
     }
 
-    public Unfiltered deserializeTombstonesOnly(FileDataInput in, 
SerializationHeader header, SerializationHelper helper)
+    public Unfiltered deserializeTombstonesOnly(FileDataInput in, 
SerializationHeader header, DeserializationHelper helper)
     throws IOException
     {
         while (true)
@@ -533,7 +532,7 @@ public class UnfilteredSerializer
         }
     }
 
-    public Row deserializeStaticRow(DataInputPlus in, SerializationHeader 
header, SerializationHelper helper)
+    public Row deserializeStaticRow(DataInputPlus in, SerializationHeader 
header, DeserializationHelper helper)
     throws IOException
     {
         int flags = in.readUnsignedByte();
@@ -561,7 +560,7 @@ public class UnfilteredSerializer
 
     public Row deserializeRowBody(DataInputPlus in,
                                   SerializationHeader header,
-                                  SerializationHelper helper,
+                                  DeserializationHelper helper,
                                   int flags,
                                   int extendedFlags,
                                   Row.Builder builder)
@@ -614,7 +613,7 @@ public class UnfilteredSerializer
                     {
                         throw new WrappedException(e);
                     }
-                }, false);
+                });
             }
             catch (WrappedException e)
             {
@@ -636,7 +635,7 @@ public class UnfilteredSerializer
         }
     }
 
-    private void readSimpleColumn(ColumnMetadata column, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper, Row.Builder builder, 
LivenessInfo rowLiveness)
+    private void readSimpleColumn(ColumnMetadata column, DataInputPlus in, 
SerializationHeader header, DeserializationHelper helper, Row.Builder builder, 
LivenessInfo rowLiveness)
     throws IOException
     {
         if (helper.includes(column))
@@ -651,7 +650,7 @@ public class UnfilteredSerializer
         }
     }
 
-    private void readComplexColumn(ColumnMetadata column, DataInputPlus in, 
SerializationHeader header, SerializationHelper helper, boolean 
hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness)
+    private void readComplexColumn(ColumnMetadata column, DataInputPlus in, 
SerializationHeader header, DeserializationHelper helper, boolean 
hasComplexDeletion, Row.Builder builder, LivenessInfo rowLiveness)
     throws IOException
     {
         if (helper.includes(column))
@@ -686,7 +685,7 @@ public class UnfilteredSerializer
         in.skipBytesFully(rowSize);
     }
 
-    public void skipStaticRow(DataInputPlus in, SerializationHeader header, 
SerializationHelper helper) throws IOException
+    public void skipStaticRow(DataInputPlus in, SerializationHeader header, 
DeserializationHelper helper) throws IOException
     {
         int flags = in.readUnsignedByte();
         assert !isEndOfPartition(flags) && kind(flags) == Unfiltered.Kind.ROW 
&& isExtended(flags) : "Flags is " + flags;
diff --git 
a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java 
b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
index 190f136..686d874 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamReader.java
@@ -181,7 +181,7 @@ public class CassandraStreamReader implements IStreamReader
         private final TableMetadata metadata;
         private final DataInputPlus in;
         private final SerializationHeader header;
-        private final SerializationHelper helper;
+        private final DeserializationHelper helper;
 
         private DecoratedKey key;
         private DeletionTime partitionLevelDeletion;
@@ -193,7 +193,7 @@ public class CassandraStreamReader implements IStreamReader
         {
             this.metadata = metadata;
             this.in = in;
-            this.helper = new SerializationHelper(metadata, 
version.correspondingMessagingVersion(), 
SerializationHelper.Flag.PRESERVE_SIZE);
+            this.helper = new DeserializationHelper(metadata, 
version.correspondingMessagingVersion(), 
DeserializationHelper.Flag.PRESERVE_SIZE);
             this.header = header;
         }
 
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
index 1846aa5..76e12c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableIdentityIterator.java
@@ -57,7 +57,7 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
             DeletionTime partitionLevelDeletion = 
DeletionTime.serializer.deserialize(file);
             if (!partitionLevelDeletion.validate())
                 UnfilteredValidation.handleInvalid(sstable.metadata(), key, 
sstable, "partitionLevelDeletion="+partitionLevelDeletion.toString());
-            SerializationHelper helper = new 
SerializationHelper(sstable.metadata(), 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
+            DeserializationHelper helper = new 
DeserializationHelper(sstable.metadata(), 
sstable.descriptor.version.correspondingMessagingVersion(), 
DeserializationHelper.Flag.LOCAL);
             SSTableSimpleIterator iterator = 
SSTableSimpleIterator.create(sstable.metadata(), file, sstable.header, helper, 
partitionLevelDeletion);
             return new SSTableIdentityIterator(sstable, key, 
partitionLevelDeletion, file.getPath(), iterator);
         }
@@ -76,7 +76,7 @@ public class SSTableIdentityIterator implements 
Comparable<SSTableIdentityIterat
             dfile.seek(indexEntry.position);
             ByteBufferUtil.skipShortLength(dfile); // Skip partition key
             DeletionTime partitionLevelDeletion = 
DeletionTime.serializer.deserialize(dfile);
-            SerializationHelper helper = new 
SerializationHelper(sstable.metadata(), 
sstable.descriptor.version.correspondingMessagingVersion(), 
SerializationHelper.Flag.LOCAL);
+            DeserializationHelper helper = new 
DeserializationHelper(sstable.metadata(), 
sstable.descriptor.version.correspondingMessagingVersion(), 
DeserializationHelper.Flag.LOCAL);
             SSTableSimpleIterator iterator = tombstoneOnly
                     ? 
SSTableSimpleIterator.createTombstoneOnly(sstable.metadata(), dfile, 
sstable.header, helper, partitionLevelDeletion)
                     : SSTableSimpleIterator.create(sstable.metadata(), dfile, 
sstable.header, helper, partitionLevelDeletion);
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
index c3c7472..fd1b6a0 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleIterator.java
@@ -38,21 +38,21 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
 {
     final TableMetadata metadata;
     protected final DataInputPlus in;
-    protected final SerializationHelper helper;
+    protected final DeserializationHelper helper;
 
-    private SSTableSimpleIterator(TableMetadata metadata, DataInputPlus in, 
SerializationHelper helper)
+    private SSTableSimpleIterator(TableMetadata metadata, DataInputPlus in, 
DeserializationHelper helper)
     {
         this.metadata = metadata;
         this.in = in;
         this.helper = helper;
     }
 
-    public static SSTableSimpleIterator create(TableMetadata metadata, 
DataInputPlus in, SerializationHeader header, SerializationHelper helper, 
DeletionTime partitionDeletion)
+    public static SSTableSimpleIterator create(TableMetadata metadata, 
DataInputPlus in, SerializationHeader header, DeserializationHelper helper, 
DeletionTime partitionDeletion)
     {
         return new CurrentFormatIterator(metadata, in, header, helper);
     }
 
-    public static SSTableSimpleIterator createTombstoneOnly(TableMetadata 
metadata, DataInputPlus in, SerializationHeader header, SerializationHelper 
helper, DeletionTime partitionDeletion)
+    public static SSTableSimpleIterator createTombstoneOnly(TableMetadata 
metadata, DataInputPlus in, SerializationHeader header, DeserializationHelper 
helper, DeletionTime partitionDeletion)
     {
         return new CurrentFormatTombstoneIterator(metadata, in, header, 
helper);
     }
@@ -65,7 +65,7 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
 
         private final Row.Builder builder;
 
-        private CurrentFormatIterator(TableMetadata metadata, DataInputPlus 
in, SerializationHeader header, SerializationHelper helper)
+        private CurrentFormatIterator(TableMetadata metadata, DataInputPlus 
in, SerializationHeader header, DeserializationHelper helper)
         {
             super(metadata, in, helper);
             this.header = header;
@@ -95,7 +95,7 @@ public abstract class SSTableSimpleIterator extends 
AbstractIterator<Unfiltered>
     {
         private final SerializationHeader header;
 
-        private CurrentFormatTombstoneIterator(TableMetadata metadata, 
DataInputPlus in, SerializationHeader header, SerializationHelper helper)
+        private CurrentFormatTombstoneIterator(TableMetadata metadata, 
DataInputPlus in, SerializationHeader header, DeserializationHelper helper)
         {
             super(metadata, in, helper);
             this.header = header;
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 369be12..7ac2ebc 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -31,6 +31,7 @@ import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.rows.UnfilteredSerializer;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.schema.TableMetadataRef;
@@ -56,6 +57,7 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
 
     // Used to compute the row serialized size
     private final SerializationHeader header;
+    private final SerializationHelper helper;
 
     private final BlockingQueue<Buffer> writeQueue = new 
SynchronousQueue<Buffer>();
     private final DiskWriter diskWriter = new DiskWriter();
@@ -65,6 +67,7 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
         super(directory, metadata, columns);
         this.bufferSize = bufferSizeInMB * 1024L * 1024L;
         this.header = new SerializationHeader(true, metadata.get(), columns, 
EncodingStats.NO_STATS);
+        this.helper = new SerializationHelper(this.header);
         diskWriter.start();
     }
 
@@ -90,7 +93,7 @@ class SSTableSimpleUnsortedWriter extends 
AbstractSSTableSimpleWriter
         // improve that. In particular, what we count is closer to the 
serialized value, but it's debatable that it's the right thing
         // to count since it will take a lot more space in memory and the 
bufferSize if first and foremost used to avoid OOM when
         // using this writer.
-        currentSize += UnfilteredSerializer.serializer.serializedSize(row, 
header, 0, formatType.info.getLatestVersion().correspondingMessagingVersion());
+        currentSize += UnfilteredSerializer.serializer.serializedSize(row, 
helper, 0, formatType.info.getLatestVersion().correspondingMessagingVersion());
     }
 
     private void maybeSync() throws SyncException
diff --git a/src/java/org/apache/cassandra/service/paxos/Commit.java 
b/src/java/org/apache/cassandra/service/paxos/Commit.java
index 422eaa8..ed0eb9b 100644
--- a/src/java/org/apache/cassandra/service/paxos/Commit.java
+++ b/src/java/org/apache/cassandra/service/paxos/Commit.java
@@ -117,7 +117,7 @@ public class Commit
         public Commit deserialize(DataInputPlus in, int version) throws 
IOException
         {
             UUID ballot = UUIDSerializer.serializer.deserialize(in, version);
-            PartitionUpdate update = 
PartitionUpdate.serializer.deserialize(in, version, 
SerializationHelper.Flag.LOCAL);
+            PartitionUpdate update = 
PartitionUpdate.serializer.deserialize(in, version, 
DeserializationHelper.Flag.LOCAL);
             return new Commit(ballot, update);
         }
 
diff --git a/src/java/org/apache/cassandra/utils/WrappedInt.java 
b/src/java/org/apache/cassandra/utils/BiLongAccumulator.java
similarity index 67%
copy from src/java/org/apache/cassandra/utils/WrappedInt.java
copy to src/java/org/apache/cassandra/utils/BiLongAccumulator.java
index a106575..2c3d6b5 100644
--- a/src/java/org/apache/cassandra/utils/WrappedInt.java
+++ b/src/java/org/apache/cassandra/utils/BiLongAccumulator.java
@@ -18,35 +18,7 @@
 
 package org.apache.cassandra.utils;
 
-/**
- * Simple wrapper for native int type
- */
-public class WrappedInt
+public interface BiLongAccumulator<T, A>
 {
-    private int value;
-
-    public WrappedInt(int initial)
-    {
-        this.value = initial;
-    }
-
-    public int get()
-    {
-        return value;
-    }
-
-    public void set(int value)
-    {
-        this.value = value;
-    }
-
-    public void increment()
-    {
-        ++value;
-    }
-
-    public void decrement()
-    {
-        --value;
-    }
+    long apply(T obj, A arguemnt, long v);
 }
diff --git a/src/java/org/apache/cassandra/utils/WrappedInt.java 
b/src/java/org/apache/cassandra/utils/LongAccumulator.java
similarity index 67%
rename from src/java/org/apache/cassandra/utils/WrappedInt.java
rename to src/java/org/apache/cassandra/utils/LongAccumulator.java
index a106575..fe3c195 100644
--- a/src/java/org/apache/cassandra/utils/WrappedInt.java
+++ b/src/java/org/apache/cassandra/utils/LongAccumulator.java
@@ -18,35 +18,7 @@
 
 package org.apache.cassandra.utils;
 
-/**
- * Simple wrapper for native int type
- */
-public class WrappedInt
+public interface LongAccumulator<T>
 {
-    private int value;
-
-    public WrappedInt(int initial)
-    {
-        this.value = initial;
-    }
-
-    public int get()
-    {
-        return value;
-    }
-
-    public void set(int value)
-    {
-        this.value = value;
-    }
-
-    public void increment()
-    {
-        ++value;
-    }
-
-    public void decrement()
-    {
-        --value;
-    }
+    long apply(T obj, long v);
 }
diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java 
b/src/java/org/apache/cassandra/utils/btree/BTree.java
index 97e935e..6c546ac 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTree.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTree.java
@@ -19,14 +19,18 @@
 package org.apache.cassandra.utils.btree;
 
 import java.util.*;
+import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Ordering;
 
+import org.apache.cassandra.utils.BiLongAccumulator;
+import org.apache.cassandra.utils.LongAccumulator;
 import org.apache.cassandra.utils.ObjectSizes;
 
 import static com.google.common.collect.Iterables.concat;
@@ -691,7 +695,7 @@ public class BTree
     }
 
     // returns true if the provided node is a leaf, false if it is a branch
-    static boolean isLeaf(Object[] node)
+    public static boolean isLeaf(Object[] node)
     {
         return (node.length & 1) == 1;
     }
@@ -1285,36 +1289,46 @@ public class BTree
         return compare(cmp, previous, max) < 0;
     }
 
-    /**
-     * Simple method to walk the btree forwards or reversed and apply a 
function to each element
-     *
-     * Public method
-     *
-     */
-    public static <V> void apply(Object[] btree, Consumer<V> function, boolean 
reversed)
+    private static <V, A> void applyValue(V value, BiConsumer<A, V> function, 
A argument)
     {
-        if (reversed)
-            applyReverse(btree, function, null);
-        else
-            applyForwards(btree, function, null);
+        function.accept(argument, value);
+    }
+
+    public static <V, A> void applyLeaf(Object[] btree, BiConsumer<A, V> 
function, A argument)
+    {
+        Preconditions.checkArgument(isLeaf(btree));
+        int limit = getLeafKeyEnd(btree);
+        for (int i=0; i<limit; i++)
+            applyValue((V) btree[i], function, argument);
     }
 
     /**
-     * Simple method to walk the btree forwards or reversed and apply a 
function till a stop condition is reached
+     * Simple method to walk the btree forwards and apply a function till a 
stop condition is reached
      *
-     * Public method
+     * Private method
      *
+     * @param btree
+     * @param function
      */
-    public static <V> void apply(Object[] btree, Consumer<V> function, 
Predicate<V> stopCondition, boolean reversed)
+    public static <V, A> void apply(Object[] btree, BiConsumer<A, V> function, 
A argument)
     {
-        if (reversed)
-            applyReverse(btree, function, stopCondition);
-        else
-            applyForwards(btree, function, stopCondition);
-    }
+        if (isLeaf(btree))
+        {
+            applyLeaf(btree, function, argument);
+            return;
+        }
 
+        int childOffset = getChildStart(btree);
+        int limit = btree.length - 1 - childOffset;
+        for (int i = 0 ; i < limit ; i++)
+        {
 
+            apply((Object[]) btree[childOffset + i], function, argument);
 
+            if (i < childOffset)
+                applyValue((V) btree[i], function, argument);
+        }
+    }
 
     /**
      * Simple method to walk the btree forwards and apply a function till a 
stop condition is reached
@@ -1323,89 +1337,115 @@ public class BTree
      *
      * @param btree
      * @param function
-     * @param stopCondition
      */
-    private static <V> boolean applyForwards(Object[] btree, Consumer<V> 
function, Predicate<V> stopCondition)
+    public static <V> void apply(Object[] btree, Consumer<V> function)
     {
-        boolean isLeaf = isLeaf(btree);
-        int childOffset = isLeaf ? Integer.MAX_VALUE : getChildStart(btree);
-        int limit = isLeaf ? getLeafKeyEnd(btree) : btree.length - 1;
-        for (int i = 0 ; i < limit ; i++)
-        {
-            // we want to visit in iteration order, so we visit our key nodes 
inbetween our children
-            int idx = isLeaf ? i : (i / 2) + (i % 2 == 0 ? childOffset : 0);
-            Object current = btree[idx];
-            if (idx < childOffset)
-            {
-                V castedCurrent = (V) current;
-                if (stopCondition != null && 
stopCondition.apply(castedCurrent))
-                    return true;
+        BTree.<V, Consumer<V>>apply(btree, Consumer::accept, function);
+    }
 
-                function.accept(castedCurrent);
-            }
-            else
-            {
-                if (applyForwards((Object[]) current, function, stopCondition))
-                    return true;
-            }
+    private static <V> int find(Object[] btree, V from, Comparator<V> 
comparator)
+    {
+        // find the start index in iteration order
+        Preconditions.checkNotNull(comparator);
+        int keyEnd = getKeyEnd(btree);
+        return Arrays.binarySearch((V[]) btree, 0, keyEnd, from, comparator);
+    }
+
+    private static boolean isStopSentinel(long v)
+    {
+        return v == Long.MAX_VALUE;
+    }
+
+    private static <V, A> long accumulateLeaf(Object[] btree, 
BiLongAccumulator<A, V> accumulator, A arg, Comparator<V> comparator, V from, 
long initialValue)
+    {
+        Preconditions.checkArgument(isLeaf(btree));
+        long value = initialValue;
+        int limit = getLeafKeyEnd(btree);
+
+        int startIdx = 0;
+        if (from != null)
+        {
+            int i = find(btree, from, comparator);
+            boolean isExact = i >= 0;
+            startIdx = isExact ? i : (-1 - i);
         }
 
-        return false;
+        for (int i = startIdx; i < limit; i++)
+        {
+            value = accumulator.apply(arg, (V) btree[i], value);
+
+            if (isStopSentinel(value))
+                break;
+        }
+        return value;
     }
 
     /**
-     * Simple method to walk the btree in reverse and apply a function till a 
stop condition is reached
+     * Walk the btree and accumulate a long value using the supplied 
accumulator function. Iteration will stop if the
+     * accumulator function returns the sentinel values Long.MIN_VALUE or 
Long.MAX_VALUE
      *
-     * Private method
-     *
-     * @param btree
-     * @param function
-     * @param stopCondition
+     * If the optional from argument is not null, iteration will start from 
that value (or the one after it's insertion
+     * point if an exact match isn't found)
      */
-    private static <V> boolean applyReverse(Object[] btree, Consumer<V> 
function, Predicate<V> stopCondition)
+    public static <V, A> long accumulate(Object[] btree, BiLongAccumulator<A, 
V> accumulator, A arg, Comparator<V> comparator, V from, long initialValue)
     {
-        boolean isLeaf = isLeaf(btree);
-        int childOffset = isLeaf ? 0 : getChildStart(btree);
-        int limit = isLeaf ? getLeafKeyEnd(btree)  : btree.length - 1;
-        for (int i = limit - 1, visited = 0; i >= 0 ; i--, visited++)
+        if (isLeaf(btree))
+            return accumulateLeaf(btree, accumulator, arg, comparator, from, 
initialValue);
+
+        long value = initialValue;
+        int childOffset = getChildStart(btree);
+
+        int startChild = 0;
+        if (from != null)
         {
-            int idx = i;
+            int i = find(btree, from, comparator);
+            boolean isExact = i >= 0;
 
-            // we want to visit in reverse iteration order, so we visit our 
children nodes inbetween our keys
-            if (!isLeaf)
-            {
-                int typeOffset = visited / 2;
+            startChild = isExact ? i + 1 : -1 - i;
 
-                if (i % 2 == 0)
-                {
-                    // This is a child branch. Since children are in the 
second half of the array, we must
-                    // adjust for the key's we've visited along the way
-                    idx += typeOffset;
-                }
-                else
-                {
-                    // This is a key. Since the keys are in the first half of 
the array and we are iterating
-                    // in reverse we subtract the childOffset and adjust for 
children we've walked so far
-                    idx = i - childOffset + typeOffset;
-                }
+            if (isExact)
+            {
+                value = accumulator.apply(arg, (V) btree[i], value);
+                if (isStopSentinel(value))
+                    return value;
+                from = null;
             }
+        }
 
-            Object current = btree[idx];
-            if (isLeaf || idx < childOffset)
-            {
-                V castedCurrent = (V) current;
-                if (stopCondition != null && 
stopCondition.apply(castedCurrent))
-                    return true;
+        int limit = btree.length - 1 - childOffset;
+        for (int i=startChild; i<limit; i++)
+        {
+            value = accumulate((Object[]) btree[childOffset + i], accumulator, 
arg, comparator, from, value);
 
-                function.accept(castedCurrent);
-            }
-            else
+            if (isStopSentinel(value))
+                break;
+
+            if (i < childOffset)
             {
-                if (applyReverse((Object[]) current, function, stopCondition))
-                    return true;
+                value = accumulator.apply(arg, (V) btree[i], value);
+                // stop if a sentinel stop value was returned
+                if (isStopSentinel(value))
+                    break;
             }
+
+            if (from != null)
+                from = null;
         }
+        return value;
+    }
+
+    public static <V> long accumulate(Object[] btree, LongAccumulator<V> 
accumulator, Comparator<V> comparator, V from, long initialValue)
+    {
+        return accumulate(btree, LongAccumulator::apply, accumulator, 
comparator, from, initialValue);
+    }
 
-        return false;
+    public static <V> long accumulate(Object[] btree, LongAccumulator<V> 
accumulator, long initialValue)
+    {
+        return accumulate(btree, accumulator, null, null, initialValue);
+    }
+
+    public static <V, A> long accumulate(Object[] btree, BiLongAccumulator<A, 
V> accumulator, A arg, long initialValue)
+    {
+        return accumulate(btree, accumulator, arg, null, null, initialValue);
     }
 }
diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java 
b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
index 2fcece6..2ad7f40 100644
--- a/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/BTreeSearchIterator.java
@@ -25,4 +25,8 @@ import org.apache.cassandra.utils.IndexedSearchIterator;
 
 public interface BTreeSearchIterator<K, V> extends IndexedSearchIterator<K, 
V>, Iterator<V>
 {
+    /**
+     * Reset this Iterator to its starting position
+     */
+    public void rewind();
 }
diff --git 
a/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java 
b/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
index a6197f8..29aed4b 100644
--- a/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
+++ b/src/java/org/apache/cassandra/utils/btree/LeafBTreeSearchIterator.java
@@ -45,8 +45,13 @@ public class LeafBTreeSearchIterator<K, V> implements 
BTreeSearchIterator<K, V>
         this.comparator = comparator;
         this.lowerBound = lowerBound;
         this.upperBound = upperBound;
-        this.nextPos = forwards ? lowerBound : upperBound;
-        this.hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+        rewind();
+    }
+
+    public void rewind()
+    {
+        nextPos = forwards ? lowerBound : upperBound;
+        hasNext = nextPos >= lowerBound && nextPos <= upperBound;
     }
 
     public V next()
@@ -73,12 +78,30 @@ public class LeafBTreeSearchIterator<K, V> implements 
BTreeSearchIterator<K, V>
         return Arrays.binarySearch(keys, lb, ub + 1, key, comparator);
     }
 
+    private void updateHasNext()
+    {
+        hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+    }
+
     public V next(K key)
     {
         if (!hasNext)
             return null;
         V result = null;
 
+        // first check the current position in case of sequential access
+        if (comparator.compare(key, keys[nextPos]) == 0)
+        {
+            hasCurrent = true;
+            result = (V) keys[nextPos];
+            nextPos += forwards ? 1 : -1;
+        }
+        updateHasNext();
+
+        if (result != null || !hasNext)
+            return result;
+
+        // otherwise search against the remaining values
         int find = searchNext(key);
         if (find >= 0)
         {
@@ -91,7 +114,7 @@ public class LeafBTreeSearchIterator<K, V> implements 
BTreeSearchIterator<K, V>
             nextPos = (forwards ? -1 : -2) - find;
             hasCurrent = false;
         }
-        hasNext = nextPos >= lowerBound && nextPos <= upperBound;
+        updateHasNext();
         return result;
     }
 
diff --git 
a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java 
b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
index e2c6e33..7fadac4 100644
--- a/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
+++ b/test/long/org/apache/cassandra/db/commitlog/CommitLogStressTest.java
@@ -35,15 +35,12 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
 
 import io.netty.util.concurrent.FastThreadLocalThread;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.UpdateBuilder;
-import org.apache.cassandra.config.Config.CommitLogSync;
 import org.apache.cassandra.config.*;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.marshal.UTF8Type;
@@ -451,7 +448,7 @@ public abstract class CommitLogStressTest
             {
                 mutation = Mutation.serializer.deserialize(bufIn,
                                                            
desc.getMessagingVersion(),
-                                                           
SerializationHelper.Flag.LOCAL);
+                                                           
DeserializationHelper.Flag.LOCAL);
             }
             catch (IOException e)
             {
diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java 
b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
index 8709775..e0215b7 100644
--- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java
+++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java
@@ -47,6 +47,7 @@ import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.dht.Range;
@@ -368,7 +369,7 @@ public class ReadCommandTest
                                                                                
                 MessagingService.current_version,
                                                                                
                 cfs.metadata(),
                                                                                
                 columnFilter,
-                                                                               
                 SerializationHelper.Flag.LOCAL));
+                                                                               
                 DeserializationHelper.Flag.LOCAL));
             }
         }
 
diff --git a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java 
b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
index c52fd3d..392a1a0 100644
--- a/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
+++ b/test/unit/org/apache/cassandra/db/RowIndexEntryTest.java
@@ -34,6 +34,7 @@ import org.junit.Test;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.cache.IMeasurableMemory;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
+import org.apache.cassandra.db.rows.SerializationHelper;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.CQLTester;
@@ -279,6 +280,7 @@ public class RowIndexEntryTest extends CQLTester
         {
             private final UnfilteredRowIterator iterator;
             private final SequentialWriter writer;
+            private final SerializationHelper helper;
             private final SerializationHeader header;
             private final int version;
 
@@ -306,6 +308,7 @@ public class RowIndexEntryTest extends CQLTester
             {
                 this.iterator = iterator;
                 this.writer = writer;
+                this.helper = new SerializationHelper(header);
                 this.header = header;
                 this.version = version;
                 this.observers = observers == null ? Collections.emptyList() : 
observers;
@@ -317,7 +320,7 @@ public class RowIndexEntryTest extends CQLTester
                 
ByteBufferUtil.writeWithShortLength(iterator.partitionKey().getKey(), writer);
                 
DeletionTime.serializer.serialize(iterator.partitionLevelDeletion(), writer);
                 if (header.hasStatic())
-                    
UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), 
header, writer, version);
+                    
UnfilteredSerializer.serializer.serializeStaticRow(iterator.staticRow(), 
helper, writer, version);
             }
 
             public ColumnIndex build() throws IOException
@@ -358,7 +361,7 @@ public class RowIndexEntryTest extends CQLTester
                     startPosition = pos;
                 }
 
-                UnfilteredSerializer.serializer.serialize(unfiltered, header, 
writer, pos - previousRowStart, version);
+                UnfilteredSerializer.serializer.serialize(unfiltered, helper, 
writer, pos - previousRowStart, version);
 
                 // notify observers about each new row
                 if (!observers.isEmpty())
diff --git a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java 
b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
index 18bc6e0..fa3295a 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CDCTestReplayer.java
@@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.RebufferingInputStream;
 
@@ -62,7 +62,7 @@ public class CDCTestReplayer extends CommitLogReplayer
             Mutation mutation;
             try
             {
-                mutation = Mutation.serializer.deserialize(bufIn, 
desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+                mutation = Mutation.serializer.deserialize(bufIn, 
desc.getMessagingVersion(), DeserializationHelper.Flag.LOCAL);
                 if (mutation.trackedByCDC())
                     sawCDCMutation = true;
             }
diff --git 
a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java 
b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
index 9a22b04..5b87d68 100644
--- a/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
+++ b/test/unit/org/apache/cassandra/db/commitlog/CommitLogTestReplayer.java
@@ -26,7 +26,7 @@ import org.junit.Assert;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.rows.SerializationHelper;
+import org.apache.cassandra.db.rows.DeserializationHelper;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.RebufferingInputStream;
 
@@ -65,7 +65,7 @@ public class CommitLogTestReplayer extends CommitLogReplayer
             Mutation mutation;
             try
             {
-                mutation = Mutation.serializer.deserialize(bufIn, 
desc.getMessagingVersion(), SerializationHelper.Flag.LOCAL);
+                mutation = Mutation.serializer.deserialize(bufIn, 
desc.getMessagingVersion(), DeserializationHelper.Flag.LOCAL);
                 Assert.assertTrue(processor.apply(mutation));
             }
             catch (IOException e)
diff --git a/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java 
b/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java
index 7cc1291..e60fb64 100644
--- a/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/btree/BTreeTest.java
@@ -88,14 +88,20 @@ public class BTreeTest
         }
     };
 
-    private static List<Integer> seq(int count)
+    private static List<Integer> seq(int count, int interval)
     {
         List<Integer> r = new ArrayList<>();
         for (int i = 0 ; i < count ; i++)
-            r.add(i);
+            if (i % interval == 0)
+                r.add(i);
         return r;
     }
 
+    private static List<Integer> seq(int count)
+    {
+        return seq(count, 1);
+    }
+
     private static List<Integer> rand(int count)
     {
         Random rand = ThreadLocalRandom.current();
@@ -133,27 +139,60 @@ public class BTreeTest
     }
 
     @Test
-    public void testApplyForwards()
+    public void testApply()
     {
         List<Integer> input = seq(71);
         Object[] btree = BTree.build(input, noOp);
 
         final List<Integer> result = new ArrayList<>();
-        BTree.<Integer>apply(btree, i -> result.add(i), false);
+        BTree.<Integer>apply(btree, i -> result.add(i));
 
         org.junit.Assert.assertArrayEquals(input.toArray(),result.toArray());
     }
 
     @Test
-    public void testApplyReverse()
+    public void inOrderAccumulation()
     {
         List<Integer> input = seq(71);
         Object[] btree = BTree.build(input, noOp);
+        long result = BTree.<Integer>accumulate(btree, (o, l) -> {
+            Assert.assertEquals((long) o, l + 1);
+            return o;
+        }, -1);
+        Assert.assertEquals(result, 70);
+    }
 
-        final List<Integer> result = new ArrayList<>();
-        BTree.<Integer>apply(btree, i -> result.add(i), true);
+    @Test
+    public void accumulateFrom()
+    {
+        int limit = 100;
+        for (int interval=1; interval<=5; interval++)
+        {
+            List<Integer> input = seq(limit, interval);
+            Object[] btree = BTree.build(input, noOp);
+            for (int start=0; start<=limit; start+=interval)
+            {
+                int thisInterval = interval;
+                String errMsg = String.format("interval=%s, start=%s", 
interval, start);
+                long result = BTree.accumulate(btree, (o, l) -> {
+                    Assert.assertEquals(errMsg, (long) o, l + thisInterval);
+                    return o;
+                }, Comparator.naturalOrder(), start, start - thisInterval);
+                Assert.assertEquals(errMsg, result, 
(limit-1)/interval*interval);
+            }
+        }
+    }
 
-        
org.junit.Assert.assertArrayEquals(Lists.reverse(input).toArray(),result.toArray());
+    /**
+     * accumulate function should not be called if we ask it to start past the 
end of the btree
+     */
+    @Test
+    public void accumulateFromEnd()
+    {
+        List<Integer> input = seq(100);
+        Object[] btree = BTree.build(input, noOp);
+        long result = BTree.accumulate(btree, (o, l) -> 1, Integer::compareTo, 
101, 0L);
+        Assert.assertEquals(0, result);
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to