This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new dd255ff  Fix replica-side filtering returning stale data with CL > 1
dd255ff is described below

commit dd255ffa07d0263521a1ca863fc2192db19bc04c
Author: Andrés de la Peña <[email protected]>
AuthorDate: Wed May 27 11:01:42 2020 +0100

    Fix replica-side filtering returning stale data with CL > 1
    
    patch by Andres de la Peña; reviewed by Benjamin Lerer, Caleb Rackliffe and 
ZhaoYang for CASSANDRA-8272
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/db/DataRange.java    |  10 +
 .../cassandra/db/PartitionRangeReadCommand.java    |   5 +
 src/java/org/apache/cassandra/db/ReadCommand.java  |   7 +
 .../cassandra/db/SinglePartitionReadCommand.java   |   5 +
 .../db/compaction/CompactionIterator.java          |   3 +-
 .../org/apache/cassandra/db/filter/RowFilter.java  |  87 ++--
 .../partitions/UnfilteredPartitionIterators.java   |  30 +-
 .../cassandra/db/rows/UnfilteredRowIterators.java  |  32 +-
 .../org/apache/cassandra/metrics/TableMetrics.java |  18 +-
 .../org/apache/cassandra/service/DataResolver.java | 162 +++++--
 .../service/ReplicaFilteringProtection.java        | 465 +++++++++++++++++++++
 .../cassandra/utils/concurrent/Accumulator.java    |  13 +
 .../utils/concurrent/AccumulatorTest.java          |  54 ++-
 14 files changed, 800 insertions(+), 92 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 46b3f56..ff00579 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.21
+ * Fix replica-side filtering returning stale data with CL > ONE 
(CASSANDRA-8272, CASSANDRA-8273)
  * Fix duplicated row on 2.x upgrades when multi-rows range tombstones 
interact with collection ones (CASSANDRA-15805)
  * Rely on snapshotted session infos on StreamResultFuture.maybeComplete to 
avoid race conditions (CASSANDRA-15667)
  * EmptyType doesn't override writeValue so could attempt to write bytes when 
expected not to (CASSANDRA-15790)
diff --git a/src/java/org/apache/cassandra/db/DataRange.java 
b/src/java/org/apache/cassandra/db/DataRange.java
index d2f9c76..f6776c4 100644
--- a/src/java/org/apache/cassandra/db/DataRange.java
+++ b/src/java/org/apache/cassandra/db/DataRange.java
@@ -196,6 +196,16 @@ public class DataRange
     }
 
     /**
+     * Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+     *
+     * @return whether the underlying {@code ClusteringIndexFilter} is 
reversed or not.
+     */
+    public boolean isReversed()
+    {
+        return clusteringIndexFilter.isReversed();
+    }
+
+    /**
      * The clustering index filter to use for the provided key.
      * <p>
      * This may or may not be the same filter for all keys (that is, paging 
range
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 4f936cc..1da66c1 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -206,6 +206,11 @@ public class PartitionRangeReadCommand extends ReadCommand
         return DatabaseDescriptor.getRangeRpcTimeout();
     }
 
+    public boolean isReversed()
+    {
+        return dataRange.isReversed();
+    }
+
     public boolean selectsKey(DecoratedKey key)
     {
         if (!dataRange().contains(key))
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java 
b/src/java/org/apache/cassandra/db/ReadCommand.java
index b499daf..39a5402 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -330,6 +330,13 @@ public abstract class ReadCommand implements ReadQuery
 
     protected abstract int oldestUnrepairedTombstone();
 
+    /**
+     * Whether the underlying {@code ClusteringIndexFilter} is reversed or not.
+     *
+     * @return whether the underlying {@code ClusteringIndexFilter} is 
reversed or not.
+     */
+    public abstract boolean isReversed();
+
     public ReadResponse createResponse(UnfilteredPartitionIterator iterator)
     {
         // validate that the sequence of RT markers is correct: open is 
followed by close, deletion times for both
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 2e014ba..841c3b9 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -415,6 +415,11 @@ public class SinglePartitionReadCommand extends ReadCommand
         return DatabaseDescriptor.getReadRpcTimeout();
     }
 
+    public boolean isReversed()
+    {
+        return clusteringIndexFilter.isReversed();
+    }
+
     public boolean selectsKey(DecoratedKey key)
     {
         if (!this.partitionKey().equals(key))
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
index b132d90..8c4732b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
@@ -199,11 +199,12 @@ public class CompactionIterator extends 
CompactionInfo.Holder implements Unfilte
                     {
                     }
 
-                    public void onMergedRows(Row merged, Row[] versions)
+                    public Row onMergedRows(Row merged, Row[] versions)
                     {
                         indexTransaction.start();
                         indexTransaction.onRowMerge(merged, versions);
                         indexTransaction.commit();
+                        return merged;
                     }
 
                     public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker mergedMarker, 
RangeTombstoneMarker[] versions)
diff --git a/src/java/org/apache/cassandra/db/filter/RowFilter.java 
b/src/java/org/apache/cassandra/db/filter/RowFilter.java
index 4300651..774e4d3 100644
--- a/src/java/org/apache/cassandra/db/filter/RowFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/RowFilter.java
@@ -126,6 +126,8 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
         return false;
     }
 
+    protected abstract Transformation<BaseRowIterator<?>> filter(CFMetaData 
metadata, int nowInSec);
+
     /**
      * Filters the provided iterator so that only the row satisfying the 
expression of this filter
      * are included in the resulting iterator.
@@ -134,7 +136,23 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
      * @param nowInSec the time of query in seconds.
      * @return the filtered iterator.
      */
