This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new d946c22b8a CEP-45: Fix ALLOW FILTERING queries for mutation tracking
d946c22b8a is described below

commit d946c22b8a5e3ac7720df4355dfdfbe1eade023f
Author: Blake Eggleston <[email protected]>
AuthorDate: Wed Apr 30 10:52:30 2025 -0700

    CEP-45: Fix ALLOW FILTERING queries for mutation tracking
    
    Patch by Blake Eggleston; Reviewed by Aleksey Yeschenko for CASSANDRA-20555
---
 src/java/org/apache/cassandra/db/ReadCommand.java  |  31 +-
 .../cassandra/db/SinglePartitionReadCommand.java   |  17 +
 .../org/apache/cassandra/db/filter/RowFilter.java  | 141 +++--
 .../db/partitions/PartitionIterators.java          |  49 +-
 .../reads/tracked/AbstractPartialTrackedRead.java  | 228 ++++++--
 .../reads/tracked/ExtendingCompletedRead.java      | 186 ++++++
 .../reads/tracked/FilteredFollowupRead.java        | 177 ++++++
 .../reads/tracked/PartialTrackedRangeRead.java     | 638 +++++++++++++++------
 .../service/reads/tracked/PartialTrackedRead.java  |  25 +-
 .../tracked/PartialTrackedSinglePartitionRead.java | 110 ++--
 .../service/reads/tracked/TrackedDataResponse.java |  87 ++-
 .../reads/tracked/TrackedLocalReadCoordinator.java |  18 +-
 .../service/reads/tracked/TrackedLocalReads.java   |   5 +-
 .../service/reads/tracked/TrackedRead.java         |  35 +-
 test/conf/logback-dtest.xml                        |   8 +-
 .../test/ReadRepairRangeQueriesTest.java           | 123 +++-
 .../test/ReadRepairSliceQueriesTest.java           |   3 -
 .../tracking/MutationTrackingPendingReadTest.java  |   2 +-
 .../MutationTrackingReadReconciliationTest.java    |  36 +-
 19 files changed, 1531 insertions(+), 388 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index 453cfcf95b..7920a41d94 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -476,6 +476,24 @@ public abstract class ReadCommand extends AbstractReadQuery
         }
     }
 
+    public RowFilter rowFilter(Index.Searcher searcher)
+    {
+        // If we've used a 2ndary index, we know the result already satisfy 
the primary expression used, so
+        // no point in checking it again.
+        return  (null == searcher) ? rowFilter() : 
indexQueryPlan.postIndexQueryFilter();
+    }
+
+    private UnfilteredPartitionIterator 
withRowFilter(UnfilteredPartitionIterator iterator, Index.Searcher searcher)
+    {
+        /*
+         * TODO: We'll currently do filtering by the rowFilter here because 
it's convenient. However,
+         * we'll probably want to optimize by pushing it down the layer (like 
for dropped columns) as it
+         * would be more efficient (the sooner we discard stuff we know we 
don't care, the less useless
+         * processing we do on it).
+         */
+        return rowFilter(searcher).filter(iterator, nowInSec());
+    }
+
     private UnfilteredPartitionIterator 
completeRead(UnfilteredPartitionIterator iterator, ReadExecutionController 
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long 
startTimeNanos)
     {
         COMMAND.set(this);
@@ -488,18 +506,7 @@ public abstract class ReadCommand extends AbstractReadQuery
             iterator = maybeRecordPurgeableTombstones(iterator, cfs);
             iterator = 
RTBoundValidator.validate(withoutPurgeableTombstones(iterator, cfs, 
executionController), Stage.PURGED, false);
             iterator = withMetricsRecording(iterator, cfs.metric, 
startTimeNanos);
-
-            // If we've used a 2ndary index, we know the result already 
satisfy the primary expression used, so
-            // no point in checking it again.
-            RowFilter filter = (null == searcher) ? rowFilter() : 
indexQueryPlan.postIndexQueryFilter();
-
-            /*
-             * TODO: We'll currently do filtering by the rowFilter here 
because it's convenient. However,
-             * we'll probably want to optimize by pushing it down the layer 
(like for dropped columns) as it
-             * would be more efficient (the sooner we discard stuff we know we 
don't care, the less useless
-             * processing we do on it).
-             */
-            iterator = filter.filter(iterator, nowInSec());
+            iterator = withRowFilter(iterator, searcher);
 
             // apply the limits/row counter; this transformation is stopping 
and would close the iterator as soon
             // as the count is observed; if that happens in the middle of an 
open RT, its end bound will not be included.
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 018be1dcae..1a4d12fa98 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -162,6 +162,23 @@ public class SinglePartitionReadCommand extends 
ReadCommand implements SinglePar
                                               dataRange);
     }
 
+    public static SinglePartitionReadCommand fromRangeRead(DecoratedKey key, 
PartitionRangeReadCommand command, DataLimits limits)
+    {
+        return create(command.serializedAtEpoch(),
+                      command.isDigestQuery(),
+                      command.digestVersion(),
+                      command.acceptsTransient(),
+                      command.metadata(),
+                      command.nowInSec(),
+                      command.columnFilter(),
+                      command.rowFilter(),
+                      limits,
+                      key,
+                      command.clusteringIndexFilter(key),
+                      command.indexQueryPlan(),
+                      command.isTrackingWarnings());
+    }
+
     /**
      * Creates a new read command on a single partition.
      *
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 4f6b7bd1d5..3bd07dc943 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -51,6 +51,7 @@ import org.apache.cassandra.db.marshal.LongType;
 import org.apache.cassandra.db.marshal.MapType;
 import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.rows.BaseRowIterator;
 import org.apache.cassandra.db.rows.Cell;
@@ -58,6 +59,7 @@ import org.apache.cassandra.db.rows.CellPath;
 import org.apache.cassandra.db.rows.ComplexColumnData;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.db.rows.RowIterator;
+import org.apache.cassandra.db.rows.Unfiltered;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.exceptions.InvalidRequestException;
@@ -200,73 +202,118 @@ public class RowFilter implements 
Iterable<RowFilter.Expression>
         return false;
     }
 
-    /**
-     * Note that the application of this transformation does not yet take 
{@link #isStrict()} into account. This means
-     * that even when strict filtering is not safe, expressions will be 
applied as intersections rather than unions.
-     * The filter will always be evaluated strictly in conjunction with 
replica filtering protection at the 
-     * coordinator, however, even after CASSANDRA-19007 is addressed.
-     * 
-     * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-190007";>CASSANDRA-19007</a>
-     */
-    protected Transformation<BaseRowIterator<?>> filter(TableMetadata 
metadata, long nowInSec)
+    public static class RowFilterTransformation extends 
Transformation<BaseRowIterator<?>>
     {
-        List<Expression> partitionLevelExpressions = new ArrayList<>();
-        List<Expression> rowLevelExpressions = new ArrayList<>();
-        for (Expression e: expressions)
+        private final TableMetadata metadata;
+        private final long nowInSec;
+        private final List<Expression> partitionLevelExpressions = new 
ArrayList<>();
+        private final List<Expression> rowLevelExpressions = new ArrayList<>();
+        private DecoratedKey pk;
+
+        private RowFilterTransformation(RowFilter filter, TableMetadata 
metadata, long nowInSec)
         {
-            if (e.column.isStatic() || e.column.isPartitionKey())
-                partitionLevelExpressions.add(e);
-            else
-                rowLevelExpressions.add(e);
+            this.metadata = metadata;
+            this.nowInSec = nowInSec;
+            for (Expression e: filter.expressions)
+            {
+                if (e.column.isStatic() || e.column.isPartitionKey())
+                    partitionLevelExpressions.add(e);
+                else
+                    rowLevelExpressions.add(e);
+            }
         }
 
-        long numberOfRegularColumnExpressions = rowLevelExpressions.size();
-        final boolean filterNonStaticColumns = 
numberOfRegularColumnExpressions > 0;
-
-        return new Transformation<>()
+        public int potentialMatches(PartitionUpdate update)
         {
-            DecoratedKey pk;
-
-            @Override
-            protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> 
partition)
+            try (UnfilteredRowIterator partition = update.unfilteredIterator())
             {
-                pk = partition.partitionKey();
-
-                // Short-circuit all partitions that won't match based on 
static and partition keys
+                int matches = 0;
                 for (Expression e : partitionLevelExpressions)
-                    if (!e.isSatisfiedBy(metadata, partition.partitionKey(), 
partition.staticRow(), nowInSec))
+                {
+                    if (e.isSatisfiedBy(metadata, partition.partitionKey(), 
partition.staticRow(), nowInSec))
                     {
-                        partition.close();
-                        return null;
+                        matches++;
+                        break;
                     }
+                }
 
-                BaseRowIterator<?> iterator = partition instanceof 
UnfilteredRowIterator
-                                              ? 
Transformation.apply((UnfilteredRowIterator) partition, this)
-                                              : 
Transformation.apply((RowIterator) partition, this);
+                while (partition.hasNext())
+                {
+                    Unfiltered unfiltered = partition.next();
+
+                    if (unfiltered instanceof Row)
+                    {
+                        Row row = (Row) unfiltered;
+
+                        for (Expression e : rowLevelExpressions)
+                        {
+                            if (e.isSatisfiedBy(metadata, pk, row, nowInSec))
+                            {
+                                matches++;
+                                break;
+                            }
+                        }
+                    }
+                }
+                return matches;
+            }
+        }
 
-                if (filterNonStaticColumns && !iterator.hasNext())
+        @Override
+        protected BaseRowIterator<?> applyToPartition(BaseRowIterator<?> 
partition)
+        {
+            pk = partition.partitionKey();
+
+            // Short-circuit all partitions that won't match based on static 
and partition keys
+            for (Expression e : partitionLevelExpressions)
+            {
+                if (!e.isSatisfiedBy(metadata, partition.partitionKey(), 
partition.staticRow(), nowInSec))
                 {
-                    iterator.close();
+                    partition.close();
                     return null;
                 }
-
-                return iterator;
             }
 
-            @Override
-            public Row applyToRow(Row row)
+            BaseRowIterator<?> iterator = partition instanceof 
UnfilteredRowIterator
+                                          ? 
Transformation.apply((UnfilteredRowIterator) partition, this)
+                                          : Transformation.apply((RowIterator) 
partition, this);
+
+            boolean filterNonStaticColumns = !rowLevelExpressions.isEmpty();
+            if (filterNonStaticColumns && !iterator.hasNext())
             {
-                Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec, 
metadata.enforceStrictLiveness());
-                if (purged == null)
+                iterator.close();
+                return null;
+            }
+
+            return iterator;
+        }
+
+        @Override
+        public Row applyToRow(Row row)
+        {
+            Row purged = row.purge(DeletionPurger.PURGE_ALL, nowInSec, 
metadata.enforceStrictLiveness());
+            if (purged == null)
+                return null;
+
+            for (Expression e : rowLevelExpressions)
+                if (!e.isSatisfiedBy(metadata, pk, purged, nowInSec))
                     return null;
 
-                for (Expression e : rowLevelExpressions)
-                    if (!e.isSatisfiedBy(metadata, pk, purged, nowInSec))
-                        return null;
+            return row;
+        }
+    }
 
