This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 292650d968c30757a027905d12c7370c6342dbe3
Merge: 3e393f2 2ef1f1c
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Jul 30 14:10:48 2020 +0100

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 build.xml                                          |   3 +-
 conf/cassandra.yaml                                |  20 +
 doc/source/operating/metrics.rst                   | 144 +++----
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  20 +
 .../config/ReplicaFilteringProtectionOptions.java  |  28 ++
 .../db/partitions/PartitionIterators.java          |  47 +++
 .../org/apache/cassandra/metrics/TableMetrics.java |  21 +-
 .../apache/cassandra/service/StorageService.java   |  28 +-
 .../cassandra/service/StorageServiceMBean.java     |  12 +
 .../cassandra/service/reads/DataResolver.java      |  57 ++-
 .../service/reads/ReplicaFilteringProtection.java  | 444 ++++++++++++---------
 .../reads/ShortReadPartitionsProtection.java       |  10 +-
 .../service/reads/ShortReadProtection.java         |  17 +-
 .../service/reads/repair/NoopReadRepair.java       |   3 +-
 .../org/apache/cassandra/transport/Message.java    |   2 +-
 .../cassandra/utils/concurrent/Accumulator.java    |   9 +-
 .../cassandra/distributed/impl/Coordinator.java    |  10 +
 .../apache/cassandra/distributed/impl/RowUtil.java |   7 +-
 .../test/ReplicaFilteringProtectionTest.java       | 244 +++++++++++
 .../utils/concurrent/AccumulatorTest.java          |  34 +-
 22 files changed, 839 insertions(+), 324 deletions(-)

diff --cc CHANGES.txt
index 4a9ae14,9dbbd1c..e74f330
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,11 +1,14 @@@
 -3.11.8
 +4.0-beta2
 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
 + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973)
 + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766)
 + * Verify sstable components on startup (CASSANDRA-15945)
 +Merged from 3.11:
 + * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
   * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
  Merged from 3.0:
+  * Operational improvements and hardening for replica filtering protection 
(CASSANDRA-15907)
 - * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
 - * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
   * Fix empty/null json string representation (CASSANDRA-15896)
 - * 3.x fails to start if commit log has range tombstones from a column which 
is also deleted (CASSANDRA-15970)
  Merged from 2.2:
   * Fix CQL parsing of collections when the column type is reversed 
(CASSANDRA-15814)
  
diff --cc build.xml
index f0948ae,50d278e..d861920
--- a/build.xml
+++ b/build.xml
@@@ -556,15 -412,19 +556,14 @@@
            <dependency groupId="com.fasterxml.jackson.core" 
artifactId="jackson-annotations" version="2.9.10"/>
            <dependency groupId="com.googlecode.json-simple" 
artifactId="json-simple" version="1.1"/>
            <dependency groupId="com.boundary" artifactId="high-scale-lib" 
version="1.0.6"/>
 -          <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="0.3.0"/>
 +          <dependency groupId="com.github.jbellis" artifactId="jamm" 
version="${jamm.version}"/>
  
 -          <dependency groupId="com.thinkaurelius.thrift" 
artifactId="thrift-server" version="0.3.7">
 -            <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/>
 -            <exclusion groupId="junit" artifactId="junit"/>
 -          </dependency>
            <dependency groupId="org.yaml" artifactId="snakeyaml" 
version="1.11"/>
 -          <dependency groupId="org.apache.thrift" artifactId="libthrift" 
version="0.9.2">
 -               <exclusion groupId="commons-logging" 
artifactId="commons-logging"/>
 -          </dependency>
 -          <dependency groupId="junit" artifactId="junit" version="4.6" />
 +          <dependency groupId="junit" artifactId="junit" version="4.12" />
            <dependency groupId="org.mockito" artifactId="mockito-core" 
version="3.2.4" />
 +          <dependency groupId="org.quicktheories" artifactId="quicktheories" 
version="0.25" />
 +          <dependency groupId="com.google.code.java-allocation-instrumenter" 
artifactId="java-allocation-instrumenter" 
version="${allocation-instrumenter.version}" />
-           <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.3" />
- 
+           <dependency groupId="org.apache.cassandra" artifactId="dtest-api" 
version="0.0.4" />
            <dependency groupId="org.apache.rat" artifactId="apache-rat" 
version="0.10">
               <exclusion groupId="commons-lang" artifactId="commons-lang"/>
            </dependency>
diff --cc doc/source/operating/metrics.rst
index fc37440,4bd0c08..0053cbc
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@@ -79,76 -75,65 +79,80 @@@ Reported name format
      **all** tables and keyspaces on the node.
  
  
- ======================================= ============== ===========
- Name                                    Type           Description
- ======================================= ============== ===========
- MemtableOnHeapSize                      Gauge<Long>    Total amount of data 
stored in the memtable that resides **on**-heap, including column related 
overhead and partitions overwritten.
- MemtableOffHeapSize                     Gauge<Long>    Total amount of data 
stored in the memtable that resides **off**-heap, including column related 
overhead and partitions overwritten.
- MemtableLiveDataSize                    Gauge<Long>    Total amount of live 
data stored in the memtable, excluding any data structure overhead.
- AllMemtablesOnHeapSize                  Gauge<Long>    Total amount of data 
stored in the memtables (2i and pending flush memtables included) that resides 
**on**-heap.
- AllMemtablesOffHeapSize                 Gauge<Long>    Total amount of data 
stored in the memtables (2i and pending flush memtables included) that resides 
**off**-heap.
- AllMemtablesLiveDataSize                Gauge<Long>    Total amount of live 
data stored in the memtables (2i and pending flush memtables included) that 
resides off-heap, excluding any data structure overhead.
- MemtableColumnsCount                    Gauge<Long>    Total number of 
columns present in the memtable.
- MemtableSwitchCount                     Counter        Number of times flush 
has resulted in the memtable being switched out.
- CompressionRatio                        Gauge<Double>  Current compression 
ratio for all SSTables.
- EstimatedPartitionSizeHistogram         Gauge<long[]>  Histogram of estimated 
partition size (in bytes).
- EstimatedPartitionCount                 Gauge<Long>    Approximate number of 
keys in table.
- EstimatedColumnCountHistogram           Gauge<long[]>  Histogram of estimated 
number of columns.
- SSTablesPerReadHistogram                Histogram      Histogram of the 
number of sstable data files accessed per single partition read. SSTables 
skipped due to Bloom Filters, min-max key or partition index lookup are not 
taken into acoount.
- ReadLatency                             Latency        Local read latency for 
this table.
- RangeLatency                            Latency        Local range scan 
latency for this table.
- WriteLatency                            Latency        Local write latency 
for this table.
- CoordinatorReadLatency                  Timer          Coordinator read 
latency for this table.
- CoordinatorWriteLatency                 Timer          Coordinator write 
latency for this table.
- CoordinatorScanLatency                  Timer          Coordinator range scan 
latency for this table.
- PendingFlushes                          Counter        Estimated number of 
flush tasks pending for this table.
- BytesFlushed                            Counter        Total number of bytes 
flushed since server [re]start.
- CompactionBytesWritten                  Counter        Total number of bytes 
written by compaction since server [re]start.
- PendingCompactions                      Gauge<Integer> Estimate of number of 
pending compactions for this table.
- LiveSSTableCount                        Gauge<Integer> Number of SSTables on 
disk for this table.
- LiveDiskSpaceUsed                       Counter        Disk space used by 
SSTables belonging to this table (in bytes).
- TotalDiskSpaceUsed                      Counter        Total disk space used 
by SSTables belonging to this table, including obsolete ones waiting to be GC'd.
- MinPartitionSize                        Gauge<Long>    Size of the smallest 
compacted partition (in bytes).
- MaxPartitionSize                        Gauge<Long>    Size of the largest 
compacted partition (in bytes).
- MeanPartitionSize                       Gauge<Long>    Size of the average 
compacted partition (in bytes).
- BloomFilterFalsePositives               Gauge<Long>    Number of false 
positives on table's bloom filter.
- BloomFilterFalseRatio                   Gauge<Double>  False positive ratio 
of table's bloom filter.
- BloomFilterDiskSpaceUsed                Gauge<Long>    Disk space used by 
bloom filter (in bytes).
- BloomFilterOffHeapMemoryUsed            Gauge<Long>    Off-heap memory used 
by bloom filter.
- IndexSummaryOffHeapMemoryUsed           Gauge<Long>    Off-heap memory used 
by index summary.
- CompressionMetadataOffHeapMemoryUsed    Gauge<Long>    Off-heap memory used 
by compression meta data.
- KeyCacheHitRate                         Gauge<Double>  Key cache hit rate for 
this table.
- TombstoneScannedHistogram               Histogram      Histogram of 
tombstones scanned in queries on this table.
- LiveScannedHistogram                    Histogram      Histogram of live 
cells scanned in queries on this table.
- ColUpdateTimeDeltaHistogram             Histogram      Histogram of column 
update time delta on this table.
- ViewLockAcquireTime                     Timer          Time taken acquiring a 
partition lock for materialized view updates on this table.
- ViewReadTime                            Timer          Time taken during the 
local read of a materialized view update.
- TrueSnapshotsSize                       Gauge<Long>    Disk space used by 
snapshots of this table including all SSTable components.
- RowCacheHitOutOfRange                   Counter        Number of table row 
cache hits that do not satisfy the query filter, thus went to disk.
- RowCacheHit                             Counter        Number of table row 
cache hits.
- RowCacheMiss                            Counter        Number of table row 
cache misses.
- CasPrepare                              Latency        Latency of paxos 
prepare round.
- CasPropose                              Latency        Latency of paxos 
propose round.
- CasCommit                               Latency        Latency of paxos 
commit round.
- PercentRepaired                         Gauge<Double>  Percent of table data 
that is repaired on disk.
- BytesRepaired                           Gauge<Long>    Size of table data 
repaired on disk
- BytesUnrepaired                         Gauge<Long>    Size of table data 
unrepaired on disk
- BytesPendingRepair                      Gauge<Long>    Size of table data 
isolated for an ongoing incremental repair
- SpeculativeRetries                      Counter        Number of times 
speculative retries were sent for this table.
- SpeculativeFailedRetries                Counter        Number of speculative 
retries that failed to prevent a timeout
- SpeculativeInsufficientReplicas         Counter        Number of speculative 
retries that couldn't be attempted due to lack of replicas
- SpeculativeSampleLatencyNanos           Gauge<Long>    Number of nanoseconds 
to wait before speculation is attempted. Value may be statically configured or 
updated periodically based on coordinator latency.
- WaitingOnFreeMemtableSpace              Histogram      Histogram of time 
spent waiting for free memtable space, either on- or off-heap.
- DroppedMutations                        Counter        Number of dropped 
mutations on this table.
- AnticompactionTime                      Timer          Time spent 
anticompacting before a consistent repair.
- ValidationTime                          Timer          Time spent doing 
validation compaction during repair.
- SyncTime                                Timer          Time spent doing 
streaming during repair.
- BytesValidated                          Histogram      Histogram over the 
amount of bytes read during validation.
- PartitionsValidated                     Histogram      Histogram over the 
number of partitions read during validation.
- BytesAnticompacted                      Counter        How many bytes we 
anticompacted.
- BytesMutatedAnticompaction              Counter        How many bytes we 
avoided anticompacting because the sstable was fully contained in the repaired 
range.
- MutatedAnticompactionGauge              Gauge<Double>  Ratio of bytes mutated 
vs total bytes repaired.
- ======================================= ============== ===========
+ =============================================== ============== ===========
+ Name                                            Type           Description
+ =============================================== ============== ===========
+ MemtableOnHeapSize                              Gauge<Long>    Total amount 
of data stored in the memtable that resides **on**-heap, including column 
related overhead and partitions overwritten.
+ MemtableOffHeapSize                             Gauge<Long>    Total amount 
of data stored in the memtable that resides **off**-heap, including column 
related overhead and partitions overwritten.
+ MemtableLiveDataSize                            Gauge<Long>    Total amount 
of live data stored in the memtable, excluding any data structure overhead.
+ AllMemtablesOnHeapSize                          Gauge<Long>    Total amount 
of data stored in the memtables (2i and pending flush memtables included) that 
resides **on**-heap.
+ AllMemtablesOffHeapSize                         Gauge<Long>    Total amount 
of data stored in the memtables (2i and pending flush memtables included) that 
resides **off**-heap.
+ AllMemtablesLiveDataSize                        Gauge<Long>    Total amount 
of live data stored in the memtables (2i and pending flush memtables included) 
that resides off-heap, excluding any data structure overhead.
+ MemtableColumnsCount                            Gauge<Long>    Total number 
of columns present in the memtable.
+ MemtableSwitchCount                             Counter        Number of 
times flush has resulted in the memtable being switched out.
+ CompressionRatio                                Gauge<Double>  Current 
compression ratio for all SSTables.
+ EstimatedPartitionSizeHistogram                 Gauge<long[]>  Histogram of 
estimated partition size (in bytes).
+ EstimatedPartitionCount                         Gauge<Long>    Approximate 
number of keys in table.
+ EstimatedColumnCountHistogram                   Gauge<long[]>  Histogram of 
estimated number of columns.
+ SSTablesPerReadHistogram                        Histogram      Histogram of 
the number of sstable data files accessed per single partition read. SSTables 
skipped due to Bloom Filters, min-max key or partition index lookup are not 
taken into acoount.
+ ReadLatency                                     Latency        Local read 
latency for this table.
+ RangeLatency                                    Latency        Local range 
scan latency for this table.
+ WriteLatency                                    Latency        Local write 
latency for this table.
+ CoordinatorReadLatency                          Timer          Coordinator 
read latency for this table.
++CoordinatorWriteLatency                         Timer          Coordinator 
write latency for this table.
+ CoordinatorScanLatency                          Timer          Coordinator 
range scan latency for this table.
+ PendingFlushes                                  Counter        Estimated 
number of flush tasks pending for this table.
+ BytesFlushed                                    Counter        Total number 
of bytes flushed since server [re]start.
+ CompactionBytesWritten                          Counter        Total number 
of bytes written by compaction since server [re]start.
+ PendingCompactions                              Gauge<Integer> Estimate of 
number of pending compactions for this table.
+ LiveSSTableCount                                Gauge<Integer> Number of 
SSTables on disk for this table.
+ LiveDiskSpaceUsed                               Counter        Disk space 
used by SSTables belonging to this table (in bytes).
+ TotalDiskSpaceUsed                              Counter        Total disk 
space used by SSTables belonging to this table, including obsolete ones waiting 
to be GC'd.
+ MinPartitionSize                                Gauge<Long>    Size of the 
smallest compacted partition (in bytes).
+ MaxPartitionSize                                Gauge<Long>    Size of the 
largest compacted partition (in bytes).
+ MeanPartitionSize                               Gauge<Long>    Size of the 
average compacted partition (in bytes).
+ BloomFilterFalsePositives                       Gauge<Long>    Number of 
false positives on table's bloom filter.
+ BloomFilterFalseRatio                           Gauge<Double>  False positive 
ratio of table's bloom filter.
+ BloomFilterDiskSpaceUsed                        Gauge<Long>    Disk space 
used by bloom filter (in bytes).
+ BloomFilterOffHeapMemoryUsed                    Gauge<Long>    Off-heap 
memory used by bloom filter.
+ IndexSummaryOffHeapMemoryUsed                   Gauge<Long>    Off-heap 
memory used by index summary.
+ CompressionMetadataOffHeapMemoryUsed            Gauge<Long>    Off-heap 
memory used by compression meta data.
+ KeyCacheHitRate                                 Gauge<Double>  Key cache hit 
rate for this table.
+ TombstoneScannedHistogram                       Histogram      Histogram of 
tombstones scanned in queries on this table.
+ LiveScannedHistogram                            Histogram      Histogram of 
live cells scanned in queries on this table.
+ ColUpdateTimeDeltaHistogram                     Histogram      Histogram of 
column update time delta on this table.
+ ViewLockAcquireTime                             Timer          Time taken 
acquiring a partition lock for materialized view updates on this table.
+ ViewReadTime                                    Timer          Time taken 
during the local read of a materialized view update.
+ TrueSnapshotsSize                               Gauge<Long>    Disk space 
used by snapshots of this table including all SSTable components.
+ RowCacheHitOutOfRange                           Counter        Number of 
table row cache hits that do not satisfy the query filter, thus went to disk.
+ RowCacheHit                                     Counter        Number of 
table row cache hits.
+ RowCacheMiss                                    Counter        Number of 
table row cache misses.
+ CasPrepare                                      Latency        Latency of 
paxos prepare round.
+ CasPropose                                      Latency        Latency of 
paxos propose round.
+ CasCommit                                       Latency        Latency of 
paxos commit round.
+ PercentRepaired                                 Gauge<Double>  Percent of 
table data that is repaired on disk.
++BytesRepaired                                   Gauge<Long>    Size of table 
data repaired on disk
++BytesUnrepaired                                 Gauge<Long>    Size of table 
data unrepaired on disk
++BytesPendingRepair                              Gauge<Long>    Size of table 
data isolated for an ongoing incremental repair
+ SpeculativeRetries                              Counter        Number of 
times speculative retries were sent for this table.
++SpeculativeFailedRetries                        Counter        Number of 
speculative retries that failed to prevent a timeout
++SpeculativeInsufficientReplicas                 Counter        Number of 
speculative retries that couldn't be attempted due to lack of replicas
++SpeculativeSampleLatencyNanos                   Gauge<Long>    Number of 
nanoseconds to wait before speculation is attempted. Value may be statically 
configured or updated periodically based on coordinator latency.
+ WaitingOnFreeMemtableSpace                      Histogram      Histogram of 
time spent waiting for free memtable space, either on- or off-heap.
+ DroppedMutations                                Counter        Number of 
dropped mutations on this table.
++AnticompactionTime                              Timer          Time spent 
anticompacting before a consistent repair.
++ValidationTime                                  Timer          Time spent 
doing validation compaction during repair.
++SyncTime                                        Timer          Time spent 
doing streaming during repair.
++BytesValidated                                  Histogram      Histogram over 
the amount of bytes read during validation.
++PartitionsValidated                             Histogram      Histogram over 
the number of partitions read during validation.
++BytesAnticompacted                              Counter        How many bytes 
we anticompacted.
++BytesMutatedAnticompaction                      Counter        How many bytes 
we avoided anticompacting because the sstable was fully contained in the 
repaired range.
++MutatedAnticompactionGauge                      Gauge<Double>  Ratio of bytes 
mutated vs total bytes repaired.
+ ReadRepairRequests                              Meter          Throughput for 
mutations generated by read-repair.
+ ShortReadProtectionRequests                     Meter          Throughput for 
requests to get extra rows during short read protection.
+ ReplicaFilteringProtectionRequests              Meter          Throughput for 
row completion requests during replica filtering protection.
+ ReplicaFilteringProtectionRowsCachedPerQuery    Histogram      Histogram of 
the number of rows cached per query when replica filtering protection is 
engaged.
+ ============================================    ============== ===========
  
  Keyspace Metrics
  ^^^^^^^^^^^^^^^^
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index bfb261d,5521bf7..24550b0
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -299,38 -201,20 +299,47 @@@ public class TableMetric
          }
      });
  
 +    public static final Gauge<Long> globalBytesRepaired = 