-    public abstract UnfilteredPartitionIterator 
filter(UnfilteredPartitionIterator iter, int nowInSec);
+    public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, int nowInSec)
+    {
+        return expressions.isEmpty() ? iter : Transformation.apply(iter, 
filter(iter.metadata(), nowInSec));
+    }
+
+    /**
+     * Filters the provided iterator so that only the row satisfying the 
expression of this filter
+     * are included in the resulting iterator.
+     *
+     * @param iter the iterator to filter
+     * @param nowInSec the time of query in seconds.
+     * @return the filtered iterator.
+     */
+    public PartitionIterator filter(PartitionIterator iter, CFMetaData 
metadata, int nowInSec)
+    {
+        return expressions.isEmpty() ? iter : Transformation.apply(iter, 
filter(metadata, nowInSec));
+    }
 
     /**
      * Whether the provided row in the provided partition satisfies this 
filter.
@@ -263,20 +281,16 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
             super(expressions);
         }
 
-        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, int nowInSec)
+        protected Transformation<BaseRowIterator<?>> filter(CFMetaData 
metadata, int nowInSec)
         {
-            if (expressions.isEmpty())
-                return iter;
-
-            final CFMetaData metadata = iter.metadata();
             long numberOfStaticColumnExpressions = 
expressions.stream().filter(e -> e.column.isStatic()).count();
             final boolean filterStaticColumns = 
numberOfStaticColumnExpressions != 0;
             final boolean filterNonStaticColumns = (expressions.size() - 
numberOfStaticColumnExpressions) > 0;
 
-            class IsSatisfiedFilter extends 
Transformation<UnfilteredRowIterator>
+            return new Transformation<BaseRowIterator<?>>()
             {
                 DecoratedKey pk;
-                public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator partition)
+                protected BaseRowIterator<?> 
applyToPartition(BaseRowIterator<?> partition)
                 {
                     // The filter might be on static columns, so need to check 
static row first.
                     if (filterStaticColumns && 
applyToRow(partition.staticRow()) == null)
@@ -286,7 +300,9 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                     }
 
                     pk = partition.partitionKey();
-                    UnfilteredRowIterator iterator = 
Transformation.apply(partition, this);
+                    BaseRowIterator<?> iterator = partition instanceof 
UnfilteredRowIterator
+                                                  ? 
Transformation.apply((UnfilteredRowIterator) partition, this)
+                                                  : 
Transformation.apply((RowIterator) partition, this);
 
                     if (filterNonStaticColumns && !iterator.hasNext())
                     {
@@ -308,9 +324,7 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
                             return null;
                     return row;
                 }
-            }
-
-            return Transformation.apply(iter, new IsSatisfiedFilter());
+            };
         }
 
         protected RowFilter withNewExpressions(List<Expression> expressions)
@@ -326,36 +340,47 @@ public abstract class RowFilter implements 
Iterable<RowFilter.Expression>
             super(expressions);
         }
 
-        public UnfilteredPartitionIterator filter(UnfilteredPartitionIterator 
iter, final int nowInSec)
+        protected Transformation<BaseRowIterator<?>> filter(CFMetaData 
metadata, int nowInSec)
         {
-            if (expressions.isEmpty())
-                return iter;
-
-            class IsSatisfiedThriftFilter extends 
Transformation<UnfilteredRowIterator>
+            // Thrift does not filter rows, it filters entire partition if any 
of the expression is not
+            // satisfied, which forces us to materialize the result (in theory 
we could materialize only
+            // what we need which might or might not be everything, but we 
keep it simple since in practice
+            // it's not worth that it has ever been).
+            return new Transformation<BaseRowIterator<?>>()
             {
-                @Override
-                public UnfilteredRowIterator 
applyToPartition(UnfilteredRowIterator iter)
+                protected BaseRowIterator<?> 
applyToPartition(BaseRowIterator<?> partition)
+                {
+                    return partition instanceof UnfilteredRowIterator ? 
applyTo((UnfilteredRowIterator) partition)
+                                                                      : 
applyTo((RowIterator) partition);
+                }
+
+                private UnfilteredRowIterator applyTo(UnfilteredRowIterator 
partition)
+                {
+                    ImmutableBTreePartition result = 
ImmutableBTreePartition.create(partition);
+                    partition.close();
+                    return accepts(result) ? result.unfilteredIterator() : 
null;
+                }
+
+                private RowIterator applyTo(RowIterator partition)
                 {
-                    // Thrift does not filter rows, it filters entire 
partition if any of the expression is not
-                    // satisfied, which forces us to materialize the result 
(in theory we could materialize only
-                    // what we need which might or might not be everything, 
but we keep it simple since in practice
-                    // it's not worth that it has ever been).
-                    ImmutableBTreePartition result = 
ImmutableBTreePartition.create(iter);
-                    iter.close();
+                    FilteredPartition result = 
FilteredPartition.create(partition);
+                    return accepts(result) ? result.rowIterator() : null;
+                }
 
+                private boolean accepts(ImmutableBTreePartition result)
+                {
                     // The partition needs to have a row for every expression, 
and the expression needs to be valid.
                     for (Expression expr : expressions)
                     {
                         assert expr instanceof ThriftExpression;
-                        Row row = 
result.getRow(makeCompactClustering(iter.metadata(), expr.column().name.bytes));
-                        if (row == null || 
!expr.isSatisfiedBy(iter.metadata(), iter.partitionKey(), row))
-                            return null;
+                        Row row = 
result.getRow(makeCompactClustering(metadata, expr.column().name.bytes));
+                        if (row == null || !expr.isSatisfiedBy(metadata, 
result.partitionKey(), row))
+                            return false;
                     }
                     // If we get there, it means all expressions where 
satisfied, so return the original result
-                    return result.unfilteredIterator();
+                    return true;
                 }
-            }
-            return Transformation.apply(iter, new IsSatisfiedThriftFilter());
+            };
         }
 
         protected RowFilter withNewExpressions(List<Expression> expressions)
diff --git 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index 3f7d072..bff910e 100644
--- 
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++ 
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -27,6 +27,7 @@ import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.db.transform.FilteredPartitions;
+import org.apache.cassandra.db.transform.MorePartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -47,7 +48,7 @@ public abstract class UnfilteredPartitionIterators
     public interface MergeListener
     {
         public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions);
-        public void close();
+        public default void close() {}
     }
 
     @SuppressWarnings("resource") // The created resources are returned right 
away
@@ -77,6 +78,24 @@ public abstract class UnfilteredPartitionIterators
         return Transformation.apply(toReturn, new Close());
     }
 
+    public static UnfilteredPartitionIterator concat(final 
List<UnfilteredPartitionIterator> iterators)
+    {
+        if (iterators.size() == 1)
+            return iterators.get(0);
+
+        class Extend implements MorePartitions<UnfilteredPartitionIterator>
+        {
+            int i = 1;
+            public UnfilteredPartitionIterator moreContents()
+            {
+                if (i >= iterators.size())
+                    return null;
+                return iterators.get(i++);
+            }
+        }
+        return MorePartitions.extend(iterators.get(0), new Extend());
+    }
+
     public static PartitionIterator filter(final UnfilteredPartitionIterator 
iterator, final int nowInSec)
     {
         return FilteredPartitions.filter(iterator, nowInSec);
@@ -84,7 +103,6 @@ public abstract class UnfilteredPartitionIterators
 
     public static UnfilteredPartitionIterator merge(final List<? extends 
UnfilteredPartitionIterator> iterators, final int nowInSec, final MergeListener 
listener)
     {
-        assert listener != null;
         assert !iterators.isEmpty();
 
         final boolean isForThrift = iterators.get(0).isForThrift();
@@ -109,7 +127,9 @@ public abstract class UnfilteredPartitionIterators
 
             protected UnfilteredRowIterator getReduced()
             {
-                UnfilteredRowIterators.MergeListener rowListener = 
listener.getRowMergeListener(partitionKey, toMerge);
+                UnfilteredRowIterators.MergeListener rowListener = listener == 
null
+                                                                 ? null
+                                                                 : 
listener.getRowMergeListener(partitionKey, toMerge);
 
                 // Replace nulls by empty iterators
                 for (int i = 0; i < toMerge.size(); i++)
@@ -153,7 +173,9 @@ public abstract class UnfilteredPartitionIterators
             public void close()
             {
                 merged.close();
-                listener.close();
+
+                if (listener != null)
+                    listener.close();
             }
         };
     }
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index f42f675..b6dbf82 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -72,12 +72,17 @@ public abstract class UnfilteredRowIterators
          * particular, this may be called in cases where there is no row in 
the merged output (if a source has a row
          * that is shadowed by another source range tombstone or partition 
level deletion).
          *
-         * @param merged the result of the merge. This cannot be {@code null} 
but can be empty, in which case this is a
-         * placeholder for when at least one source has a row, but that row is 
shadowed in the merged output.
+         * @param merged the result of the merge. This cannot be {@code null} 
(so that listener can always access the
+         * clustering from this safely)but can be empty, in which case this is 
a placeholder for when at least one
+         * source has a row, but that row is shadowed in the merged output.
          * @param versions for each source, the row in that source 
corresponding to {@code merged}. This can be
          * {@code null} for some sources if the source has not such row.
+         * @return the row to use as result of the merge (can be {@code 
null}). Most implementations should simply
+         * return {@code merged}, but this allows some implementations to 
impact the merge result if necessary. If this
+         * returns either {@code null} or an empty row, then the row is 
skipped from the merge result. If this returns a
+         * non {@code null} result, then the returned row <b>must</b> have the 
same clustering than {@code merged}.
          */
