http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 900b17a..afa731c 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -22,19 +22,16 @@ import java.io.IOException;
 import java.security.MessageDigest;
 import java.util.*;
 
-import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Lists;
-
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.MergeIterator;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.*;
 
 /**
  * Static methods to work with partition iterators.
@@ -53,33 +50,6 @@ public abstract class UnfilteredPartitionIterators
         public void close();
     }
 
-
-    public static UnfilteredPartitionIterator empty(final CFMetaData metadata)
-    {
-        return new AbstractUnfilteredPartitionIterator()
-        {
-            public boolean isForThrift()
-            {
-                return false;
-            }
-
-            public CFMetaData metadata()
-            {
-                return metadata;
-            }
-
-            public boolean hasNext()
-            {
-                return false;
-            }
-
-            public UnfilteredRowIterator next()
-            {
-                throw new NoSuchElementException();
-            }
-        };
-    }
-
     @SuppressWarnings("resource") // The created resources are returned right 
away
     public static UnfilteredRowIterator getOnlyElement(final 
UnfilteredPartitionIterator iter, SinglePartitionReadCommand<?> command)
     {
@@ -87,30 +57,24 @@ public abstract class UnfilteredPartitionIterators
         // want a RowIterator out of this method, so we return an empty one.
         UnfilteredRowIterator toReturn = iter.hasNext()
                               ? iter.next()
-                              : 
UnfilteredRowIterators.emptyIterator(command.metadata(),
-                                                                     
command.partitionKey(),
-                                                                     
command.clusteringIndexFilter().isReversed());
+                              : 
EmptyIterators.unfilteredRow(command.metadata(),
+                                                             
command.partitionKey(),
+                                                             
command.clusteringIndexFilter().isReversed());
 
         // Note that in general, we should wrap the result so that it's close 
method actually
         // close the whole UnfilteredPartitionIterator.
-        return new WrappingUnfilteredRowIterator(toReturn)
+        class Close extends Transformation
         {
-            public void close()
+            public void onPartitionClose()
             {
-                try
-                {
-                    super.close();
-                }
-                finally
-                {
-                    // asserting this only now because it bothers Serializer 
if hasNext() is called before
-                    // the previously returned iterator hasn't been fully 
consumed.
-                    assert !iter.hasNext();
-
-                    iter.close();
-                }
+                // asserting this only now because it bothers Serializer if 
hasNext() is called before
+                // the previously returned iterator hasn't been fully consumed.
+                boolean hadNext = iter.hasNext();
+                iter.close();
+                assert !hadNext;
             }
-        };
+        }
+        return Transformation.apply(toReturn, new Close());
     }
 
     public static PartitionIterator 
mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, 
MergeListener listener)
@@ -121,55 +85,7 @@ public abstract class UnfilteredPartitionIterators
 
     public static PartitionIterator filter(final UnfilteredPartitionIterator 
iterator, final int nowInSec)
     {
-        return new PartitionIterator()
-        {
-            private RowIterator next;
-
-            public boolean hasNext()
-            {
-                while (next == null && iterator.hasNext())
-                {
-                    @SuppressWarnings("resource") // closed either directly if 
empty, or, if assigned to next, by either
-                                                  // the caller of next() or 
close()
-                    UnfilteredRowIterator rowIterator = iterator.next();
-                    next = UnfilteredRowIterators.filter(rowIterator, 
nowInSec);
-                    if (!iterator.isForThrift() && next.isEmpty())
-                    {
-                        rowIterator.close();
-                        next = null;
-                    }
-                }
-                return next != null;
-            }
-
-            public RowIterator next()
-            {
-                if (next == null && !hasNext())
-                    throw new NoSuchElementException();
-
-                RowIterator toReturn = next;
-                next = null;
-                return toReturn;
-            }
-
-            public void remove()
-            {
-                throw new UnsupportedOperationException();
-            }
-
-            public void close()
-            {
-                try
-                {
-                    iterator.close();
-                }
-                finally
-                {
-                    if (next != null)
-                        next.close();
-                }
-            }
-        };
+        return FilteredPartitions.filter(iterator, nowInSec);
     }
 
     public static UnfilteredPartitionIterator merge(final List<? extends 
UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener 
listener)
@@ -204,7 +120,7 @@ public abstract class UnfilteredPartitionIterators
                 // Replace nulls by empty iterators
                 for (int i = 0; i < toMerge.size(); i++)
                     if (toMerge.get(i) == null)
-                        toMerge.set(i, 
UnfilteredRowIterators.emptyIterator(metadata, partitionKey, isReverseOrder));
+                        toMerge.set(i, EmptyIterators.unfilteredRow(metadata, 
partitionKey, isReverseOrder));
 
                 return UnfilteredRowIterators.merge(toMerge, nowInSec, 
rowListener);
             }
@@ -353,13 +269,14 @@ public abstract class UnfilteredPartitionIterators
      */
     public static UnfilteredPartitionIterator 