Metrics.register(globalFactory.createMetricName("BytesRepaired"),
 +                                                                           
new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(SSTableReader::isRepaired).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesUnrepaired = 
Metrics.register(globalFactory.createMetricName("BytesUnrepaired"),
 +                                                                             
new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return totalNonSystemTablesSize(s -> !s.isRepaired() && 
!s.isPendingRepair()).left;
 +        }
 +    });
 +
 +    public static final Gauge<Long> globalBytesPendingRepair = 
Metrics.register(globalFactory.createMetricName("BytesPendingRepair"),
 +                                                                              
  new Gauge<Long>()
 +    {
 +        public Long getValue()
 +        {
 +            return 
totalNonSystemTablesSize(SSTableReader::isPendingRepair).left;
 +        }
 +    });
 +
      public final Meter readRepairRequests;
      public final Meter shortReadProtectionRequests;
-     public final Meter replicaSideFilteringProtectionRequests;
+     
+     public final Meter replicaFilteringProtectionRequests;
+     
+     /**
+      * This histogram records the maximum number of rows {@link 
org.apache.cassandra.service.ReplicaFilteringProtection}
+      * caches at a point in time per query. With no replica divergence, this 
is equivalent to the maximum number of
+      * cached rows in a single partition during a query. It can be helpful 
when choosing appropriate values for the
+      * replica_filtering_protection thresholds in cassandra.yaml. 
+      */
+     public final Histogram rfpRowsCachedPerQuery;
  
 -    public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
 +    public final EnumMap<SamplerType, Sampler<?>> samplers;
      /**
       * stores metrics that will be rolled into a single global metric
       */
@@@ -942,23 -710,8 +951,24 @@@
  
          readRepairRequests = createTableMeter("ReadRepairRequests");
          shortReadProtectionRequests = 
createTableMeter("ShortReadProtectionRequests");
-         replicaSideFilteringProtectionRequests = 
createTableMeter("ReplicaSideFilteringProtectionRequests");
+         replicaFilteringProtectionRequests = 
createTableMeter("ReplicaFilteringProtectionRequests");
+         rfpRowsCachedPerQuery = 
createHistogram("ReplicaFilteringProtectionRowsCachedPerQuery", true);
 +
 +        confirmedRepairedInconsistencies = 
createTableMeter("RepairedDataInconsistenciesConfirmed", 
cfs.keyspace.metric.confirmedRepairedInconsistencies);
 +        unconfirmedRepairedInconsistencies = 
createTableMeter("RepairedDataInconsistenciesUnconfirmed", 
cfs.keyspace.metric.unconfirmedRepairedInconsistencies);
 +
 +        repairedDataTrackingOverreadRows = 
createTableHistogram("RepairedDataTrackingOverreadRows", 
cfs.keyspace.metric.repairedDataTrackingOverreadRows, false);
 +        repairedDataTrackingOverreadTime = 