-        public void onMergedRows(Row merged, Row[] versions);
+        public Row onMergedRows(Row merged, Row[] versions);
 
         /**
          * Called once for every range tombstone marker participating in the 
merge.
@@ -500,9 +505,12 @@ public abstract class UnfilteredRowIterators
             Row merged = merger.merge(partitionDeletion);
             if (merged == null)
                 merged = Rows.EMPTY_STATIC_ROW;
-            if (listener != null)
-                listener.onMergedRows(merged, merger.mergedRows());
-            return merged;
+            if (listener == null)
+                return merged;
+
+            merged = listener.onMergedRows(merged, merger.mergedRows());
+            // Note that onMergedRows can have returned null even though his 
input wasn't null
+            return merged == null ? Rows.EMPTY_STATIC_ROW : merged;
         }
 
         private static PartitionColumns 
collectColumns(List<UnfilteredRowIterator> iterators)
@@ -586,9 +594,15 @@ public abstract class UnfilteredRowIterators
                 if (nextKind == Unfiltered.Kind.ROW)
                 {
                     Row merged = 
rowMerger.merge(markerMerger.activeDeletion());
-                    if (listener != null)
-                        listener.onMergedRows(merged == null ? 
BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, 
rowMerger.mergedRows());
-                    return merged;
+                    if (listener == null)
+                        return merged;
+
+                    merged = listener.onMergedRows(merged == null
+                                                   ? 
BTreeRow.emptyRow(rowMerger.mergedClustering())
+                                                   : merged,
+                                                   rowMerger.mergedRows());
+
+                    return merged == null || merged.isEmpty() ? null : merged;
                 }
                 else
                 {
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index eb56ed9..1f4803e 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -153,6 +153,7 @@ public class TableMetrics
 
     public final Meter readRepairRequests;
     public final Meter shortReadProtectionRequests;
+    public final Meter replicaSideFilteringProtectionRequests;
 
     public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
     /**
@@ -649,8 +650,9 @@ public class TableMetrics
         casPropose = new LatencyMetrics(factory, "CasPropose", 
cfs.keyspace.metric.casPropose);
         casCommit = new LatencyMetrics(factory, "CasCommit", 
cfs.keyspace.metric.casCommit);
 
-        readRepairRequests = 
Metrics.meter(factory.createMetricName("ReadRepairRequests"));
-        shortReadProtectionRequests = 
Metrics.meter(factory.createMetricName("ShortReadProtectionRequests"));
+        readRepairRequests = createTableMeter("ReadRepairRequests");
+        shortReadProtectionRequests = 
createTableMeter("ShortReadProtectionRequests");
+        replicaSideFilteringProtectionRequests = 
createTableMeter("ReplicaSideFilteringProtectionRequests");
     }
 
     public void updateSSTableIterated(int count)
@@ -758,6 +760,18 @@ public class TableMetrics
         return cfCounter;
     }
 
+    private Meter createTableMeter(final String name)
+    {
+        return createTableMeter(name, name);
+    }
+
+    private Meter createTableMeter(final String name, final String alias)
+    {
+        Meter tableMeter = Metrics.meter(factory.createMetricName(name), 
aliasFactory.createMetricName(alias));
+        register(name, alias, tableMeter);
+        return tableMeter;
+    }
+
     /**
      * Create a histogram-like interface that will register both a CF, 
keyspace and global level
      * histogram and forward any updates to both
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java 
b/src/java/org/apache/cassandra/service/DataResolver.java
index 522c57b..02d355e 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.service;
 import java.net.InetAddress;
 import java.util.*;
 import java.util.concurrent.TimeoutException;
+import java.util.function.UnaryOperator;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
@@ -80,17 +81,126 @@ public class DataResolver extends ResponseResolver
 
     public PartitionIterator resolve()
     {
+        if (!needsReplicaFilteringProtection())
+        {
+            ResolveContext context = new ResolveContext(responses.size());
+            return resolveWithReadRepair(context,
+                                         i -> shortReadProtectedResponse(i, 
context),
+                                         UnaryOperator.identity());
+        }
+
+        return resolveWithReplicaFilteringProtection();
+    }
+
+    private boolean needsReplicaFilteringProtection()
+    {
+        return !command.rowFilter().isEmpty();
+    }
+
+    private class ResolveContext
+    {
+        private final InetAddress[] sources;
+        private final DataLimits.Counter mergedResultCounter;
+
+        private ResolveContext(int count)
+        {
+            assert count <= responses.size();
+            this.sources = new InetAddress[count];
+            for (int i = 0; i < count; i++)
+                sources[i] = responses.get(i).from;
+            this.mergedResultCounter = 
command.limits().newCounter(command.nowInSec(),
+                                                                   true,
+                                                                   
command.selectsFullPartition(),
+                                                                   
enforceStrictLiveness);
+        }
+
+        private boolean needShortReadProtection()
+        {
+            // If we have only one result, there is no read repair to do and 
we can't get short reads
+            // Also, so-called "short reads" stems from nodes returning only a 
subset of the results they have for a
+            // partition due to the limit, but that subset not being enough 
post-reconciliation. So if we don't have limit,
+            // don't bother protecting against short reads.
+            return sources.length > 1 && !command.limits().isUnlimited();
+        }
+    }
+
+    @FunctionalInterface
+    private interface ResponseProvider
+    {
+        UnfilteredPartitionIterator getResponse(int i);
+    }
+
+    private UnfilteredPartitionIterator shortReadProtectedResponse(int i, 
ResolveContext context)
+    {
+        UnfilteredPartitionIterator originalResponse = 
responses.get(i).payload.makeIterator(command);
+
+        return context.needShortReadProtection()
+               ? extendWithShortReadProtection(originalResponse, 
context.sources[i], context.mergedResultCounter)
+               : originalResponse;
+    }
+
+    private PartitionIterator resolveWithReadRepair(ResolveContext context,
+                                                    ResponseProvider 
responseProvider,
+                                                    
UnaryOperator<PartitionIterator> preCountFilter)
+    {
+        return resolveInternal(context, new 
RepairMergeListener(context.sources), responseProvider, preCountFilter);
+    }
+
+    private PartitionIterator resolveWithReplicaFilteringProtection()
+    {
+        // Protecting against inconsistent replica filtering (some replica 
returning a row that is outdated but that
+        // wouldn't be removed by normal reconciliation because up-to-date 
replica have filtered the up-to-date version
+        // of that row) works in 3 steps:
+        //   1) we read the full response just to collect rows that may be 
outdated (the ones we got from some
+        //      replica but didn't got any response for other; it could be 
those other replica have filtered a more
+        //      up-to-date result). In doing so, we do not count any of such 
"potentially outdated" row towards the
+        //      query limit. This simulate the worst case scenario where all 
those "potentially outdated" rows are
+        //      indeed outdated, and thus make sure we are guaranteed to read 
enough results (thanks to short read
+        //      protection).
+        //   2) we query all the replica/rows we need to rule out whether 
those "potentially outdated" rows are outdated
+        //      or not.
+        //   3) we re-read cached copies of each replica response using the 
"normal" read path merge with read-repair,
+        //      but where for each replica we use their original response 
_plus_ the additional rows queried in the
+        //      previous step (and apply the command#rowFilter() on the full 
result). Since the first phase has
+        //      pessimistically collected enough results for the case where 
all potentially outdated results are indeed
+        //      outdated, we shouldn't need further short-read protection 
requests during this phase.
+
         // We could get more responses while this method runs, which is ok 
(we're happy to ignore any response not here
         // at the beginning of this method), so grab the response count once 
and use that through the method.
         int count = responses.size();
-        List<UnfilteredPartitionIterator> iters = new ArrayList<>(count);
-        InetAddress[] sources = new InetAddress[count];
+        // We need separate contexts, as each context has his own counter
+        ResolveContext firstPhaseContext = new ResolveContext(count);
+        ResolveContext secondPhaseContext = new ResolveContext(count);
+        ReplicaFilteringProtection rfp = new 
ReplicaFilteringProtection(keyspace, command, consistency, 
firstPhaseContext.sources);
+        PartitionIterator firstPhasePartitions = 
resolveInternal(firstPhaseContext,
+                                                                 
rfp.mergeController(),
+                                                                 i -> 
shortReadProtectedResponse(i, firstPhaseContext),
+                                                                 
UnaryOperator.identity());
+
+        // Consume the first phase partitions to populate the replica 
filtering protection with both those materialized
+        // partitions and the primary keys to be fetched.
+        PartitionIterators.consume(firstPhasePartitions);
+        firstPhasePartitions.close();
+
+        // After reading the entire query results the protection helper should 
have cached all the partitions so we can
+        // clear the responses accumulator for the sake of memory usage, given 
that the second phase might take long if
+        // it needs to query replicas.
+        responses.clearUnsafe();
+
+        return resolveWithReadRepair(secondPhaseContext,
+                                     rfp::queryProtectedPartitions,
+                                     results -> 
command.rowFilter().filter(results, command.metadata(), command.nowInSec()));
+    }
+
+    private PartitionIterator resolveInternal(ResolveContext context,
+                                              
UnfilteredPartitionIterators.MergeListener mergeListener,
+                                              ResponseProvider 
responseProvider,
+                                              UnaryOperator<PartitionIterator> 
preCountFilter)
+    {
+        int count = context.sources.length;
+        List<UnfilteredPartitionIterator> results = new ArrayList<>(count);
         for (int i = 0; i < count; i++)
-        {
-            MessageIn<ReadResponse> msg = responses.get(i);
-            iters.add(msg.payload.makeIterator(command));
-            sources[i] = msg.from;
-        }
+            results.add(responseProvider.getResponse(i));
 
         /*
          * Even though every response, individually, will honor the limit, it 
is possible that we will, after the merge,
@@ -106,36 +216,14 @@ public class DataResolver extends ResponseResolver
          * See CASSANDRA-13747 for more details.
          */
 