loggingIterator(UnfilteredPartitionIterator iterator, final String id, final 
boolean fullDetails)
     {
-        return new WrappingUnfilteredPartitionIterator(iterator)
+        class Logging extends Transformation<UnfilteredRowIterator>
         {
-            public UnfilteredRowIterator next()
+            public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition)
             {
-                return UnfilteredRowIterators.loggingIterator(super.next(), 
id, fullDetails);
+                return UnfilteredRowIterators.loggingIterator(partition, id, 
fullDetails);
             }
-        };
+        }
+        return Transformation.apply(iterator, new Logging());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java 
b/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
deleted file mode 100644
index 4d4be70..0000000
--- a/src/java/org/apache/cassandra/db/partitions/WrappingPartitionIterator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.db.rows.RowIterator;
-
-public abstract class WrappingPartitionIterator implements PartitionIterator
-{
-    protected final PartitionIterator wrapped;
-
-    protected WrappingPartitionIterator(PartitionIterator wrapped)
-    {
-        this.wrapped = wrapped;
-    }
-
-    public boolean hasNext()
-    {
-        return wrapped.hasNext();
-    }
-
-    public RowIterator next()
-    {
-        return wrapped.next();
-    }
-
-    public void remove()
-    {
-        wrapped.remove();
-    }
-
-    public void close()
-    {
-        wrapped.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
 
b/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
deleted file mode 100644
index ebf3c28..0000000
--- 
a/src/java/org/apache/cassandra/db/partitions/WrappingUnfilteredPartitionIterator.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.partitions;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-
-/**
- * A utility class for writing partition iterators that filter/modify other
- * partition iterators.
- *
- * This work a little bit like Guava's AbstractIterator in that you only need
- * to implement the computeNext() method, though that method takes as argument
- * the UnfilteredRowIterator to filter from the wrapped partition iterator.
- */
-public abstract class WrappingUnfilteredPartitionIterator extends 
AbstractUnfilteredPartitionIterator
-{
-    protected final UnfilteredPartitionIterator wrapped;
-
-    private UnfilteredRowIterator next;
-
-    protected WrappingUnfilteredPartitionIterator(UnfilteredPartitionIterator 
wrapped)
-    {
-        this.wrapped = wrapped;
-    }
-
-    public boolean isForThrift()
-    {
-        return wrapped.isForThrift();
-    }
-
-    public CFMetaData metadata()
-    {
-        return wrapped.metadata();
-    }
-
-    public boolean hasNext()
-    {
-        prepareNext();
-        return next != null;
-    }
-
-    public UnfilteredRowIterator next()
-    {
-        prepareNext();
-        assert next != null;
-
-        UnfilteredRowIterator toReturn = next;
-        next = null;
-        return toReturn;
-    }
-
-    private void prepareNext()
-    {
-        while (next == null && wrapped.hasNext())
-        {
-            @SuppressWarnings("resource") // Closed on exception, right away 
if empty or ignored by computeNext, or if assigned to 'next',
-                                          // either by the caller to next(), 
or in close().
-            UnfilteredRowIterator wrappedNext = wrapped.next();
-            try
-            {
-                UnfilteredRowIterator maybeNext = computeNext(wrappedNext);
-
-                // As the wrappd iterator shouldn't return an empty iterator, 
if computeNext
-                // gave us back it's input we save the isEmpty check.
-                if (maybeNext != null && (isForThrift() || maybeNext == 
wrappedNext || !maybeNext.isEmpty()))
-                {
-                    next = maybeNext;
-                    return;
-                }
-                else
-                {
-                    wrappedNext.close();
-                }
-            }
-            catch (RuntimeException | Error e)
-            {
-                wrappedNext.close();
-                throw e;
-            }
-        }
-    }
-
-    /**
-     * Given the next UnfilteredRowIterator from the wrapped partition 
iterator, return
-     * the (potentially modified) UnfilteredRowIterator to return. Please note 
that the
-     * result will be skipped if it's either {@code null} of if it's empty.
-     *
-     * The default implementation return it's input unchanged to make it easier
-     * to write wrapping partition iterators that only change the close method.
-     */
-    protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter)
-    {
-        return iter;
-    }
-
-    @Override
-    public void close()
-    {
-        try
-        {
-            wrapped.close();
-        }
-        finally
-        {
-            if (next != null)
-                next.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java
deleted file mode 100644
index a390bad..0000000
--- a/src/java/org/apache/cassandra/db/rows/AlteringUnfilteredRowIterator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import java.util.NoSuchElementException;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-
-/**
- * Class that makes it easier to write unfiltered iterators that filter or 
modify
- * the returned unfiltered.
- *
- * The methods you want to override are {@code computeNextStatic} and the 
{@code computeNext} methods.
- * All of these methods are allowed to return a {@code null} value with the 
meaning of ignoring
- * the entry.
- */
-public abstract class AlteringUnfilteredRowIterator extends 
WrappingUnfilteredRowIterator
-{
-    private Row staticRow;
-    private Unfiltered next;
-
-    protected AlteringUnfilteredRowIterator(UnfilteredRowIterator wrapped)
-    {
-        super(wrapped);
-    }
-
-    protected Row computeNextStatic(Row row)
-    {
-        return row;
-    }
-
-    protected Row computeNext(Row row)
-    {
-        return row;
-    }
-
-    protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
-    {
-        return marker;
-    }
-
-    public Row staticRow()
-    {
-        if (staticRow == null)
-        {
-            Row row = computeNextStatic(super.staticRow());
-            staticRow = row == null ? Rows.EMPTY_STATIC_ROW : row;
-        }
-        return staticRow;
-    }
-
-    public boolean hasNext()
-    {
-        while (next == null && super.hasNext())
-        {
-            Unfiltered unfiltered = super.next();
-            if (unfiltered.isRow())
-            {
-                Row row = computeNext((Row)unfiltered);
-                if (row != null && !row.isEmpty())
-                    next = row;
-            }
-            else
-            {
-                next = computeNext((RangeTombstoneMarker)unfiltered);
-            }
-        }
-        return next != null;
-    }
-
-    public Unfiltered next()
-    {
-        if (!hasNext())
-            throw new NoSuchElementException();
-
-        Unfiltered toReturn = next;
-        next = null;
-        return toReturn;
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java
new file mode 100644
index 0000000..fb9e908
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/BaseRowIterator.java
@@ -0,0 +1,64 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.db.rows;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.utils.CloseableIterator;
+
+/**
+ * A common interface for Row and Unfiltered, that permits sharing of the 
(majority) common
+ * methods and functionality
+ */
+public interface BaseRowIterator<U extends Unfiltered> extends 
CloseableIterator<U>
+{
+    /**
+     * The metadata for the table this iterator on.
+     */
+    public CFMetaData metadata();
+
+    /**
+     * Whether or not the rows returned by this iterator are in reversed
+     * clustering order.
+     */
+    public boolean isReverseOrder();
+
+    /**
+     * A subset of the columns for the (static and regular) rows returned by 
this iterator.
+     * Every row returned by this iterator must guarantee that it has only 
those columns.
+     */
+    public PartitionColumns columns();
+
+    /**
+     * The partition key of the partition this in an iterator over.
+     */
+    public DecoratedKey partitionKey();
+
+    /**
+     * The static part corresponding to this partition (this can be an empty
+     * row).
+     */
+    public Row staticRow();
+
+    /**
+     * Returns whether the provided iterator has no data.
+     */
+    public boolean isEmpty();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/RowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterator.java 
b/src/java/org/apache/cassandra/db/rows/RowIterator.java
index 69994dd..f0b4499 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterator.java
@@ -34,39 +34,9 @@ import org.apache.cassandra.db.*;
  * reverse clustering order if isReverseOrder is true), and the Row objects 
returned
  * by next() are only valid until the next call to hasNext() or next().
  */
-public interface RowIterator extends Iterator<Row>, AutoCloseable
+public interface RowIterator extends BaseRowIterator<Row>
 {
     /**
-     * The metadata for the table this iterator on.
-     */
-    public CFMetaData metadata();
-
-    /**
-     * Whether or not the rows returned by this iterator are in reversed
-     * clustering order.
-     */
-    public boolean isReverseOrder();
-
-    /**
-     * A subset of the columns for the (static and regular) rows returned by 
this iterator.
-     * Every row returned by this iterator must guarantee that it has only 
those columns.
-     */
-    public PartitionColumns columns();
-
-    /**
-     * The partition key of the partition this in an iterator over.
-     */
-    public DecoratedKey partitionKey();
-
-    /**
-     * The static part corresponding to this partition (this can be an empty
-     * row).
-     */
-    public Row staticRow();
-
-    public void close();
-
-    /**
      * Returns whether the provided iterator has no data.
      */
     public default boolean isEmpty()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java 
b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index 30f5c50..551edb8 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -17,14 +17,13 @@
  */
 package org.apache.cassandra.db.rows;
 
-import java.util.*;
 import java.security.MessageDigest;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
@@ -49,54 +48,6 @@ public abstract class RowIterators
             iterator.next().digest(digest);
     }
 
-    public static RowIterator emptyIterator(CFMetaData cfm, DecoratedKey 
partitionKey, boolean isReverseOrder)
-    {
-        return iterator(cfm, partitionKey, isReverseOrder, 
Collections.emptyIterator());
-    }
-
-    public static RowIterator iterator(CFMetaData cfm, DecoratedKey 
partitionKey, boolean isReverseOrder, Iterator<Row> iterator)
-    {
-        return new RowIterator()
-        {
-            public CFMetaData metadata()
-            {
-                return cfm;
-            }
-
-            public boolean isReverseOrder()
-            {
-                return isReverseOrder;
-            }
-
-            public PartitionColumns columns()
-            {
-                return PartitionColumns.NONE;
-            }
-
-            public DecoratedKey partitionKey()
-            {
-                return partitionKey;
-            }
-
-            public Row staticRow()
-            {
-                return Rows.EMPTY_STATIC_ROW;
-            }
-
-            public void close() { }
-
-            public boolean hasNext()
-            {
-                return iterator.hasNext();
-            }
-
-            public Row next()
-            {
-                return iterator.next();
-            }
-        };
-    }
-
     /**
      * Wraps the provided iterator so it logs the returned rows for debugging 
purposes.
      * <p>
@@ -113,24 +64,23 @@ public abstract class RowIterators
                     
metadata.getKeyValidator().getString(iterator.partitionKey().getKey()),
                     iterator.isReverseOrder());
 
-        return new WrappingRowIterator(iterator)
+        class Log extends Transformation
         {
             @Override
-            public Row staticRow()
+            public Row applyToStatic(Row row)
             {
-                Row row = super.staticRow();
                 if (!row.isEmpty())
-                    logger.info("[{}] {}", id, row.toString(metadata()));
+                    logger.info("[{}] {}", id, row.toString(metadata));
                 return row;
             }
 
             @Override
-            public Row next()
+            public Row applyToRow(Row row)
             {
-                Row next = super.next();
-                logger.info("[{}] {}", id, next.toString(metadata()));
-                return next;
+                logger.info("[{}] {}", id, row.toString(metadata));
+                return row;
             }
-        };
+        }
+        return Transformation.apply(iterator, new Log());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
index 649fd8b..a969858 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterator.java
@@ -46,50 +46,20 @@ import org.apache.cassandra.db.*;
  * the returned objects for longer than the iteration, it must make a copy of
  * it explicitly.
  */
-public interface UnfilteredRowIterator extends Iterator<Unfiltered>, 
AutoCloseable
+public interface UnfilteredRowIterator extends BaseRowIterator<Unfiltered>
 {
     /**
-     * The metadata for the table this iterator on.
-     */
-    public CFMetaData metadata();
-
-    /**
-     * A subset of the columns for the (static and regular) rows returned by 
this iterator.
-     * Every row returned by this iterator must guarantee that it has only 
those columns.
-     */
-    public PartitionColumns columns();
-
-    /**
-     * Whether or not the atom returned by this iterator are in reversed
-     * clustering order.
-     */
-    public boolean isReverseOrder();
-
-    /**
-     * The partition key of the partition this in an iterator over.
-     */
-    public DecoratedKey partitionKey();
-
-    /**
      * The partition level deletion for the partition this iterate over.
      */
     public DeletionTime partitionLevelDeletion();
 
     /**
-     * The static part corresponding to this partition (this can be an empty
-     * row).
-     */
-    public Row staticRow();
-
-    /**
      * Return "statistics" about what is returned by this iterator. Those are 
used for
      * performance reasons (for delta-encoding for instance) and code should 
not
      * expect those to be exact.
      */
     public EncodingStats stats();
 
-    public void close();
-
     /**
      * Returns whether this iterator has no data (including no deletion data).
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
index 3a0558e..932ca4c 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java
@@ -202,7 +202,7 @@ public class UnfilteredRowIteratorSerializer
     public UnfilteredRowIterator deserialize(DataInputPlus in, int version, 
CFMetaData metadata, SerializationHelper.Flag flag, Header header) throws 
IOException
     {
         if (header.isEmpty)
-            return UnfilteredRowIterators.emptyIterator(metadata, header.key, 
header.isReversed);
+            return EmptyIterators.unfilteredRow(metadata, header.key, 
header.isReversed);
 
         final SerializationHelper helper = new SerializationHelper(metadata, 
version, flag);
         final SerializationHeader sHeader = header.sHeader;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index 22628e2..ea929d7 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -22,10 +22,12 @@ import java.security.MessageDigest;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import com.google.common.collect.AbstractIterator;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.transform.FilteredRows;
+import org.apache.cassandra.db.transform.MoreRows;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.serializers.MarshalException;
@@ -63,8 +65,7 @@ public abstract class UnfilteredRowIterators
      */
     public static RowIterator filter(UnfilteredRowIterator iter, int nowInSec)
     {
-        return new FilteringIterator(iter, nowInSec);
-
+        return FilteredRows.filter(iter, nowInSec);
     }
 
     /**
@@ -90,72 +91,12 @@ public abstract class UnfilteredRowIterators
         return UnfilteredRowMergeIterator.create(iterators, nowInSec, 
mergeListener);
     }
 
-    public static UnfilteredRowIterator emptyIterator(final CFMetaData cfm, 
final DecoratedKey partitionKey, final boolean isReverseOrder)
-    {
-        return noRowsIterator(cfm, partitionKey, Rows.EMPTY_STATIC_ROW, 
DeletionTime.LIVE, isReverseOrder);
-    }
     /**
      * Returns an empty atom iterator for a given partition.
      */
     public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, 
final DecoratedKey partitionKey, final Row staticRow, final DeletionTime 
partitionDeletion, final boolean isReverseOrder)
     {
-        PartitionColumns columns = staticRow == Rows.EMPTY_STATIC_ROW ? 
PartitionColumns.NONE
-                                                                      : new 
PartitionColumns(Columns.from(staticRow.columns()), Columns.NONE);
-        return new UnfilteredRowIterator()
-        {
-            public CFMetaData metadata()
-            {
-                return cfm;
-            }
-
-            public boolean isReverseOrder()
-            {
-                return isReverseOrder;
-            }
-
-            public PartitionColumns columns()
-            {
-                return columns;
-            }
-
-            public DecoratedKey partitionKey()
-            {
-                return partitionKey;
-            }
-
-            public DeletionTime partitionLevelDeletion()
-            {
-                return partitionDeletion;
-            }
-
-            public Row staticRow()
-            {
-                return staticRow;
-            }
-
-            public EncodingStats stats()
-            {
-                return EncodingStats.NO_STATS;
-            }
-
-            public boolean hasNext()
-            {
-                return false;
-            }
-
-            public Unfiltered next()
-            {
-                throw new NoSuchElementException();
-            }
-
-            public void remove()
-            {
-            }
-
-            public void close()
-            {
-            }
-        };
+        return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, 
staticRow, partitionDeletion);
     }
 
     /**
@@ -201,65 +142,45 @@ public abstract class UnfilteredRowIterators
             && iter1.columns().equals(iter2.columns())
             && iter1.staticRow().equals(iter2.staticRow());
 
-        return new AbstractUnfilteredRowIterator(iter1.metadata(),
-                                                 iter1.partitionKey(),
-                                                 
iter1.partitionLevelDeletion(),
-                                                 iter1.columns(),
-                                                 iter1.staticRow(),
-                                                 iter1.isReverseOrder(),
-                                                 iter1.stats())
+        class Extend implements MoreRows<UnfilteredRowIterator>
         {
-            protected Unfiltered computeNext()
+            boolean returned = false;
+            public UnfilteredRowIterator moreContents()
             {
-                if (iter1.hasNext())
-                    return iter1.next();
-
-                return iter2.hasNext() ? iter2.next() : endOfData();
+                if (returned)
+                    return null;
+                returned = true;
+                return iter2;
             }
+        }
 
-            @Override
-            public void close()
-            {
-                try
-                {
-                    iter1.close();
-                }
-                finally
-                {
-                    iter2.close();
-                }
-            }
-        };
+        return MoreRows.extend(iter1, new Extend());
     }
 
     public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator 
iterator, final AbstractAllocator allocator)
     {
-        return new AlteringUnfilteredRowIterator(iterator)
+        class Cloner extends Transformation
         {
-            private Row.Builder regularBuilder;
+            private final Row.Builder builder = 
allocator.cloningBTreeRowBuilder();
 
-            @Override
-            protected Row computeNextStatic(Row row)
+            public Row applyToStatic(Row row)
             {
-                Row.Builder staticBuilder = allocator.cloningBTreeRowBuilder();
-                return Rows.copy(row, staticBuilder).build();
+                return Rows.copy(row, builder).build();
             }
 
             @Override
-            protected Row computeNext(Row row)
+            public Row applyToRow(Row row)
             {
-                if (regularBuilder == null)
-                    regularBuilder = allocator.cloningBTreeRowBuilder();
-
-                return Rows.copy(row, regularBuilder).build();
+                return Rows.copy(row, builder).build();
             }
 
             @Override
-            protected RangeTombstoneMarker computeNext(RangeTombstoneMarker 
marker)
+            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker 
marker)
             {
                 return marker.copy(allocator);
             }
-        };
+        }
+        return Transformation.apply(iterator, new Cloner());
     }
 
     /**
@@ -277,24 +198,24 @@ public abstract class UnfilteredRowIterators
      */
     public static UnfilteredRowIterator withValidation(UnfilteredRowIterator 
iterator, final String filename)
     {
-        return new AlteringUnfilteredRowIterator(iterator)
+        class Validator extends Transformation
         {
             @Override
-            protected Row computeNextStatic(Row row)
+            public Row applyToStatic(Row row)
             {
                 validate(row);
                 return row;
             }
 
             @Override
-            protected Row computeNext(Row row)
+            public Row applyToRow(Row row)
             {
                 validate(row);
                 return row;
             }
 
             @Override
-            protected RangeTombstoneMarker computeNext(RangeTombstoneMarker 
marker)
+            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker 
marker)
             {
                 validate(marker);
                 return marker;
@@ -311,7 +232,8 @@ public abstract class UnfilteredRowIterators
                     throw new CorruptSSTableException(me, filename);
                 }
             }
-        };
+        }
+        return Transformation.apply(iterator, new Validator());
     }
 
     /**
@@ -331,30 +253,31 @@ public abstract class UnfilteredRowIterators
                     iterator.isReverseOrder(),
                     iterator.partitionLevelDeletion().markedForDeleteAt());
 
-        return new AlteringUnfilteredRowIterator(iterator)
+        class Logger extends Transformation
         {
             @Override
-            protected Row computeNextStatic(Row row)
+            public Row applyToStatic(Row row)
             {
                 if (!row.isEmpty())
-                    logger.info("[{}] {}", id, row.toString(metadata(), 
fullDetails));
+                    logger.info("[{}] {}", id, row.toString(metadata, 
fullDetails));
                 return row;
             }
 
             @Override
-            protected Row computeNext(Row row)
+            public Row applyToRow(Row row)
             {
-                logger.info("[{}] {}", id, row.toString(metadata(), 
fullDetails));
+                logger.info("[{}] {}", id, row.toString(metadata, 
fullDetails));
                 return row;
             }
 
             @Override
-            protected RangeTombstoneMarker computeNext(RangeTombstoneMarker 
marker)
+            public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker 
marker)
             {
-                logger.info("[{}] {}", id, marker.toString(metadata()));
+                logger.info("[{}] {}", id, marker.toString(metadata));
                 return marker;
             }
-        };
+        }
+        return Transformation.apply(iterator, new Logger());
     }
 
     /**
@@ -577,66 +500,4 @@ public abstract class UnfilteredRowIterators
             }
         }
     }
-
-    private static class FilteringIterator extends AbstractIterator<Row> 
implements RowIterator
-    {
-        private final UnfilteredRowIterator iter;
-        private final int nowInSec;
-
-        public FilteringIterator(UnfilteredRowIterator iter, int nowInSec)
-        {
-            this.iter = iter;
-            this.nowInSec = nowInSec;
-        }
-
-        public CFMetaData metadata()
-        {
-            return iter.metadata();
-        }
-
-        public boolean isReverseOrder()
-        {
-            return iter.isReverseOrder();
-        }
-
-        public PartitionColumns columns()
-        {
-            return iter.columns();
-        }
-
-        public DecoratedKey partitionKey()
-        {
-            return iter.partitionKey();
-        }
-
-        public Row staticRow()
-        {
-            Row row = iter.staticRow();
-            if (row.isEmpty())
-                return Rows.EMPTY_STATIC_ROW;
-
-            row = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
-            return row == null ? Rows.EMPTY_STATIC_ROW : row;
-        }
-
-        protected Row computeNext()
-        {
-            while (iter.hasNext())
-            {
-                Unfiltered next = iter.next();
-                if (next.isRangeTombstoneMarker())
-                    continue;
-
-                Row row = ((Row)next).purge(DeletionPurger.PURGE_ALL, 
nowInSec);
-                if (row != null)
-                    return row;
-            }
-            return endOfData();
-        }
-
-        public void close()
-        {
-            iter.close();
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
deleted file mode 100644
index 8847a47..0000000
--- a/src/java/org/apache/cassandra/db/rows/WrappingRowIterator.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.cassandra.db.rows;
-
-import com.google.common.collect.UnmodifiableIterator;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-
-/**
- * Abstract class to make writing atom iterators that wrap another iterator
- * easier. By default, the wrapping iterator simply delegate every call to
- * the wrapped iterator so concrete implementations will override some of the
- * methods.
- */
-public abstract class WrappingRowIterator extends UnmodifiableIterator<Row>  
implements RowIterator
-{
-    protected final RowIterator wrapped;
-
-    protected WrappingRowIterator(RowIterator wrapped)
-    {
-        this.wrapped = wrapped;
-    }
-
-    public CFMetaData metadata()
-    {
-        return wrapped.metadata();
-    }
-
-    public boolean isReverseOrder()
-    {
-        return wrapped.isReverseOrder();
-    }
-
-    public PartitionColumns columns()
-    {
-        return wrapped.columns();
-    }
-
-    public DecoratedKey partitionKey()
-    {
-        return wrapped.partitionKey();
-    }
-
-    public Row staticRow()
-    {
-        return wrapped.staticRow();
-    }
-
-    public boolean hasNext()
-    {
-        return wrapped.hasNext();
-    }
-
-    public Row next()
-    {
-        return wrapped.next();
-    }
-
-    public void close()
-    {
-        wrapped.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java 
b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
index 84713eb..8b18554 100644
--- a/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
+++ b/src/java/org/apache/cassandra/db/rows/WrappingUnfilteredRowIterator.java
@@ -29,7 +29,7 @@ import org.apache.cassandra.db.*;
  * some of the methods.
  * <p>
  * Note that if most of what you want to do is modifying/filtering the returned
- * {@code Unfiltered}, {@link AlteringUnfilteredRowIterator} can be a simpler 
option.
+ * {@code Unfiltered}, {@link 
org.apache.cassandra.db.transform.Transformation.apply} can be a simpler option.
  */
 public abstract class WrappingUnfilteredRowIterator extends 
UnmodifiableIterator<Unfiltered>  implements UnfilteredRowIterator
 {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/BaseIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/BaseIterator.java 
b/src/java/org/apache/cassandra/db/transform/BaseIterator.java
new file mode 100644
index 0000000..9b95dfa
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/BaseIterator.java
@@ -0,0 +1,129 @@
+package org.apache.cassandra.db.transform;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
+import org.apache.cassandra.utils.CloseableIterator;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+abstract class BaseIterator<V, I extends CloseableIterator<? extends V>, O 
extends V> extends Stack implements AutoCloseable, Iterator<O>
+{
+    I input;
+    V next;
+    Stop stop; // applies at the end of the current next()
+
+    static class Stop
+    {
+        // TODO: consider moving "next" into here, so that a stop() when 
signalled outside of a function call (e.g. in attach)
+        // can take effect immediately; this doesn't seem to be necessary at 
the moment, but it might cause least surprise in future
+        boolean isSignalled;
+    }
+
+    // responsibility for initialising next lies with the subclass
+    BaseIterator(BaseIterator<? extends V, ? extends I, ?> copyFrom)
+    {
+        super(copyFrom);
+        this.input = copyFrom.input;
+        this.next = copyFrom.next;
+        this.stop = copyFrom.stop;
+    }
+
+    BaseIterator(I input)
+    {
+        this.input = input;
+        this.stop = new Stop();
+    }
+
+    /**
+     * run the corresponding runOnClose method for the first length 
transformations.
+     *
+     * used in hasMoreContents to close the methods preceding the MoreContents
+     */
+    protected abstract Throwable runOnClose(int length);
+
+    /**
+     * apply the relevant method from the transformation to the value.
+     *
+     * used in hasMoreContents to apply the functions that follow the 
MoreContents
+     */
+    protected abstract V applyOne(V value, Transformation transformation);
+
+    public final void close()
+    {
+        Throwable fail = runOnClose(length);
+        if (next instanceof AutoCloseable)
+        {
+            try { ((AutoCloseable) next).close(); }
+            catch (Throwable t) { fail = merge(fail, t); }
+        }
+        try { input.close(); }
+        catch (Throwable t) { fail = merge(fail, t); }
+        maybeFail(fail);
+    }
+
+    public final O next()
+    {
+        if (next == null && !hasNext())
+            throw new NoSuchElementException();
+
+        O next = (O) this.next;
+        this.next = null;
+        return next;
+    }
+
+    // may set next != null if the next contents are a transforming iterator 
that already has data to return,
+    // in which case we immediately have more contents to yield
+    protected final boolean hasMoreContents()
+    {
+        return moreContents.length > 0 && tryGetMoreContents();
+    }
+
+    @DontInline
+    private boolean tryGetMoreContents()
+    {
+        for (int i = 0 ; i < moreContents.length ; i++)
+        {
+            MoreContentsHolder holder = moreContents[i];
+            MoreContents provider = holder.moreContents;
+            I newContents = (I) provider.moreContents();
+            if (newContents == null)
+                continue;
+
+            input.close();
+            input = newContents;
+            Stack prefix = EMPTY;
+            if (newContents instanceof BaseIterator)
+            {
+                // we're refilling with transformed contents, so swap in its 
internals directly
+                // TODO: ensure that top-level data is consistent. i.e. 
staticRow, partitionlevelDeletion etc are same?
+                BaseIterator abstr = (BaseIterator) newContents;
+                prefix = abstr;
+                input = (I) abstr.input;
+                next = apply((V) abstr.next, holder.length); // must apply all 
remaining functions to the next, if any
+            }
+
+            // since we're truncating our transformation stack to only those 
occurring after the extend transformation
+            // we have to run any prior runOnClose methods
+            maybeFail(runOnClose(holder.length));
+            refill(prefix, holder, i);
+
+            if (next != null || input.hasNext())
+                return true;
+
+            i = -1;
+        }
+        return false;
+    }
+
+    // apply the functions [from..length)
+    private V apply(V next, int from)
+    {
+        while (next != null & from < length)
+            next = applyOne(next, stack[from++]);
+        return next;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/BasePartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/BasePartitions.java 
b/src/java/org/apache/cassandra/db/transform/BasePartitions.java
new file mode 100644
index 0000000..e795760
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/BasePartitions.java
@@ -0,0 +1,100 @@
+package org.apache.cassandra.db.transform;
+
+import java.util.Collections;
+
+import org.apache.cassandra.db.partitions.BasePartitionIterator;
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.utils.Throwables;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+
+public abstract class BasePartitions<R extends BaseRowIterator<?>, I extends 
BasePartitionIterator<? extends BaseRowIterator<?>>>
+extends BaseIterator<BaseRowIterator<?>, I, R>
+implements BasePartitionIterator<R>
+{
+
+    public BasePartitions(I input)
+    {
+        super(input);
+    }
+
+    BasePartitions(BasePartitions<?, ? extends I> copyFrom)
+    {
+        super(copyFrom);
+    }
+
+
+    // *********************************
+
+
+    protected BaseRowIterator<?> applyOne(BaseRowIterator<?> value, 
Transformation transformation)
+    {
+        return value == null ? null : transformation.applyToPartition(value);
+    }
+
+    void add(Transformation transformation)
+    {
+        transformation.attachTo(this);
+        super.add(transformation);
+        next = applyOne(next, transformation);
+    }
+
+    protected Throwable runOnClose(int length)
+    {
+        Throwable fail = null;
+        Transformation[] fs = stack;
+        for (int i = 0 ; i < length ; i++)
+        {
+            try
+            {
+                fs[i].onClose();
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
+            }
+        }
+        return fail;
+    }
+
+    public final boolean hasNext()
+    {
+        BaseRowIterator<?> next = null;
+        try
+        {
+
+            Stop stop = this.stop;
+            while (this.next == null)
+            {
+                Transformation[] fs = stack;
+                int len = length;
+
+                while (!stop.isSignalled && input.hasNext())
+                {
+                    next = input.next();
+                    for (int i = 0 ; next != null & i < len ; i++)
+                        next = fs[i].applyToPartition(next);
+
+                    if (next != null)
+                    {
+                        this.next = next;
+                        return true;
+                    }
+                }
+
+                if (stop.isSignalled || !hasMoreContents())
+                    return false;
+            }
+            return true;
+
+        }
+        catch (Throwable t)
+        {
+            if (next != null)
+                Throwables.close(t, Collections.singleton(next));
+            throw t;
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/BaseRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/BaseRows.java 
b/src/java/org/apache/cassandra/db/transform/BaseRows.java
new file mode 100644
index 0000000..78526e8
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/BaseRows.java
@@ -0,0 +1,139 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.rows.*;
+
+import static org.apache.cassandra.utils.Throwables.merge;
+
+public abstract class BaseRows<R extends Unfiltered, I extends 
BaseRowIterator<? extends Unfiltered>>
+extends BaseIterator<Unfiltered, I, R>
+implements BaseRowIterator<R>
+{
+
+    private Row staticRow;
+
+    public BaseRows(I input)
+    {
+        super(input);
+        staticRow = input.staticRow();
+    }
+
+    // swap parameter order to avoid casting errors
+    BaseRows(BaseRows<?, ? extends I> copyFrom)
+    {
+        super(copyFrom);
+        staticRow = copyFrom.staticRow;
+    }
+
+    public CFMetaData metadata()
+    {
+        return input.metadata();
+    }
+
+    public boolean isReverseOrder()
+    {
+        return input.isReverseOrder();
+    }
+
+    public PartitionColumns columns()
+    {
+        return input.columns();
+    }
+
+    public DecoratedKey partitionKey()
+    {
+        return input.partitionKey();
+    }
+
+    public Row staticRow()
+    {
+        return staticRow;
+    }
+
+
+    // **************************
+
+
+    @Override
+    protected Throwable runOnClose(int length)
+    {
+        Throwable fail = null;
+        Transformation[] fs = stack;
+        for (int i = 0 ; i < length ; i++)
+        {
+            try
+            {
+                fs[i].onPartitionClose();
+            }
+            catch (Throwable t)
+            {
+                fail = merge(fail, t);
+            }
+        }
+        return fail;
+    }
+
+    @Override
+    void add(Transformation transformation)
+    {
+        transformation.attachTo(this);
+        super.add(transformation);
+
+        // transform any existing data
+        staticRow = transformation.applyToStatic(staticRow);
+        next = applyOne(next, transformation);
+    }
+
+    @Override
+    protected Unfiltered applyOne(Unfiltered value, Transformation 
transformation)
+    {
+        return value == null
+               ? null
+               : value instanceof Row
+                 ? transformation.applyToRow((Row) value)
+                 : transformation.applyToMarker((RangeTombstoneMarker) value);
+    }
+
+    @Override
+    public final boolean hasNext()
+    {
+        Stop stop = this.stop;
+        while (this.next == null)
+        {
+            Transformation[] fs = stack;
+            int len = length;
+
+            while (!stop.isSignalled && input.hasNext())
+            {
+                Unfiltered next = input.next();
+
+                if (next.isRow())
+                {
+                    Row row = (Row) next;
+                    for (int i = 0 ; row != null && i < len ; i++)
+                        row = fs[i].applyToRow(row);
+                    next = row;
+                }
+                else
+                {
+                    RangeTombstoneMarker rtm = (RangeTombstoneMarker) next;
+                    for (int i = 0 ; rtm != null && i < len ; i++)
+                        rtm = fs[i].applyToMarker(rtm);
+                    next = rtm;
+                }
+
+                if (next != null)
+                {
+                    this.next = next;
+                    return true;
+                }
+            }
+
+            if (stop.isSignalled || !hasMoreContents())
+                return false;
+        }
+        return true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/Filter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java 
b/src/java/org/apache/cassandra/db/transform/Filter.java
new file mode 100644
index 0000000..3bf831f
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/Filter.java
@@ -0,0 +1,56 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.DeletionPurger;
+import org.apache.cassandra.db.rows.*;
+
+final class Filter extends Transformation
+{
+    private final boolean filterEmpty; // generally maps to !isForThrift, but 
also false for direct row filtration
+    private final int nowInSec;
+    public Filter(boolean filterEmpty, int nowInSec)
+    {
+        this.filterEmpty = filterEmpty;
+        this.nowInSec = nowInSec;
+    }
+
+    public RowIterator applyToPartition(BaseRowIterator iterator)
+    {
+        RowIterator filtered = iterator instanceof UnfilteredRows
+                               ? new FilteredRows(this, (UnfilteredRows) 
iterator)
+                               : new FilteredRows((UnfilteredRowIterator) 
iterator, this);
+
+        if (filterEmpty && closeIfEmpty(filtered))
+            return null;
+
+        return filtered;
+    }
+
+    public Row applyToStatic(Row row)
+    {
+        if (row.isEmpty())
+            return Rows.EMPTY_STATIC_ROW;
+
+        row = row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+        return row == null ? Rows.EMPTY_STATIC_ROW : row;
+    }
+
+    public Row applyToRow(Row row)
+    {
+        return row.purge(DeletionPurger.PURGE_ALL, nowInSec);
+    }
+
+    public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+    {
+        return null;
+    }
+
+    private static boolean closeIfEmpty(BaseRowIterator<?> iter)
+    {
+        if (iter.isEmpty())
+        {
+            iter.close();
+            return true;
+        }
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java 
b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
new file mode 100644
index 0000000..5a802dc
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java
@@ -0,0 +1,40 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.partitions.BasePartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+
+public final class FilteredPartitions extends BasePartitions<RowIterator, 
BasePartitionIterator<?>> implements PartitionIterator
+{
+    // wrap basic iterator for transformation
+    FilteredPartitions(PartitionIterator input)
+    {
+        super(input);
+    }
+
+    // wrap basic unfiltered iterator for transformation, applying filter as 
first transformation
+    FilteredPartitions(UnfilteredPartitionIterator input, Filter filter)
+    {
+        super(input);
+        add(filter);
+    }
+
+    // copy from an UnfilteredPartitions, applying a filter to convert it
+    FilteredPartitions(Filter filter, UnfilteredPartitions copyFrom)
+    {
+        super(copyFrom);
+        add(filter);
+    }
+
+    /**
+     * Filter any RangeTombstoneMarker from the iterator's iterators, 
transforming it into a PartitionIterator.
+     */
+    public static PartitionIterator filter(UnfilteredPartitionIterator 
iterator, int nowInSecs)
+    {
+        Filter filter = new Filter(!iterator.isForThrift(), nowInSecs);
+        if (iterator instanceof UnfilteredPartitions)
+            return new FilteredPartitions(filter, (UnfilteredPartitions) 
iterator);
+        return new FilteredPartitions(iterator, filter);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/FilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java 
b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
new file mode 100644
index 0000000..b21b451
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java
@@ -0,0 +1,40 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> 
implements RowIterator
+{
+    FilteredRows(RowIterator input)
+    {
+        super(input);
+    }
+
+    FilteredRows(UnfilteredRowIterator input, Filter filter)
+    {
+        super(input);
+        add(filter);
+    }
+
+    FilteredRows(Filter filter, UnfilteredRows input)
+    {
+        super(input);
+        add(filter);
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return staticRow().isEmpty() && !hasNext();
+    }
+
+    /**
+     * Filter any RangeTombstoneMarker from the iterator, transforming it into 
a RowIterator.
+     */
+    public static RowIterator filter(UnfilteredRowIterator iterator, int 
nowInSecs)
+    {
+        return new Filter(false, nowInSecs).applyToPartition(iterator);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/MoreContents.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/MoreContents.java 
b/src/java/org/apache/cassandra/db/transform/MoreContents.java
new file mode 100644
index 0000000..7e392ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/MoreContents.java
@@ -0,0 +1,8 @@
+package org.apache.cassandra.db.transform;
+
+// a shared internal interface, that is hidden to provide type-safety to the 
user
+interface MoreContents<I>
+{
+    public abstract I moreContents();
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/MorePartitions.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/MorePartitions.java 
b/src/java/org/apache/cassandra/db/transform/MorePartitions.java
new file mode 100644
index 0000000..5cfcc4c
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/MorePartitions.java
@@ -0,0 +1,35 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.partitions.BasePartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+
+import static org.apache.cassandra.db.transform.Transformation.add;
+import static org.apache.cassandra.db.transform.Transformation.mutable;
+
+/**
+ * An interface for providing new partitions for a partitions iterator.
+ *
+ * The new contents are produced as a normal arbitrary PartitionIterator or 
UnfilteredPartitionIterator (as appropriate)
+ *
+ * The transforming iterator invokes this method when any current source is 
exhausted, then then inserts the
+ * new contents as the new source.
+ *
+ * If the new source is itself a product of any transformations, the two 
transforming iterators are merged
+ * so that control flow always occurs at the outermost point
+ */
+public interface MorePartitions<I extends BasePartitionIterator<?>> extends 
MoreContents<I>
+{
+
+    public static UnfilteredPartitionIterator 
extend(UnfilteredPartitionIterator iterator, MorePartitions<? super 
UnfilteredPartitionIterator> more)
+    {
+        return add(mutable(iterator), more);
+    }
+
+    public static PartitionIterator extend(PartitionIterator iterator, 
MorePartitions<? super PartitionIterator> more)
+    {
+        return add(mutable(iterator), more);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/MoreRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/MoreRows.java 
b/src/java/org/apache/cassandra/db/transform/MoreRows.java
new file mode 100644
index 0000000..f406a49
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/MoreRows.java
@@ -0,0 +1,36 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.rows.BaseRowIterator;
+import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+import static org.apache.cassandra.db.transform.Transformation.add;
+import static org.apache.cassandra.db.transform.Transformation.mutable;
+
+/**
+ * An interface for providing new row contents for a partition.
+ *
+ * The new contents are produced as a normal arbitrary RowIterator or 
UnfilteredRowIterator (as appropriate),
+ * with matching staticRow, partitionKey and partitionLevelDeletion.
+ *
+ * The transforming iterator invokes this method when any current source is 
exhausted, then then inserts the
+ * new contents as the new source.
+ *
+ * If the new source is itself a product of any transformations, the two 
transforming iterators are merged
+ * so that control flow always occurs at the outermost point
+ */
+public interface MoreRows<I extends BaseRowIterator<?>> extends MoreContents<I>
+{
+
+    public static UnfilteredRowIterator extend(UnfilteredRowIterator iterator, 
MoreRows<? super UnfilteredRowIterator> more)
+    {
+        return add(mutable(iterator), more);
+    }
+
+    public static RowIterator extend(RowIterator iterator, MoreRows<? super 
RowIterator> more)
+    {
+        return add(mutable(iterator), more);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/Stack.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Stack.java 
b/src/java/org/apache/cassandra/db/transform/Stack.java
new file mode 100644
index 0000000..aac1679
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/Stack.java
@@ -0,0 +1,81 @@
+package org.apache.cassandra.db.transform;
+
+import java.util.Arrays;
+
+class Stack
+{
+    static final Stack EMPTY = new Stack();
+
+    Transformation[] stack;
+    int length; // number of used stack entries
+    MoreContentsHolder[] moreContents; // stack of more contents providers (if 
any; usually zero or one)
+
+    // an internal placeholder for a MoreContents, storing the associated 
stack length at time it was applied
+    static class MoreContentsHolder
+    {
+        final MoreContents moreContents;
+        int length;
+        private MoreContentsHolder(MoreContents moreContents, int length)
+        {
+            this.moreContents = moreContents;
+            this.length = length;
+        }
+    }
+
+    Stack()
+    {
+        stack = new Transformation[0];
+        moreContents = new MoreContentsHolder[0];
+    }
+
+    Stack(Stack copy)
+    {
+        stack = copy.stack;
+        length = copy.length;
+        moreContents = copy.moreContents;
+    }
+
+    void add(Transformation add)
+    {
+        if (length == stack.length)
+            stack = resize(stack);
+        stack[length++] = add;
+    }
+
+    void add(MoreContents more)
+    {
+        this.moreContents = Arrays.copyOf(moreContents, moreContents.length + 
1);
+        this.moreContents[moreContents.length - 1] = new 
MoreContentsHolder(more, length);
+    }
+
+    private static <E> E[] resize(E[] array)
+    {
+        int newLen = array.length == 0 ? 5 : array.length * 2;
+        return Arrays.copyOf(array, newLen);
+    }
+
+    // reinitialise the transformations after a moreContents applies
+    void refill(Stack prefix, MoreContentsHolder holder, int index)
+    {
+        // drop the transformations that were present when the MoreContents 
was attached,
+        // and prefix any transformations in the new contents (if it's a 
transformer)
+        moreContents = splice(prefix.moreContents, prefix.moreContents.length, 
moreContents, index, moreContents.length);
+        stack = splice(prefix.stack, prefix.length, stack, holder.length, 
length);
+        length += prefix.length - holder.length;
+        holder.length = prefix.length;
+    }
+
+    private static <E> E[] splice(E[] prefix, int prefixCount, E[] keep, int 
keepFrom, int keepTo)
+    {
+        int keepCount = keepTo - keepFrom;
+        int newCount = prefixCount + keepCount;
+        if (newCount > keep.length)
+            keep = Arrays.copyOf(keep, newCount);
+        if (keepFrom != prefixCount)
+            System.arraycopy(keep, keepFrom, keep, prefixCount, keepCount);
+        if (prefixCount != 0)
+            System.arraycopy(prefix, 0, keep, 0, prefixCount);
+        return keep;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java 
b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
new file mode 100644
index 0000000..f3afdc0
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/StoppingTransformation.java
@@ -0,0 +1,60 @@
+package org.apache.cassandra.db.transform;
+
+import net.nicoulaj.compilecommand.annotations.DontInline;
+import org.apache.cassandra.db.rows.BaseRowIterator;
+
+// A Transformation that can stop an iterator earlier than its natural 
exhaustion
+public abstract class StoppingTransformation<I extends BaseRowIterator<?>> 
extends Transformation<I>
+{
+    private BaseIterator.Stop stop;
+    private BaseIterator.Stop stopInPartition;
+
+    /**
+     * If invoked by a subclass, any partitions iterator this transformation 
has been applied to will terminate
+     * after any currently-processing item is returned, as will any 
row/unfiltered iterator
+     */
+    @DontInline
+    protected void stop()
+    {
+        if (stop != null)
+            stop.isSignalled = true;
+        stopInPartition();
+    }
+
+    /**
+     * If invoked by a subclass, any rows/unfiltered iterator this 
transformation has been applied to will terminate
+     * after any currently-processing item is returned
+     */
+    @DontInline
+    protected void stopInPartition()
+    {
+        if (stopInPartition != null)
+            stopInPartition.isSignalled = true;
+    }
+
+    @Override
+    protected void attachTo(BasePartitions partitions)
+    {
+        assert this.stop == null;
+        this.stop = partitions.stop;
+    }
+
+    @Override
+    protected void attachTo(BaseRows rows)
+    {
+        assert this.stopInPartition == null;
+        this.stopInPartition = rows.stop;
+    }
+
+    @Override
+    protected void onClose()
+    {
+        stop = null;
+    }
+
+    @Override
+    protected void onPartitionClose()
+    {
+        stopInPartition = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/Transformation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/Transformation.java 
b/src/java/org/apache/cassandra/db/transform/Transformation.java
new file mode 100644
index 0000000..29e2e15
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/Transformation.java
@@ -0,0 +1,145 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.*;
+
+/**
+ * We have a single common superclass for all Transformations to make 
implementation efficient.
+ * we have a shared stack for all transformations, and can share the same 
transformation across partition and row
+ * iterators, reducing garbage. Internal code is also simplified by always 
having a basic no-op implementation to invoke.
+ *
+ * Only the necessary methods need be overridden. Early termination is 
provided by invoking the method's stop or stopInPartition
+ * methods, rather than having their own abstract method to invoke, as this is 
both more efficient and simpler to reason about.
+ */
+public abstract class Transformation<I extends BaseRowIterator<?>>
+{
+    // internal methods for StoppableTransformation only
+    void attachTo(BasePartitions partitions) { }
+    void attachTo(BaseRows rows) { }
+
+    /**
+     * Run on the close of any (logical) partitions iterator this function was 
applied to
+     *
+     * We stipulate logical, because if applied to a transformed iterator the 
lifetime of the iterator
+     * object may be longer than the lifetime of the "logical" iterator it was 
applied to; if the iterator
+     * is refilled with MoreContents, for instance, the iterator may outlive 
this function
+     */
+    protected void onClose() { }
+
+    /**
+     * Run on the close of any (logical) rows iterator this function was 
applied to
+     *
+     * We stipulate logical, because if applied to a transformed iterator the 
lifetime of the iterator
+     * object may be longer than the lifetime of the "logical" iterator it was 
applied to; if the iterator
+     * is refilled with MoreContents, for instance, the iterator may outlive 
this function
+     */
+    protected void onPartitionClose() { }
+
+    /**
+     * Applied to any rows iterator (partition) we encounter in a partitions 
iterator
+     */
+    protected I applyToPartition(I partition)
+    {
+        return partition;
+    }
+
+    /**
+     * Applied to any row we encounter in a rows iterator
+     */
+    protected Row applyToRow(Row row)
+    {
+        return row;
+    }
+
+    /**
+     * Applied to any RTM we encounter in a rows/unfiltered iterator
+     */
+    protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
+    {
+        return marker;
+    }
+
+    /**
+     * Applied to the static row of any rows iterator.
+     *
+     * NOTE that this is only applied to the first iterator in any sequence of 
iterators filled by a MoreContents;
+     * the static data for such iterators is all expected to be equal
+     */
+    protected Row applyToStatic(Row row)
+    {
+        return row;
+    }
+
+    /**
+     * Applied to the partition-level deletion of any rows iterator.
+     *
+     * NOTE that this is only applied to the first iterator in any sequence of 
iterators filled by a MoreContents;
+     * the static data for such iterators is all expected to be equal
+     */
+    protected DeletionTime applyToDeletion(DeletionTime deletionTime)
+    {
+        return deletionTime;
+    }
+
+
+    //******************************************************
+    //          Static Application Methods
+    //******************************************************
+
+
+    public static UnfilteredPartitionIterator 
apply(UnfilteredPartitionIterator iterator, Transformation<? super 
UnfilteredRowIterator> transformation)
+    {
+        return add(mutable(iterator), transformation);
+    }
+    public static PartitionIterator apply(PartitionIterator iterator, 
Transformation<? super RowIterator> transformation)
+    {
+        return add(mutable(iterator), transformation);
+    }
+    public static UnfilteredRowIterator apply(UnfilteredRowIterator iterator, 
Transformation<?> transformation)
+    {
+        return add(mutable(iterator), transformation);
+    }
+    public static RowIterator apply(RowIterator iterator, Transformation<?> 
transformation)
+    {
+        return add(mutable(iterator), transformation);
+    }
+
+    static UnfilteredPartitions mutable(UnfilteredPartitionIterator iterator)
+    {
+        return iterator instanceof UnfilteredPartitions
+               ? (UnfilteredPartitions) iterator
+               : new UnfilteredPartitions(iterator);
+    }
+    static FilteredPartitions mutable(PartitionIterator iterator)
+    {
+        return iterator instanceof FilteredPartitions
+               ? (FilteredPartitions) iterator
+               : new FilteredPartitions(iterator);
+    }
+    static UnfilteredRows mutable(UnfilteredRowIterator iterator)
+    {
+        return iterator instanceof UnfilteredRows
+               ? (UnfilteredRows) iterator
+               : new UnfilteredRows(iterator);
+    }
+    static FilteredRows mutable(RowIterator iterator)
+    {
+        return iterator instanceof FilteredRows
+               ? (FilteredRows) iterator
+               : new FilteredRows(iterator);
+    }
+
+    static <E extends BaseIterator> E add(E to, Transformation add)
+    {
+        to.add(add);
+        return to;
+    }
+    static <E extends BaseIterator> E add(E to, MoreContents add)
+    {
+        to.add(add);
+        return to;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java 
b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
new file mode 100644
index 0000000..4e40545
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredPartitions.java
@@ -0,0 +1,27 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+final class UnfilteredPartitions extends BasePartitions<UnfilteredRowIterator, 
UnfilteredPartitionIterator> implements UnfilteredPartitionIterator
+{
+    final boolean isForThrift;
+
+    // wrap an iterator for transformation
+    public UnfilteredPartitions(UnfilteredPartitionIterator input)
+    {
+        super(input);
+        this.isForThrift = input.isForThrift();
+    }
+
+    public boolean isForThrift()
+    {
+        return isForThrift;
+    }
+
+    public CFMetaData metadata()
+    {
+        return input.metadata();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java 
b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
new file mode 100644
index 0000000..98640ae
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java
@@ -0,0 +1,40 @@
+package org.apache.cassandra.db.transform;
+
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+
+final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> 
implements UnfilteredRowIterator
+{
+    private DeletionTime partitionLevelDeletion;
+
+    public UnfilteredRows(UnfilteredRowIterator input)
+    {
+        super(input);
+        partitionLevelDeletion = input.partitionLevelDeletion();
+    }
+
+    @Override
+    void add(Transformation add)
+    {
+        super.add(add);
+        partitionLevelDeletion = add.applyToDeletion(partitionLevelDeletion);
+    }
+
+    public DeletionTime partitionLevelDeletion()
+    {
+        return partitionLevelDeletion;
+    }
+
+    public EncodingStats stats()
+    {
+        return input.stats();
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+        return staticRow().isEmpty() && partitionLevelDeletion().isLive() && 
!hasNext();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java 
b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
index d6ae8e2..e66f0a3 100644
--- a/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
+++ b/src/java/org/apache/cassandra/index/SecondaryIndexBuilder.java
@@ -71,14 +71,7 @@ public class SecondaryIndexBuilder extends 
CompactionInfo.Holder
         }
         finally
         {
-            try
-            {
-                iter.close();
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
+            iter.close();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
 
b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
index f1751f5..46701bc 100644
--- 
a/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
+++ 
b/src/java/org/apache/cassandra/index/internal/composites/CompositesSearcher.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.index.internal.CassandraIndex;
 import org.apache.cassandra.index.internal.CassandraIndexSearcher;
 import org.apache.cassandra.index.internal.IndexEntry;
@@ -203,18 +204,13 @@ public class CompositesSearcher extends 
CassandraIndexSearcher
             });
         }
 
-        return new AlteringUnfilteredRowIterator(dataIter)
+        ClusteringComparator comparator = dataIter.metadata().comparator;
+        class Transform extends Transformation
         {
             private int entriesIdx;
 
-            public void close()
-            {
-                deleteAllEntries(staleEntries, writeOp, nowInSec);
-                super.close();
-            }
-
             @Override
-            protected Row computeNext(Row row)
+            public Row applyToRow(Row row)
             {
                 IndexEntry entry = findEntry(row.clustering());
                 if (!index.isStale(row, indexValue, nowInSec))
@@ -234,7 +230,7 @@ public class CompositesSearcher extends 
CassandraIndexSearcher
                     // next entry, the one at 'entriesIdx'. However, we can 
have stale entries, entries
                     // that have no corresponding row in the base table 
typically because of a range
                     // tombstone or partition level deletion. Delete such 
stale entries.
-                    int cmp = 
metadata().comparator.compare(entry.indexedEntryClustering, clustering);
+                    int cmp = comparator.compare(entry.indexedEntryClustering, 
clustering);
                     assert cmp <= 0; // this would means entries are not in 
clustering order, which shouldn't happen
                     if (cmp == 0)
                         return entry;
@@ -244,6 +240,14 @@ public class CompositesSearcher extends 
CassandraIndexSearcher
                 // entries correspond to the rows we've queried, so we 
shouldn't have a row that has no corresponding entry.
                 throw new AssertionError();
             }
-        };
+
+            @Override
+            public void onClose()
+            {
+                deleteAllEntries(staleEntries, writeOp, nowInSec);
+            }
+        }
+
+        return Transformation.apply(dataIter, new Transform());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java 
b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
index bbc56cc..6f395f8 100644
--- a/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
+++ b/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.io.sstable;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
@@ -70,12 +69,10 @@ public class ReducingKeyIterator implements 
CloseableIterator<DecoratedKey>
         }
     }
 
-    public void close() throws IOException
+    public void close()
     {
         if (mi != null)
-        {
             mi.close();
-        }
     }
 
     public long getTotalBytes()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/60949747/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java 
b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index e02c919..b6077e0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -143,11 +144,11 @@ public class BigTableWriter extends SSTableWriter
 
         long startPosition = beforeAppend(key);
 
-        try (StatsCollector withStats = new StatsCollector(iterator, 
metadataCollector))
+        try (UnfilteredRowIterator collecting = Transformation.apply(iterator, 
new StatsCollector(metadataCollector)))
         {
-            ColumnIndex index = ColumnIndex.writeAndBuildIndex(withStats, 
dataFile, header, descriptor.version);
+            ColumnIndex index = ColumnIndex.writeAndBuildIndex(collecting, 
dataFile, header, descriptor.version);
 
-            RowIndexEntry entry = RowIndexEntry.create(startPosition, 
iterator.partitionLevelDeletion(), index);
+            RowIndexEntry entry = RowIndexEntry.create(startPosition, 
collecting.partitionLevelDeletion(), index);
 
             long endPosition = dataFile.position();
             long rowSize = endPosition - startPosition;
@@ -171,20 +172,18 @@ public class BigTableWriter extends SSTableWriter
         }
     }
 
-    private static class StatsCollector extends AlteringUnfilteredRowIterator
+    private static class StatsCollector extends Transformation
     {
         private final MetadataCollector collector;
         private int cellCount;
 
-        StatsCollector(UnfilteredRowIterator iter, MetadataCollector collector)
+        StatsCollector(MetadataCollector collector)
         {
-            super(iter);
             this.collector = collector;
-            collector.update(iter.partitionLevelDeletion());
         }
 
         @Override
-        protected Row computeNextStatic(Row row)
+        public Row applyToStatic(Row row)
         {
             if (!row.isEmpty())
                 cellCount += Rows.collectStats(row, collector);
@@ -192,7 +191,7 @@ public class BigTableWriter extends SSTableWriter
         }
 
         @Override
-        protected Row computeNext(Row row)
+        public Row applyToRow(Row row)
         {
             collector.updateClusteringValues(row.clustering());
             cellCount += Rows.collectStats(row, collector);
@@ -200,7 +199,7 @@ public class BigTableWriter extends SSTableWriter
         }
 
         @Override
-        protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker)
+        public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
         {
             collector.updateClusteringValues(marker.clustering());
             if (marker.isBoundary())
@@ -217,10 +216,16 @@ public class BigTableWriter extends SSTableWriter
         }
 
         @Override
-        public void close()
+        public void onPartitionClose()
         {
             collector.addCellPerPartitionCount(cellCount);
-            super.close();
+        }
+
+        @Override
+        public DeletionTime applyToDeletion(DeletionTime deletionTime)
+        {
+            collector.update(deletionTime);
+            return deletionTime;
         }
     }
 

Reply via email to