createTableTimer("RepairedDataTrackingOverreadTime", 
cfs.keyspace.metric.repairedDataTrackingOverreadTime);
 +
 +        unleveledSSTables = createTableGauge("UnleveledSSTables", 
cfs::getUnleveledSSTables, () -> {
 +            // global gauge
 +            int cnt = 0;
 +            for (Metric cfGauge : allTableMetrics.get("UnleveledSSTables"))
 +            {
 +                cnt += ((Gauge<? extends Number>) 
cfGauge).getValue().intValue();
 +            }
 +            return cnt;
 +        });
      }
  
      public void updateSSTableIterated(int count)
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index a1b3b82,240a15e..0d10418
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -5397,19 -5265,31 +5398,42 @@@ public class StorageService extends Not
      public void setTombstoneFailureThreshold(int threshold)
      {
          DatabaseDescriptor.setTombstoneFailureThreshold(threshold);
+         logger.info("updated tombstone_failure_threshold to {}", threshold);
+     }
+ 
+     public int getCachedReplicaRowsWarnThreshold()
+     {
+         return DatabaseDescriptor.getCachedReplicaRowsWarnThreshold();
+     }
+ 
+     public void setCachedReplicaRowsWarnThreshold(int threshold)
+     {
+         DatabaseDescriptor.setCachedReplicaRowsWarnThreshold(threshold);
+         logger.info("updated 
replica_filtering_protection.cached_rows_warn_threshold to {}", threshold);
+     }
+ 
+     public int getCachedReplicaRowsFailThreshold()
+     {
+         return DatabaseDescriptor.getCachedReplicaRowsFailThreshold();
+     }
+ 
+     public void setCachedReplicaRowsFailThreshold(int threshold)
+     {
+         DatabaseDescriptor.setCachedReplicaRowsFailThreshold(threshold);
+         logger.info("updated 
replica_filtering_protection.cached_rows_fail_threshold to {}", threshold);
      }
  
 +    public int getColumnIndexCacheSize()
 +    {
 +        return DatabaseDescriptor.getColumnIndexCacheSizeInKB();
 +    }
 +
 +    public void setColumnIndexCacheSize(int cacheSizeInKB)
 +    {
 +        DatabaseDescriptor.setColumnIndexCacheSize(cacheSizeInKB);
 +        logger.info("Updated column_index_cache_size_in_kb to {}", 
cacheSizeInKB);
 +    }
 +
      public int getBatchSizeFailureThreshold()
      {
          return DatabaseDescriptor.getBatchSizeFailThresholdInKB();
@@@ -5418,147 -5298,15 +5442,147 @@@
      public void setBatchSizeFailureThreshold(int threshold)
      {
          DatabaseDescriptor.setBatchSizeFailThresholdInKB(threshold);
-         logger.info("Updated batch_size_fail_threshold_in_kb to {}", 
threshold);
+         logger.info("updated batch_size_fail_threshold_in_kb to {}", 
threshold);
      }
  
 +    public int getBatchSizeWarnThreshold()
 +    {
 +        return DatabaseDescriptor.getBatchSizeWarnThresholdInKB();
 +    }
 +
 +    public void setBatchSizeWarnThreshold(int threshold)
 +    {
 +        DatabaseDescriptor.setBatchSizeWarnThresholdInKB(threshold);
 +        logger.info("Updated batch_size_warn_threshold_in_kb to {}", 
threshold);
 +    }
 +
 +    public int getInitialRangeTombstoneListAllocationSize()
 +    {
 +        return 
DatabaseDescriptor.getInitialRangeTombstoneListAllocationSize();
 +    }
 +
 +    public void setInitialRangeTombstoneListAllocationSize(int size)
 +    {
 +        if (size < 0 || size > 1024)
 +        {
 +            throw new IllegalStateException("Not updating 
initial_range_tombstone_allocation_size as it must be in the range [0, 1024] 
inclusive");
 +        }
 +        int originalSize = 
DatabaseDescriptor.getInitialRangeTombstoneListAllocationSize();
 +        DatabaseDescriptor.setInitialRangeTombstoneListAllocationSize(size);
 +        logger.info("Updated initial_range_tombstone_allocation_size from {} 
to {}", originalSize, size);
 +    }
 +
 +    public double getRangeTombstoneResizeListGrowthFactor()
 +    {
 +        return DatabaseDescriptor.getRangeTombstoneListGrowthFactor();
 +    }
 +
 +    public void setRangeTombstoneListResizeGrowthFactor(double growthFactor) 
throws IllegalStateException
 +    {
 +        if (growthFactor < 1.2 || growthFactor > 5)
 +        {
 +            throw new IllegalStateException("Not updating 
range_tombstone_resize_factor as growth factor must be in the range [1.2, 5.0] 
inclusive");
 +        }
 +        else
 +        {
 +            double originalGrowthFactor = 
DatabaseDescriptor.getRangeTombstoneListGrowthFactor();
 +            
DatabaseDescriptor.setRangeTombstoneListGrowthFactor(growthFactor);
 +            logger.info("Updated range_tombstone_resize_factor from {} to 
{}", originalGrowthFactor, growthFactor);
 +        }
 +    }
 +
      public void setHintedHandoffThrottleInKB(int throttleInKB)
      {
          DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
-         logger.info("Updated hinted_handoff_throttle_in_kb to {}", 
throttleInKB);
+         logger.info("updated hinted_handoff_throttle_in_kb to {}", 
throttleInKB);
      }
  
 +    @Override
 +    public void clearConnectionHistory()
 +    {
 +        daemon.clearConnectionHistory();
 +        logger.info("Cleared connection history");
 +    }
 +    public void disableAuditLog()
 +    {
 +        AuditLogManager.instance.disableAuditLog();
 +        logger.info("Auditlog is disabled");
 +    }
 +
 +    public void enableAuditLog(String loggerName, String includedKeyspaces, 
String excludedKeyspaces, String includedCategories, String excludedCategories,
 +                               String includedUsers, String excludedUsers) 
throws ConfigurationException, IllegalStateException
 +    {
 +        enableAuditLog(loggerName, Collections.emptyMap(), includedKeyspaces, 
excludedKeyspaces, includedCategories, excludedCategories, includedUsers, 
excludedUsers);
 +    }
 +
 +    public void enableAuditLog(String loggerName, Map<String, String> 
parameters, String includedKeyspaces, String excludedKeyspaces, String 
includedCategories, String excludedCategories,
 +                               String includedUsers, String excludedUsers) 
throws ConfigurationException, IllegalStateException
 +    {
 +        loggerName = loggerName != null ? loggerName : 
DatabaseDescriptor.getAuditLoggingOptions().logger.class_name;
 +
 +        Preconditions.checkNotNull(loggerName, "cassandra.yaml did not have 
logger in audit_logging_option and not set as parameter");
 +        
Preconditions.checkState(FBUtilities.isAuditLoggerClassExists(loggerName), 
"Unable to find AuditLogger class: "+loggerName);
 +
 +        AuditLogOptions auditLogOptions = new AuditLogOptions();
 +        auditLogOptions.enabled = true;
 +        auditLogOptions.logger = new ParameterizedClass(loggerName, 
parameters);
 +        auditLogOptions.included_keyspaces = includedKeyspaces != null ? 
includedKeyspaces : 
DatabaseDescriptor.getAuditLoggingOptions().included_keyspaces;
 +        auditLogOptions.excluded_keyspaces = excludedKeyspaces != null ? 
excludedKeyspaces : 
DatabaseDescriptor.getAuditLoggingOptions().excluded_keyspaces;
 +        auditLogOptions.included_categories = includedCategories != null ? 
includedCategories : 
DatabaseDescriptor.getAuditLoggingOptions().included_categories;
 +        auditLogOptions.excluded_categories = excludedCategories != null ? 
excludedCategories : 
DatabaseDescriptor.getAuditLoggingOptions().excluded_categories;
 +        auditLogOptions.included_users = includedUsers != null ? 
includedUsers : DatabaseDescriptor.getAuditLoggingOptions().included_users;
 +        auditLogOptions.excluded_users = excludedUsers != null ? 
excludedUsers : DatabaseDescriptor.getAuditLoggingOptions().excluded_users;
 +
 +        AuditLogManager.instance.enable(auditLogOptions);
 +
 +        logger.info("AuditLog is enabled with logger: [{}], 
included_keyspaces: [{}], excluded_keyspaces: [{}], " +
 +                    "included_categories: [{}], excluded_categories: [{}], 
included_users: [{}], "
 +                    + "excluded_users: [{}], archive_command: [{}]", 
auditLogOptions.logger, auditLogOptions.included_keyspaces, 
auditLogOptions.excluded_keyspaces,
 +                    auditLogOptions.included_categories, 
auditLogOptions.excluded_categories, auditLogOptions.included_users, 
auditLogOptions.excluded_users,
 +                    auditLogOptions.archive_command);
 +
 +    }
 +
 +    public boolean isAuditLogEnabled()
 +    {
 +        return AuditLogManager.instance.isEnabled();
 +    }
 +
 +    public String getCorruptedTombstoneStrategy()
 +    {
 +        return DatabaseDescriptor.getCorruptedTombstoneStrategy().toString();
 +    }
 +
 +    public void setCorruptedTombstoneStrategy(String strategy)
 +    {
 +        
DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.valueOf(strategy));
 +        logger.info("Setting corrupted tombstone strategy to {}", strategy);
 +    }
 +
 +    @Override
 +    public long getNativeTransportMaxConcurrentRequestsInBytes()
 +    {
 +        return Server.EndpointPayloadTracker.getGlobalLimit();
 +    }
 +
 +    @Override
 +    public void setNativeTransportMaxConcurrentRequestsInBytes(long newLimit)
 +    {
 +        Server.EndpointPayloadTracker.setGlobalLimit(newLimit);
 +    }
 +
 +    @Override
 +    public long getNativeTransportMaxConcurrentRequestsInBytesPerIp()
 +    {
 +        return Server.EndpointPayloadTracker.getEndpointLimit();
 +    }
 +
 +    @Override
 +    public void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long 
newLimit)
 +    {
 +        Server.EndpointPayloadTracker.setEndpointLimit(newLimit);
 +    }
 +
      @VisibleForTesting
      public void shutdownServer()
      {
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 9664769,50d5c2c..3c6bbf9
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -697,11 -678,18 +697,23 @@@ public interface StorageServiceMBean ex
      /** Sets the threshold for abandoning queries with many tombstones */
      public void setTombstoneFailureThreshold(int tombstoneDebugThreshold);
  
+     /** Returns the number of rows cached at the coordinator before 
filtering/index queries log a warning. */
+     public int getCachedReplicaRowsWarnThreshold();
+ 
+     /** Sets the number of rows cached at the coordinator before 
filtering/index queries log a warning. */
+     public void setCachedReplicaRowsWarnThreshold(int threshold);
+ 
+     /** Returns the number of rows cached at the coordinator before 
filtering/index queries fail outright. */
+     public int getCachedReplicaRowsFailThreshold();
+ 
+     /** Sets the number of rows cached at the coordinator before 
filtering/index queries fail outright. */
+     public void setCachedReplicaRowsFailThreshold(int threshold);
+ 
 +    /** Returns the threshold for skipping the column index when caching 
partition info **/
 +    public int getColumnIndexCacheSize();
 +    /** Sets the threshold for skipping the column index when caching 
partition info **/
 +    public void setColumnIndexCacheSize(int cacheSizeInKB);
 +
      /** Returns the threshold for rejecting queries due to a large batch size 
*/
      public int getBatchSizeFailureThreshold();
      /** Sets the threshold for rejecting queries due to a large batch size */
diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java
index 30427d0,0000000..eeabb4b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@@ -1,405 -1,0 +1,404 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.function.UnaryOperator;
 +
 +import com.google.common.base.Joiner;
 +
++import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.statements.schema.IndexTarget;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.ReadResponse;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.PartitionIterators;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 +import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder;
 +import org.apache.cassandra.db.transform.Filter;
 +import org.apache.cassandra.db.transform.FilteredPartitions;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.index.sasi.SASIIndex;
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.schema.IndexMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
 +import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
 +import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 +
 +import static com.google.common.collect.Iterables.*;
 +
 +public class DataResolver<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
 +{
 +    private final boolean enforceStrictLiveness;
 +    private final ReadRepair<E, P> readRepair;
 +
 +    public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> 
replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
 +    {
 +        super(command, replicaPlan, queryStartNanoTime);
 +        this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
 +        this.readRepair = readRepair;
 +    }
 +
 +    public PartitionIterator getData()
 +    {
 +        ReadResponse response = responses.get(0).payload;
 +        return 
UnfilteredPartitionIterators.filter(response.makeIterator(command), 
command.nowInSec());
 +    }
 +
 +    public boolean isDataPresent()
 +    {
 +        return !responses.isEmpty();
 +    }
 +
 +    public PartitionIterator resolve()
 +    {
 +        // We could get more responses while this method runs, which is ok 
(we're happy to ignore any response not here
 +        // at the beginning of this method), so grab the response count once 
and use that through the method.
 +        Collection<Message<ReadResponse>> messages = responses.snapshot();
 +        assert !any(messages, msg -> msg.payload.isDigestResponse());
 +
 +        E replicas = replicaPlan().candidates().select(transform(messages, 
Message::from), false);
 +
 +        // If requested, inspect each response for a digest of the replica's 
repaired data set
 +        RepairedDataTracker repairedDataTracker = 
command.isTrackingRepairedStatus()
 +                                                  ? new 
RepairedDataTracker(getRepairedDataVerifier(command))
 +                                                  : null;
 +        if (repairedDataTracker != null)
 +        {
 +            messages.forEach(msg -> {
 +                if (msg.payload.mayIncludeRepairedDigest() && 
replicas.byEndpoint().get(msg.from()).isFull())
 +                {
 +                    repairedDataTracker.recordDigest(msg.from(),
 +                                                     
msg.payload.repairedDataDigest(),
 +                                                     
msg.payload.isRepairedDigestConclusive());
 +                }
 +            });
 +        }
 +
 +        if (!needsReplicaFilteringProtection())
 +        {
 +            ResolveContext context = new ResolveContext(replicas);
 +            return resolveWithReadRepair(context,
 +                                         i -> shortReadProtectedResponse(i, 
context),
 +                                         UnaryOperator.identity(),
 +                                         repairedDataTracker);
 +        }
 +
 +        return resolveWithReplicaFilteringProtection(replicas, 
repairedDataTracker);
 +    }
 +
 +    private boolean needsReplicaFilteringProtection()
 +    {
 +        if (command.rowFilter().isEmpty())
 +            return false;
 +
 +        IndexMetadata indexDef = command.indexMetadata();
 +        if (indexDef != null && indexDef.isCustom())
 +        {
 +            String className = 
indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME);
 +            return !SASIIndex.class.getName().equals(className);
 +        }
 +
 +        return true;
 +    }
 +
 +    private class ResolveContext
 +    {
 +        private final E replicas;
 +        private final DataLimits.Counter mergedResultCounter;
 +
 +        private ResolveContext(E replicas)
 +        {
 +            this.replicas = replicas;
 +            this.mergedResultCounter = 
command.limits().newCounter(command.nowInSec(),
 +                                                                   true,
 +                                                                   
command.selectsFullPartition(),
 +                                                                   
enforceStrictLiveness);
 +        }
 +
 +        private boolean needsReadRepair()
 +        {
 +            return replicas.size() > 1;
 +        }
 +
 +        private boolean needShortReadProtection()
 +        {
 +            // If we have only one result, there is no read repair to do and 
we can't get short reads
 +            // Also, so-called "short reads" stems from nodes returning only 
a subset of the results they have for a
 +            // partition due to the limit, but that subset not being enough 
post-reconciliation. So if we don't have limit,
 +            // don't bother protecting against short reads.
 +            return replicas.size() > 1 && !command.limits().isUnlimited();
 +        }
 +    }
 +
 +    @FunctionalInterface
 +    private interface ResponseProvider
 +    {
 +        UnfilteredPartitionIterator getResponse(int i);
 +    }
 +
 +    private UnfilteredPartitionIterator shortReadProtectedResponse(int i, 
ResolveContext context)
 +    {
 +        UnfilteredPartitionIterator originalResponse = 
responses.get(i).payload.makeIterator(command);
 +
 +        return context.needShortReadProtection()
 +               ? ShortReadProtection.extend(context.replicas.get(i),
++                                            () -> responses.clearUnsafe(i),
 +                                            originalResponse,
 +                                            command,
 +                                            context.mergedResultCounter,
 +                                            queryStartNanoTime,
 +                                            enforceStrictLiveness)
 +               : originalResponse;
 +    }
 +
 +    private PartitionIterator resolveWithReadRepair(ResolveContext context,
 +                                                    ResponseProvider 
responseProvider,
 +                                                    
UnaryOperator<PartitionIterator> preCountFilter,
 +                                                    RepairedDataTracker 
repairedDataTracker)
 +    {
 +        UnfilteredPartitionIterators.MergeListener listener = null;
 +        if (context.needsReadRepair())
 +        {
 +            P sources = replicaPlan.getWithContacts(context.replicas);
 +            listener = 
wrapMergeListener(readRepair.getMergeListener(sources), sources, 
repairedDataTracker);
 +        }
 +
 +        return resolveInternal(context, listener, responseProvider, 
preCountFilter);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    private PartitionIterator resolveWithReplicaFilteringProtection(E 
replicas, RepairedDataTracker repairedDataTracker)
 +    {
 +        // Protecting against inconsistent replica filtering (some replica 
returning a row that is outdated but that
 +        // wouldn't be removed by normal reconciliation because up-to-date 
replica have filtered the up-to-date version
-         // of that row) works in 3 steps:
-         //   1) we read the full response just to collect rows that may be 
outdated (the ones we got from some
-         //      replica but didn't got any response for other; it could be 
those other replica have filtered a more
-         //      up-to-date result). In doing so, we do not count any of such 
"potentially outdated" row towards the
-         //      query limit. This simulate the worst case scenario where all 
those "potentially outdated" rows are
-         //      indeed outdated, and thus make sure we are guaranteed to read 
enough results (thanks to short read
-         //      protection).
-         //   2) we query all the replica/rows we need to rule out whether 
those "potentially outdated" rows are outdated
-         //      or not.
-         //   3) we re-read cached copies of each replica response using the 
"normal" read path merge with read-repair,
-         //      but where for each replica we use their original response 
_plus_ the additional rows queried in the
-         //      previous step (and apply the command#rowFilter() on the full 
result). Since the first phase has
-         //      pessimistically collected enough results for the case where 
all potentially outdated results are indeed
-         //      outdated, we shouldn't need further short-read protection 
requests during this phase.
++        // of that row) involves 3 main elements:
++        //   1) We combine short-read protection and a merge listener that 
identifies potentially "out-of-date"
++        //      rows to create an iterator that is guaranteed to produce 
enough valid row results to satisfy the query
++        //      limit if enough actually exist. A row is considered 
out-of-date if its merged from is non-empty and we
++        //      receive not response from at least one replica. In this case, 
it is possible that filtering at the
++        //      "silent" replica has produced a more up-to-date result.
++        //   2) This iterator is passed to the standard resolution process 
with read-repair, but is first wrapped in a
++        //      response provider that lazily "completes" potentially 
out-of-date rows by directly querying them on the
++        //      replicas that were previously silent. As this iterator is 
consumed, it caches valid data for potentially
++        //      out-of-date rows, and this cached data is merged with the 
fetched data as rows are requested. If there
++        //      is no replica divergence, only rows in the partition being 
evalutated will be cached (then released
++        //      when the partition is consumed).
++        //   3) After a "complete" row is materialized, it must pass the row 
filter supplied by the original query
++        //      before it counts against the limit.
 +
 +        // We need separate contexts, as each context has his own counter
 +        ResolveContext firstPhaseContext = new ResolveContext(replicas);
 +        ResolveContext secondPhaseContext = new ResolveContext(replicas);
 +        ReplicaFilteringProtection<E> rfp = new 
ReplicaFilteringProtection<>(replicaPlan().keyspace(),
 +                                                                             
command,
 +                                                                             
replicaPlan().consistencyLevel(),
 +                                                                             
queryStartNanoTime,
-                                                                              
firstPhaseContext.replicas);
++                                                                             
firstPhaseContext.replicas,
++                                                                             
DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
++                                                                             
DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
++
 +        PartitionIterator firstPhasePartitions = 
resolveInternal(firstPhaseContext,
 +                                                                 
rfp.mergeController(),
 +                                                                 i -> 
shortReadProtectedResponse(i, firstPhaseContext),
 +                                                                 
UnaryOperator.identity());
 +
-         // Consume the first phase partitions to populate the replica 
filtering protection with both those materialized
-         // partitions and the primary keys to be fetched.
-         PartitionIterators.consume(firstPhasePartitions);
-         firstPhasePartitions.close();
- 
-         // After reading the entire query results the protection helper 
should have cached all the partitions so we can
-         // clear the responses accumulator for the sake of memory usage, 
given that the second phase might take long if
-         // it needs to query replicas.
-         responses.clearUnsafe();
++        PartitionIterator completedPartitions = 
resolveWithReadRepair(secondPhaseContext,
++                                                                      i -> 
rfp.queryProtectedPartitions(firstPhasePartitions, i),
++                                                                      results 
-> command.rowFilter().filter(results, command.metadata(), command.nowInSec()),
++                                                                      
repairedDataTracker);
 +
-         return resolveWithReadRepair(secondPhaseContext,
-                                      rfp::queryProtectedPartitions,
-                                      results -> 
command.rowFilter().filter(results, command.metadata(), command.nowInSec()),
-                                      repairedDataTracker);
++        // Ensure that the RFP instance has a chance to record metrics when 
the iterator closes.
++        return PartitionIterators.doOnClose(completedPartitions, 
firstPhasePartitions::close);
 +    }
 +
 +    @SuppressWarnings("resource")
 +    private PartitionIterator resolveInternal(ResolveContext context,
 +                                              
UnfilteredPartitionIterators.MergeListener mergeListener,
 +                                              ResponseProvider 
responseProvider,
 +                                              
UnaryOperator<PartitionIterator> preCountFilter)
 +    {
 +        int count = context.replicas.size();
 +        List<UnfilteredPartitionIterator> results = new ArrayList<>(count);
 +        for (int i = 0; i < count; i++)
 +            results.add(responseProvider.getResponse(i));
 +
 +        /*
 +         * Even though every response, individually, will honor the limit, it 
is possible that we will, after the merge,
 +         * have more rows than the client requested. To make sure that we 
still conform to the original limit,
 +         * we apply a top-level post-reconciliation counter to the merged 
partition iterator.
 +         *
 +         * Short read protection logic 
(ShortReadRowsProtection.moreContents()) relies on this counter to be applied
 +         * to the current partition to work. For this reason we have to apply 
the counter transformation before
 +         * empty partition discard logic kicks in - for it will eagerly 
consume the iterator.
 +         *
 +         * That's why the order here is: 1) merge; 2) filter rows; 3) count; 
4) discard empty partitions
 +         *
 +         * See CASSANDRA-13747 for more details.
 +         */
 +
 +        UnfilteredPartitionIterator merged = 
UnfilteredPartitionIterators.merge(results, mergeListener);
-         FilteredPartitions filtered = FilteredPartitions.filter(merged, new 
Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
++        Filter filter = new Filter(command.nowInSec(), 
command.metadata().enforceStrictLiveness());
++        FilteredPartitions filtered = FilteredPartitions.filter(merged, 
filter);
 +        PartitionIterator counted = 
Transformation.apply(preCountFilter.apply(filtered), 
context.mergedResultCounter);
 +        return Transformation.apply(counted, new EmptyPartitionsDiscarder());
 +    }
 +
 +    protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand 
command)
 +    {
 +        return RepairedDataVerifier.verifier(command);
 +    }
 +
 +    private String makeResponsesDebugString(DecoratedKey partitionKey)
 +    {
 +        return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> 
m.from() + " => " + m.payload.toDebugString(command, partitionKey)));
 +    }
 +
 +    private UnfilteredPartitionIterators.MergeListener 
wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
 +                                                                         P 
sources,
 +                                                                         
RepairedDataTracker repairedDataTracker)
 +    {
 +        // Avoid wrapping no-op listener as it doesn't throw, unless we're 
tracking repaired status
 +        // in which case we need to inject the tracker & verify on close
 +        if (partitionListener == 
UnfilteredPartitionIterators.MergeListener.NOOP)
 +        {
 +            if (repairedDataTracker == null)
 +                return partitionListener;
 +
 +            return new UnfilteredPartitionIterators.MergeListener()
 +            {
 +
 +                public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
 +                {
 +                    return UnfilteredRowIterators.MergeListener.NOOP;
 +                }
 +
 +                public void close()
 +                {
 +                    repairedDataTracker.verify();
 +                }
 +            };
 +        }
 +
 +        return new UnfilteredPartitionIterators.MergeListener()
 +        {
 +            public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
 +            {
 +                UnfilteredRowIterators.MergeListener rowListener = 
partitionListener.getRowMergeListener(partitionKey, versions);
 +
 +                return new UnfilteredRowIterators.MergeListener()
 +                {
 +                    public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
 +                    {
 +                        try
 +                        {
 +                            
rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's 
really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging 
partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug 
info:%n %s",
 +                                                           table,
 +                                                           mergedDeletion == 
null ? "null" : mergedDeletion.toString(),
 +                                                           '[' + Joiner.on(", 
").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : 
rt.toString())) + ']',
 +                                                           sources.contacts(),
 +                                                           
makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public Row onMergedRows(Row merged, Row[] versions)
 +                    {
 +                        try
 +                        {
 +                            return rowListener.onMergedRows(merged, versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +                            // The following can be pretty verbose, but it's 
really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging 
rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? 
"null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", 
").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : 
rt.toString(table))) + ']',
 +                                                           sources.contacts(),
 +                                                           
makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +                    }
 +
 +                    public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
 +                    {
 +                        try
 +                        {
 +                            // The code for merging range tombstones is a tad 
complex and we had the assertions there triggered
 +                            // unexpectedly in a few occasions 
(CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights
 +                            // when that happen without more context that 
what the assertion errors give us however, hence the
 +                            // catch here that basically gather as much as 
context as reasonable.
 +                            rowListener.onMergedRangeTombstoneMarkers(merged, 
versions);
 +                        }
 +                        catch (AssertionError e)
 +                        {
 +
 +                            // The following can be pretty verbose, but it's 
really only triggered if a bug happen, so we'd
 +                            // rather get more info to debug than not.
 +                            TableMetadata table = command.metadata();
 +                            String details = String.format("Error merging RTs 
on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s",
 +                                                           table,
 +                                                           merged == null ? 
"null" : merged.toString(table),
 +                                                           '[' + Joiner.on(", 
").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : 
rt.toString(table))) + ']',
 +                                                           sources.contacts(),
 +                                                           
makeResponsesDebugString(partitionKey));
 +                            throw new AssertionError(details, e);
 +                        }
 +
 +                    }
 +
 +                    public void close()
 +                    {
 +                        rowListener.close();
 +                    }
 +                };
 +            }
 +
 +            public void close()
 +            {
 +                partitionListener.close();
 +                if (repairedDataTracker != null)
 +                    repairedDataTracker.verify();
 +            }
 +        };
 +    }
 +}
diff --cc 
src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
index b4a3cb5,0000000..4f0ad4d
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java
@@@ -1,480 -1,0 +1,550 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
++import java.util.ArrayDeque;
 +import java.util.ArrayList;
 +import java.util.Arrays;
- import java.util.Iterator;
 +import java.util.List;
 +import java.util.NavigableSet;
- import java.util.SortedMap;
- import java.util.TreeMap;
- import java.util.stream.Collectors;
++import java.util.concurrent.TimeUnit;
++import java.util.Queue;
++import java.util.function.Function;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.db.Clustering;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.Columns;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.DeletionTime;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.RegularAndStaticColumns;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 +import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
++import org.apache.cassandra.db.partitions.PartitionIterator;
++import org.apache.cassandra.db.partitions.PartitionIterators;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.db.rows.EncodingStats;
 +import org.apache.cassandra.db.rows.RangeTombstoneMarker;
 +import org.apache.cassandra.db.rows.Row;
 +import org.apache.cassandra.db.rows.Rows;
 +import org.apache.cassandra.db.rows.Unfiltered;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterators;
++import org.apache.cassandra.exceptions.OverloadedException;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.exceptions.UnavailableException;
 +import org.apache.cassandra.locator.Endpoints;
++import org.apache.cassandra.locator.EndpointsForToken;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.locator.ReplicaPlans;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.schema.TableMetadata;
++import org.apache.cassandra.service.ClientWarn;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 +import org.apache.cassandra.tracing.Tracing;
++import org.apache.cassandra.utils.NoSpamLogger;
 +import org.apache.cassandra.utils.btree.BTreeSet;
 +
 +/**
 + * Helper in charge of collecting additional queries to be done on the 
coordinator to protect against invalid results
 + * being included due to replica-side filtering (secondary indexes or {@code 
ALLOW * FILTERING}).
 + * <p>
 + * When using replica-side filtering with CL>ONE, a replica can send a stale 
result satisfying the filter, while updated
 + * replicas won't send a corresponding tombstone to discard that result 
during reconciliation. This helper identifies
 + * the rows in a replica response that don't have a corresponding row in 
other replica responses, and requests them by
 + * primary key to the "silent" replicas in a second fetch round.
 + * <p>
-  * See CASSANDRA-8272 and CASSANDRA-8273 for further details.
++ * See CASSANDRA-8272, CASSANDRA-8273, and CASSANDRA-15907 for further 
details.
 + */
 +class ReplicaFilteringProtection<E extends Endpoints<E>>
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(ReplicaFilteringProtection.class);
++    private static final NoSpamLogger oneMinuteLogger = 
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
++
++    private static final Function<UnfilteredRowIterator, EncodingStats> 
NULL_TO_NO_STATS =
++        rowIterator -> rowIterator == null ? EncodingStats.NO_STATS : 
rowIterator.stats();
 +
 +    private final Keyspace keyspace;
 +    private final ReadCommand command;
 +    private final ConsistencyLevel consistency;
 +    private final long queryStartNanoTime;
 +    private final E sources;
 +    private final TableMetrics tableMetrics;
 +
-     /**
-      * Per-source primary keys of the rows that might be outdated so they 
need to be fetched.
-      * For outdated static rows we use an empty builder to signal it has to 
be queried.
-      */
-     private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> 
rowsToFetch;
++    private final int cachedRowsWarnThreshold;
++    private final int cachedRowsFailThreshold;
++
++    /** Tracks whether or not we've already hit the warning threshold while 
evaluating a partition. */
++    private boolean hitWarningThreshold = false;
++
++    private int currentRowsCached = 0; // tracks the current number of cached 
rows
++    private int maxRowsCached = 0; // tracks the high watermark for the 
number of cached rows
 +
 +    /**
-      * Per-source list of all the partitions seen by the merge listener, to 
be merged with the extra fetched rows.
++     * Per-source list of the pending partitions seen by the merge listener, 
to be merged with the extra fetched rows.
 +     */
-     private final List<List<PartitionBuilder>> originalPartitions;
++    private final List<Queue<PartitionBuilder>> originalPartitions;
 +
 +    ReplicaFilteringProtection(Keyspace keyspace,
 +                               ReadCommand command,
 +                               ConsistencyLevel consistency,
 +                               long queryStartNanoTime,
-                                E sources)
++                               E sources,
++                               int cachedRowsWarnThreshold,
++                               int cachedRowsFailThreshold)
 +    {
 +        this.keyspace = keyspace;
 +        this.command = command;
 +        this.consistency = consistency;
 +        this.queryStartNanoTime = queryStartNanoTime;
 +        this.sources = sources;
-         this.rowsToFetch = new ArrayList<>(sources.size());
 +        this.originalPartitions = new ArrayList<>(sources.size());
 +
-         for (Replica ignored : sources)
++        for (int i = 0; i < sources.size(); i++)
 +        {
-             rowsToFetch.add(new TreeMap<>());
-             originalPartitions.add(new ArrayList<>());
++            originalPartitions.add(new ArrayDeque<>());
 +        }
 +
 +        tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().id);
-     }
 +
-     private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source, 
DecoratedKey partitionKey)
-     {
-         return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> 
BTreeSet.builder(command.metadata().comparator));
++        this.cachedRowsWarnThreshold = cachedRowsWarnThreshold;
++        this.cachedRowsFailThreshold = cachedRowsFailThreshold;
 +    }
 +
-     /**
-      * Returns the protected results for the specified replica. These are 
generated fetching the extra rows and merging
-      * them with the cached original filtered results for that replica.
-      *
-      * @param source the source
-      * @return the protected results for the specified replica
-      */
-     UnfilteredPartitionIterator queryProtectedPartitions(int source)
++    private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, 
Replica source, ReplicaPlan.Shared<EndpointsForToken, ReplicaPlan.ForTokenRead> 
replicaPlan)
 +    {
-         UnfilteredPartitionIterator original = 
makeIterator(originalPartitions.get(source));
-         SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = 
rowsToFetch.get(source);
- 
-         if (toFetch.isEmpty())
-             return original;
++        @SuppressWarnings("unchecked")
++        DataResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> resolver =
++            new DataResolver<>(cmd, replicaPlan, 
(NoopReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead>) 
NoopReadRepair.instance, queryStartNanoTime);
 +
-         // TODO: this would be more efficient if we had multi-key queries 
internally
-         List<UnfilteredPartitionIterator> fetched = toFetch.keySet()
-                                                            .stream()
-                                                            .map(k -> 
querySourceOnKey(source, k))
-                                                            
.collect(Collectors.toList());
- 
-         return UnfilteredPartitionIterators.merge(Arrays.asList(original, 
UnfilteredPartitionIterators.concat(fetched)), null);
-     }
- 
-     private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey 
key)
-     {
-         BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key);
-         assert builder != null; // We're calling this on the result of 
rowsToFetch.get(i).keySet()
- 
-         Replica source = sources.get(i);
-         NavigableSet<Clustering> clusterings = builder.build();
-         tableMetrics.replicaSideFilteringProtectionRequests.mark();
-         if (logger.isTraceEnabled())
-             logger.trace("Requesting rows {} in partition {} from {} for 
replica-side filtering protection",
-                          clusterings, key, source);
-         Tracing.trace("Requesting {} rows in partition {} from {} for 
replica-side filtering protection",
-                       clusterings.size(), key, source);
- 
-         // build the read command taking into account that we could be 
requesting only in the static row
-         DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : 
DataLimits.NONE;
-         ClusteringIndexFilter filter = new 
ClusteringIndexNamesFilter(clusterings, command.isReversed());
-         SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
-                                                                            
command.nowInSec(),
-                                                                            
command.columnFilter(),
-                                                                            
RowFilter.NONE,
-                                                                            
limits,
-                                                                            
key,
-                                                                            
filter);
- 
-         ReplicaPlan.ForTokenRead replicaPlan = 
ReplicaPlans.forSingleReplicaRead(keyspace, key.getToken(), source);
-         ReplicaPlan.SharedForTokenRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
-         try
-         {
-             return executeReadCommand(cmd, source, sharedReplicaPlan);
-         }
-         catch (ReadTimeoutException e)
-         {
-             int blockFor = consistency.blockFor(keyspace);
-             throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
-         }
-         catch (UnavailableException e)
-         {
-             int blockFor = consistency.blockFor(keyspace);
-             throw UnavailableException.create(consistency, blockFor, blockFor 
- 1);
-         }
-     }
- 
-     private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-     UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, Replica 
source, ReplicaPlan.Shared<E, P> replicaPlan)
-     {
-         DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, 
(NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
-         ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, 
replicaPlan, queryStartNanoTime);
++        ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler = 
new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
 +
 +        if (source.isSelf())
 +        {
 +            Stage.READ.maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
 +        }
 +        else
 +        {
 +            if (source.isTransient())
 +                cmd = cmd.copyAsTransientQuery(source);
 +            
MessagingService.instance().sendWithCallback(cmd.createMessage(false), 
source.endpoint(), handler);
 +        }
 +
 +        // We don't call handler.get() because we want to preserve tombstones 
since we're still in the middle of merging node results.
 +        handler.awaitResults();
 +        assert resolver.getMessages().size() == 1;
 +        return resolver.getMessages().get(0).payload.makeIterator(command);
 +    }
 +
 +    /**
 +     * Returns a merge listener that skips the merged rows for which any of 
the replicas doesn't have a version,
 +     * pessimistically assuming that they are outdated. It is intended to be 
used during a first merge of per-replica
 +     * query results to ensure we fetch enough results from the replicas to 
ensure we don't miss any potentially
 +     * outdated result.
 +     * <p>
 +     * The listener will track both the accepted data and the primary keys of 
the rows that are considered as outdated.
 +     * That way, once the query results would have been merged using this 
listener, further calls to
-      * {@link #queryProtectedPartitions(int)} will use the collected data to 
return a copy of the
++     * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the 
collected data to return a copy of the
 +     * data originally collected from the specified replica, completed with 
the potentially outdated rows.
 +     */
 +    UnfilteredPartitionIterators.MergeListener mergeController()
 +    {
-         return (partitionKey, versions) -> {
++        return new UnfilteredPartitionIterators.MergeListener()
++        {
++            @Override
++            public void close()
++            {
++                // If we hit the failure threshold before consuming a single 
partition, record the current rows cached.
++                
tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, 
maxRowsCached));
++            }
 +
-             PartitionBuilder[] builders = new 
PartitionBuilder[sources.size()];
++            @Override
++            public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
++            {
++                List<PartitionBuilder> builders = new 
ArrayList<>(sources.size());
++                RegularAndStaticColumns columns = columns(versions);
++                EncodingStats stats = EncodingStats.merge(versions, 
NULL_TO_NO_STATS);
 +
-             for (int i = 0; i < sources.size(); i++)
-                 builders[i] = new PartitionBuilder(command, partitionKey, 
columns(versions), stats(versions));
++                for (int i = 0; i < sources.size(); i++)
++                    builders.add(i, new PartitionBuilder(partitionKey, 
sources.get(i), columns, stats));
 +
-             return new UnfilteredRowIterators.MergeListener()
-             {
-                 @Override
-                 public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
++                return new UnfilteredRowIterators.MergeListener()
 +                {
-                     // cache the deletion time versions to be able to 
regenerate the original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].setDeletionTime(versions[i]);
-                 }
++                    @Override
++                    public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
++                    {
++                        // cache the deletion time versions to be able to 
regenerate the original row iterator
++                        for (int i = 0; i < versions.length; i++)
++                            builders.get(i).setDeletionTime(versions[i]);
++                    }
 +
-                 @Override
-                 public Row onMergedRows(Row merged, Row[] versions)
-                 {
-                     // cache the row versions to be able to regenerate the 
original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].addRow(versions[i]);
++                    @Override
++                    public Row onMergedRows(Row merged, Row[] versions)
++                    {
++                        // cache the row versions to be able to regenerate 
the original row iterator
++                        for (int i = 0; i < versions.length; i++)
++                            builders.get(i).addRow(versions[i]);
 +
-                     if (merged.isEmpty())
-                         return merged;
++                        if (merged.isEmpty())
++                            return merged;
 +
-                     boolean isPotentiallyOutdated = false;
-                     boolean isStatic = merged.isStatic();
-                     for (int i = 0; i < versions.length; i++)
-                     {
-                         Row version = versions[i];
-                         if (version == null || (isStatic && 
version.isEmpty()))
++                        boolean isPotentiallyOutdated = false;
++                        boolean isStatic = merged.isStatic();
++                        for (int i = 0; i < versions.length; i++)
 +                        {
-                             isPotentiallyOutdated = true;
-                             BTreeSet.Builder<Clustering> toFetch = 
getOrCreateToFetch(i, partitionKey);
-                             // Note that for static, we shouldn't add the 
clustering to the clustering set (the
-                             // ClusteringIndexNamesFilter we'll build from 
this later does not expect it), but the fact
-                             // we created a builder in the first place will 
act as a marker that the static row must be
-                             // fetched, even if no other rows are added for 
this partition.
-                             if (!isStatic)
-                                 toFetch.add(merged.clustering());
++                            Row version = versions[i];
++                            if (version == null || (isStatic && 
version.isEmpty()))
++                            {
++                                isPotentiallyOutdated = true;
++                                builders.get(i).addToFetch(merged);
++                            }
 +                        }
-                     }
 +
-                     // If the row is potentially outdated (because some 
replica didn't send anything and so it _may_ be
-                     // an outdated result that is only present because other 
replica have filtered the up-to-date result
-                     // out), then we skip the row. In other words, the 
results of the initial merging of results by this
-                     // protection assume the worst case scenario where every 
row that might be outdated actually is.
-                     // This ensures that during this first phase (collecting 
additional row to fetch) we are guaranteed
-                     // to look at enough data to ultimately fulfill the query 
limit.
-                     return isPotentiallyOutdated ? null : merged;
-                 }
++                        // If the row is potentially outdated (because some 
replica didn't send anything and so it _may_ be
++                        // an outdated result that is only present because 
other replica have filtered the up-to-date result
++                        // out), then we skip the row. In other words, the 
results of the initial merging of results by this
++                        // protection assume the worst case scenario where 
every row that might be outdated actually is.
++                        // This ensures that during this first phase 
(collecting additional row to fetch) we are guaranteed
++                        // to look at enough data to ultimately fulfill the 
query limit.
++                        return isPotentiallyOutdated ? null : merged;
++                    }
 +