-        DataLimits.Counter mergedResultCounter =
-            command.limits().newCounter(command.nowInSec(), true, 
command.selectsFullPartition(), enforceStrictLiveness);
-
-        UnfilteredPartitionIterator merged = 
mergeWithShortReadProtection(iters, sources, mergedResultCounter);
+        UnfilteredPartitionIterator merged = 
UnfilteredPartitionIterators.merge(results, command.nowInSec(), mergeListener);
         FilteredPartitions filtered =
-            FilteredPartitions.filter(merged, new Filter(command.nowInSec(), 
command.metadata().enforceStrictLiveness()));
-        PartitionIterator counted = Transformation.apply(filtered, 
mergedResultCounter);
+        FilteredPartitions.filter(merged, new Filter(command.nowInSec(), 
command.metadata().enforceStrictLiveness()));
+        PartitionIterator counted = 
Transformation.apply(preCountFilter.apply(filtered), 
context.mergedResultCounter);
 
         return command.isForThrift()
-             ? counted
-             : Transformation.apply(counted, new EmptyPartitionsDiscarder());
-    }
-
-    private UnfilteredPartitionIterator 
mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
-                                                                     
InetAddress[] sources,
-                                                                     
DataLimits.Counter mergedResultCounter)
-    {
-        // If we have only one results, there is no read repair to do and we 
can't get short reads
-        if (results.size() == 1)
-            return results.get(0);
-
-        /*
-         * So-called short reads stems from nodes returning only a subset of 
the results they have due to the limit,
-         * but that subset not being enough post-reconciliation. So if we 
don't have a limit, don't bother.
-         */
-        if (!command.limits().isUnlimited())
-            for (int i = 0; i < results.size(); i++)
-                results.set(i, extendWithShortReadProtection(results.get(i), 
sources[i], mergedResultCounter));
-
-        return UnfilteredPartitionIterators.merge(results, command.nowInSec(), 
new RepairMergeListener(sources));
+               ? counted
+               : Transformation.apply(counted, new EmptyPartitionsDiscarder());
     }
 
     private class RepairMergeListener implements 