-                return row;
-            }
-        };
+    /**
+     * Note that the application of this transformation does not yet take 
{@link #isStrict()} into account. This means
+     * that even when strict filtering is not safe, expressions will be 
applied as intersections rather than unions.
+     * The filter will always be evaluated strictly in conjunction with 
replica filtering protection at the 
+     * coordinator, however, even after CASSANDRA-19007 is addressed.
+     * 
+     * @see <a 
href="https://issues.apache.org/jira/browse/CASSANDRA-190007";>CASSANDRA-19007</a>
+     */
+    public RowFilterTransformation filter(TableMetadata metadata, long 
nowInSec)
+    {
+        return new RowFilterTransformation( this, metadata, nowInSec);
     }
 
     /**
diff --git 
a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index 9231914201..00e16e9d4d 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -32,6 +32,7 @@ import org.apache.cassandra.utils.AbstractIterator;
 
 import org.apache.cassandra.db.SinglePartitionReadQuery;
 import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.utils.MergeIterator;
 
 public abstract class PartitionIterators
 {
@@ -99,6 +100,52 @@ public abstract class PartitionIterators
         }
     }
 
+    /**
+     * Merges multiple partition iterators with the requirement that there are 
no keys in common between any
+     * of the iterators
+     */
+    public static PartitionIterator 
mergeNonOverlapping(List<PartitionIterator> iterators)
+    {
+        MergeIterator.Reducer<RowIterator, RowIterator> reducer = new 
MergeIterator.Reducer<>()
+        {
+            RowIterator current;
+
+            @Override
+            protected void onKeyChange()
+            {
+                current = null;
+            }
+
+            @Override
+            public void reduce(int idx, RowIterator partition)
+            {
+                if (current != null)
+                {
+                    throw new IllegalStateException("Multiple partitions 
received for " + current.partitionKey());
+                }
+                current = partition;
+            }
+
+            @Override
+            protected RowIterator getReduced()
+            {
+                return current;
+            }
+        };
+
+        MergeIterator<RowIterator, RowIterator> mergeIterator = 
MergeIterator.get(iterators, rowIteratorComparator, reducer);
+
+        return new AbstractPartitionIterator()
+        {
+            @Override
+            protected RowIterator computeNext()
+            {
+                return mergeIterator.hasNext() ? mergeIterator.next() : 
endOfData();
+            }
+        };
+    }
+    private static final Comparator<RowIterator> rowIteratorComparator = 
Comparator.comparing(BaseRowIterator::partitionKey);
+
     /**
      * Consumes all rows in the next partition of the provided partition 
iterator.
      */
@@ -224,5 +271,5 @@ public abstract class PartitionIterators
                 }
             };
         }
-    };
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java
 
b/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java
index 0cbb3be38d..2819068a43 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/AbstractPartialTrackedRead.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.service.reads.tracked;
 
-import java.util.List;
-
 import com.google.common.base.Preconditions;
 
 import org.slf4j.Logger;
@@ -30,34 +28,176 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.transform.RTBoundValidator;
-import org.apache.cassandra.index.Index;
 
-import static 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
 
 public abstract class AbstractPartialTrackedRead implements PartialTrackedRead
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AbstractPartialTrackedRead.class);
 
-    private enum State
+    protected interface Augmentable
+    {
+        State augment(PartitionUpdate update);
+    }
+
+    protected static abstract class State
+    {
+        protected static final State CLOSED = new State()
+        {
+            @Override
+            String name()
+            {
+                return "closed";
+            }
+
+            @Override
+            boolean isClosed()
+            {
+                return true;
+            }
+        };
+
+        abstract String name();
+
+        boolean isInitialized()
+        {
+            return false;
+        }
+
+        Initialized asInitialized()
+        {
+            throw new IllegalStateException("State is " + name() + ", not " + 
Initialized.NAME);
+        }
+
+        boolean isPrepared()
+        {
+            return false;
+        }
+
+        Prepared asPrepared()
+        {
+            throw new IllegalStateException("State is " + name() + ", not " + 
Prepared.NAME);
+        }
+
+        boolean isCompleted()
+        {
+            return false;
+        }
+
+        Completed asCompleted()
+        {
+            throw new IllegalStateException("State is " + name() + ", not " + 
Completed.NAME);
+        }
+
+        boolean isAugmentable()
+        {
+            return isPrepared() || isInitialized();
+        }
+
+        Augmentable asAugmentable()
+        {
+            if (isPrepared()) return asPrepared();
+            throw new IllegalStateException("State is " + name() + ", not 
augmentable");
+        }
+
+        boolean isClosed()
+        {
+            return false;
+        }
+
+        void close()
+        {
+        }
+    }
+
+    // TODO (expected): this is a redundant state, never exposed
+    protected final class Initialized extends State
+    {
+        static final String NAME = "initialized";
+
+        @Override
+        String name()
+        {
+            return NAME;
+        }
+
+        @Override
+        boolean isInitialized()
+        {
+            return true;
+        }
+
+        @Override
+        Initialized asInitialized()
+        {
+            return this;
+        }
+
+        Prepared prepare(UnfilteredPartitionIterator initialData)
+        {
+            return prepareInternal(initialData);
+        }
+    }
+
+    protected abstract Prepared prepareInternal(UnfilteredPartitionIterator 
initialData);
+
+    protected abstract class Prepared extends State implements Augmentable
     {
-        INITIALIZED,
-        PREPARED,
-        READING,
-        FINISHED
+        private static final String NAME = "prepared";
+
+        @Override
+        String name()
+        {
+            return NAME;
+        }
+
+        @Override
+        boolean isPrepared()
+        {
+            return true;
+        }
+
+        @Override
+        Prepared asPrepared()
+        {
+            return this;
+        }
+
+        abstract Completed complete();
+
+    }
+
+    protected abstract class Completed extends State
+    {
+        private static final String NAME = "completed";
+
+        @Override
+        String name()
+        {
+            return NAME;
+        }
+
+        protected abstract UnfilteredPartitionIterator iterator();
+        protected abstract CompletedRead 
createResult(UnfilteredPartitionIterator iterator);
+
+        protected CompletedRead getResult()
+        {
+            UnfilteredPartitionIterator result = 
command().completeTrackedRead(iterator(), AbstractPartialTrackedRead.this);
+            // validate that the sequence of RT markers is correct: open is 
followed by close, deletion times for both
+            // ends equal, and there are no dangling RT bound in any partition.
+            result = RTBoundValidator.validate(result, 
RTBoundValidator.Stage.PROCESSED, true);
+            return createResult(result);
+        }
     }
 
     final ReadExecutionController executionController;
-    final Index.Searcher searcher;
     final ColumnFamilyStore cfs;
     final long startTimeNanos;
-    volatile State state = State.INITIALIZED;
+    private State state = new Initialized();
 
-    public AbstractPartialTrackedRead(ReadExecutionController 
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long 
startTimeNanos)
+    public AbstractPartialTrackedRead(ReadExecutionController 
executionController, ColumnFamilyStore cfs, long startTimeNanos)
     {
         this.executionController = executionController;
-        this.searcher = searcher;
         this.cfs = cfs;
         this.startTimeNanos = startTimeNanos;
     }
@@ -68,12 +208,6 @@ public abstract class AbstractPartialTrackedRead implements 
PartialTrackedRead
         return executionController;
     }
 
-    @Override
-    public Index.Searcher searcher()
-    {
-        return searcher;
-    }
-
     @Override
     public ColumnFamilyStore cfs()
     {
@@ -86,69 +220,47 @@ public abstract class AbstractPartialTrackedRead 
implements PartialTrackedRead
         return startTimeNanos;
     }
 
-    abstract void freezeInitialData();
-
-    abstract UnfilteredPartitionIterator initialData();
-
-    abstract UnfilteredPartitionIterator augmentedData();
-
-    abstract void augmentResponse(PartitionUpdate update);
+    protected synchronized State state()
+    {
+        return state;
+    }
 
     /**
      * Implementors need to call this before returning this from 
createInProgressRead
+     * TODO (expected): this is a redundant transition from a redundant state 
(INITIALIZED)
      */
-    synchronized void prepare()
+    synchronized void prepare(UnfilteredPartitionIterator initialData)
     {
         logger.trace("Preparing read {}", this);
-        Preconditions.checkState(state == State.INITIALIZED);
-        freezeInitialData();
-        state = State.PREPARED;
+        state = state.asInitialized().prepare(initialData);
     }
 
     @Override
-    public void augment(Mutation mutation)
+    public synchronized void augment(Mutation mutation)
     {
-        Preconditions.checkState(state == State.PREPARED);
         PartitionUpdate update = 
mutation.getPartitionUpdate(command().metadata());
         if (update != null)
-            augmentResponse(update);
+            state = state.asAugmentable().augment(update);
     }
 