-                 @Override
-                 public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
-                 {
-                     // cache the marker versions to be able to regenerate the 
original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].addRangeTombstoneMarker(versions[i]);
-                 }
++                    @Override
++                    public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
++                    {
++                        // cache the marker versions to be able to regenerate 
the original row iterator
++                        for (int i = 0; i < versions.length; i++)
++                            
builders.get(i).addRangeTombstoneMarker(versions[i]);
++                    }
 +
-                 @Override
-                 public void close()
-                 {
-                     for (int i = 0; i < sources.size(); i++)
-                         originalPartitions.get(i).add(builders[i]);
-                 }
-             };
++                    @Override
++                    public void close()
++                    {
++                        for (int i = 0; i < sources.size(); i++)
++                            originalPartitions.get(i).add(builders.get(i));
++                    }
++                };
++            }
 +        };
 +    }
 +
++    private void incrementCachedRows()
++    {
++        currentRowsCached++;
++
++        if (currentRowsCached == cachedRowsFailThreshold + 1)
++        {
++            String message = String.format("Replica filtering protection has 
cached over %d rows during query %s. " +
++                                           "(See 
'cached_replica_rows_fail_threshold' in cassandra.yaml.)",
++                                           cachedRowsFailThreshold, 
command.toCQLString());
++
++            logger.error(message);
++            Tracing.trace(message);
++            throw new OverloadedException(message);
++        }
++        else if (currentRowsCached == cachedRowsWarnThreshold + 1 && 
!hitWarningThreshold)
++        {
++            hitWarningThreshold = true;
++
++            String message = String.format("Replica filtering protection has 
cached over %d rows during query %s. " +
++                                           "(See 
'cached_replica_rows_warn_threshold' in cassandra.yaml.)",
++                                           cachedRowsWarnThreshold, 
command.toCQLString());
++
++            ClientWarn.instance.warn(message);
++            oneMinuteLogger.warn(message);
++            Tracing.trace(message);
++        }
++    }
++
++    private void releaseCachedRows(int count)
++    {
++        maxRowsCached = Math.max(maxRowsCached, currentRowsCached);
++        currentRowsCached -= count;
++    }
++
 +    private static RegularAndStaticColumns 