UnfilteredPartitionIterators.MergeListener
@@ -294,13 +382,13 @@ public class DataResolver extends ResponseResolver
                 }
             }
 
-            public void onMergedRows(Row merged, Row[] versions)
+            public Row onMergedRows(Row merged, Row[] versions)
             {
                 // If a row was shadowed post merged, it must be by a 
partition level or range tombstone, and we handle
                 // those case directly in their respective methods (in other 
words, it would be inefficient to send a row
                 // deletion as repair when we know we've already send a 
partition level or range tombstone that covers it).
                 if (merged.isEmpty())
-                    return;
+                    return merged;
 
                 Rows.diff(diffListener, merged, versions);
                 for (int i = 0; i < currentRows.length; i++)
@@ -309,6 +397,8 @@ public class DataResolver extends ResponseResolver
                         update(i).add(currentRows[i].build());
                 }
                 Arrays.fill(currentRows, null);
+
+                return merged;
             }
 
             private DeletionTime currentDeletion()
diff --git 
a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java 
b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
new file mode 100644
index 0000000..36d51cc
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Columns;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionColumns;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.db.rows.EncodingStats;
+import org.apache.cassandra.db.rows.RangeTombstoneMarker;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.Rows;
+import org.apache.cassandra.db.rows.Unfiltered;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.metrics.TableMetrics;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.btree.BTreeSet;
+
+/**
+ * Helper in charge of collecting additional queries to be done on the 
coordinator to protect against invalid results
+ * being included due to replica-side filtering (secondary indexes or {@code 
ALLOW * FILTERING}).
+ * <p>
+ * When using replica-side filtering with CL>ONE, a replica can send a stale 
result satisfying the filter, while updated
+ * replicas won't send a corresponding tombstone to discard that result during 
reconciliation. This helper identifies
+ * the rows in a replica response that don't have a corresponding row in other 
replica responses, and requests them by
+ * primary key to the "silent" replicas in a second fetch round.
+ * <p>
+ * See CASSANDRA-8272 and CASSANDRA-8273 for further details.
+ */
+class ReplicaFilteringProtection
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(ReplicaFilteringProtection.class);
+
+    private final Keyspace keyspace;
+    private final ReadCommand command;
+    private final ConsistencyLevel consistency;
+    private final InetAddress[] sources;
+    private final TableMetrics tableMetrics;
+
+    /**
+     * Per-source primary keys of the rows that might be outdated so they need 
to be fetched.
+     * For outdated static rows we use an empty builder to signal it has to be 
queried.
+     */
+    private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> 
rowsToFetch;
+
+    /**
+     * Per-source list of all the partitions seen by the merge listener, to be 
merged with the extra fetched rows.
+     */
+    private final List<List<PartitionBuilder>> originalPartitions;
+
+    ReplicaFilteringProtection(Keyspace keyspace,
+                               ReadCommand command,
+                               ConsistencyLevel consistency,
+                               InetAddress[] sources)
+    {
+        this.keyspace = keyspace;
+        this.command = command;
+        this.consistency = consistency;
+        this.sources = sources;
+        this.rowsToFetch = new ArrayList<>(sources.length);
+        this.originalPartitions = new ArrayList<>(sources.length);
+
+        for (InetAddress ignored : sources)
+        {
+            rowsToFetch.add(new TreeMap<>());
+            originalPartitions.add(new ArrayList<>());
+        }
+
+        tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
+    }
+
+    private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source, 
DecoratedKey partitionKey)
+    {
+        return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> 
BTreeSet.builder(command.metadata().comparator));
+    }
+
+    /**
+     * Returns the protected results for the specified replica. These are 
generated fetching the extra rows and merging
+     * them with the cached original filtered results for that replica.
+     *
+     * @param source the source
+     * @return the protected results for the specified replica
+     */
+    UnfilteredPartitionIterator queryProtectedPartitions(int source)
+    {
+        UnfilteredPartitionIterator original = 
makeIterator(originalPartitions.get(source));
+        SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = 
rowsToFetch.get(source);
+
+        if (toFetch.isEmpty())
+            return original;
+
+        // TODO: this would be more efficient if we had multi-key queries 
internally
+        List<UnfilteredPartitionIterator> fetched = toFetch.keySet()
+                                                           .stream()
+                                                           .map(k -> 
querySourceOnKey(source, k))
+                                                           
.collect(Collectors.toList());
+
+        return UnfilteredPartitionIterators.merge(Arrays.asList(original, 
UnfilteredPartitionIterators.concat(fetched)),
+                                                  command.nowInSec(), null);
+    }
+
+    private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey 
key)
+    {
+        BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key);
+        assert builder != null; // We're calling this on the result of 
rowsToFetch.get(i).keySet()
+
+        InetAddress source = sources[i];
+        NavigableSet<Clustering> clusterings = builder.build();
+        tableMetrics.replicaSideFilteringProtectionRequests.mark();
+        if (logger.isTraceEnabled())
+            logger.trace("Requesting rows {} in partition {} from {} for 
replica-side filtering protection",
+                         clusterings, key, source);
+        Tracing.trace("Requesting {} rows in partition {} from {} for 
replica-side filtering protection",
+                      clusterings.size(), key, source);
+
+        // build the read command taking into account that we could be 
requesting only in the static row
+        DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : 
DataLimits.NONE;
+        ClusteringIndexFilter filter = new 
ClusteringIndexNamesFilter(clusterings, command.isReversed());
+        SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
+                                                                           
command.nowInSec(),
+                                                                           
command.columnFilter(),
+                                                                           
RowFilter.NONE,
+                                                                           
limits,
+                                                                           key,
+                                                                           
filter);
+        try
+        {
+            return executeReadCommand(cmd, source);
+        }
+        catch (ReadTimeoutException e)
+        {
+            int blockFor = consistency.blockFor(keyspace);
+            throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
+        }
+        catch (UnavailableException e)
+        {
+            int blockFor = consistency.blockFor(keyspace);
+            throw new UnavailableException(consistency, blockFor, blockFor - 
1);
+        }
+    }
+
+    private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, 
InetAddress source)
+    {
+        DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1);
+        ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source));
+
+        if (StorageProxy.canDoLocalRequest(source))
+            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));
+        else
+            
MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version),
 source, handler);