-    private UnfilteredPartitionIterator complete(UnfilteredPartitionIterator 
iterator)
-    {
-        return command().completeTrackedRead(iterator, this);
-    }
-
-    abstract CompletedRead createResult(UnfilteredPartitionIterator iterator);
-
     @Override
     public synchronized CompletedRead complete()
     {
-        Preconditions.checkState(state == State.PREPARED);
-        state = State.READING;
-
-        UnfilteredPartitionIterator initial = initialData();
-        UnfilteredPartitionIterator augmented = augmentedData();
-
-        UnfilteredPartitionIterator result = augmented != null ?
-                                             
UnfilteredPartitionIterators.merge(List.of(initial, augmented), NOOP) :
-                                             initial;
-
-        result = complete(result);
-        // validate that the sequence of RT markers is correct: open is 
followed by close, deletion times for both
-        // ends equal, and there are no dangling RT bound in any partition.
-        result = RTBoundValidator.validate(result, 
RTBoundValidator.Stage.PROCESSED, true);
-        return createResult(complete(result));
+        Preconditions.checkState(state.isPrepared());
+        Completed completed = state.asPrepared().complete();
+        state = completed;
+        return completed.getResult();
     }
 
     @Override
     public synchronized void close()
     {
-        if (state == State.FINISHED)
+        if (state.isClosed())
             return;
 
         logger.trace("Closing read {}", this);
+        state.close();
         executionController.close();
-        state = State.FINISHED;
+        state = State.CLOSED;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java
 
b/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java
new file mode 100644
index 0000000000..f5fa4dc25a
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/ExtendingCompletedRead.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.tracked;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
+import org.apache.cassandra.db.transform.Transformation;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
+class ExtendingCompletedRead implements PartialTrackedRead.CompletedRead
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(ExtendingCompletedRead.class);
+
+    final PartitionRangeReadCommand command;
+    final UnfilteredPartitionIterator iterator;
+    // merged end-result counter
+    final DataLimits.Counter mergedResultCounter;
+
+    private final boolean partitionsFetched;
+    private final boolean initialIteratorExhausted;
+    protected final AbstractBounds<PartitionPosition> followUpBounds;
+
+    public ExtendingCompletedRead(PartitionRangeReadCommand command,
+                                  UnfilteredPartitionIterator iterator,
+                                  boolean partitionsFetched,
+                                  boolean initialIteratorExhausted,
+                                  AbstractBounds<PartitionPosition> 
followUpBounds)
+    {
+        this.command = command;
+        this.iterator = iterator;
+        mergedResultCounter = command.limits().newCounter(command.nowInSec(),
+                                                          true,
+                                                          
command.selectsFullPartition(),
+                                                          
command.metadata().enforceStrictLiveness());
+        this.partitionsFetched = partitionsFetched;
+        this.initialIteratorExhausted = initialIteratorExhausted;
+        this.followUpBounds = followUpBounds;
+    }
+
+    @Override
+    public TrackedDataResponse response()
+    {
+        PartitionIterator filtered = 
UnfilteredPartitionIterators.filter(iterator, command.nowInSec());
+        PartitionIterator counted = Transformation.apply(filtered, 
mergedResultCounter);
+        PartitionIterator result = Transformation.apply(counted, new 
EmptyPartitionsDiscarder());
+        return TrackedDataResponse.create(result, command.columnFilter());
+    }
+
+    static boolean followUpReadRequired(ReadCommand command, 
DataLimits.Counter mergedResultCounter, boolean initialIteratorExhausted, 
boolean partitionsFetched)
+    {
+        // never try to request additional partitions from replicas if our 
reconciled partitions are already filled to the limit
+        if (mergedResultCounter.isDone())
+            return false;
+
+        // we do not apply short read protection when we have no limits at all
+        if (command.limits().isUnlimited())
+            return false;
+
+        /*
+         * If this is a single partition read command or an (indexed) 
partition range read command with
+         * a partition key specified, then we can't and shouldn't try fetch 
more partitions.
+         */
+        if (command.isLimitedToOnePartition())
+            return false;
+
+        /*
+         * If the returned result doesn't have enough rows/partitions to 
satisfy even the original limit, don't ask for more.
+         *
+         * Can only take the short cut if there is no per partition limit set. 
Otherwise it's possible to hit false
+         * positives due to some rows being uncounted for in certain scenarios 
(see CASSANDRA-13911).
+         */
+        if (initialIteratorExhausted && command.limits().perPartitionCount() 
== DataLimits.NO_LIMIT)
+            return false;
+
+        /*
+         * Either we had an empty iterator as the initial response, or our 
moreContents() call got us an empty iterator.
+         * There is no point to ask the replica for more rows - it has no more 
in the requested range.
+         */
+        if (!partitionsFetched)
+            return false;
+
+        return true;
+    }
+
+    protected boolean followUpRequired()
+    {
+        return followUpReadRequired(command, mergedResultCounter, 
initialIteratorExhausted, partitionsFetched);
+    }
+
+    static int toQuery(ReadCommand command, DataLimits.Counter 
mergedResultCounter)
+    {
+        /*
+         * We are going to fetch one partition at a time for thrift and 
potentially more for CQL.
+         * The row limit will either be set to the per partition limit - if 
the command has no total row limit set, or
+         * the total # of rows remaining - if it has some. If we don't grab 
enough rows in some of the partitions,
+         * then future ShortReadRowsProtection.moreContents() calls will fetch 
the missing ones.
+         */
+        return command.limits().count() != DataLimits.NO_LIMIT
+               ? command.limits().count() - mergedResultCounter.rowsCounted()
+               : command.limits().perPartitionCount();
+    }
+
+    @Override
+    public Future<TrackedDataResponse> followupRead(TrackedDataResponse 
initialResponse, ConsistencyLevel consistencyLevel, long expiresAtNanos)
+    {
+        if (!followUpRequired())
+            return null;
+
+
+        /*
+         * We are going to fetch one partition at a time for thrift and 
potentially more for CQL.
+         * The row limit will either be set to the per partition limit - if 
the command has no total row limit set, or
+         * the total # of rows remaining - if it has some. If we don't grab 
enough rows in some of the partitions,
+         * then future ShortReadRowsProtection.moreContents() calls will fetch 
the missing ones.
+         */
+        int toQuery = toQuery(command, mergedResultCounter);
+
+        
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
+        Tracing.trace("Requesting {} extra rows from {} for short read 
protection", toQuery, FBUtilities.getBroadcastAddressAndPort());
+        logger.info("Requesting {} extra rows from {} for short read 
protection", toQuery, FBUtilities.getBroadcastAddressAndPort());
+
+        return makeFollowupRead(initialResponse, toQuery, consistencyLevel, 
expiresAtNanos);
+    }
+
+    protected Future<TrackedDataResponse> makeFollowupRead(TrackedDataResponse 
initialResponse, int toQuery, ConsistencyLevel consistencyLevel, long 
expiresAtNanos)
+    {
+        TrackedRead.Range followUpRead = 
PartialTrackedRangeRead.makeFollowUpRead(command, followUpBounds, toQuery, 
consistencyLevel, expiresAtNanos);
+        followUpRead.start(expiresAtNanos);
+        AsyncPromise<TrackedDataResponse> combinedRead = new AsyncPromise<>();
+        followUpRead.future().addCallback((result, failure) -> {
+            if (failure != null)
+            {
+                combinedRead.tryFailure(failure);
+                return;
+            }
+
+            try
+            {
+                
combinedRead.trySuccess(TrackedDataResponse.merge(initialResponse, result));
+            }
+            catch (Throwable t)
+            {
+                combinedRead.tryFailure(t);
+            }
+        });
+
+        return combinedRead;
+    }
+
+    @Override
+    public void close()
+    {
+        iterator.close();
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java 
b/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java
new file mode 100644
index 0000000000..121ce1a581
--- /dev/null
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/FilteredFollowupRead.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads.tracked;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.PeekingIterator;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.db.PartitionRangeReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.dht.AbstractBounds;
+import 
org.apache.cassandra.service.reads.tracked.PartialTrackedRangeRead.FollowUpReadInfo;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+import org.apache.cassandra.utils.concurrent.FutureCombiner;
+
+import static 
org.apache.cassandra.service.reads.tracked.ExtendingCompletedRead.followUpReadRequired;
+import static 
org.apache.cassandra.service.reads.tracked.ExtendingCompletedRead.toQuery;
+import static 
org.apache.cassandra.service.reads.tracked.PartialTrackedRangeRead.makeFollowUpRead;
+
+class FilteredFollowupRead extends AsyncPromise<TrackedDataResponse>
+{
+    private final TrackedDataResponse initialResponse;
+    private final int toQuery;
+    private final ConsistencyLevel consistencyLevel;
+    private final long expiresAtNanos;
+    private final SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo;
+    private final PartitionRangeReadCommand command;
+    private final AbstractBounds<PartitionPosition> followUpBounds;
+    private final DecoratedKey finalKey;
+
+    public FilteredFollowupRead(TrackedDataResponse initialResponse,
+                                int toQuery,
+                                ConsistencyLevel consistencyLevel,
+                                long expiresAtNanos,
+                                SortedMap<DecoratedKey, FollowUpReadInfo> 
followUpReadInfo,
+                                PartitionRangeReadCommand command,
+                                AbstractBounds<PartitionPosition> 
followUpBounds,
+                                DecoratedKey finalKey)
+    {
+        this.initialResponse = initialResponse;
+        this.toQuery = toQuery;
+        this.consistencyLevel = consistencyLevel;
+        this.expiresAtNanos = expiresAtNanos;
+        this.followUpReadInfo = followUpReadInfo;
+        this.command = command;
+        this.followUpBounds = followUpBounds;
+        this.finalKey = finalKey;
+    }
+
+    private boolean interleavesWithOriginal(DecoratedKey key)
+    {
+        if (finalKey == null)
+            return false;
+        return key.compareTo(finalKey) < 0;
+    }
+
+    public void start()
+    {
+        ClusterMetadata metadata = ClusterMetadata.current();
+        List<Future<TrackedDataResponse>> futures = new ArrayList<>();
+
+        int remaining = toQuery;
+        PeekingIterator<DecoratedKey> followUpKeys = 
Iterators.peekingIterator(followUpReadInfo.keySet().iterator());
+        // query all keys that interleave with the range of keys from the 
original range read
+        while (followUpKeys.hasNext() && (remaining > 0 || 
interleavesWithOriginal(followUpKeys.peek())))
+        {
+            DecoratedKey key = followUpKeys.next();
+            FollowUpReadInfo info = followUpReadInfo.get(key);
+            remaining -= info.potentialMatches;
+            SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.fromRangeRead(key, command, 
command.limits().forShortReadRetry(toQuery));
+            TrackedRead.Partition read = 
TrackedRead.Partition.create(metadata, cmd, consistencyLevel);
+            read.start(expiresAtNanos);
+            futures.add(read.future());
+        }
+
+        SortedMap<DecoratedKey, FollowUpReadInfo> nextKeys = 
followUpKeys.hasNext() ? followUpReadInfo.tailMap(followUpKeys.next()) : 
Collections.emptySortedMap();
+
+        AtomicReference<PartialTrackedRead> partialRead;
+        if (remaining > 0)
+        {
+            partialRead = new AtomicReference<>();
+            TrackedRead.Range rangeRead = makeFollowUpRead(command, 
followUpBounds, remaining, consistencyLevel, expiresAtNanos);
+            rangeRead.startLocal(expiresAtNanos, partialRead::set);
+            futures.add(rangeRead.future());
+        }
+        else
+        {
+            partialRead = null;
+        }
+
+        FutureCombiner.allOf(futures).addCallback((responses, error) -> {
+            if (error != null)
+            {
+                tryFailure(error);
+                return;
+            }
+
+            try
+            {
+                List<TrackedDataResponse> allResponses = new 
ArrayList<>(responses);
+                allResponses.add(initialResponse);
+                TrackedDataResponse merged = 
TrackedDataResponse.merge(allResponses);
+                DataLimits.Counter mergedResultCounter = 
command.limits().newCounter(command.nowInSec(),
+                                                                               
      true,
+                                                                               
      command.selectsFullPartition(),
+                                                                               
      command.metadata().enforceStrictLiveness());
+
+                boolean partitionsFetched;
+                boolean initialIteratorExhausted;
+                TrackedDataResponse response;
+                try (PartitionIterator iterator = 
merged.makeIteratorUnlimited(command))
+                {
+                    partitionsFetched = iterator.hasNext();
+                    response = 
TrackedDataResponse.create(mergedResultCounter.applyTo(iterator), 
command.columnFilter());
+                    initialIteratorExhausted = iterator.hasNext();
+                }
+
+                // although we check for interleaved keys in the initial read, 
we always query for them in the follow up, so
+                // we just use normal short read protection checks here
+                if (followUpReadRequired(command, mergedResultCounter, 
initialIteratorExhausted, partitionsFetched))
+                {
+                    AbstractBounds<PartitionPosition> nextBounds =  
this.followUpBounds;
+                    if (partialRead != null)
+                    {
+                        PartialTrackedRangeRead followUpRangeRead = 
(PartialTrackedRangeRead) partialRead.get();
+                        nextBounds = followUpRangeRead.followUpBounds();
+                    }
+                    FilteredFollowupRead followUp = new 
FilteredFollowupRead(response, toQuery(command, mergedResultCounter), 
consistencyLevel, expiresAtNanos, nextKeys, command, nextBounds, null);
+                    followUp.start();
+                    followUp.addCallback((result, failure) -> {
+                        if (failure != null)
+                            tryFailure(failure);
+                        else
+                            trySuccess(result);
+                    });
+                }
+                else
+                {
+                    trySuccess(response);
+                }
+            }
+            catch (Throwable t)
+            {
+                tryFailure(t);
+            }
+
+        });
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java
 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java
index 4f50442950..1a6c91db4f 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRangeRead.java
@@ -18,7 +18,9 @@
 
 package org.apache.cassandra.service.reads.tracked;
 
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
@@ -36,15 +38,13 @@ import org.apache.cassandra.db.PartitionRangeReadCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.db.partitions.AbstractBTreePartition;
 import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.SimpleBTreePartition;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
-import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.ExcludingBounds;
@@ -54,39 +54,45 @@ import 
org.apache.cassandra.index.transactions.UpdateTransaction;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.schema.TableMetadata;
-import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Future;
 
-public class PartialTrackedRangeRead extends AbstractPartialTrackedRead
+public abstract class PartialTrackedRangeRead extends 
AbstractPartialTrackedRead
 {
     private static final Logger logger = 
LoggerFactory.getLogger(PartialTrackedRangeRead.class);
 
-    private final PartitionRangeReadCommand command;
-    private final SortedMap<DecoratedKey, SimpleBTreePartition> data = new 
TreeMap<>();
-    private final UnfilteredPartitionIterator initialData;
-    private final boolean enforceStrictLiveness;
+    protected static class FollowUpReadInfo
+    {
+        int potentialMatches = 0;
+    }
 
-    // short read support
-    private DecoratedKey lastPartitionKey; // key of the last observed 
partition
-    private boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
-    private boolean initialIteratorExhausted;
-    private boolean wasAugmented;
-    AbstractBounds<PartitionPosition> followUpBounds;
+    protected final PartitionRangeReadCommand command;
 
-    public PartialTrackedRangeRead(ReadExecutionController 
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long 
startTimeNanos, PartitionRangeReadCommand command, UnfilteredPartitionIterator 
initialData)
+    private PartialTrackedRangeRead(ReadExecutionController 
executionController, ColumnFamilyStore cfs, long startTimeNanos, 
PartitionRangeReadCommand command)
     {
-        super(executionController, searcher, cfs, startTimeNanos);
+        super(executionController, cfs, startTimeNanos);
         this.command = command;
-        this.initialData = initialData;
-        this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
     }
 
     public static PartialTrackedRangeRead create(ReadExecutionController 
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long 
startTimeNanos, PartitionRangeReadCommand command, UnfilteredPartitionIterator 
initialData)
     {
-        PartialTrackedRangeRead read = new 
PartialTrackedRangeRead(executionController, searcher, cfs, startTimeNanos, 
command, initialData);
+        RowFilter rowFilter = command.rowFilter();
+        PartialTrackedRangeRead read;
+        if (searcher != null)
+        {
+            throw new UnsupportedOperationException("TODO: CASSANDRA-20374");
+        }
+        else if (!rowFilter.isEmpty())
+        {
+            read = new PartialTrackedRangeRead.Filtered(executionController, 
cfs, startTimeNanos, command);
+        }
+        else
+        {
+            read = new PartialTrackedRangeRead.Simple(executionController, 
cfs, startTimeNanos, command);
+        }
+
         try
         {
-            read.prepare();
+            read.prepare(initialData);
             return read;
         }
         catch (Throwable e)
@@ -102,6 +108,227 @@ public class PartialTrackedRangeRead extends 
AbstractPartialTrackedRead
         return command;
     }
 
+    protected static class ShortReadSupport
+    {
+        final DecoratedKey lastPartitionKey; // key of the last observed 
partition
+        final boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
+        final boolean initialIteratorExhausted;
+        final AbstractBounds<PartitionPosition> followUpBounds;
+        boolean wasAugmented;
+
+        ShortReadSupport(Builder builder, boolean initialIteratorExhausted, 
AbstractBounds<PartitionPosition> followUpBounds)
+        {
+            this.lastPartitionKey = builder.lastPartitionKey;
+            this.partitionsFetched = builder.partitionsFetched;
+            this.initialIteratorExhausted = initialIteratorExhausted;
+            this.followUpBounds = followUpBounds;
+            this.wasAugmented = false;
+        }
+
+        protected static class Builder
+        {
+            final ReadCommand command;
+            final DataLimits.Counter counter;
+            DecoratedKey lastPartitionKey; // key of the last observed 
partition
+            boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
+
+            protected Builder(ReadCommand command)
+            {
+                this.command = command;
+                counter = command.limits().newCounter(command.nowInSec(),
+                                                      false,
+                                                      
command.selectsFullPartition(),
+                                                      
command.metadata().enforceStrictLiveness());
+            }
+
+            ShortReadSupport build()
+            {
+                boolean initialIteratorExhausted = 
command.limits().isExhausted(counter);
+                AbstractBounds<PartitionPosition> followUpBounds = null;
+                if (partitionsFetched)
+                {
+                    AbstractBounds<PartitionPosition> bounds = 
command.dataRange().keyRange();
+                    followUpBounds = bounds.inclusiveRight()
+                                     ? new Range<>(lastPartitionKey, 
bounds.right)
+                                     : new ExcludingBounds<>(lastPartitionKey, 
bounds.right);
+                    
Preconditions.checkState(!followUpBounds.contains(lastPartitionKey));
+                }
+                return new ShortReadSupport(this, initialIteratorExhausted, 
followUpBounds);
+            }
+        }
+    }
+
+    private abstract class Materializer extends 
Transformation<UnfilteredRowIterator>
+    {
+        final SortedMap<DecoratedKey, SimpleBTreePartition> data = new 
TreeMap<>();
+        final ShortReadSupport.Builder shortReadSupport;
+
+        private Materializer(ReadCommand command)
+        {
+            this.shortReadSupport = new ShortReadSupport.Builder(command);
+        }
+
+        abstract UnfilteredPartitionIterator 
filter(UnfilteredPartitionIterator iterator);
+
+        abstract RangePrepared createRangePrepared();
+
+        RangePrepared materialize(UnfilteredPartitionIterator inputIterator)
+        {
+            try
+            {
+                UnfilteredPartitionIterator materialized = 
Transformation.apply(inputIterator, new Transformation<UnfilteredRowIterator>()
+                {
+                    @Override
+                    protected UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition)
+                    {
+                        SimpleBTreePartition materialized = 
data.computeIfAbsent(partition.partitionKey(), key -> new 
SimpleBTreePartition(key, partition.metadata(), UpdateTransaction.NO_OP));
+                        
materialized.update(PartitionUpdate.fromIterator(partition, 
command.columnFilter()));
+                        shortReadSupport.lastPartitionKey = 
partition.partitionKey();
+                        shortReadSupport.partitionsFetched = true;
+                        return queryPartition(materialized);
+                    }
+                });
+
+                UnfilteredPartitionIterator filtered = filter(materialized);
+
+                try (UnfilteredPartitionIterator iterator = 
shortReadSupport.counter.applyTo(filtered))
+                {
+                    consume(iterator);
+                }
+
+                return createRangePrepared();
+            }
+            finally
+            {
+                inputIterator.close();
+            }
+        }
+
+        @Override
+        protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
+        {
+            SimpleBTreePartition materialized = 
data.computeIfAbsent(partition.partitionKey(), key -> new 
SimpleBTreePartition(key, partition.metadata(), UpdateTransaction.NO_OP));
+            materialized.update(PartitionUpdate.fromIterator(partition, 
command.columnFilter()));
+            shortReadSupport.lastPartitionKey = partition.partitionKey();
+            shortReadSupport.partitionsFetched = true;
+            return queryPartition(materialized);
+        }
+    }
+
+    protected abstract class RangePrepared extends Prepared
+    {
+        protected final SortedMap<DecoratedKey, SimpleBTreePartition> data;
+        protected final ShortReadSupport shortReadSupport;
+        protected boolean wasAugmented;
+
+        public RangePrepared(SortedMap<DecoratedKey, SimpleBTreePartition> 
data, ShortReadSupport shortReadSupport)
+        {
+            this.data = data;
+            this.shortReadSupport = shortReadSupport;
+        }
+
+        protected boolean canAcceptUpdate(PartitionUpdate update)
+        {
+            return shortReadSupport.initialIteratorExhausted || 
!shortReadSupport.followUpBounds.contains(update.partitionKey());
+        }
+
+        private SimpleBTreePartition augmentResponseInternal(PartitionUpdate 
update)
+        {
+            SimpleBTreePartition partition = 
data.computeIfAbsent(update.partitionKey(), key -> new 
SimpleBTreePartition(key, update.metadata(), UpdateTransaction.NO_OP));
+            partition.update(update);
+            return partition;
+        }
+
+        @Override
+        public State augment(PartitionUpdate update)
+        {
+            // if the input iterator reached the row limit, then we can't 
apply any augmenting mutations that are past
+            // the last materialized key. Since we wouldn't have materialized 
the local data for that key, applying an
+            // update would cause us to return incomplete data for it.
+            if (canAcceptUpdate(update))
+            {
+                logger.trace("Augmented partition {} for read {}", 
update.partitionKey(), PartialTrackedRangeRead.this);
+                augmentResponseInternal(update);
+            }
+            else
+            {
+                logger.trace("Ignoring unacceptable update from key {} on read 
{}", update.partitionKey(), PartialTrackedRangeRead.this);
+            }
+            wasAugmented = true;
+            return this;
+        }
+    }
+
+    protected abstract class RangeCompleted extends Completed
+    {
+        protected final SortedMap<DecoratedKey, SimpleBTreePartition> data;
+        protected final ShortReadSupport shortReadSupport;
+        protected final boolean wasAugmented;
+
+        public RangeCompleted(SortedMap<DecoratedKey, SimpleBTreePartition> 
data, ShortReadSupport shortReadSupport, boolean wasAugmented)
+        {
+            this.data = data;
+            this.shortReadSupport = shortReadSupport;
+            this.wasAugmented = wasAugmented;
+        }
+
+        @Override
+        protected UnfilteredPartitionIterator iterator()
+        {
+            Iterator<SimpleBTreePartition> iterator = data.values().iterator();
+            return new AbstractUnfilteredPartitionIterator()
+            {
+                @Override
+                public TableMetadata metadata()
+                {
+                    return command.metadata();
+                }
+
+                @Override
+                public boolean hasNext()
+                {
+                    return iterator.hasNext();
+                }
+
+                @Override
+                public UnfilteredRowIterator next()
+                {
+                    return queryPartition(iterator.next());
+                }
+            };
+        }
+
+        protected abstract CompletedRead 
extendRead(UnfilteredPartitionIterator iterator);
+
+        @Override
+        protected CompletedRead createResult(UnfilteredPartitionIterator 
iterator)
+        {
+            if (wasAugmented)
+                return extendRead(iterator);
+            return CompletedRead.simple(iterator, command);
+        }
+
+        AbstractBounds<PartitionPosition> followUpBounds()
+        {
+            return shortReadSupport.followUpBounds;
+        }
+    }
+
+    abstract Materializer createMaterializer();
+
+    @Override
+    protected Prepared prepareInternal(UnfilteredPartitionIterator initialData)
+    {
+        // memtable contents are frozen at read completion time, when the 
iterator is evaluated, not at the beginning
+        // of the read, when references to memtables and sstables are 
collected. Because of this, replica coordinated
+        // reads can cause read monotonicity to be broken by returning data 
that hasn't been replicated to at least
+        // CL other nodes via reconciliation. To prevent this, the contents of 
the initial iterator are materialized
+        // onto heap at partition granularity until the limits of the read are 
reached.
+
+        Materializer materializer = createMaterializer();
+        return materializer.materialize(initialData);
+    }
+
     UnfilteredRowIterator queryPartition(AbstractBTreePartition partition)
     {
         return partition.unfilteredIterator(command.columnFilter(),
@@ -121,227 +348,264 @@ public class PartialTrackedRangeRead extends 
AbstractPartialTrackedRead
         }
     }
 
-    @Override
-    void freezeInitialData()
+    public AbstractBounds<PartitionPosition> followUpBounds()
     {
-        // memtable contents are frozen at read completion time, when the 
iterator is evaluated, not at the beginning
-        // of the read, when references to memtables and sstables are 
collected. Because of this, replica coordinated
-        // reads can cause read monotonicity to be broken by returning data 
that hasn't been replicated to at least
-        // CL other nodes via reconciliation. To prevent this, the contents of 
the initial iterator are materialized
-        // onto heap at partition granularity until the limits of the read are 
reached.
+        RangeCompleted completed = (RangeCompleted) state().asCompleted();
+        return completed.followUpBounds();
+    }
+
+    protected static TrackedRead.Range 
makeFollowUpRead(PartitionRangeReadCommand command, 
AbstractBounds<PartitionPosition> followUpBounds, int toQuery, ConsistencyLevel 
consistencyLevel, long expiresAtNanos)
+    {
+        DataLimits newLimits = command.limits().forShortReadRetry(toQuery);
+
+        DataRange newDataRange = 
command.dataRange().forSubRange(followUpBounds);
+
+        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
+        PartitionRangeReadCommand followUpCmd = 
command.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
+        ReplicaPlan.ForRangeRead replicaPlan = 
ReplicaPlans.forRangeRead(keyspace,
+                                                                         
followUpCmd.indexQueryPlan(),
+                                                                         
consistencyLevel,
+                                                                         
followUpCmd.dataRange().keyRange(),
+                                                                         1);
 
-        UnfilteredPartitionIterator materializer = new 
AbstractUnfilteredPartitionIterator()
+        TrackedRead.Range read = TrackedRead.Range.create(followUpCmd, 
replicaPlan);
+        logger.trace("Short read detected, starting followup read {}", read);
+        return read;
+    }
+
+    static class Simple extends PartialTrackedRangeRead
+    {
+        private class SimplePrepared extends RangePrepared
         {
-            @Override
-            public TableMetadata metadata()
+            public SimplePrepared(SortedMap<DecoratedKey, 
SimpleBTreePartition> data, ShortReadSupport shortReadSupport)
             {
-                return initialData.metadata();
+                super(data, shortReadSupport);
             }
 
             @Override
-            public boolean hasNext()
+            Completed complete()
             {
-                return initialData.hasNext();
+                return new SimpleCompleted(data, shortReadSupport, 
wasAugmented);
             }
+        }
 
-            @Override
-            public UnfilteredRowIterator next()
+        protected class SimpleCompleted extends RangeCompleted
+        {
+            public SimpleCompleted(SortedMap<DecoratedKey, 
SimpleBTreePartition> data, ShortReadSupport shortReadSupport, boolean 
wasAugmented)
             {
-                try (UnfilteredRowIterator rowIterator = initialData.next())
-                {
-                    SimpleBTreePartition partition = 
augmentResponseInternal(PartitionUpdate.fromIterator(rowIterator, 
command.columnFilter()));
-                    lastPartitionKey = partition.partitionKey();
-                    partitionsFetched = true;
-                    return queryPartition(partition);
-                }
+                super(data, shortReadSupport, wasAugmented);
             }
 
             @Override
-            public void close()
+            protected CompletedRead extendRead(UnfilteredPartitionIterator 
iterator)
             {
-                super.close();
-                initialData.close();
+                return new ExtendingCompletedRead(command, iterator, 
shortReadSupport.partitionsFetched, shortReadSupport.initialIteratorExhausted, 
shortReadSupport.followUpBounds);
             }
-        };
-
-        // unmerged per-source counter
-        final DataLimits.Counter singleResultCounter = 
command.limits().newCounter(command.nowInSec(),
-                                                                               
    false,
-                                                                               
    command.selectsFullPartition(),
-                                                                               
    enforceStrictLiveness);
-        try (UnfilteredPartitionIterator iterator = 
singleResultCounter.applyTo(materializer))
+        }
+
+        public Simple(ReadExecutionController executionController, 
ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command)
+        {
+            super(executionController, cfs, startTimeNanos, command);
+        }
+
+        @Override
+        Materializer createMaterializer()
         {
-            consume(iterator);
+            return new Materializer(command)
+            {
+                @Override
+                UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iterator)
+                {
+                    Preconditions.checkState(command().rowFilter().isEmpty());
+                    return iterator;
+                }
+
+                @Override
+                RangePrepared createRangePrepared()
+                {
+                    return new SimplePrepared(data, shortReadSupport.build());
+                }
+            };
         }
-        initialIteratorExhausted = 
command.limits().isExhausted(singleResultCounter);
-        if (partitionsFetched)
+
+        @Override
+        public Index.Searcher searcher()
         {
-            AbstractBounds<PartitionPosition> bounds = 
command.dataRange().keyRange();
-            followUpBounds = bounds.inclusiveRight()
-                             ? new Range<>(lastPartitionKey, bounds.right)
-                             : new ExcludingBounds<>(lastPartitionKey, 
bounds.right);
-            
Preconditions.checkState(!followUpBounds.contains(lastPartitionKey));
+            return null;
         }
-        wasAugmented = false;
     }
 
-    @Override
-    UnfilteredPartitionIterator initialData()
+
+    /**
+     * Since ALLOW FILTERING reads can cover a lot of partitions without 
returning much data, we don't want to eagerly
+     * materialize partitions onto the heap and keep them there. So this 
filters out non-matching partitions from the
+     * freezeInitialData phase. However, if reconciliation receives a mutation 
that applies to a previously discarded
+     * partition AND the contents of that mutation matches the row filter, we 
also need to retry the read against that
+     * partition so we don't return incomplete data. This class handles both 
jobs
+     */
+    static class Filtered extends PartialTrackedRangeRead
     {
-        Iterator<SimpleBTreePartition> iterator = data.values().iterator();
-        return new AbstractUnfilteredPartitionIterator()
+
+        protected class FilteredPrepared extends RangePrepared
         {
-            @Override
-            public TableMetadata metadata()
+            private final Set<DecoratedKey> filteredKeys;
+            private final SortedMap<DecoratedKey, FollowUpReadInfo> 
followUpReadInfo = new TreeMap<>();
+            private final RowFilter.RowFilterTransformation filter;
+            public FilteredPrepared(SortedMap<DecoratedKey, 
SimpleBTreePartition> data, ShortReadSupport shortReadSupport, 
Set<DecoratedKey> filteredKeys, RowFilter.RowFilterTransformation filter)
             {
-                return command.metadata();
+                super(data, shortReadSupport);
+                this.filteredKeys = filteredKeys;
+                this.filter = filter;
             }
 
             @Override
-            public boolean hasNext()
+            protected boolean canAcceptUpdate(PartitionUpdate update)
             {
-                return iterator.hasNext();
+                DecoratedKey key = update.partitionKey();
+                if (filteredKeys.contains(key))
+                {
+                    int matches = filter.potentialMatches(update);
+                    if (matches > 0)
+                    {
+                        FollowUpReadInfo info = 
followUpReadInfo.computeIfAbsent(key, k -> new FollowUpReadInfo());
+                        info.potentialMatches +=  matches;
+                    }
+                    logger.trace("Not applying update for previously filtered 
partition: {}", update.partitionKey());
+                    return false;
+                }
+                return super.canAcceptUpdate(update);
             }
 
             @Override
-            public UnfilteredRowIterator next()
+            Completed complete()
             {
-                return queryPartition(iterator.next());
+                return new FilteredCompleted(data, shortReadSupport, 
wasAugmented, followUpReadInfo);
             }
-        };
-    }
-
-    @Override
-    UnfilteredPartitionIterator augmentedData()
-    {
-        return null;
-    }
-
-    private SimpleBTreePartition augmentResponseInternal(PartitionUpdate 
update)
-    {
-        SimpleBTreePartition partition = 
data.computeIfAbsent(update.partitionKey(), key -> new 
SimpleBTreePartition(key, update.metadata(), UpdateTransaction.NO_OP));
-        partition.update(update);
-        return partition;
-    }
-
-    @Override
-    void augmentResponse(PartitionUpdate update)
-    {
-        // if the input iterator reached the row limit, then we can't apply 
any augmenting mutations that are past
-        // the last materialized key. Since we wouldn't have materialized the 
local data for that key, applying an
-        // update would cause us to return incomplete data for it.
-        if (initialIteratorExhausted || 
!followUpBounds.contains(update.partitionKey()))
-            augmentResponseInternal(update);
-        wasAugmented = true;
-    }
+        }
 
-    private class ExtendingCompletedRead implements CompletedRead
-    {
+        protected class FilteredMaterializer extends Materializer
+        {
+            private final Set<DecoratedKey> filteredKeys = new HashSet<>();
+            private final RowFilter.RowFilterTransformation filter;
+            public FilteredMaterializer(ReadCommand command)
+            {
+                super(command);
+                filter = command.rowFilter().filter(command().metadata(), 
command().nowInSec());
+            }
 
-        final UnfilteredPartitionIterator iterator;
-        // merged end-result counter
-        final DataLimits.Counter mergedResultCounter = 
command.limits().newCounter(command.nowInSec(),
-                                                                               
    true,
-                                                                               
    command.selectsFullPartition(),
-                                                                               
    enforceStrictLiveness);
+            @Override
+            UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iterator)
+            {
+                return Transformation.apply(iterator, new Transformation<>()
+                {
+                    @Override
+                    protected UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition)
+                    {
+                        if (Transformation.apply(partition, filter).isEmpty())
+                        {
+                            DecoratedKey key = partition.partitionKey();
+                            data.remove(key);
+                            filteredKeys.add(key);
+                            partition.close();
+                            return null;
+                        }
+                        return partition;
+                    }
+                });
+            }
 
-        public ExtendingCompletedRead(UnfilteredPartitionIterator iterator)
-        {
-            this.iterator = iterator;
+            @Override
+            RangePrepared createRangePrepared()
+            {
+                return new FilteredPrepared(data, shortReadSupport.build(), 
filteredKeys, filter);
+            }
         }
 
-        @Override
-        public PartitionIterator iterator()
+        static class FilteredCompletedRead extends ExtendingCompletedRead
         {
-            PartitionIterator filtered = 
UnfilteredPartitionIterators.filter(iterator, command.nowInSec());
-            PartitionIterator counted = Transformation.apply(filtered, 
mergedResultCounter);
-            return Transformation.apply(counted, new 
EmptyPartitionsDiscarder());
-        }
+            private final DecoratedKey lastMatchingKey;
+            private final SortedMap<DecoratedKey, FollowUpReadInfo> 
followUpReadInfo;
+            public FilteredCompletedRead(PartitionRangeReadCommand command, 
UnfilteredPartitionIterator iterator, ShortReadSupport shortReadSupport, 
DecoratedKey lastMatchingKey, SortedMap<DecoratedKey, FollowUpReadInfo> 
followUpReadInfo)
+            {
+                super(command, iterator, shortReadSupport.partitionsFetched, 
shortReadSupport.initialIteratorExhausted, shortReadSupport.followUpBounds);
+                this.lastMatchingKey = lastMatchingKey;
+                this.followUpReadInfo = followUpReadInfo;
+            }
 
-        @Override
-        public TrackedRead<?, ?> followupRead(ConsistencyLevel 
consistencyLevel, long expiresAtNanos)
-        {
-            // never try to request additional partitions from replicas if our 
reconciled partitions are already filled to the limit
-            if (mergedResultCounter.isDone())
-                return null;
+            /**
+             * Even if we reached the limit during materialization, if there 
are keys ahead of the first materialized key
+             * or interleaved with them, then we need to read them
+             * @return
+             */
+            private boolean hasInterleavedFollowupKeys()
+            {
+                if (followUpReadInfo.isEmpty())
+                    return false;
 
-            // we do not apply short read protection when we have no limits at 
all
-            if (command.limits().isUnlimited())
-                return null;
+                if (lastMatchingKey == null)  // null means there was no data 
and therefore no interleaving
+                    return true;
 
-            /*
-             * If this is a single partition read command or an (indexed) 
partition range read command with
-             * a partition key specified, then we can't and shouldn't try 
fetch more partitions.
-             */
-            if (command.isLimitedToOnePartition())
-                return null;
-
-            /*
-             * If the returned result doesn't have enough rows/partitions to 
satisfy even the original limit, don't ask for more.
-             *
-             * Can only take the short cut if there is no per partition limit 
set. Otherwise it's possible to hit false
-             * positives due to some rows being uncounted for in certain 
scenarios (see CASSANDRA-13911).
-             */
-            if (initialIteratorExhausted && 
command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
-                return null;
+                return followUpReadInfo.firstKey().compareTo(lastMatchingKey) 
< 0;
+            }
 
-            /*
-             * Either we had an empty iterator as the initial response, or our 
moreContents() call got us an empty iterator.
-             * There is no point to ask the replica for more rows - it has no 
more in the requested range.
-             */
-            if (!partitionsFetched)
-                return null;
-            partitionsFetched = false;
-
-            /*
-             * We are going to fetch one partition at a time for thrift and 
potentially more for CQL.
-             * The row limit will either be set to the per partition limit - 
if the command has no total row limit set, or
-             * the total # of rows remaining - if it has some. If we don't 
grab enough rows in some of the partitions,
-             * then future ShortReadRowsProtection.moreContents() calls will 
fetch the missing ones.
-             */
-            int toQuery = command.limits().count() != DataLimits.NO_LIMIT
-                          ? command.limits().count() - 
mergedResultCounter.rowsCounted()
-                          : command.limits().perPartitionCount();
+            @Override
+            protected boolean followUpRequired()
+            {
+                return hasInterleavedFollowupKeys() || 
super.followUpRequired();
+            }
 
-            
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
-            Tracing.trace("Requesting {} extra rows from {} for short read 
protection", toQuery, FBUtilities.getBroadcastAddressAndPort());
-            logger.info("Requesting {} extra rows from {} for short read 
protection", toQuery, FBUtilities.getBroadcastAddressAndPort());
+            @Override
+            protected Future<TrackedDataResponse> 
makeFollowupRead(TrackedDataResponse initialResponse, int toQuery, 
ConsistencyLevel consistencyLevel, long expiresAtNanos)
+            {
+                if (followUpReadInfo.isEmpty())
+                    return super.makeFollowupRead(initialResponse, toQuery, 
consistencyLevel, expiresAtNanos);
+
+                FilteredFollowupRead followupRead = new 
FilteredFollowupRead(initialResponse,
+                                                                             
toQuery,
+                                                                             
consistencyLevel,
+                                                                             
expiresAtNanos,
+                                                                             
followUpReadInfo,
+                                                                             
command,
+                                                                             
followUpBounds,
+                                                                             
lastMatchingKey);
+
+                followupRead.start();
+                return followupRead;
+            }
 
-            return makeFollowupRead(toQuery, consistencyLevel, expiresAtNanos);
         }
 
-        private TrackedRead<?, ?> makeFollowupRead(int toQuery, 
ConsistencyLevel consistencyLevel, long expiresAtNanos)
+        private class FilteredCompleted extends RangeCompleted
         {
-            DataLimits newLimits = command.limits().forShortReadRetry(toQuery);
-
-            DataRange newDataRange = 
command.dataRange().forSubRange(followUpBounds);
+            private final SortedMap<DecoratedKey, FollowUpReadInfo> 
followUpReadInfo;
+            public FilteredCompleted(SortedMap<DecoratedKey, 
SimpleBTreePartition> data, ShortReadSupport shortReadSupport, boolean 
wasAugmented, SortedMap<DecoratedKey, FollowUpReadInfo> followUpReadInfo)
+            {
+                super(data, shortReadSupport, wasAugmented);
+                this.followUpReadInfo = followUpReadInfo;
+            }
 
-            Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-            PartitionRangeReadCommand followUpCmd = 
command.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
-            ReplicaPlan.ForRangeRead replicaPlan = 
ReplicaPlans.forRangeRead(keyspace,
-                                                                             
followUpCmd.indexQueryPlan(),
-                                                                             
consistencyLevel,
-                                                                             
followUpCmd.dataRange().keyRange(),
-                                                                             
1);
+            @Override
+            protected CompletedRead extendRead(UnfilteredPartitionIterator 
iterator)
+            {
+                return new FilteredCompletedRead(command, iterator, 
shortReadSupport, data.isEmpty() ? data.lastKey() : null, followUpReadInfo);
+            }
+        }
 
-            TrackedRead.Range read = TrackedRead.Range.create(followUpCmd, 
replicaPlan);
-            logger.trace("Short read detected, starting followup read {}", 
read);
-            read.start(expiresAtNanos);
-            return read;
+        public Filtered(ReadExecutionController executionController, 
ColumnFamilyStore cfs, long startTimeNanos, PartitionRangeReadCommand command)
+        {
+            super(executionController, cfs, startTimeNanos, command);
         }
 
         @Override
-        public void close()
+        Materializer createMaterializer()
         {
-            iterator.close();
+            return new FilteredMaterializer(command);
         }
-    }
 
-    @Override
-    CompletedRead createResult(UnfilteredPartitionIterator iterator)
-    {
-        if (wasAugmented)
-            return new ExtendingCompletedRead(iterator);
-        return CompletedRead.simple(iterator, command().nowInSec());
+        @Override
+        public Index.Searcher searcher()
+        {
+            return null;
+        }
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
index daf222239c..99e509ee7f 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedRead.java
@@ -25,33 +25,46 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.index.Index;
+import org.apache.cassandra.utils.concurrent.Future;
 
 public interface PartialTrackedRead
 {
     interface CompletedRead extends AutoCloseable
     {
-        PartitionIterator iterator();
-        TrackedRead<?, ?> followupRead(ConsistencyLevel consistencyLevel, long 
expiresAtNanos);
+        TrackedDataResponse response(); // must be called from the read stage
+        Future<TrackedDataResponse> followupRead(TrackedDataResponse 
initialResponse, ConsistencyLevel consistencyLevel, long expiresAtNanos);
 
         @Override
         void close();
 
-        static CompletedRead simple(UnfilteredPartitionIterator partition, 
long nowInSec)
+        static TrackedDataResponse createResponse(UnfilteredPartitionIterator 
partition, ReadCommand command)
+        {
+            PartitionIterator iterator = 
UnfilteredPartitionIterators.filter(partition, command.nowInSec());
+            DataLimits.Counter counter = 
command.limits().newCounter(command.nowInSec(),
+                                                                     false,
+                                                                     
command.selectsFullPartition(),
+                                                                     
command.metadata().enforceStrictLiveness()).onlyCount();
+            return TrackedDataResponse.create(counter.applyTo(iterator),
+                                              command.columnFilter());
+        }
+
+        static CompletedRead simple(UnfilteredPartitionIterator partition, 
ReadCommand command)
         {
             return new CompletedRead()
             {
                 @Override
-                public PartitionIterator iterator()
+                public TrackedDataResponse response()
                 {
-                    return UnfilteredPartitionIterators.filter(partition, 
nowInSec);
+                    return createResponse(partition, command);
                 }
 
                 @Override
-                public TrackedRead<?, ?> followupRead(ConsistencyLevel 
consistencyLevel, long expiresAtNanos)
+                public Future<TrackedDataResponse> 
followupRead(TrackedDataResponse initialRead, ConsistencyLevel 
consistencyLevel, long expiresAtNanos)
                 {
                     return null;
                 }
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
index 6aa64a2985..e15dba8a5c 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/PartialTrackedSinglePartitionRead.java
@@ -18,6 +18,8 @@
 
 package org.apache.cassandra.service.reads.tracked;
 
+import java.util.List;
+
 import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -29,29 +31,31 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.partitions.SimpleBTreePartition;
 import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.index.Index;
 import org.apache.cassandra.index.transactions.UpdateTransaction;
 
+import static 
org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP;
+
 public class PartialTrackedSinglePartitionRead extends 
AbstractPartialTrackedRead
 {
+    private final Index.Searcher searcher;
     private final SinglePartitionReadCommand command;
-    private final UnfilteredPartitionIterator initialData;
-    private SimpleBTreePartition augmentedData;
 
-    public PartialTrackedSinglePartitionRead(ReadExecutionController 
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long 
startTimeNanos, SinglePartitionReadCommand command, UnfilteredPartitionIterator 
initialData)
+    public PartialTrackedSinglePartitionRead(ReadExecutionController 
executionController, Index.Searcher searcher, ColumnFamilyStore cfs, long 
startTimeNanos, SinglePartitionReadCommand command)
     {
-        super(executionController, searcher, cfs, startTimeNanos);
+        super(executionController, cfs, startTimeNanos);
+        this.searcher = searcher;
         this.command = command;
-        this.initialData = initialData;
     }
 
     public static PartialTrackedSinglePartitionRead 
create(ReadExecutionController executionController, Index.Searcher searcher, 
ColumnFamilyStore cfs, long startTimeNanos, SinglePartitionReadCommand command, 
UnfilteredPartitionIterator initialData)
     {
-        PartialTrackedSinglePartitionRead read = new 
PartialTrackedSinglePartitionRead(executionController, searcher, cfs, 
startTimeNanos, command, initialData);
+        PartialTrackedSinglePartitionRead read = new 
PartialTrackedSinglePartitionRead(executionController, searcher, cfs, 
startTimeNanos, command);
         try
         {
-            read.prepare();
+            read.prepare(initialData);
             return read;
         }
         catch (Throwable e)
@@ -61,48 +65,88 @@ public class PartialTrackedSinglePartitionRead extends 
AbstractPartialTrackedRea
         }
     }
 
-    @Override
-    void freezeInitialData()
+    private class SinglePartitionPrepared extends Prepared
     {
-        // the iterators from queryStorage grabs sstable references and a
-        // snapshot of the memtable partition so we don't need to do anything 
here
-    }
+        private final UnfilteredPartitionIterator initialData;
+        private SimpleBTreePartition augmentedData;
 
-    @Override
-    public ReadCommand command()
-    {
-        return command;
+        private SinglePartitionPrepared(UnfilteredPartitionIterator 
initialData)
+        {
+            this.initialData = initialData;
+        }
+
+        @Override
+        public State augment(PartitionUpdate update)
+        {
+            
Preconditions.checkArgument(update.partitionKey().equals(command.partitionKey()));
+            if (augmentedData == null)
+                augmentedData = new 
SimpleBTreePartition(command.partitionKey(), command.metadata(), 
UpdateTransaction.NO_OP);
+
+            augmentedData.update(update);
+            return this;
+        }
+
+        @Override
+        Completed complete()
+        {
+            return new SinglePartitionCompleted(initialData, augmentedData);
+        }
     }
 
-    @Override
-    UnfilteredPartitionIterator initialData()
+    private class SinglePartitionCompleted extends Completed
     {
-        return initialData;
+        private final UnfilteredPartitionIterator initialData;
+        private final SimpleBTreePartition augmentedData;
+
+        public SinglePartitionCompleted(UnfilteredPartitionIterator 
initialData, SimpleBTreePartition augmentedData)
+        {
+            this.initialData = initialData;
+            this.augmentedData = augmentedData;
+        }
+
+        private UnfilteredPartitionIterator augmentedIterator()
+        {
+            if (augmentedData == null)
+                return null;
+            Slices slices = 
command.clusteringIndexFilter().getSlices(command.metadata());
+            UnfilteredRowIterator augmentedPartition = 
augmentedData.unfilteredIterator(command.columnFilter(), slices, 
command.clusteringIndexFilter().isReversed());
+            return new 
SingletonUnfilteredPartitionIterator(augmentedPartition);
+        }
+
+        @Override
+        protected UnfilteredPartitionIterator iterator()
+        {
+            UnfilteredPartitionIterator augmentedIterator = 
augmentedIterator();
+            if (augmentedIterator == null)
+                return initialData;
+
+            return UnfilteredPartitionIterators.merge(List.of(initialData, 
augmentedIterator), NOOP);
+        }
+
+        @Override
+        protected CompletedRead createResult(UnfilteredPartitionIterator 
iterator)
+        {
+            return CompletedRead.simple(iterator, command);
+        }
     }
 
     @Override
-    UnfilteredPartitionIterator augmentedData()
+    protected Prepared prepareInternal(UnfilteredPartitionIterator initialData)
     {
-        if (augmentedData == null)
-            return null;
-        Slices slices = 
command.clusteringIndexFilter().getSlices(command.metadata());
-        UnfilteredRowIterator augmented = 
augmentedData.unfilteredIterator(command.columnFilter(), slices, 
command.clusteringIndexFilter().isReversed());
-        return new SingletonUnfilteredPartitionIterator(augmented);
+        return new SinglePartitionPrepared(initialData);
     }
 
     @Override
-    void augmentResponse(PartitionUpdate update)
+    public Index.Searcher searcher()
     {
-        
Preconditions.checkArgument(update.partitionKey().equals(command.partitionKey()));
-        if (augmentedData == null)
-            augmentedData = new SimpleBTreePartition(command.partitionKey(), 
command.metadata(), UpdateTransaction.NO_OP);
-
-        augmentedData.update(update);
+        return searcher;
     }
 
+    // TODO: delete (almost?) ever
+
     @Override
-    CompletedRead createResult(UnfilteredPartitionIterator iterator)
+    public ReadCommand command()
     {
-        return CompletedRead.simple(iterator, command().nowInSec());
+        return command;
     }
 }
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java
index b55095bf02..01fad19fb5 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedDataResponse.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.service.reads.tracked;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.PartitionIterators;
 import org.apache.cassandra.io.IVersionedSerializer;
@@ -33,18 +34,63 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
 
 public class TrackedDataResponse
 {
     private final int serializationVersion;
-    private final ByteBuffer data;
+    private final List<ByteBuffer> data;
 
     public TrackedDataResponse(int serializationVersion, ByteBuffer data)
     {
+        this(serializationVersion, Collections.singletonList(data));
+    }
+
+    private TrackedDataResponse(int serializationVersion, List<ByteBuffer> 
data)
+    {
+        Preconditions.checkArgument(!data.isEmpty());
         this.serializationVersion = serializationVersion;
         this.data = data;
     }
 
+    public TrackedDataResponse merge(TrackedDataResponse that)
+    {
+        return merge(this, that);
+    }
+
+    public static TrackedDataResponse merge(TrackedDataResponse l, 
TrackedDataResponse r)
+    {
+        Preconditions.checkArgument(l.serializationVersion == 
r.serializationVersion);
+        List<ByteBuffer> newData = new ArrayList<>(l.data.size() + 
r.data.size());
+        newData.addAll(l.data);
+        newData.addAll(r.data);
+        return new TrackedDataResponse(l.serializationVersion, newData);
+    }
+
+    public static TrackedDataResponse merge(List<TrackedDataResponse> 
responses)
+    {
+        Preconditions.checkArgument(!responses.isEmpty());
+
+        int version = responses.get(0).serializationVersion;
+        int size = responses.get(0).data.size();
+
+        for (int i=1,mi=responses.size(); i<mi; i++)
+        {
+            Preconditions.checkState(responses.get(i).serializationVersion == 
version);
+            size += responses.get(i).data.size();
+        }
+
+        List<ByteBuffer> newData = new ArrayList<>(size);
+        for (int i=0,mi=responses.size(); i<mi; i++)
+            newData.addAll(responses.get(i).data);
+
+        return new TrackedDataResponse(version, newData);
+    }
+
     public static TrackedDataResponse create(PartitionIterator iter, 
ColumnFilter selection)
     {
         try (DataOutputBuffer buffer = new DataOutputBuffer())
@@ -59,7 +105,7 @@ public class TrackedDataResponse
         }
     }
 
-    public PartitionIterator makeIterator(ReadCommand command)
+    private static PartitionIterator makeIterator(int serializationVersion, 
ByteBuffer data, ReadCommand command)
     {
         try (DataInputBuffer in = new DataInputBuffer(data, true))
         {
@@ -72,28 +118,55 @@ public class TrackedDataResponse
         }
     }
 
-    public static final IVersionedSerializer<TrackedDataResponse> serializer = 
new IVersionedSerializer<TrackedDataResponse>()
+    public PartitionIterator makeIteratorUnlimited(ReadCommand command)
+    {
+        if (data.size() == 1)
+            return makeIterator(serializationVersion, data.get(0), command);
+
+        List<PartitionIterator> iterators = new ArrayList<>(data.size());
+        for (ByteBuffer buffer : data)
+            iterators.add(makeIterator(serializationVersion, buffer, command));
+        return PartitionIterators.mergeNonOverlapping(iterators);
+    }
+
+    public PartitionIterator makeIterator(ReadCommand command)
+    {
+        DataLimits.Counter counter = 
command.limits().newCounter(command.nowInSec(),
+                                                                 true,
+                                                                 
command.selectsFullPartition(),
+                                                                 
command.metadata().enforceStrictLiveness());
+        return counter.applyTo(makeIteratorUnlimited(command));
+    }
+
+    public static final IVersionedSerializer<TrackedDataResponse> serializer = 
new IVersionedSerializer<>()
     {
         @Override
         public void serialize(TrackedDataResponse response, DataOutputPlus 
out, int version) throws IOException
         {
             out.writeInt(response.serializationVersion);
-            ByteBufferUtil.writeWithVIntLength(response.data, out);
+            out.writeInt(response.data.size());
+            for (ByteBuffer buffer : response.data)
+                ByteBufferUtil.writeWithVIntLength(buffer, out);
         }
 
         @Override
         public TrackedDataResponse deserialize(DataInputPlus in, int version) 
throws IOException
         {
             int serializationVersion = in.readInt();
-            ByteBuffer data = ByteBufferUtil.readWithVIntLength(in);
+            int size = in.readInt();
+            List<ByteBuffer> data = new ArrayList<>(size);
+            for (int i = 0; i < size; i++)
+                data.add(ByteBufferUtil.readWithVIntLength(in));
             return new TrackedDataResponse(serializationVersion, data);
         }
 
         @Override
         public long serializedSize(TrackedDataResponse response, int version)
         {
-            return TypeSizes.sizeof(response.serializationVersion)
-                   + 
ByteBufferUtil.serializedSizeWithVIntLength(response.data);
+            long size = TypeSizes.sizeof(response.serializationVersion) + 
TypeSizes.sizeof(response.data.size());
+            for (ByteBuffer buffer : response.data)
+                size += ByteBufferUtil.serializedSizeWithVIntLength(buffer);
+            return size;
         }
     };
 }
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java
 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java
index 6732adca4b..c62531240a 100644
--- 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java
+++ 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReadCoordinator.java
@@ -27,8 +27,6 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.filter.ColumnFilter;
-import org.apache.cassandra.db.partitions.PartitionIterator;
-import org.apache.cassandra.db.partitions.PartitionIterators;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
@@ -45,6 +43,8 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.AbstractFuture;
 import org.apache.cassandra.utils.concurrent.Accumulator;
 import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,6 +52,7 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
 public class TrackedLocalReadCoordinator
 {
@@ -512,7 +513,7 @@ public class TrackedLocalReadCoordinator
         });
     }
 
-    public void startLocalRead(TrackedRead.Id readId, ReadCommand command, 
ReplicaPlan.AbstractForRead<?, ?> replicaPlan, int[] summaryNodes, long 
expiresAtNanos)
+    public void startLocalRead(TrackedRead.Id readId, ReadCommand command, 
ReplicaPlan.AbstractForRead<?, ?> replicaPlan, int[] summaryNodes, long 
expiresAtNanos, Consumer<PartialTrackedRead> partialReadConsumer)
     {
         synchronized (this)
         {
@@ -528,6 +529,8 @@ public class TrackedLocalReadCoordinator
         try
         {
             read = command.beginTrackedRead(controller);
+            if (partialReadConsumer != null)
+                partialReadConsumer.accept(read);
             // Create another summary once initial data has been read fully. 
We do this to catch
             // any mutations that may have arrived during initial read 
execution.
             secondarySummary = command.createMutationSummary(true);
@@ -555,20 +558,17 @@ public class TrackedLocalReadCoordinator
     {
         try (PartialTrackedRead.CompletedRead completedRead = read.complete())
         {
-            TrackedDataResponse response = 
TrackedDataResponse.create(completedRead.iterator(), selection);
-            TrackedRead<?, ?> followUp = 
completedRead.followupRead(consistencyLevel, expiresAtNanos);
+            TrackedDataResponse response = completedRead.response();
+            Future<TrackedDataResponse> followUp = 
completedRead.followupRead(response, consistencyLevel, expiresAtNanos);
 
             if (followUp != null)
             {
-                ReadCommand command = read.command();
-                followUp.future().addCallback((iterator, error) -> {
+                followUp.addCallback((newResponse, error) -> {
                     if (error != null)
                     {
                         promise.tryFailure(error);
                         return;
                     }
-                    PartitionIterator previous = 
response.makeIterator(command);
-                    TrackedDataResponse newResponse = 
TrackedDataResponse.create(PartitionIterators.concat(List.of(previous, 
iterator)), selection);
                     promise.trySuccess(newResponse);
                 });
             }
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
index 5c2417e6b5..d356ce2efa 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedLocalReads.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -89,7 +90,7 @@ public class TrackedLocalReads implements Shutdownable
         getOrCreate(summary.readId()).receiveSummary(from, summary.summary());
     }
 
-    public TrackedLocalReadCoordinator beginRead(TrackedRead.Id readId, 
ClusterMetadata metadata, ReadCommand command, ConsistencyLevel 
consistencyLevel, int[] summaryNodes, long expiresAtNanos)
+    public TrackedLocalReadCoordinator beginRead(TrackedRead.Id readId, 
ClusterMetadata metadata, ReadCommand command, ConsistencyLevel 
consistencyLevel, int[] summaryNodes, long expiresAtNanos, 
Consumer<PartialTrackedRead> partialReadConsumer)
     {
         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
         ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
@@ -116,7 +117,7 @@ public class TrackedLocalReads implements Shutdownable
         // TODO: confirm all summaryNodes are present in the replica plan
 
         TrackedLocalReadCoordinator coordinator = getOrCreate(readId);
-        coordinator.startLocalRead(readId, command, replicaPlan, summaryNodes, 
expiresAtNanos);
+        coordinator.startLocalRead(readId, command, replicaPlan, summaryNodes, 
expiresAtNanos, partialReadConsumer);
         return coordinator;
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java 
b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
index 7356ae785a..a4c4b61230 100644
--- a/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
+++ b/src/java/org/apache/cassandra/service/reads/tracked/TrackedRead.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -132,7 +133,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
         }
     }
 
-    private final AsyncPromise<PartitionIterator> future = new 
AsyncPromise<>();
+    private final AsyncPromise<TrackedDataResponse> future = new 
AsyncPromise<>();
 
     private final Id readId = Id.nextId();
     private final ReadCommand command;
@@ -171,6 +172,11 @@ public abstract class TrackedRead<E extends Endpoints<E>, 
P extends ReplicaPlan.
 
     protected abstract Verb verb();
 
+    public boolean intersects(DecoratedKey key)
+    {
+        return command.dataRange().contains(key);
+    }
+
     public static class Partition extends TrackedRead<EndpointsForToken, 
ReplicaPlan.ForTokenRead>
     {
         private Partition(SinglePartitionReadCommand command, 
ReplicaPlan.AbstractForRead<EndpointsForToken, ReplicaPlan.ForTokenRead> 
replicaPlan, ConsistencyLevel consistencyLevel)
@@ -230,7 +236,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
         return hostids;
     }
 
-    public void start(long expiresAt)
+    private void start(long expiresAt, Consumer<PartialTrackedRead> 
partialReadConsumer)
     {
         // TODO: skip local coordination if this node knows its recovering 
from an outage
         // TODO: read speculation
@@ -253,8 +259,9 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
 
         if (dataNode == localReplica)
         {
+            logger.trace("Locally coordinating {}", readId);
             Stage.READ.submit(() -> {
-                TrackedLocalReadCoordinator coordinator = 
MutationTrackingService.instance.localReads().beginRead(readId, 
ClusterMetadata.current(), command, consistencyLevel, summaryHostIds, 
expiresAt);
+                TrackedLocalReadCoordinator coordinator = 
MutationTrackingService.instance.localReads().beginRead(readId, 
ClusterMetadata.current(), command, consistencyLevel, summaryHostIds, 
expiresAt, partialReadConsumer);
                 coordinator.addCallback((response, error) -> {
                     if (error != null)
                     {
@@ -269,6 +276,8 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
         }
         else
         {
+            logger.trace("Sending data request for {} to {}", readId, 
dataNode.endpoint());
+            Preconditions.checkArgument(partialReadConsumer == null, "Cannot 
supply read consumer for nonlocal reads");
             DataRequest dataRequest = new DataRequest(readId, command, 
consistencyLevel, summaryHostIds);
             Message<DataRequest> dataMessage = Message.outWithFlag(verb(), 
dataRequest, MessageFlag.CALL_BACK_ON_FAILURE);
             MessagingService.instance().sendWithCallback(dataMessage, 
dataNode.endpoint(), this);
@@ -286,15 +295,27 @@ public abstract class TrackedRead<E extends Endpoints<E>, 
P extends ReplicaPlan.
         {
             if (localReplica == replica)
             {
+                logger.trace("Locally processing summary request for {}", 
readId);
                 Stage.READ.submit(() -> 
summaryRequest.executeLocally(summaryMessage, ClusterMetadata.current()));
             }
             else
             {
+                logger.trace("Sending summary request for {} to {}", readId, 
replica.endpoint());
                 MessagingService.instance().send(summaryMessage, 
replica.endpoint());
             }
         }
     }
 
+    public void start(long expiresAt)
+    {
+        start(expiresAt, null);
+    }
+
+    public void startLocal(long expiresAt, Consumer<PartialTrackedRead> 
partialReadConsumer)
+    {
+        start(expiresAt, partialReadConsumer);
+    }
+
     public void start(Dispatcher.RequestTime requestTime)
     {
         start(requestTime.computeDeadline(verb().expiresAfterNanos()));
@@ -302,7 +323,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
 
     private void onResponse(TrackedDataResponse response)
     {
-        future.trySuccess(response.makeIterator(command));
+        future.trySuccess(response);
     }
 
     @Override
@@ -323,7 +344,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
         return true;
     }
 
-    public Future<PartitionIterator> future()
+    public Future<TrackedDataResponse> future()
     {
         return future;
     }
@@ -332,7 +353,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
     {
         try
         {
-            return future.get(command.getTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS);
+            return future.get(command.getTimeout(TimeUnit.MILLISECONDS), 
TimeUnit.MILLISECONDS).makeIterator(command);
         }
         catch (InterruptedException e)
         {
@@ -410,7 +431,7 @@ public abstract class TrackedRead<E extends Endpoints<E>, P 
extends ReplicaPlan.
         @Override
         public void executeLocally(Message<? extends Request> message, 
ClusterMetadata metadata)
         {
-            TrackedLocalReadCoordinator coordinator = 
MutationTrackingService.instance.localReads().beginRead(readId, metadata, 
command, consistencyLevel, summaryNodes, message.expiresAtNanos());
+            TrackedLocalReadCoordinator coordinator = 
MutationTrackingService.instance.localReads().beginRead(readId, metadata, 
command, consistencyLevel, summaryNodes, message.expiresAtNanos(), null);
             coordinator.addCallback((response, error) -> {
                 if (error != null)
                 {
diff --git a/test/conf/logback-dtest.xml b/test/conf/logback-dtest.xml
index 48d9859b67..71371d15c9 100644
--- a/test/conf/logback-dtest.xml
+++ b/test/conf/logback-dtest.xml
@@ -45,9 +45,9 @@
     <encoder>
       <pattern>%-5level [%thread] ${instance_id} %date{ISO8601} %F:%L - 
%msg%n</pattern>
     </encoder>
-    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
-      <level>DEBUG</level>
-    </filter>
+<!--    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">-->
+<!--      <level>DEBUG</level>-->
+<!--    </filter>-->
   </appender>
 
   <logger name="io.netty.*" level="WARN"/>
@@ -58,4 +58,6 @@
     <appender-ref ref="INSTANCESTDERR" />
     <appender-ref ref="INSTANCESTDOUT" />
   </root>
+  <logger name="org.apache.cassandra.replication" level="TRACE"/>
+  <logger name="org.apache.cassandra.service.reads.tracked" level="TRACE"/>
 </configuration>
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java
index b2af1f938e..3ad81cdc9e 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairRangeQueriesTest.java
@@ -18,9 +18,15 @@
 
 package org.apache.cassandra.distributed.test;
 
+import java.util.Arrays;
+import java.util.Comparator;
+
+import org.junit.Assume;
 import org.junit.Test;
 
-import org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.distributed.shared.AssertUtils.row;
 
@@ -156,7 +162,6 @@ public class ReadRepairRangeQueriesTest extends 
ReadRepairQueryTester
     @Test
     public void testRangeQueryWithFilterOnSelectedColumnOnSkinnyTable()
     {
-        MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW 
FILTERING (CASSANDRA-20555)");
         tester("WHERE a=2 ALLOW FILTERING")
         .createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)")
         .mutate("INSERT INTO %s (k, a, b) VALUES (1, 2, 3)",
@@ -183,7 +188,6 @@ public class ReadRepairRangeQueriesTest extends 
ReadRepairQueryTester
     @Test
     public void testRangeQueryWithFilterOnSelectedColumnOnWideTable()
     {
-        MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW 
FILTERING (CASSANDRA-20555)");
         tester("WHERE a=1 ALLOW FILTERING")
         .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY 
KEY(k, c))")
         .mutate("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)",
@@ -215,8 +219,6 @@ public class ReadRepairRangeQueriesTest extends 
ReadRepairQueryTester
     @Test
     public void testRangeQueryWithFilterOnUnselectedColumnOnSkinnyTable()
     {
-        MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "uses ALLOW 
FILTERING (CASSANDRA-20555)");
-
         tester("WHERE b=3 ALLOW FILTERING")
         .createTable("CREATE TABLE %s (k int PRIMARY KEY, a int, b int)")
         .mutate("INSERT INTO %s (k, a, b) VALUES (1, 2, 3)",
@@ -243,8 +245,6 @@ public class ReadRepairRangeQueriesTest extends 
ReadRepairQueryTester
     @Test
     public void testRangeQueryWithFilterOnUnselectedColumnOnWideTable()
     {
-        if (coordinator == 2)
-            MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "Depends 
on ALLOW FILTERING");
         tester("WHERE b=2 ALLOW FILTERING")
         .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY 
KEY(k, c))")
         .mutate("INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)",
@@ -269,4 +269,113 @@ public class ReadRepairRangeQueriesTest extends 
ReadRepairQueryTester
                   rows(row(2, 1, 1, 1)),
                   rows());
     }
+
+    /**
+     * Test range queries using filtering on an selected column on a table 
with clustering columns.
+     */
+    @Test
+    public void testRangeQueryWithFilterOnSelectedColumnConflictingUpdates()
+    {
+        Assume.assumeTrue(replicationType.isTracked());  // flaky for 
untracked replication
+        tester("WHERE a=1 ALLOW FILTERING")
+        .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY 
KEY(k, c))")
+        .mutate(1, "INSERT INTO %s (k, c, a, b) VALUES (1, 1, 1, 1)",
+                "INSERT INTO %s (k, c, a, b) VALUES (1, 2, 2, 2)",
+                "INSERT INTO %s (k, c, a, b) VALUES (2, 1, 2, 1)",
+                "INSERT INTO %s (k, c, a, b) VALUES (2, 2, 2, 2)")
+        .mutate(2, "INSERT INTO %s (k, c, a, b) VALUES (2, 1, 1, 1)")
+        .mutate(1)  // updates the last coordinator for mutation tracking
+        .queryColumns("k, c, a", 2, 2,
+                      rows(row(1, 1, 1), row(2, 1, 1)),
+                      rows(row(1, 1, 1, 1), row(2, 1, 1, 1)),
+                      rows(row(1, 1, 1, null), row(2, 1, 1, 1)))
+        .tearDown(2,
+                  rows(row(1, 1, 1, 1), row(1, 2, 2, 2), row(2, 1, 1, 1), 
row(2, 2, 2, 2)),
+                  rows(row(1, 1, 1, 1), row(2, 1, 1, 1)));
+    }
+
+    /**
+     * Helper class for creating rows in token sorted order
+     */
+    private static class TokenSortedKey
+    {
+        final int key;
+
+        public TokenSortedKey(int key)
+        {
+            this.key = key;
+        }
+
+        Token token()
+        {
+            return 
Murmur3Partitioner.instance.getToken(ByteBufferUtil.bytes(key));
+        }
+
+        public Object[] row(Object... values)
+        {
+            Object[] row = new Object[values.length + 1];
+            row[0] = key;
+            System.arraycopy(values, 0, row, 1, values.length);
+            return row;
+        }
+
+        public static TokenSortedKey[] create(int numKeys)
+        {
+            TokenSortedKey[] keys = new TokenSortedKey[numKeys];
+            for (int i = 0; i < numKeys; i++)
+                keys[i] = new TokenSortedKey(i);
+            Arrays.sort(keys, Comparator.comparing(TokenSortedKey::token));
+            return keys;
+        }
+
+    }
+
+    /**
+     * Tests the allow filtering case where the local coordinator filters out 
a partition that hashes to a token
+     * between 2 valid matches and another replica reports a mutation that 
causes it to be re-added to the result
+     */
+    @Test
+    public void testFilteredInterleavedShortRead()
+    {
+        Assume.assumeTrue(replicationType.isTracked());  // flaky for 
untracked replication
+        TokenSortedKey[] keys = TokenSortedKey.create(4);
+
+        tester("WHERE a=1 ALLOW FILTERING")
+        .createTable("CREATE TABLE %s (k int, c int, a int, PRIMARY KEY(k, 
c))")
+        .mutate(1, "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 1, 
1)",
+                "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 2, 2)",
+                "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1, 2)",
+                "INSERT INTO %s (k, c, a) VALUES (" + keys[2].key + ", 1, 1)")
+        .mutate(2, "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1, 
1)") // make keys[1] match the filter
+        .mutate(1)  // updates the last coordinator for mutation tracking
+        .queryColumns("k, c, a", 3, 0,
+                      rows(keys[0].row(1, 1), keys[1].row(1, 1), 
keys[2].row(1, 1)),
+                      rows(keys[0].row(1, 1), keys[1].row(1, 1), 
keys[2].row(1, 1)),
+                      rows(keys[0].row(1, 1), keys[1].row(1, 1), 
keys[2].row(1, 1)))
+        .tearDown(1,
+                  rows(keys[0].row(1, 1), keys[0].row(2, 2), keys[1].row(1, 
1), keys[2].row(1, 1)),
+                  rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 
1)));
+    }
+
+    @Test
+    public void testNonFilteredInterleavedShortRead()
+    {
+        Assume.assumeTrue(replicationType.isTracked());  // flaky for 
untracked replication
+        TokenSortedKey[] keys = TokenSortedKey.create(4);
+
+        tester("WHERE a=1 ALLOW FILTERING")
+        .createTable("CREATE TABLE %s (k int, c int, a int, PRIMARY KEY(k, 
c))")
+        .mutate(1, "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 1, 
1)",
+                "INSERT INTO %s (k, c, a) VALUES (" + keys[0].key + ", 2, 2)",
+                "INSERT INTO %s (k, c, a) VALUES (" + keys[2].key + ", 1, 1)")
+        .mutate(2, "INSERT INTO %s (k, c, a) VALUES (" + keys[1].key + ", 1, 
1)") // make keys[1] match the filter
+        .mutate(1)  // updates the last coordinator for mutation tracking
+        .queryColumns("k, c, a", 3, 0,
+                      rows(keys[0].row(1, 1), keys[1].row(1, 1), 
keys[2].row(1, 1)),
+                      rows(keys[0].row(1, 1), keys[1].row(1, 1), 
keys[2].row(1, 1)),
+                      rows(keys[0].row(1, 1), keys[1].row(1, 1), 
keys[2].row(1, 1)))
+        .tearDown(1,
+                  rows(keys[0].row(1, 1), keys[0].row(2, 2), keys[1].row(1, 
1), keys[2].row(1, 1)),
+                  rows(keys[0].row(1, 1), keys[1].row(1, 1), keys[2].row(1, 
1)));
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java
index 58e9ecca11..62f3702d50 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairSliceQueriesTest.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.distributed.test;
 
 import org.junit.Test;
 
-import org.apache.cassandra.distributed.test.tracking.MutationTrackingUtils;
-
 import static org.apache.cassandra.distributed.shared.AssertUtils.row;
 
 /**
@@ -64,7 +62,6 @@ public class ReadRepairSliceQueriesTest extends 
ReadRepairQueryTester
     @Test
     public void testSliceQueryWithFilter()
     {
-        MutationTrackingUtils.fixmeSkipIfTracked(replicationType, "Depends on 
ALLOW FILTERING");
         tester("WHERE k=0 AND a>10 AND a<40 ALLOW FILTERING")
         .createTable("CREATE TABLE %s (k int, c int, a int, b int, PRIMARY 
KEY(k, c))")
         .mutate("INSERT INTO %s (k, c, a, b) VALUES (0, 1, 10, 100)",
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java
index 7e17c9c557..7d859ce110 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingPendingReadTest.java
@@ -176,7 +176,7 @@ public class MutationTrackingPendingReadTest
 
                 // check that the returned data contains the unapplied mutation
                 try (PartialTrackedRead.CompletedRead completedRead = 
read.complete();
-                     PartitionIterator partitions = completedRead.iterator())
+                     PartitionIterator partitions = 
completedRead.response().makeIterator(command))
                 {
                     Assert.assertTrue(partitions.hasNext());
                     try (RowIterator rowIterator = partitions.next())
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java
index 306d6e5092..37f6c6a46f 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingReadReconciliationTest.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.distributed.test.tracking;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.replication.CoordinatorLogId;
 import org.apache.cassandra.replication.MutationSummary;
 import org.junit.Assert;
@@ -77,6 +79,31 @@ public class MutationTrackingReadReconciliationTest extends 
TestBaseImpl
         awaitNodeLiveness(from, 
InetAddressAndPort.getByAddress(node.broadcastAddress()), true);
     }
 
+    // TODO (expected): remove after speculation is implemented
+    private static Object[][] queryWithRetries(ICoordinator coordinator, 
String query) throws InterruptedException
+    {
+        int attempt = 0;
+        for (;;)
+        {
+            attempt++;
+            try
+            {
+                return coordinator.execute(query, ConsistencyLevel.QUORUM);
+            }
+            catch (Throwable t)
+            {
+                if (attempt < 10 && 
t.getClass().getSimpleName().equals(ReadTimeoutException.class.getSimpleName()))
+                {
+                    Thread.sleep(2000);
+                }
+                else
+                {
+                    throw t;
+                }
+            }
+        }
+    }
+
     /**
      * Test a read reconciliation where the coordinator doesn't have a read 
response it needs to apply
      * additional mutations to
@@ -146,9 +173,8 @@ public class MutationTrackingReadReconciliationTest extends 
TestBaseImpl
             awaitNodeAlive(cluster.get(1), cluster.get(3));
             awaitNodeDead(cluster.get(1), cluster.get(2));
 
-
             Assert.assertEquals(0, numLogReconciliations(cluster.get(1)));
-            Object[][] result = cluster.coordinator(1).execute(format("SELECT 
* FROM %s.%s WHERE k=1", keyspaceName, tableName), ConsistencyLevel.QUORUM);
+            Object[][] result = queryWithRetries(cluster.coordinator(1), 
format("SELECT * FROM %s.%s WHERE k=1", keyspaceName, tableName));
             Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1)), result);
 
             // check that node3 has the new ids
@@ -225,7 +251,7 @@ public class MutationTrackingReadReconciliationTest extends 
TestBaseImpl
 
 
             Assert.assertEquals(0, numLogReconciliations(cluster.get(1)));
-            Object[][] result = cluster.coordinator(3).execute(format("SELECT 
* FROM %s.%s WHERE k=1", keyspaceName, tableName), ConsistencyLevel.QUORUM);
+            Object[][] result = queryWithRetries(cluster.coordinator(3), 
format("SELECT * FROM %s.%s WHERE k=1", keyspaceName, tableName));
             Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1)), result);
 
             // check that node3 has the new ids
@@ -302,7 +328,7 @@ public class MutationTrackingReadReconciliationTest extends 
TestBaseImpl
 
             // No reconciliation has happened yet
             Assert.assertEquals(0, numLogReconciliations(cluster.get(1)));
-            Object[][] result = cluster.coordinator(1).execute(format("SELECT 
* FROM %s.%s", keyspaceName, tableName), ConsistencyLevel.QUORUM);
+            Object[][] result = queryWithRetries(cluster.coordinator(1), 
format("SELECT * FROM %s.%s", keyspaceName, tableName));
             Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1), row(2, 2, 2)), 
result);
 
             // Coordinator sends its missing mutations to 3 on read
@@ -377,7 +403,7 @@ public class MutationTrackingReadReconciliationTest extends 
TestBaseImpl
 
 
             Assert.assertEquals(0, numLogReconciliations(cluster.get(3)));
-            Object[][] result = cluster.coordinator(3).execute(format("SELECT 
* FROM %s.%s", keyspaceName, tableName), ConsistencyLevel.QUORUM);
+            Object[][] result = queryWithRetries(cluster.coordinator(3), 
format("SELECT * FROM %s.%s", keyspaceName, tableName));
             Assert.assertEquals(row(row(1, 0, 0), row(1, 1, 1), row(2, 2, 2)), 
result);
 
             // Coordinator sends its missing mutations to 3 on read


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to