columns(List<UnfilteredRowIterator> versions)
 +    {
 +        Columns statics = Columns.NONE;
 +        Columns regulars = Columns.NONE;
 +        for (UnfilteredRowIterator iter : versions)
 +        {
 +            if (iter == null)
 +                continue;
 +
 +            RegularAndStaticColumns cols = iter.columns();
 +            statics = statics.mergeTo(cols.statics);
 +            regulars = regulars.mergeTo(cols.regulars);
 +        }
 +        return new RegularAndStaticColumns(statics, regulars);
 +    }
 +
-     private static EncodingStats stats(List<UnfilteredRowIterator> iterators)
-     {
-         EncodingStats stats = EncodingStats.NO_STATS;
-         for (UnfilteredRowIterator iter : iterators)
-         {
-             if (iter == null)
-                 continue;
- 
-             stats = stats.mergeWith(iter.stats());
-         }
-         return stats;
-     }
- 
-     private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder> 
builders)
++    /**
++     * Returns the protected results for the specified replica. These are 
generated fetching the extra rows and merging
++     * them with the cached original filtered results for that replica.
++     *
++     * @param merged the first iteration partitions, that should have been 
read used with the {@link #mergeController()}
++     * @param source the source
++     * @return the protected results for the specified replica
++     */
++    UnfilteredPartitionIterator queryProtectedPartitions(PartitionIterator 
merged, int source)
 +    {
 +        return new UnfilteredPartitionIterator()
 +        {
-             final Iterator<PartitionBuilder> iterator = builders.iterator();
++            final Queue<PartitionBuilder> partitions = 
originalPartitions.get(source);
 +
 +            @Override
 +            public TableMetadata metadata()
 +            {
 +                return command.metadata();
 +            }
 +
 +            @Override
-             public void close()
-             {
-                 // nothing to do here
-             }
++            public void close() { }
 +
 +            @Override
 +            public boolean hasNext()
 +            {
-                 return iterator.hasNext();
++                // If there are no cached partition builders for this source, 
advance the first phase iterator, which
++                // will force the RFP merge listener to load at least the 
next protected partition. Note that this may
++                // load more than one partition if any divergence between 
replicas is discovered by the merge listener.
++                if (partitions.isEmpty())
++                {
++                    PartitionIterators.consumeNext(merged);
++                }
++
++                return !partitions.isEmpty();
 +            }
 +
 +            @Override
 +            public UnfilteredRowIterator next()
 +            {
-                 return iterator.next().build();
++                PartitionBuilder builder = partitions.poll();
++                assert builder != null;
++                return builder.protectedPartition();
 +            }
 +        };
 +    }
 +