+
+        // We don't call handler.get() because we want to preserve tombstones
+        handler.awaitResults();
+        assert resolver.responses.size() == 1;
+        return resolver.responses.get(0).payload.makeIterator(command);
+    }
+
+    /**
+     * Returns a merge listener that skips the merged rows for which any of 
the replicas doesn't have a version,
+     * pessimistically assuming that they are outdated. It is intended to be 
used during a first merge of per-replica
+     * query results to ensure we fetch enough results from the replicas to 
ensure we don't miss any potentially
+     * outdated result.
+     * <p>
+     * The listener will track both the accepted data and the primary keys of 
the rows that are considered as outdated.
+     * That way, once the query results would have been merged using this 
listener, further calls to
+     * {@link #queryProtectedPartitions(int)} will use the collected data to 
return a copy of the
+     * data originally collected from the specified replica, completed with 
the potentially outdated rows.
+     */
+    UnfilteredPartitionIterators.MergeListener mergeController()
+    {
+        return (partitionKey, versions) -> {
+
+            PartitionBuilder[] builders = new PartitionBuilder[sources.length];
+
+            for (int i = 0; i < sources.length; i++)
+                builders[i] = new PartitionBuilder(partitionKey, 
columns(versions), stats(versions));
+
+            return new UnfilteredRowIterators.MergeListener()
+            {
+                @Override
+                public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
+                {
+                    // cache the deletion time versions to be able to 
regenerate the original row iterator
+                    for (int i = 0; i < versions.length; i++)
+                        builders[i].setDeletionTime(versions[i]);
+                }
+
+                @Override
+                public Row onMergedRows(Row merged, Row[] versions)
+                {
+                    // cache the row versions to be able to regenerate the 
original row iterator
+                    for (int i = 0; i < versions.length; i++)
+                        builders[i].addRow(versions[i]);
+
+                    if (merged.isEmpty())
+                        return merged;
+
+                    boolean isPotentiallyOutdated = false;
+                    boolean isStatic = merged.isStatic();
+                    for (int i = 0; i < versions.length; i++)
+                    {
+                        Row version = versions[i];
+                        if (version == null || (isStatic && version.isEmpty()))
+                        {
+                            isPotentiallyOutdated = true;
+                            BTreeSet.Builder<Clustering> toFetch = 
getOrCreateToFetch(i, partitionKey);
+                            // Note that for static, we shouldn't add the 
clustering to the clustering set (the
+                            // ClusteringIndexNamesFilter we'll build from 
this later does not expect it), but the fact
+                            // we created a builder in the first place will 
act as a marker that the static row must be
+                            // fetched, even if no other rows are added for 
this partition.
+                            if (!isStatic)
+                                toFetch.add(merged.clustering());
+                        }
+                    }
+
+                    // If the row is potentially outdated (because some 
replica didn't send anything and so it _may_ be
+                    // an outdated result that is only present because other 
replica have filtered the up-to-date result
+                    // out), then we skip the row. In other words, the results 
of the initial merging of results by this
+                    // protection assume the worst case scenario where every 
row that might be outdated actually is.
+                    // This ensures that during this first phase (collecting 
additional row to fetch) we are guaranteed
+                    // to look at enough data to ultimately fulfill the query 
limit.
+                    return isPotentiallyOutdated ? null : merged;
+                }
+
+                @Override
+                public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker 
merged, RangeTombstoneMarker[] versions)
+                {
+                    // cache the marker versions to be able to regenerate the 
original row iterator
+                    for (int i = 0; i < versions.length; i++)
+                        builders[i].addRangeTombstoneMarker(versions[i]);
+                }
+
+                @Override
+                public void close()
+                {
+                    for (int i = 0; i < sources.length; i++)
+                        originalPartitions.get(i).add(builders[i]);
+                }
+            };
+        };
+    }
+
+    private static PartitionColumns columns(List<UnfilteredRowIterator> 
versions)
+    {
+        Columns statics = Columns.NONE;
+        Columns regulars = Columns.NONE;
+        for (UnfilteredRowIterator iter : versions)
+        {
+            if (iter == null)
+                continue;
+
+            PartitionColumns cols = iter.columns();
+            statics = statics.mergeTo(cols.statics);
+            regulars = regulars.mergeTo(cols.regulars);
+        }
+        return new PartitionColumns(statics, regulars);
+    }
+
+    private static EncodingStats stats(List<UnfilteredRowIterator> iterators)
+    {
+        EncodingStats stats = EncodingStats.NO_STATS;
+        for (UnfilteredRowIterator iter : iterators)
+        {
+            if (iter == null)
+                continue;
+
+            stats = stats.mergeWith(iter.stats());
+        }
+        return stats;
+    }
+
+    private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder> 
builders)
+    {
+        return new UnfilteredPartitionIterator()
+        {
+            final Iterator<PartitionBuilder> iterator = builders.iterator();
+
+            @Override
+            public boolean isForThrift()
+            {
+                return command.isForThrift();
+            }
+
+            @Override
+            public CFMetaData metadata()
+            {
+                return command.metadata();
+            }
+
+            @Override
+            public void close()
+            {
+                // nothing to do here
+            }
+
+            @Override
+            public boolean hasNext()
+            {
+                return iterator.hasNext();
+            }
+
+            @Override
+            public UnfilteredRowIterator next()
+            {
+                return iterator.next().build();
+            }
+        };
+    }
+
+    private class PartitionBuilder
+    {
+        private final DecoratedKey partitionKey;
+        private final PartitionColumns columns;
+        private final EncodingStats stats;
+        private DeletionTime deletionTime;
+        private Row staticRow = Rows.EMPTY_STATIC_ROW;
+        private final List<Unfiltered> contents = new ArrayList<>();
+
+        private PartitionBuilder(DecoratedKey partitionKey, PartitionColumns 
columns, EncodingStats stats)
+        {
+            this.partitionKey = partitionKey;
+            this.columns = columns;
+            this.stats = stats;
+        }
+
+        private void setDeletionTime(DeletionTime deletionTime)
+        {
+            this.deletionTime = deletionTime;
+        }
+
+        private void addRow(Row row)
+        {
+            if (row == null)
+                return;
+
+            if (row.isStatic())
+                staticRow = row;
+            else
+                contents.add(row);
+        }
+
+        private void addRangeTombstoneMarker(RangeTombstoneMarker marker)
+        {
+            if (marker != null)
+                contents.add(marker);
+        }
+
+        private UnfilteredRowIterator build()
+        {
+            return new UnfilteredRowIterator()
+            {
+                final Iterator<Unfiltered> iterator = contents.iterator();
+
+                @Override
+                public DeletionTime partitionLevelDeletion()
+                {
+                    return deletionTime;
+                }
+
+                @Override
+                public EncodingStats stats()
+                {
+                    return stats;
+                }
+
+                @Override
+                public CFMetaData metadata()
+                {
+                    return command.metadata();
+                }
+
+                @Override
+                public boolean isReverseOrder()
+                {
+                    return command.isReversed();
+                }
+
+                @Override
+                public PartitionColumns columns()
+                {
+                    return columns;
+                }
+
+                @Override
+                public DecoratedKey partitionKey()
+                {
+                    return partitionKey;
+                }
+
+                @Override
+                public Row staticRow()
+                {
+                    return staticRow;
+                }
+
+                @Override
+                public void close()
+                {
+                    // nothing to do here
+                }
+
+                @Override
+                public boolean hasNext()
+                {
+                    return iterator.hasNext();
+                }
+
+                @Override
+                public Unfiltered next()
+                {
+                    return iterator.next();
+                }
+            };
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java 
b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index e80faca..ca9bb09 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -18,6 +18,7 @@
 */
 package org.apache.cassandra.utils.concurrent;
 
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
@@ -135,4 +136,16 @@ public class Accumulator<E> implements Iterable<E>
             throw new IndexOutOfBoundsException();
         return (E) values[i];
     }
+
+    /**
+     * Removes all of the elements from this accumulator.
+     *
+     * This method is not thread-safe when used concurrently with {@link 
#add(Object)}.
+     */
+    public void clearUnsafe()
+    {
+        nextIndexUpdater.set(this, 0);
+        presentCountUpdater.set(this, 0);
+        Arrays.fill(values, null);
+    }
 }