-     private static class PartitionBuilder
++    private class PartitionBuilder
 +    {
-         private final ReadCommand command;
-         private final DecoratedKey partitionKey;
++        private final DecoratedKey key;
++        private final Replica source;
 +        private final RegularAndStaticColumns columns;
 +        private final EncodingStats stats;
 +
 +        private DeletionTime deletionTime;
 +        private Row staticRow = Rows.EMPTY_STATIC_ROW;
-         private final List<Unfiltered> contents = new ArrayList<>();
++        private final Queue<Unfiltered> contents = new ArrayDeque<>();
++        private BTreeSet.Builder<Clustering> toFetch;
++        private int partitionRowsCached;
 +
-         private PartitionBuilder(ReadCommand command,
-                                  DecoratedKey partitionKey,
-                                  RegularAndStaticColumns columns,
-                                  EncodingStats stats)
++        private PartitionBuilder(DecoratedKey key, Replica source, 
RegularAndStaticColumns columns, EncodingStats stats)
 +        {
-             this.command = command;
-             this.partitionKey = partitionKey;
++            this.key = key;
++            this.source = source;
 +            this.columns = columns;
 +            this.stats = stats;
 +        }
 +
 +        private void setDeletionTime(DeletionTime deletionTime)
 +        {
 +            this.deletionTime = deletionTime;
 +        }
 +
 +        private void addRow(Row row)
 +        {
++            partitionRowsCached++;
++
++            incrementCachedRows();
++
++            // Note that even null rows are counted against the row caching 
limit. The assumption is that
++            // a subsequent protection query will later fetch the row onto 
the heap anyway.
 +            if (row == null)
 +                return;
 +
 +            if (row.isStatic())
 +                staticRow = row;
 +            else
 +                contents.add(row);
 +        }
 +
 +        private void addRangeTombstoneMarker(RangeTombstoneMarker marker)
 +        {
 +            if (marker != null)
 +                contents.add(marker);
 +        }
 +
-         private UnfilteredRowIterator build()
++        private void addToFetch(Row row)
++        {
++            if (toFetch == null)
++                toFetch = BTreeSet.builder(command.metadata().comparator);
++
++            // Note that for static, we shouldn't add the clustering to the 
clustering set (the
++            // ClusteringIndexNamesFilter we'll build from this later does 
not expect it), but the fact
++            // we created a builder in the first place will act as a marker 
that the static row must be
++            // fetched, even if no other rows are added for this partition.
++            if (!row.isStatic())
++                toFetch.add(row.clustering());
++        }
++
++        private UnfilteredRowIterator originalPartition()
 +        {
 +            return new UnfilteredRowIterator()
 +            {
-                 final Iterator<Unfiltered> iterator = contents.iterator();
- 
 +                @Override
 +                public DeletionTime partitionLevelDeletion()
 +                {
 +                    return deletionTime;
 +                }
 +
 +                @Override
 +                public EncodingStats stats()
 +                {
 +                    return stats;
 +                }
 +
 +                @Override
 +                public TableMetadata metadata()
 +                {
 +                    return command.metadata();
 +                }
 +
 +                @Override
 +                public boolean isReverseOrder()
 +                {
 +                    return command.isReversed();
 +                }
 +
 +                @Override
 +                public RegularAndStaticColumns columns()
 +                {
 +                    return columns;
 +                }
 +
 +                @Override
 +                public DecoratedKey partitionKey()
 +                {
-                     return partitionKey;
++                    return key;
 +                }
 +
 +                @Override
 +                public Row staticRow()
 +                {
 +                    return staticRow;
 +                }
 +
 +                @Override
 +                public void close()
 +                {
-                     // nothing to do here
++                    releaseCachedRows(partitionRowsCached);
 +                }
 +
 +                @Override
 +                public boolean hasNext()
 +                {
-                     return iterator.hasNext();
++                    return !contents.isEmpty();
 +                }
 +
 +                @Override
 +                public Unfiltered next()
 +                {
-                     return iterator.next();
++                    return contents.poll();
 +                }
 +            };
 +        }
++
++        private UnfilteredRowIterator protectedPartition()
++        {
++            UnfilteredRowIterator original = originalPartition();
++
++            if (toFetch != null)
++            {
++                try (UnfilteredPartitionIterator partitions = 
fetchFromSource())
++                {
++                    if (partitions.hasNext())
++                    {
++                        try (UnfilteredRowIterator fetchedRows = 
partitions.next())
++                        {
++                            return 
UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows));
++                        }
++                    }
++                }
++            }
++
++            return original;
++        }
++
++        private UnfilteredPartitionIterator fetchFromSource()
++        {
++            assert toFetch != null;
++
++            NavigableSet<Clustering> clusterings = toFetch.build();
++            tableMetrics.replicaFilteringProtectionRequests.mark();
++
++            if (logger.isTraceEnabled())
++                logger.trace("Requesting rows {} in partition {} from {} for 
replica filtering protection",
++                             clusterings, key, source);
++
++            Tracing.trace("Requesting {} rows in partition {} from {} for 
replica filtering protection",
++                          clusterings.size(), key, source);
++
++            // build the read command taking into account that we could be 
requesting only in the static row
++            DataLimits limits = clusterings.isEmpty() ? 
DataLimits.cqlLimits(1) : DataLimits.NONE;
++            ClusteringIndexFilter filter = new 
ClusteringIndexNamesFilter(clusterings, command.isReversed());
++            SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
++                                                                              
 command.nowInSec(),
++                                                                              
 command.columnFilter(),
++                                                                              
 RowFilter.NONE,
++                                                                              
 limits,
++                                                                              
 key,
++                                                                              
 filter);
++
++            ReplicaPlan.ForTokenRead replicaPlan = 
ReplicaPlans.forSingleReplicaRead(keyspace, key.getToken(), source);
++            ReplicaPlan.SharedForTokenRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
++
++            try
++            {
++                return executeReadCommand(cmd, source, sharedReplicaPlan);
++            }
++            catch (ReadTimeoutException e)
++            {
++                int blockFor = consistency.blockFor(keyspace);
++                throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
++            }
++            catch (UnavailableException e)
++            {
++                int blockFor = consistency.blockFor(keyspace);
++                throw UnavailableException.create(consistency, blockFor, 
blockFor - 1);
++            }
++        }
 +    }
 +}
diff --cc 
src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 59edc5a,0000000..42676b6
mode 100644,000000..100644
--- 
a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@@ -1,199 -1,0 +1,207 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.locator.ReplicaPlans;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.DataRange;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.PartitionPosition;
 +import org.apache.cassandra.db.PartitionRangeReadCommand;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.db.transform.MorePartitions;
 +import org.apache.cassandra.db.transform.MoreRows;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.dht.AbstractBounds;
 +import org.apache.cassandra.dht.ExcludingBounds;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +public class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowIterator> implements 
MorePartitions<UnfilteredPartitionIterator>
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(ShortReadPartitionsProtection.class);
 +    private final ReadCommand command;
 +    private final Replica source;
 +
++    private final Runnable preFetchCallback; // called immediately before 
fetching more contents
++
 +    private final DataLimits.Counter singleResultCounter; // unmerged 