diff --git 
a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java 
b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
index 2842374..33daca7 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
@@ -81,15 +81,7 @@ public class AccumulatorTest
         assertEquals("2", accu.get(1));
         assertEquals("4", accu.get(2));
 
-        try
-        {
-            assertEquals(null, accu.get(3));
-            fail();
-        }
-        catch (IndexOutOfBoundsException e)
-        {
-            // Expected
-        }
+        assertOutOfBonds(accu, 3);
 
         accu.add("0");
 
@@ -103,4 +95,48 @@ public class AccumulatorTest
         assertEquals("0", iter.next());
         assertFalse(iter.hasNext());
     }
+
+    @Test
+    public void testClearUnsafe()
+    {
+        Accumulator<String> accu = new Accumulator<>(3);
+
+        accu.add("1");
+        accu.add("2");
+        accu.add("3");
+
+        accu.clearUnsafe();
+
+        assertEquals(0, accu.size());
+        assertFalse(accu.iterator().hasNext());
+        assertOutOfBonds(accu, 0);
+
+        accu.add("4");
+        accu.add("5");
+
+        assertEquals(2, accu.size());
+
+        assertEquals("4", accu.get(0));
+        assertEquals("5", accu.get(1));
+        assertOutOfBonds(accu, 2);
+
+        Iterator<String> iter = accu.iterator();
+        assertTrue(iter.hasNext());
+        assertEquals("4", iter.next());
+        assertEquals("5", iter.next());
+        assertFalse(iter.hasNext());
+    }
+
+    private static void assertOutOfBonds(Accumulator<String> accumulator, int 
index)
+    {
+        try
+        {
+            assertNull(accumulator.get(index));
+            fail();
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            // Expected
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to