per-source counter
 +    private final DataLimits.Counter mergedResultCounter; // merged 
end-result counter
 +
 +    private DecoratedKey lastPartitionKey; // key of the last observed 
partition
 +
 +    private boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
 +
 +    private final long queryStartNanoTime;
 +
-     public ShortReadPartitionsProtection(ReadCommand command, Replica source,
++    public ShortReadPartitionsProtection(ReadCommand command,
++                                         Replica source,
++                                         Runnable preFetchCallback,
 +                                         DataLimits.Counter 
singleResultCounter,
 +                                         DataLimits.Counter 
mergedResultCounter,
 +                                         long queryStartNanoTime)
 +    {
 +        this.command = command;
 +        this.source = source;
++        this.preFetchCallback = preFetchCallback;
 +        this.singleResultCounter = singleResultCounter;
 +        this.mergedResultCounter = mergedResultCounter;
 +        this.queryStartNanoTime = queryStartNanoTime;
 +    }
 +
 +    @Override
 +    public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator 
partition)
 +    {
 +        partitionsFetched = true;
 +
 +        lastPartitionKey = partition.partitionKey();
 +
 +        /*
 +         * Extend for moreContents() then apply protection to track 
lastClustering by applyToRow().
 +         *
 +         * If we don't apply the transformation *after* extending the 
partition with MoreRows,
 +         * applyToRow() method of protection will not be called on the first 
row of the new extension iterator.
 +         */
 +        ReplicaPlan.ForTokenRead replicaPlan = 
ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), 
partition.partitionKey().getToken(), source);
 +        ReplicaPlan.SharedForTokenRead sharedReplicaPlan = 
ReplicaPlan.shared(replicaPlan);
 +        ShortReadRowsProtection protection = new 
ShortReadRowsProtection(partition.partitionKey(),
 +                                                                         
command, source,
 +                                                                         
(cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
 +                                                                         
singleResultCounter,
 +                                                                         
mergedResultCounter);
 +        return Transformation.apply(MoreRows.extend(partition, protection), 
protection);
 +    }
 +
 +    /*
 +     * We only get here once all the rows and partitions in this iterator 
have been iterated over, and so
 +     * if the node had returned the requested number of rows but we still get 
here, then some results were
 +     * skipped during reconciliation.
 +     */
 +    public UnfilteredPartitionIterator moreContents()
 +    {
 +        // never try to request additional partitions from replicas if our 
reconciled partitions are already filled to the limit
 +        assert !mergedResultCounter.isDone();
 +
 +        // we do not apply short read protection when we have no limits at all
 +        assert !command.limits().isUnlimited();
 +
 +        /*
 +         * If this is a single partition read command or an (indexed) 
partition range read command with
 +         * a partition key specified, then we can't and shouldn't try fetch 
more partitions.
 +         */
 +        assert !command.isLimitedToOnePartition();
 +
 +        /*
 +         * If the returned result doesn't have enough rows/partitions to 
satisfy even the original limit, don't ask for more.
 +         *
 +         * Can only take the short cut if there is no per partition limit 
set. Otherwise it's possible to hit false
 +         * positives due to some rows being uncounted for in certain 
scenarios (see CASSANDRA-13911).
 +         */
 +        if (!singleResultCounter.isDone() && 
command.limits().perPartitionCount() == DataLimits.NO_LIMIT)
 +            return null;
 +
 +        /*
 +         * Either we had an empty iterator as the initial response, or our 
moreContents() call got us an empty iterator.
 +         * There is no point to ask the replica for more rows - it has no 
more in the requested range.
 +         */
 +        if (!partitionsFetched)
 +            return null;
 +        partitionsFetched = false;
 +
 +        /*
 +         * We are going to fetch one partition at a time for thrift and 
potentially more for CQL.
 +         * The row limit will either be set to the per partition limit - if 
the command has no total row limit set, or
 +         * the total # of rows remaining - if it has some. If we don't grab 
enough rows in some of the partitions,
 +         * then future ShortReadRowsProtection.moreContents() calls will 
fetch the missing ones.
 +         */
 +        int toQuery = command.limits().count() != DataLimits.NO_LIMIT
 +                      ? command.limits().count() - 
counted(mergedResultCounter)
 +                      : command.limits().perPartitionCount();
 +
 +        
ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark();
 +        Tracing.trace("Requesting {} extra rows from {} for short read 
protection", toQuery, source);
 +        logger.info("Requesting {} extra rows from {} for short read 
protection", toQuery, source);
 +
++        // If we've arrived here, all responses have been consumed, and we're 
about to request more.
++        preFetchCallback.run();
++
 +        return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery);
 +    }
 +
 +    // Counts the number of rows for regular queries and the number of groups 
for GROUP BY queries
 +    private int counted(DataLimits.Counter counter)
 +    {
 +        return command.limits().isGroupByLimit()
 +               ? counter.rowCounted()
 +               : counter.counted();
 +    }
 +
 +    private UnfilteredPartitionIterator 
makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery)
 +    {
 +        PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command;
 +
 +        DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery);
 +
 +        AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange();
 +        AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight()
 +                                                      ? new 
Range<>(lastPartitionKey, bounds.right)
 +                                                      : new 
ExcludingBounds<>(lastPartitionKey, bounds.right);
 +        DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
 +
 +        ReplicaPlan.ForRangeRead replicaPlan = 
ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), 
cmd.dataRange().keyRange(), source, 1);
 +        return 
executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), 
ReplicaPlan.shared(replicaPlan));
 +    }
 +
 +    private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
 +    UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, 
ReplicaPlan.Shared<E, P> replicaPlan)
 +    {
 +        DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, 
(NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
 +        ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, 
replicaPlan, queryStartNanoTime);
 +
 +        if (source.isSelf())
 +        {
 +            Stage.READ.maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
 +        }
 +        else
 +        {
 +            if (source.isTransient())
 +                cmd = cmd.copyAsTransientQuery(source);
 +            
MessagingService.instance().sendWithCallback(cmd.createMessage(false), 
source.endpoint(), handler);
 +        }
 +
 +        // We don't call handler.get() because we want to preserve tombstones 
since we're still in the middle of merging node results.
 +        handler.awaitResults();
 +        assert resolver.getMessages().size() == 1;
 +        return resolver.getMessages().get(0).payload.makeIterator(command);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
index 1a454f9,0000000..a1bdc0e
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java
@@@ -1,77 -1,0 +1,86 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads;
 +
 +import java.net.InetAddress;
 +
 +
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.db.transform.MorePartitions;
 +import org.apache.cassandra.db.transform.Transformation;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.locator.Replica;
 +
 +/**
 + * We have a potential short read if the result from a given node contains 
the requested number of rows
 + * (i.e. it has stopped returning results due to the limit), but some of them 
haven't
 + * made it into the final post-reconciliation result due to other nodes' row, 
range, and/or partition tombstones.
 + *
 + * If that is the case, then that node may have more rows that we should 
fetch, as otherwise we could
 + * ultimately return fewer rows than required. Also, those additional rows 
may contain tombstones which
 + * which we also need to fetch as they may shadow rows or partitions from 
other replicas' results, which we would
 + * otherwise return incorrectly.
 + */
 +public class ShortReadProtection
 +{
 +    @SuppressWarnings("resource")
-     public static UnfilteredPartitionIterator extend(Replica source, 
UnfilteredPartitionIterator partitions,
-                                                      ReadCommand command, 
DataLimits.Counter mergedResultCounter,
-                                                      long queryStartNanoTime, 
boolean enforceStrictLiveness)
++    public static UnfilteredPartitionIterator extend(Replica source,
++                                                     Runnable 
preFetchCallback,
++                                                     
UnfilteredPartitionIterator partitions,
++                                                     ReadCommand command,
++                                                     DataLimits.Counter 
mergedResultCounter,
++                                                     long queryStartNanoTime,
++                                                     boolean 
enforceStrictLiveness)
 +    {
 +        DataLimits.Counter singleResultCounter = 
command.limits().newCounter(command.nowInSec(),
 +                                                                             
false,
 +                                                                             
command.selectsFullPartition(),
 +                                                                             
enforceStrictLiveness).onlyCount();
 +
-         ShortReadPartitionsProtection protection = new 
ShortReadPartitionsProtection(command, source, singleResultCounter, 
mergedResultCounter, queryStartNanoTime);
++        ShortReadPartitionsProtection protection = new 
ShortReadPartitionsProtection(command,
++                                                                              
       source,
++                                                                              
       preFetchCallback,
++                                                                              
       singleResultCounter,
++                                                                              
       mergedResultCounter,
++                                                                              
       queryStartNanoTime);
 +
 +        /*
 +         * The order of extention and transformations is important here. 
Extending with more partitions has to happen
 +         * first due to the way BaseIterator.hasMoreContents() works: only 
transformations applied after extension will
 +         * be called on the first partition of the extended iterator.
 +         *
 +         * Additionally, we want singleResultCounter to be applied after 
SRPP, so that its applyToPartition() method will
 +         * be called last, after the extension done by 
SRRP.applyToPartition() call. That way we preserve the same order
 +         * when it comes to calling SRRP.moreContents() and applyToRow() 
callbacks.
 +         *
 +         * See ShortReadPartitionsProtection.applyToPartition() for more 
details.
 +         */
 +
 +        // extend with moreContents() only if it's a range read command with 
no partition key specified
 +        if (!command.isLimitedToOnePartition())
 +            partitions = MorePartitions.extend(partitions, protection);     
// register SRPP.moreContents()
 +
 +        partitions = Transformation.apply(partitions, protection);          
// register SRPP.applyToPartition()
 +        partitions = Transformation.apply(partitions, singleResultCounter); 
// register the per-source counter
 +
 +        return partitions;
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 2f82c22,0000000..b65f3fc
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@@ -1,83 -1,0 +1,82 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service.reads.repair;
 +
 +import java.util.Map;
 +import java.util.function.Consumer;
 +
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.Mutation;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.locator.Endpoints;
 +import org.apache.cassandra.locator.Replica;
 +import org.apache.cassandra.locator.ReplicaPlan;
 +import org.apache.cassandra.service.reads.DigestResolver;
 +
 +/**
 + * Bypasses the read repair path for short read protection and testing
 + */
- public class NoopReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>>
-         implements ReadRepair<E, P>
++public class NoopReadRepair<E extends Endpoints<E>, P extends 
ReplicaPlan.ForRead<E>> implements ReadRepair<E, P>
 +{
 +    public static final NoopReadRepair instance = new NoopReadRepair();
 +
 +    private NoopReadRepair() {}
 +
 +    @Override
 +    public UnfilteredPartitionIterators.MergeListener getMergeListener(P 
replicas)
 +    {
 +        return UnfilteredPartitionIterators.MergeListener.NOOP;
 +    }
 +
 +    @Override
 +    public void startRepair(DigestResolver<E, P> digestResolver, 
Consumer<PartitionIterator> resultConsumer)
 +    {
 +        resultConsumer.accept(digestResolver.getData());
 +    }
 +
 +    public void awaitReads() throws ReadTimeoutException
 +    {
 +    }
 +
 +    @Override
 +    public void maybeSendAdditionalReads()
 +    {
 +
 +    }
 +
 +    @Override
 +    public void maybeSendAdditionalWrites()
 +    {
 +
 +    }
 +
 +    @Override
 +    public void awaitWrites()
 +    {
 +
 +    }
 +
 +    @Override
 +    public void repairPartition(DecoratedKey partitionKey, Map<Replica, 
Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
 +    {
 +
 +    }
 +}
diff --cc src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index 7adb33b,15afdbe..3ebbdce
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@@ -18,9 -18,6 +18,8 @@@
  */
  package org.apache.cassandra.utils.concurrent;
  
 +import java.util.AbstractCollection;
 +import java.util.Collection;
- import java.util.Arrays;
  import java.util.Iterator;
  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
  
@@@ -139,27 -136,8 +138,27 @@@ public class Accumulator<E
          return (E) values[i];
      }
  
 +    public Collection<E> snapshot()
 +    {
 +        int count = presentCount;
 +        return new AbstractCollection<E>()
 +        {
 +            @Override
 +            public Iterator<E> iterator()
 +            {
 +                return Accumulator.this.iterator(count);
 +            }
 +
 +            @Override
 +            public int size()
 +            {
 +                return count;
 +            }
 +        };
 +    }
 +
      /**
-      * Removes all of the elements from this accumulator.
+      * Removes element at the speficied index from this accumulator.
       *
       * This method is not thread-safe when used concurrently with {@link 
#add(Object)}.
       */
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 2ee209d,c449fdd..0b6fe6e
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -98,10 -104,13 +104,14 @@@ public class Coordinator implements ICo
                                                                   
Integer.MAX_VALUE,
                                                                   null,
                                                                   null,
 -                                                                 
ProtocolVersion.CURRENT),
 +                                                                 
ProtocolVersion.V4,
 +                                                                 null),
                                               System.nanoTime());
  
+         // Collect warnings reported during the query.
+         if (res != null)
+             res.setWarnings(ClientWarn.instance.getWarnings());
+ 
          return RowUtil.toQueryResult(res);
      }
  
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
index 0000000,2a847bf..f891dfe
mode 000000,100644..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
@@@ -1,0 -1,244 +1,244 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import java.io.IOException;
+ import java.util.List;
+ 
+ import org.junit.AfterClass;
+ import org.junit.BeforeClass;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.distributed.Cluster;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.distributed.api.SimpleQueryResult;
+ import org.apache.cassandra.exceptions.OverloadedException;
+ import org.apache.cassandra.service.StorageService;
+ 
+ import static 
org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_FAIL_THRESHOLD;
+ import static 
org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_WARN_THRESHOLD;
+ import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+ import static org.junit.Assert.assertEquals;
+ 
+ /**
 - * Exercises the functionality of {@link 
org.apache.cassandra.service.ReplicaFilteringProtection}, the
++ * Exercises the functionality of {@link 
org.apache.cassandra.service.reads.ReplicaFilteringProtection}, the
+  * mechanism that ensures distributed index and filtering queries at read 
consistency levels > ONE/LOCAL_ONE
+  * avoid stale replica results.
+  */
+ public class ReplicaFilteringProtectionTest extends TestBaseImpl
+ {
+     private static final int REPLICAS = 2;
+     private static final int ROWS = 3;
+ 
+     private static Cluster cluster;
+ 
+     @BeforeClass
+     public static void setup() throws IOException
+     {
+         cluster = init(Cluster.build()
+                               .withNodes(REPLICAS)
+                               .withConfig(config -> 
config.set("hinted_handoff_enabled", false)
+                                                           
.set("commitlog_sync", "batch")
+                                                           .set("num_tokens", 
1)).start());
+ 
+         // Make sure we start w/ the correct defaults:
+         cluster.get(1).runOnInstance(() -> 
assertEquals(DEFAULT_WARN_THRESHOLD, 
StorageService.instance.getCachedReplicaRowsWarnThreshold()));
+         cluster.get(1).runOnInstance(() -> 
assertEquals(DEFAULT_FAIL_THRESHOLD, 
StorageService.instance.getCachedReplicaRowsFailThreshold()));
+     }
+ 
+     @AfterClass
+     public static void teardown()
+     {
+         cluster.close();
+     }
+ 
+     @Test
+     public void testMissedUpdatesBelowCachingWarnThreshold()
+     {
+         String tableName = "missed_updates_no_warning";
+         cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " 
(k int PRIMARY KEY, v text)"));
+ 
+         // The warning threshold provided is one more than the total number 
of rows returned
+         // to the coordinator from all replicas and therefore should not be 
triggered.
+         testMissedUpdates(tableName, REPLICAS * ROWS, Integer.MAX_VALUE, 
false);
+     }
+ 
+     @Test
+     public void testMissedUpdatesAboveCachingWarnThreshold()
+     {
+         String tableName = "missed_updates_cache_warn";
+         cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " 
(k int PRIMARY KEY, v text)"));
+ 
+         // The warning threshold provided is one less than the total number 
of rows returned
+         // to the coordinator from all replicas and therefore should be 
triggered but not fail the query.
+         testMissedUpdates(tableName, REPLICAS * ROWS - 1, Integer.MAX_VALUE, 
true);
+     }
+ 
+     @Test
+     public void testMissedUpdatesAroundCachingFailThreshold()
+     {
+         String tableName = "missed_updates_cache_fail";
+         cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " 
(k int PRIMARY KEY, v text)"));
+ 
+         // The failure threshold provided is exactly the total number of rows 
returned
+         // to the coordinator from all replicas and therefore should just 
warn.
+         testMissedUpdates(tableName, 1, REPLICAS * ROWS, true);
+ 
+         try
+         {
+             // The failure threshold provided is one less than the total 
number of rows returned
+             // to the coordinator from all replicas and therefore should fail 
the query.
+             testMissedUpdates(tableName, 1, REPLICAS * ROWS - 1, true);
+         }
+         catch (RuntimeException e)
+         {
 -            assertEquals(e.getCause().getClass().getName(), 
OverloadedException.class.getName());
++            assertEquals(e.getClass().getName(), 
OverloadedException.class.getName());
+         }
+     }
+ 
+     private void testMissedUpdates(String tableName, int warnThreshold, int 
failThreshold, boolean shouldWarn)
+     {
+         cluster.get(1).runOnInstance(() -> 
StorageService.instance.setCachedReplicaRowsWarnThreshold(warnThreshold));
+         cluster.get(1).runOnInstance(() -> 
StorageService.instance.setCachedReplicaRowsFailThreshold(failThreshold));
+ 
+         String fullTableName = KEYSPACE + '.' + tableName;
+ 
+         // Case 1: Insert and query rows at ALL to verify base line.
+         for (int i = 0; i < ROWS; i++)
+         {
+             cluster.coordinator(1).execute("INSERT INTO " + fullTableName + 
"(k, v) VALUES (?, 'old')", ALL, i);
+         }
+ 
+         long histogramSampleCount = rowsCachedPerQueryCount(cluster.get(1), 
tableName);
 -        
++
+         String query = "SELECT * FROM " + fullTableName + " WHERE v = ? LIMIT 
? ALLOW FILTERING";
+ 
+         Object[][] initialRows = cluster.coordinator(1).execute(query, ALL, 
"old", ROWS);
+         assertRows(initialRows, row(1, "old"), row(0, "old"), row(2, "old"));
+ 
+         // Make sure only one sample was recorded for the query.
+         assertEquals(histogramSampleCount + 1, 
rowsCachedPerQueryCount(cluster.get(1), tableName));
+ 
+         // Case 2: Update all rows on only one replica, leaving the entire 
dataset of the remaining replica out-of-date.
+         updateAllRowsOn(1, fullTableName, "new");
+ 
+         // The replica that missed the results creates a mismatch at every 
row, and we therefore cache a version
+         // of that row for all replicas.
+         SimpleQueryResult oldResult = 
cluster.coordinator(1).executeWithResult(query, ALL, "old", ROWS);
+         assertRows(oldResult.toObjectArrays());
+         verifyWarningState(shouldWarn, oldResult);
+ 
+         // We should have made 3 row "completion" requests.
+         assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName));
+ 
 -        // In all cases above, the queries should be caching 1 row per 
partition per replica, but 
++        // In all cases above, the queries should be caching 1 row per 
partition per replica, but
+         // 6 for the whole query, given every row is potentially stale.
+         assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), 
tableName));
+ 
+         // Make sure only one more sample was recorded for the query.
+         assertEquals(histogramSampleCount + 2, 
rowsCachedPerQueryCount(cluster.get(1), tableName));
+ 
+         // Case 3: Observe the effects of blocking read-repair.
 -        
++
+         // The previous query peforms a blocking read-repair, which removes 
replica divergence. This
+         // will only warn, therefore, if the warning threshold is actually 
below the number of replicas.
+         // (i.e. The row cache counter is decremented/reset as each partition 
is consumed.)
+         SimpleQueryResult newResult = 
cluster.coordinator(1).executeWithResult(query, ALL, "new", ROWS);
+         Object[][] newRows = newResult.toObjectArrays();
+         assertRows(newRows, row(1, "new"), row(0, "new"), row(2, "new"));
 -        
++
+         verifyWarningState(warnThreshold < REPLICAS, newResult);
 -        
++
+         // We still sould only have made 3 row "completion" requests, with no 
replica divergence in the last query.
+         assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName));
+ 
+         // With no replica divergence, we only cache a single partition at a 
time across 2 replicas.
+         assertEquals(REPLICAS, minRowsCachedPerQuery(cluster.get(1), 
tableName));
+ 
+         // Make sure only one more sample was recorded for the query.
+         assertEquals(histogramSampleCount + 3, 
rowsCachedPerQueryCount(cluster.get(1), tableName));
+ 
+         // Case 4: Introduce another mismatch by updating all rows on only 
one replica.
 -        
++
+         updateAllRowsOn(1, fullTableName, "future");
+ 
+         // Another mismatch is introduced, and we once again cache a version 
of each row during resolution.
+         SimpleQueryResult futureResult = 
cluster.coordinator(1).executeWithResult(query, ALL, "future", ROWS);
+         Object[][] futureRows = futureResult.toObjectArrays();
+         assertRows(futureRows, row(1, "future"), row(0, "future"), row(2, 
"future"));
 -        
++
+         verifyWarningState(shouldWarn, futureResult);
+ 
+         // We sould have made 3 more row "completion" requests.
+         assertEquals(ROWS * 2, protectionQueryCount(cluster.get(1), 
tableName));
+ 
 -        // In all cases above, the queries should be caching 1 row per 
partition, but 6 for the 
++        // In all cases above, the queries should be caching 1 row per 
partition, but 6 for the
+         // whole query, given every row is potentially stale.
+         assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), 
tableName));
+ 
+         // Make sure only one more sample was recorded for the query.
+         assertEquals(histogramSampleCount + 4, 
rowsCachedPerQueryCount(cluster.get(1), tableName));
+     }
 -    
++
+     private void updateAllRowsOn(int node, String table, String value)
+     {
+         for (int i = 0; i < ROWS; i++)
+         {
+             cluster.get(node).executeInternal("UPDATE " + table + " SET v = ? 
WHERE k = ?", value, i);
+         }
+     }
 -    
++
+     private void verifyWarningState(boolean shouldWarn, SimpleQueryResult 
futureResult)
+     {
+         List<String> futureWarnings = futureResult.warnings();
+         assertEquals(shouldWarn, futureWarnings.stream().anyMatch(w -> 
w.contains("cached_replica_rows_warn_threshold")));
+         assertEquals(shouldWarn ? 1 : 0, futureWarnings.size());
+     }
+ 
+     private long protectionQueryCount(IInvokableInstance instance, String 
tableName)
+     {
+         return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                      
.getColumnFamilyStore(tableName)
+                                                      
.metric.replicaFilteringProtectionRequests.getCount());
+     }
+ 
+     private long maxRowsCachedPerQuery(IInvokableInstance instance, String 
tableName)
+     {
+         return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                      
.getColumnFamilyStore(tableName)
+                                                      
.metric.rfpRowsCachedPerQuery.getSnapshot().getMax());
+     }
+ 
+     private long minRowsCachedPerQuery(IInvokableInstance instance, String 
tableName)
+     {
+         return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                      
.getColumnFamilyStore(tableName)
+                                                      
.metric.rfpRowsCachedPerQuery.getSnapshot().getMin());
+     }
+ 
+     private long rowsCachedPerQueryCount(IInvokableInstance instance, String 
tableName)
+     {
+         return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+                                                      
.getColumnFamilyStore(tableName)
+                                                      
.metric.rfpRowsCachedPerQuery.getCount());
+     }
 -}
++}
diff --cc test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
index 9b34a23,ce0fe27..da5bd32
--- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
@@@ -105,26 -105,26 +105,26 @@@ public class AccumulatorTes
          accu.add("2");
          accu.add("3");
  
-         accu.clearUnsafe();
+         accu.clearUnsafe(1);
  
-         assertEquals(0, accu.size());
-         assertFalse(accu.snapshot().iterator().hasNext());
-         assertOutOfBonds(accu, 0);
+         assertEquals(3, accu.size());
 -        assertTrue(accu.iterator().hasNext());
++        assertTrue(accu.snapshot().iterator().hasNext());
  
          accu.add("4");
          accu.add("5");
  
-         assertEquals(2, accu.size());
+         assertEquals(5, accu.size());
  
-         assertEquals("4", accu.get(0));
-         assertEquals("5", accu.get(1));
-         assertOutOfBonds(accu, 2);
+         assertEquals("4", accu.get(3));
+         assertEquals("5", accu.get(4));
+         assertOutOfBonds(accu, 5);
  
 -        Iterator<String> iter = accu.iterator();
 +        Iterator<String> iter = accu.snapshot().iterator();
          assertTrue(iter.hasNext());
-         assertEquals("4", iter.next());
-         assertEquals("5", iter.next());
-         assertFalse(iter.hasNext());
+         assertEquals("1", iter.next());
+         assertNull(iter.next());
+         assertTrue(iter.hasNext());
+         assertEquals("3", iter.next());
      }
  
      private static void assertOutOfBonds(Accumulator<String> accumulator, int 
index)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to