This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new e5c3d08 Operational improvements and hardening for replica filtering
protection patch by Caleb Rackliffe; reviewed by Andrés de la Peña for
CASSANDRA-15907
e5c3d08 is described below
commit e5c3d08a1428d378b6690f0419a2b25724b9736e
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Mon Jun 29 13:54:23 2020 -0500
Operational improvements and hardening for replica filtering protection
patch by Caleb Rackliffe; reviewed by Andrés de la Peña for CASSANDRA-15907
---
CHANGES.txt | 1 +
build.xml | 2 +-
conf/cassandra.yaml | 20 +
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 20 +
.../config/ReplicaFilteringProtectionOptions.java | 28 ++
.../db/partitions/PartitionIterators.java | 49 ++-
.../partitions/UnfilteredPartitionIterators.java | 19 -
.../apache/cassandra/db/rows/EncodingStats.java | 24 ++
.../org/apache/cassandra/metrics/TableMetrics.java | 22 +-
.../org/apache/cassandra/service/DataResolver.java | 89 +++--
.../service/ReplicaFilteringProtection.java | 423 ++++++++++++---------
.../apache/cassandra/service/StorageService.java | 27 +-
.../cassandra/service/StorageServiceMBean.java | 12 +
.../org/apache/cassandra/utils/FBUtilities.java | 4 +-
.../cassandra/utils/concurrent/Accumulator.java | 9 +-
.../cassandra/distributed/impl/Coordinator.java | 10 +
.../apache/cassandra/distributed/impl/RowUtil.java | 7 +-
.../test/ReplicaFilteringProtectionTest.java | 244 ++++++++++++
.../cassandra/db/rows/EncodingStatsTest.java | 145 +++++++
.../utils/concurrent/AccumulatorTest.java | 34 +-
21 files changed, 924 insertions(+), 267 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index a755b7e..182dca3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.22:
+ * Operational improvements and hardening for replica filtering protection
(CASSANDRA-15907)
* stop_paranoid disk failure policy is ignored on CorruptSSTableException
after node is up (CASSANDRA-15191)
* 3.x fails to start if commit log has range tombstones from a column which
is also deleted (CASSANDRA-15970)
* Forbid altering UDTs used in partition keys (CASSANDRA-15933)
diff --git a/build.xml b/build.xml
index 6492767..6c1d148 100644
--- a/build.xml
+++ b/build.xml
@@ -398,7 +398,7 @@
</dependency>
<dependency groupId="junit" artifactId="junit" version="4.6" />
<dependency groupId="org.mockito" artifactId="mockito-core"
version="3.2.4" />
- <dependency groupId="org.apache.cassandra" artifactId="dtest-api"
version="0.0.3" />
+ <dependency groupId="org.apache.cassandra" artifactId="dtest-api"
version="0.0.4" />
<dependency groupId="org.apache.rat" artifactId="apache-rat"
version="0.10">
<exclusion groupId="commons-lang" artifactId="commons-lang"/>
</dependency>
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index c321a72..bb96f18 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -661,6 +661,26 @@ auto_snapshot: true
tombstone_warn_threshold: 1000
tombstone_failure_threshold: 100000
+# Filtering and secondary index queries at read consistency levels above
ONE/LOCAL_ONE use a
+# mechanism called replica filtering protection to ensure that results from
stale replicas do
+# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more
details.) This
+# mechanism materializes replica results by partition on-heap at the
coordinator. The more possibly
+# stale results returned by the replicas, the more rows materialized during
the query.
+replica_filtering_protection:
+ # These thresholds exist to limit the damage severely out-of-date replicas
can cause during these
+ # queries. They limit the number of rows from all replicas individual
index and filtering queries
+ # can materialize on-heap to return correct results at the desired read
consistency level.
+ #
+ # "cached_replica_rows_warn_threshold" is the per-query threshold at which
a warning will be logged.
+ # "cached_replica_rows_fail_threshold" is the per-query threshold at which
the query will fail.
+ #
+ # These thresholds may also be adjusted at runtime using the
StorageService mbean.
+ #
+ # If the failure threshold is breached, it is likely that either the
current page/fetch size
+ # is too large or one or more replicas is severely out-of-sync and in need
of repair.
+ cached_rows_warn_threshold: 2000
+ cached_rows_fail_threshold: 32000
+
# Granularity of the collation index of rows within a partition.
# Increase if your rows are large, or if you have a very large
# number of rows per partition. The competing goals are these:
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 6003bd1..2218ee2 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -280,6 +280,8 @@ public class Config
public volatile int tombstone_warn_threshold = 1000;
public volatile int tombstone_failure_threshold = 100000;
+ public final ReplicaFilteringProtectionOptions
replica_filtering_protection = new ReplicaFilteringProtectionOptions();
+
public volatile Long index_summary_capacity_in_mb;
public volatile int index_summary_resize_interval_in_minutes = 60;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 4b732c2..9369229 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1368,6 +1368,26 @@ public class DatabaseDescriptor
conf.tombstone_failure_threshold = threshold;
}
+ public static int getCachedReplicaRowsWarnThreshold()
+ {
+ return conf.replica_filtering_protection.cached_rows_warn_threshold;
+ }
+
+ public static void setCachedReplicaRowsWarnThreshold(int threshold)
+ {
+ conf.replica_filtering_protection.cached_rows_warn_threshold =
threshold;
+ }
+
+ public static int getCachedReplicaRowsFailThreshold()
+ {
+ return conf.replica_filtering_protection.cached_rows_fail_threshold;
+ }
+
+ public static void setCachedReplicaRowsFailThreshold(int threshold)
+ {
+ conf.replica_filtering_protection.cached_rows_fail_threshold =
threshold;
+ }
+
/**
* size of commitlog segments to allocate
*/
diff --git
a/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java
b/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java
new file mode 100644
index 0000000..7a755ab
--- /dev/null
+++
b/src/java/org/apache/cassandra/config/ReplicaFilteringProtectionOptions.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.config;
+
+public class ReplicaFilteringProtectionOptions
+{
+ public static final int DEFAULT_WARN_THRESHOLD = 2000;
+ public static final int DEFAULT_FAIL_THRESHOLD = 32000;
+
+ public volatile int cached_rows_warn_threshold = DEFAULT_WARN_THRESHOLD;
+ public volatile int cached_rows_fail_threshold = DEFAULT_FAIL_THRESHOLD;
+}
diff --git
a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
index a3cf746..c7f20c5 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionIterators.java
@@ -18,10 +18,8 @@
package org.apache.cassandra.db.partitions;
import java.util.*;
-import java.security.MessageDigest;
import org.apache.cassandra.db.EmptyIterators;
-import org.apache.cassandra.db.transform.FilteredPartitions;
import org.apache.cassandra.db.transform.MorePartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.utils.AbstractIterator;
@@ -98,6 +96,21 @@ public abstract class PartitionIterators
}
/**
+ * Consumes all rows in the next partition of the provided partition
iterator.
+ */
+ public static void consumeNext(PartitionIterator iterator)
+ {
+ if (iterator.hasNext())
+ {
+ try (RowIterator partition = iterator.next())
+ {
+ while (partition.hasNext())
+ partition.next();
+ }
+ }
+ }
+
+ /**
* Wraps the provided iterator so it logs the returned rows for debugging
purposes.
* <p>
* Note that this is only meant for debugging as this can log a very large
amount of
@@ -116,6 +129,38 @@ public abstract class PartitionIterators
return Transformation.apply(iterator, new Logger());
}
+ /**
+ * Wraps the provided iterator to run a specified action on close. Note
that the action will be
+ * run even if closure of the provided iterator throws an exception.
+ */
+ public static PartitionIterator doOnClose(PartitionIterator delegate,
Runnable action)
+ {
+ return new PartitionIterator()
+ {
+ public void close()
+ {
+ try
+ {
+ delegate.close();
+ }
+ finally
+ {
+ action.run();
+ }
+ }
+
+ public boolean hasNext()
+ {
+ return delegate.hasNext();
+ }
+
+ public RowIterator next()
+ {
+ return delegate.next();
+ }
+ };
+ }
+
private static class SingletonPartitionIterator extends
AbstractIterator<RowIterator> implements PartitionIterator
{
private final RowIterator iterator;
diff --git
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
index bff910e..4af53e2 100644
---
a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
+++
b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java
@@ -27,7 +27,6 @@ import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.transform.FilteredPartitions;
-import org.apache.cassandra.db.transform.MorePartitions;
import org.apache.cassandra.db.transform.Transformation;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -78,24 +77,6 @@ public abstract class UnfilteredPartitionIterators
return Transformation.apply(toReturn, new Close());
}
- public static UnfilteredPartitionIterator concat(final
List<UnfilteredPartitionIterator> iterators)
- {
- if (iterators.size() == 1)
- return iterators.get(0);
-
- class Extend implements MorePartitions<UnfilteredPartitionIterator>
- {
- int i = 1;
- public UnfilteredPartitionIterator moreContents()
- {
- if (i >= iterators.size())
- return null;
- return iterators.get(i++);
- }
- }
- return MorePartitions.extend(iterators.get(0), new Extend());
- }
-
public static PartitionIterator filter(final UnfilteredPartitionIterator
iterator, final int nowInSec)
{
return FilteredPartitions.filter(iterator, nowInSec);
diff --git a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
index 955ffc7..c1235e4 100644
--- a/src/java/org/apache/cassandra/db/rows/EncodingStats.java
+++ b/src/java/org/apache/cassandra/db/rows/EncodingStats.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.db.rows;
import java.io.IOException;
import java.util.*;
+import java.util.function.Function;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.partitions.PartitionStatisticsCollector;
@@ -107,6 +108,29 @@ public class EncodingStats
return new EncodingStats(minTimestamp, minDelTime, minTTL);
}
+ /**
+ * Merge one or more EncodingStats, that are lazily materialized from some
list of arbitrary type by the provided function
+ */
+ public static <V, F extends Function<V, EncodingStats>> EncodingStats
merge(List<V> values, F function)
+ {
+ if (values.size() == 1)
+ return function.apply(values.get(0));
+
+ Collector collector = new Collector();
+ for (int i = 0, iSize = values.size(); i < iSize; i++)
+ {
+ V v = values.get(i);
+ EncodingStats stats = function.apply(v);
+ if (stats.minTimestamp != TIMESTAMP_EPOCH)
+ collector.updateTimestamp(stats.minTimestamp);
+ if (stats.minLocalDeletionTime != DELETION_TIME_EPOCH)
+ collector.updateLocalDeletionTime(stats.minLocalDeletionTime);
+ if (stats.minTTL != TTL_EPOCH)
+ collector.updateTTL(stats.minTTL);
+ }
+ return collector.get();
+ }
+
@Override
public boolean equals(Object o)
{
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 1f4803e..a551a78 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import com.codahale.metrics.*;
import com.codahale.metrics.Timer;
import com.google.common.collect.Maps;
+
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Memtable;
@@ -153,7 +154,16 @@ public class TableMetrics
public final Meter readRepairRequests;
public final Meter shortReadProtectionRequests;
- public final Meter replicaSideFilteringProtectionRequests;
+
+ public final Meter replicaFilteringProtectionRequests;
+
+ /**
+ * This histogram records the maximum number of rows {@link
org.apache.cassandra.service.ReplicaFilteringProtection}
+ * caches at a point in time per query. With no replica divergence, this
is equivalent to the maximum number of
+ * cached rows in a single partition during a query. It can be helpful
when choosing appropriate values for the
+ * replica_filtering_protection thresholds in cassandra.yaml.
+ */
+ public final Histogram rfpRowsCachedPerQuery;
public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
/**
@@ -652,7 +662,8 @@ public class TableMetrics
readRepairRequests = createTableMeter("ReadRepairRequests");
shortReadProtectionRequests =
createTableMeter("ShortReadProtectionRequests");
- replicaSideFilteringProtectionRequests =
createTableMeter("ReplicaSideFilteringProtectionRequests");
+ replicaFilteringProtectionRequests =
createTableMeter("ReplicaFilteringProtectionRequests");
+ rfpRowsCachedPerQuery =
createHistogram("ReplicaFilteringProtectionRowsCachedPerQuery", true);
}
public void updateSSTableIterated(int count)
@@ -771,6 +782,13 @@ public class TableMetrics
register(name, alias, tableMeter);
return tableMeter;
}
+
+ private Histogram createHistogram(String name, boolean considerZeroes)
+ {
+ Histogram histogram =
Metrics.histogram(factory.createMetricName(name),
aliasFactory.createMetricName(name), considerZeroes);
+ register(name, name, histogram);
+ return histogram;
+ }
/**
* Create a histogram-like interface that will register both a CF,
keyspace and global level
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java
b/src/java/org/apache/cassandra/service/DataResolver.java
index 02d355e..1d0bb47 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -49,7 +49,7 @@ public class DataResolver extends ResponseResolver
Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
@VisibleForTesting
- final List<AsyncOneResponse> repairResults =
Collections.synchronizedList(new ArrayList<>());
+ final List<AsyncOneResponse<?>> repairResults =
Collections.synchronizedList(new ArrayList<>());
private final boolean enforceStrictLiveness;
@@ -135,7 +135,7 @@ public class DataResolver extends ResponseResolver
UnfilteredPartitionIterator originalResponse =
responses.get(i).payload.makeIterator(command);
return context.needShortReadProtection()
- ? extendWithShortReadProtection(originalResponse,
context.sources[i], context.mergedResultCounter)
+ ? extendWithShortReadProtection(originalResponse, context, i)
: originalResponse;
}
@@ -150,20 +150,20 @@ public class DataResolver extends ResponseResolver
{
// Protecting against inconsistent replica filtering (some replica
returning a row that is outdated but that
// wouldn't be removed by normal reconciliation because up-to-date
replica have filtered the up-to-date version
- // of that row) works in 3 steps:
- // 1) we read the full response just to collect rows that may be
outdated (the ones we got from some
- // replica but didn't got any response for other; it could be
those other replica have filtered a more
- // up-to-date result). In doing so, we do not count any of such
"potentially outdated" row towards the
- // query limit. This simulate the worst case scenario where all
those "potentially outdated" rows are
- // indeed outdated, and thus make sure we are guaranteed to read
enough results (thanks to short read
- // protection).
- // 2) we query all the replica/rows we need to rule out whether
those "potentially outdated" rows are outdated
- // or not.
- // 3) we re-read cached copies of each replica response using the
"normal" read path merge with read-repair,
- // but where for each replica we use their original response
_plus_ the additional rows queried in the
- // previous step (and apply the command#rowFilter() on the full
result). Since the first phase has
- // pessimistically collected enough results for the case where
all potentially outdated results are indeed
- // outdated, we shouldn't need further short-read protection
requests during this phase.
+ // of that row) involves 3 main elements:
+ // 1) We combine short-read protection and a merge listener that
identifies potentially "out-of-date"
+ // rows to create an iterator that is guaranteed to produce
enough valid row results to satisfy the query
+ // limit if enough actually exist. A row is considered
out-of-date if its merged from is non-empty and we
+ // receive not response from at least one replica. In this case,
it is possible that filtering at the
+ // "silent" replica has produced a more up-to-date result.
+ // 2) This iterator is passed to the standard resolution process
with read-repair, but is first wrapped in a
+ // response provider that lazily "completes" potentially
out-of-date rows by directly querying them on the
+ // replicas that were previously silent. As this iterator is
consumed, it caches valid data for potentially
+ // out-of-date rows, and this cached data is merged with the
fetched data as rows are requested. If there
+ // is no replica divergence, only rows in the partition being
evalutated will be cached (then released
+ // when the partition is consumed).
+ // 3) After a "complete" row is materialized, it must pass the row
filter supplied by the original query
+ // before it counts against the limit.
// We could get more responses while this method runs, which is ok
(we're happy to ignore any response not here
// at the beginning of this method), so grab the response count once
and use that through the method.
@@ -171,25 +171,25 @@ public class DataResolver extends ResponseResolver
// We need separate contexts, as each context has his own counter
ResolveContext firstPhaseContext = new ResolveContext(count);
ResolveContext secondPhaseContext = new ResolveContext(count);
- ReplicaFilteringProtection rfp = new
ReplicaFilteringProtection(keyspace, command, consistency,
firstPhaseContext.sources);
+
+ ReplicaFilteringProtection rfp = new
ReplicaFilteringProtection(keyspace,
+
command,
+
consistency,
+
firstPhaseContext.sources,
+
DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
+
DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
+
PartitionIterator firstPhasePartitions =
resolveInternal(firstPhaseContext,
rfp.mergeController(),
i ->
shortReadProtectedResponse(i, firstPhaseContext),
UnaryOperator.identity());
-
- // Consume the first phase partitions to populate the replica
filtering protection with both those materialized
- // partitions and the primary keys to be fetched.
- PartitionIterators.consume(firstPhasePartitions);
- firstPhasePartitions.close();
-
- // After reading the entire query results the protection helper should
have cached all the partitions so we can
- // clear the responses accumulator for the sake of memory usage, given
that the second phase might take long if
- // it needs to query replicas.
- responses.clearUnsafe();
-
- return resolveWithReadRepair(secondPhaseContext,
- rfp::queryProtectedPartitions,
- results ->
command.rowFilter().filter(results, command.metadata(), command.nowInSec()));
+
+ PartitionIterator completedPartitions =
resolveWithReadRepair(secondPhaseContext,
+ i ->
rfp.queryProtectedPartitions(firstPhasePartitions, i),
+ results
-> command.rowFilter().filter(results, command.metadata(), command.nowInSec()));
+
+ // Ensure that the RFP instance has a chance to record metrics when
the iterator closes.
+ return PartitionIterators.doOnClose(completedPartitions,
firstPhasePartitions::close);
}
private PartitionIterator resolveInternal(ResolveContext context,
@@ -217,8 +217,8 @@ public class DataResolver extends ResponseResolver
*/
UnfilteredPartitionIterator merged =
UnfilteredPartitionIterators.merge(results, command.nowInSec(), mergeListener);
- FilteredPartitions filtered =
- FilteredPartitions.filter(merged, new Filter(command.nowInSec(),
command.metadata().enforceStrictLiveness()));
+ Filter filter = new Filter(command.nowInSec(),
command.metadata().enforceStrictLiveness());
+ FilteredPartitions filtered = FilteredPartitions.filter(merged,
filter);
PartitionIterator counted =
Transformation.apply(preCountFilter.apply(filtered),
context.mergedResultCounter);
return command.isForThrift()
@@ -598,14 +598,17 @@ public class DataResolver extends ResponseResolver
}
private UnfilteredPartitionIterator
extendWithShortReadProtection(UnfilteredPartitionIterator partitions,
-
InetAddress source,
-
DataLimits.Counter mergedResultCounter)
+
ResolveContext context,
+ int i)
{
DataLimits.Counter singleResultCounter =
command.limits().newCounter(command.nowInSec(), false,
command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
- ShortReadPartitionsProtection protection =
- new ShortReadPartitionsProtection(source, singleResultCounter,
mergedResultCounter);
+ // The pre-fetch callback used here makes the initial round of
responses for this replica collectable.
+ ShortReadPartitionsProtection protection = new
ShortReadPartitionsProtection(context.sources[i],
+
() -> responses.clearUnsafe(i),
+
singleResultCounter,
+
context.mergedResultCounter);
/*
* The order of extention and transformations is important here.
Extending with more partitions has to happen
@@ -642,6 +645,7 @@ public class DataResolver extends ResponseResolver
private class ShortReadPartitionsProtection extends
Transformation<UnfilteredRowIterator> implements
MorePartitions<UnfilteredPartitionIterator>
{
private final InetAddress source;
+ private final Runnable preFetchCallback; // called immediately before
fetching more contents
private final DataLimits.Counter singleResultCounter; // unmerged
per-source counter
private final DataLimits.Counter mergedResultCounter; // merged
end-result counter
@@ -650,9 +654,13 @@ public class DataResolver extends ResponseResolver
private boolean partitionsFetched; // whether we've seen any new
partitions since iteration start or last moreContents() call
- private ShortReadPartitionsProtection(InetAddress source,
DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter)
+ private ShortReadPartitionsProtection(InetAddress source,
+ Runnable preFetchCallback,
+ DataLimits.Counter
singleResultCounter,
+ DataLimits.Counter
mergedResultCounter)
{
this.source = source;
+ this.preFetchCallback = preFetchCallback;
this.singleResultCounter = singleResultCounter;
this.mergedResultCounter = mergedResultCounter;
}
@@ -723,6 +731,9 @@ public class DataResolver extends ResponseResolver
ColumnFamilyStore.metricsFor(command.metadata().cfId).shortReadProtectionRequests.mark();
Tracing.trace("Requesting {} extra rows from {} for short read
protection", toQuery, source);
+ // If we've arrived here, all responses have been consumed, and
we're about to request more.
+ preFetchCallback.run();
+
PartitionRangeReadCommand cmd =
makeFetchAdditionalPartitionReadCommand(toQuery);
return executeReadCommand(cmd);
}
@@ -742,7 +753,7 @@ public class DataResolver extends ResponseResolver
return cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange);
}
- private class ShortReadRowsProtection extends Transformation
implements MoreRows<UnfilteredRowIterator>
+ private class ShortReadRowsProtection extends
Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator>
{
private final CFMetaData metadata;
private final DecoratedKey partitionKey;
diff --git
a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
index 36d51cc..0a57e66 100644
--- a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
+++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
@@ -19,15 +19,15 @@
package org.apache.cassandra.service;
import java.net.InetAddress;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
+import java.util.Queue;
+import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,6 +49,8 @@ import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter;
import org.apache.cassandra.db.filter.DataLimits;
import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.PartitionIterators;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
import org.apache.cassandra.db.rows.EncodingStats;
@@ -58,11 +60,13 @@ import org.apache.cassandra.db.rows.Rows;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
+import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.ReadTimeoutException;
import org.apache.cassandra.exceptions.UnavailableException;
import org.apache.cassandra.metrics.TableMetrics;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.btree.BTreeSet;
/**
@@ -74,11 +78,15 @@ import org.apache.cassandra.utils.btree.BTreeSet;
* the rows in a replica response that don't have a corresponding row in other
replica responses, and requests them by
* primary key to the "silent" replicas in a second fetch round.
* <p>
- * See CASSANDRA-8272 and CASSANDRA-8273 for further details.
+ * See CASSANDRA-8272, CASSANDRA-8273, and CASSANDRA-15907 for further details.
*/
class ReplicaFilteringProtection
{
private static final Logger logger =
LoggerFactory.getLogger(ReplicaFilteringProtection.class);
+ private static final NoSpamLogger oneMinuteLogger =
NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES);
+
+ private static final Function<UnfilteredRowIterator, EncodingStats>
NULL_TO_NO_STATS =
+ rowIterator -> rowIterator == null ? EncodingStats.NO_STATS :
rowIterator.stats();
private final Keyspace keyspace;
private final ReadCommand command;
@@ -86,106 +94,42 @@ class ReplicaFilteringProtection
private final InetAddress[] sources;
private final TableMetrics tableMetrics;
- /**
- * Per-source primary keys of the rows that might be outdated so they need
to be fetched.
- * For outdated static rows we use an empty builder to signal it has to be
queried.
- */
- private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>>
rowsToFetch;
+ private final int cachedRowsWarnThreshold;
+ private final int cachedRowsFailThreshold;
+
+ /** Tracks whether or not we've already hit the warning threshold while
evaluating a partition. */
+ private boolean hitWarningThreshold = false;
+
+ private int currentRowsCached = 0; // tracks the current number of cached
rows
+ private int maxRowsCached = 0; // tracks the high watermark for the number
of cached rows
/**
- * Per-source list of all the partitions seen by the merge listener, to be
merged with the extra fetched rows.
+ * Per-source list of the pending partitions seen by the merge listener,
to be merged with the extra fetched rows.
*/
- private final List<List<PartitionBuilder>> originalPartitions;
+ private final List<Queue<PartitionBuilder>> originalPartitions;
ReplicaFilteringProtection(Keyspace keyspace,
ReadCommand command,
ConsistencyLevel consistency,
- InetAddress[] sources)
+ InetAddress[] sources,
+ int cachedRowsWarnThreshold,
+ int cachedRowsFailThreshold)
{
this.keyspace = keyspace;
this.command = command;
this.consistency = consistency;
this.sources = sources;
- this.rowsToFetch = new ArrayList<>(sources.length);
this.originalPartitions = new ArrayList<>(sources.length);
- for (InetAddress ignored : sources)
+ for (int i = 0; i < sources.length; i++)
{
- rowsToFetch.add(new TreeMap<>());
- originalPartitions.add(new ArrayList<>());
+ originalPartitions.add(new ArrayDeque<>());
}
tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
- }
- private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source,
DecoratedKey partitionKey)
- {
- return rowsToFetch.get(source).computeIfAbsent(partitionKey, k ->
BTreeSet.builder(command.metadata().comparator));
- }
-
- /**
- * Returns the protected results for the specified replica. These are
generated fetching the extra rows and merging
- * them with the cached original filtered results for that replica.
- *
- * @param source the source
- * @return the protected results for the specified replica
- */
- UnfilteredPartitionIterator queryProtectedPartitions(int source)
- {
- UnfilteredPartitionIterator original =
makeIterator(originalPartitions.get(source));
- SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch =
rowsToFetch.get(source);
-
- if (toFetch.isEmpty())
- return original;
-
- // TODO: this would be more efficient if we had multi-key queries
internally
- List<UnfilteredPartitionIterator> fetched = toFetch.keySet()
- .stream()
- .map(k ->
querySourceOnKey(source, k))
-
.collect(Collectors.toList());
-
- return UnfilteredPartitionIterators.merge(Arrays.asList(original,
UnfilteredPartitionIterators.concat(fetched)),
- command.nowInSec(), null);
- }
-
- private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey
key)
- {
- BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key);
- assert builder != null; // We're calling this on the result of
rowsToFetch.get(i).keySet()
-
- InetAddress source = sources[i];
- NavigableSet<Clustering> clusterings = builder.build();
- tableMetrics.replicaSideFilteringProtectionRequests.mark();
- if (logger.isTraceEnabled())
- logger.trace("Requesting rows {} in partition {} from {} for
replica-side filtering protection",
- clusterings, key, source);
- Tracing.trace("Requesting {} rows in partition {} from {} for
replica-side filtering protection",
- clusterings.size(), key, source);
-
- // build the read command taking into account that we could be
requesting only in the static row
- DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) :
DataLimits.NONE;
- ClusteringIndexFilter filter = new
ClusteringIndexNamesFilter(clusterings, command.isReversed());
- SinglePartitionReadCommand cmd =
SinglePartitionReadCommand.create(command.metadata(),
-
command.nowInSec(),
-
command.columnFilter(),
-
RowFilter.NONE,
-
limits,
- key,
-
filter);
- try
- {
- return executeReadCommand(cmd, source);
- }
- catch (ReadTimeoutException e)
- {
- int blockFor = consistency.blockFor(keyspace);
- throw new ReadTimeoutException(consistency, blockFor - 1,
blockFor, true);
- }
- catch (UnavailableException e)
- {
- int blockFor = consistency.blockFor(keyspace);
- throw new UnavailableException(consistency, blockFor, blockFor -
1);
- }
+ this.cachedRowsWarnThreshold = cachedRowsWarnThreshold;
+ this.cachedRowsFailThreshold = cachedRowsFailThreshold;
}
private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd,
InetAddress source)
@@ -212,83 +156,124 @@ class ReplicaFilteringProtection
* <p>
* The listener will track both the accepted data and the primary keys of
the rows that are considered as outdated.
* That way, once the query results would have been merged using this
listener, further calls to
- * {@link #queryProtectedPartitions(int)} will use the collected data to
return a copy of the
+ * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the
collected data to return a copy of the
* data originally collected from the specified replica, completed with
the potentially outdated rows.
*/
UnfilteredPartitionIterators.MergeListener mergeController()
{
- return (partitionKey, versions) -> {
-
- PartitionBuilder[] builders = new PartitionBuilder[sources.length];
-
- for (int i = 0; i < sources.length; i++)
- builders[i] = new PartitionBuilder(partitionKey,
columns(versions), stats(versions));
+ return new UnfilteredPartitionIterators.MergeListener()
+ {
+ @Override
+ public void close()
+ {
+ // If we hit the failure threshold before consuming a single
partition, record the current rows cached.
+
tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached,
maxRowsCached));
+ }
- return new UnfilteredRowIterators.MergeListener()
+ @Override
+ public UnfilteredRowIterators.MergeListener
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator>
versions)
{
- @Override
- public void onMergedPartitionLevelDeletion(DeletionTime
mergedDeletion, DeletionTime[] versions)
+ PartitionBuilder[] builders = new
PartitionBuilder[sources.length];
+ PartitionColumns columns = columns(versions);
+ EncodingStats stats = EncodingStats.merge(versions,
NULL_TO_NO_STATS);
+
+ for (int i = 0; i < sources.length; i++)
+ builders[i] = new PartitionBuilder(partitionKey,
sources[i], columns, stats);
+
+ return new UnfilteredRowIterators.MergeListener()
{
- // cache the deletion time versions to be able to
regenerate the original row iterator
- for (int i = 0; i < versions.length; i++)
- builders[i].setDeletionTime(versions[i]);
- }
+ @Override
+ public void onMergedPartitionLevelDeletion(DeletionTime
mergedDeletion, DeletionTime[] versions)
+ {
+ // cache the deletion time versions to be able to
regenerate the original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].setDeletionTime(versions[i]);
+ }
- @Override
- public Row onMergedRows(Row merged, Row[] versions)
- {
- // cache the row versions to be able to regenerate the
original row iterator
- for (int i = 0; i < versions.length; i++)
- builders[i].addRow(versions[i]);
+ @Override
+ public Row onMergedRows(Row merged, Row[] versions)
+ {
+ // cache the row versions to be able to regenerate the
original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].addRow(versions[i]);
- if (merged.isEmpty())
- return merged;
+ if (merged.isEmpty())
+ return merged;
- boolean isPotentiallyOutdated = false;
- boolean isStatic = merged.isStatic();
- for (int i = 0; i < versions.length; i++)
- {
- Row version = versions[i];
- if (version == null || (isStatic && version.isEmpty()))
+ boolean isPotentiallyOutdated = false;
+ boolean isStatic = merged.isStatic();
+ for (int i = 0; i < versions.length; i++)
{
- isPotentiallyOutdated = true;
- BTreeSet.Builder<Clustering> toFetch =
getOrCreateToFetch(i, partitionKey);
- // Note that for static, we shouldn't add the
clustering to the clustering set (the
- // ClusteringIndexNamesFilter we'll build from
this later does not expect it), but the fact
- // we created a builder in the first place will
act as a marker that the static row must be
- // fetched, even if no other rows are added for
this partition.
- if (!isStatic)
- toFetch.add(merged.clustering());
+ Row version = versions[i];
+ if (version == null || (isStatic &&
version.isEmpty()))
+ {
+ isPotentiallyOutdated = true;
+ builders[i].addToFetch(merged);
+ }
}
- }
- // If the row is potentially outdated (because some
replica didn't send anything and so it _may_ be
- // an outdated result that is only present because other
replica have filtered the up-to-date result
- // out), then we skip the row. In other words, the results
of the initial merging of results by this
- // protection assume the worst case scenario where every
row that might be outdated actually is.
- // This ensures that during this first phase (collecting
additional row to fetch) we are guaranteed
- // to look at enough data to ultimately fulfill the query
limit.
- return isPotentiallyOutdated ? null : merged;
- }
+ // If the row is potentially outdated (because some
replica didn't send anything and so it _may_ be
+ // an outdated result that is only present because
other replica have filtered the up-to-date result
+ // out), then we skip the row. In other words, the
results of the initial merging of results by this
+ // protection assume the worst case scenario where
every row that might be outdated actually is.
+ // This ensures that during this first phase
(collecting additional row to fetch) we are guaranteed
+ // to look at enough data to ultimately fulfill the
query limit.
+ return isPotentiallyOutdated ? null : merged;
+ }
- @Override
- public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker
merged, RangeTombstoneMarker[] versions)
- {
- // cache the marker versions to be able to regenerate the
original row iterator
- for (int i = 0; i < versions.length; i++)
- builders[i].addRangeTombstoneMarker(versions[i]);
- }
+ @Override
+ public void
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged,
RangeTombstoneMarker[] versions)
+ {
+ // cache the marker versions to be able to regenerate
the original row iterator
+ for (int i = 0; i < versions.length; i++)
+ builders[i].addRangeTombstoneMarker(versions[i]);
+ }
- @Override
- public void close()
- {
- for (int i = 0; i < sources.length; i++)
- originalPartitions.get(i).add(builders[i]);
- }
- };
+ @Override
+ public void close()
+ {
+ for (int i = 0; i < sources.length; i++)
+ originalPartitions.get(i).add(builders[i]);
+ }
+ };
+ }
};
}
+ private void incrementCachedRows()
+ {
+ currentRowsCached++;
+
+ if (currentRowsCached == cachedRowsFailThreshold + 1)
+ {
+ String message = String.format("Replica filtering protection has
cached over %d rows during query %s. " +
+ "(See
'cached_replica_rows_fail_threshold' in cassandra.yaml.)",
+ cachedRowsFailThreshold,
command.toCQLString());
+
+ logger.error(message);
+ Tracing.trace(message);
+ throw new OverloadedException(message);
+ }
+ else if (currentRowsCached == cachedRowsWarnThreshold + 1 &&
!hitWarningThreshold)
+ {
+ hitWarningThreshold = true;
+
+ String message = String.format("Replica filtering protection has
cached over %d rows during query %s. " +
+ "(See
'cached_replica_rows_warn_threshold' in cassandra.yaml.)",
+ cachedRowsWarnThreshold,
command.toCQLString());
+
+ ClientWarn.instance.warn(message);
+ oneMinuteLogger.warn(message);
+ Tracing.trace(message);
+ }
+ }
+
+ private void releaseCachedRows(int count)
+ {
+ maxRowsCached = Math.max(maxRowsCached, currentRowsCached);
+ currentRowsCached -= count;
+ }
+
private static PartitionColumns columns(List<UnfilteredRowIterator>
versions)
{
Columns statics = Columns.NONE;
@@ -305,24 +290,19 @@ class ReplicaFilteringProtection
return new PartitionColumns(statics, regulars);
}
- private static EncodingStats stats(List<UnfilteredRowIterator> iterators)
- {
- EncodingStats stats = EncodingStats.NO_STATS;
- for (UnfilteredRowIterator iter : iterators)
- {
- if (iter == null)
- continue;
-
- stats = stats.mergeWith(iter.stats());
- }
- return stats;
- }
-
- private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder>
builders)
+ /**
+ * Returns the protected results for the specified replica. These are
generated fetching the extra rows and merging
+ * them with the cached original filtered results for that replica.
+ *
+ * @param merged the first iteration partitions, that should have been
read used with the {@link #mergeController()}
+ * @param source the source
+ * @return the protected results for the specified replica
+ */
+ UnfilteredPartitionIterator queryProtectedPartitions(PartitionIterator
merged, int source)
{
return new UnfilteredPartitionIterator()
{
- final Iterator<PartitionBuilder> iterator = builders.iterator();
+ final Queue<PartitionBuilder> partitions =
originalPartitions.get(source);
@Override
public boolean isForThrift()
@@ -337,37 +317,48 @@ class ReplicaFilteringProtection
}
@Override
- public void close()
- {
- // nothing to do here
- }
+ public void close() { }
@Override
public boolean hasNext()
{
- return iterator.hasNext();
+ // If there are no cached partition builders for this source,
advance the first phase iterator, which
+ // will force the RFP merge listener to load at least the next
protected partition. Note that this may
+ // load more than one partition if any divergence between
replicas is discovered by the merge listener.
+ if (partitions.isEmpty())
+ {
+ PartitionIterators.consumeNext(merged);
+ }
+
+ return !partitions.isEmpty();
}
@Override
public UnfilteredRowIterator next()
{
- return iterator.next().build();
+ PartitionBuilder builder = partitions.poll();
+ assert builder != null;
+ return builder.protectedPartition();
}
};
}
private class PartitionBuilder
{
- private final DecoratedKey partitionKey;
+ private final DecoratedKey key;
+ private final InetAddress source;
private final PartitionColumns columns;
private final EncodingStats stats;
private DeletionTime deletionTime;
private Row staticRow = Rows.EMPTY_STATIC_ROW;
- private final List<Unfiltered> contents = new ArrayList<>();
+ private final Queue<Unfiltered> contents = new ArrayDeque<>();
+ private BTreeSet.Builder<Clustering> toFetch;
+ private int partitionRowsCached;
- private PartitionBuilder(DecoratedKey partitionKey, PartitionColumns
columns, EncodingStats stats)
+ private PartitionBuilder(DecoratedKey key, InetAddress source,
PartitionColumns columns, EncodingStats stats)
{
- this.partitionKey = partitionKey;
+ this.key = key;
+ this.source = source;
this.columns = columns;
this.stats = stats;
}
@@ -379,6 +370,12 @@ class ReplicaFilteringProtection
private void addRow(Row row)
{
+ partitionRowsCached++;
+
+ incrementCachedRows();
+
+ // Note that even null rows are counted against the row caching
limit. The assumption is that
+ // a subsequent protection query will later fetch the row onto the
heap anyway.
if (row == null)
return;
@@ -394,12 +391,23 @@ class ReplicaFilteringProtection
contents.add(marker);
}
- private UnfilteredRowIterator build()
+ private void addToFetch(Row row)
+ {
+ if (toFetch == null)
+ toFetch = BTreeSet.builder(command.metadata().comparator);
+
+ // Note that for static, we shouldn't add the clustering to the
clustering set (the
+ // ClusteringIndexNamesFilter we'll build from this later does not
expect it), but the fact
+ // we created a builder in the first place will act as a marker
that the static row must be
+ // fetched, even if no other rows are added for this partition.
+ if (!row.isStatic())
+ toFetch.add(row.clustering());
+ }
+
+ private UnfilteredRowIterator originalPartition()
{
return new UnfilteredRowIterator()
{
- final Iterator<Unfiltered> iterator = contents.iterator();
-
@Override
public DeletionTime partitionLevelDeletion()
{
@@ -433,7 +441,7 @@ class ReplicaFilteringProtection
@Override
public DecoratedKey partitionKey()
{
- return partitionKey;
+ return key;
}
@Override
@@ -445,21 +453,82 @@ class ReplicaFilteringProtection
@Override
public void close()
{
- // nothing to do here
+ releaseCachedRows(partitionRowsCached);
}
@Override
public boolean hasNext()
{
- return iterator.hasNext();
+ return !contents.isEmpty();
}
@Override
public Unfiltered next()
{
- return iterator.next();
+ return contents.poll();
}
};
}
+
+ private UnfilteredRowIterator protectedPartition()
+ {
+ UnfilteredRowIterator original = originalPartition();
+
+ if (toFetch != null)
+ {
+ try (UnfilteredPartitionIterator partitions =
fetchFromSource())
+ {
+ if (partitions.hasNext())
+ {
+ try (UnfilteredRowIterator fetchedRows =
partitions.next())
+ {
+ return
UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows),
command.nowInSec());
+ }
+ }
+ }
+ }
+
+ return original;
+ }
+
+ private UnfilteredPartitionIterator fetchFromSource()
+ {
+ assert toFetch != null;
+
+ NavigableSet<Clustering> clusterings = toFetch.build();
+ tableMetrics.replicaFilteringProtectionRequests.mark();
+
+ if (logger.isTraceEnabled())
+ logger.trace("Requesting rows {} in partition {} from {} for
replica filtering protection",
+ clusterings, key, source);
+
+ Tracing.trace("Requesting {} rows in partition {} from {} for
replica filtering protection",
+ clusterings.size(), key, source);
+
+ // build the read command taking into account that we could be
requesting only in the static row
+ DataLimits limits = clusterings.isEmpty() ?
DataLimits.cqlLimits(1) : DataLimits.NONE;
+ ClusteringIndexFilter filter = new
ClusteringIndexNamesFilter(clusterings, command.isReversed());
+ SinglePartitionReadCommand cmd =
SinglePartitionReadCommand.create(command.metadata(),
+
command.nowInSec(),
+
command.columnFilter(),
+
RowFilter.NONE,
+
limits,
+
key,
+
filter);
+ try
+ {
+ return executeReadCommand(cmd, source);
+ }
+ catch (ReadTimeoutException e)
+ {
+ int blockFor = consistency.blockFor(keyspace);
+ throw new ReadTimeoutException(consistency, blockFor - 1,
blockFor, true);
+ }
+ catch (UnavailableException e)
+ {
+ int blockFor = consistency.blockFor(keyspace);
+ throw new UnavailableException(consistency, blockFor, blockFor
- 1);
+ }
+ }
}
}
\ No newline at end of file
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index d287788..f0b183d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4799,6 +4799,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
public void setTombstoneWarnThreshold(int threshold)
{
DatabaseDescriptor.setTombstoneWarnThreshold(threshold);
+ logger.info("updated tombstone_warn_threshold to {}", threshold);
}
public int getTombstoneFailureThreshold()
@@ -4809,6 +4810,29 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
public void setTombstoneFailureThreshold(int threshold)
{
DatabaseDescriptor.setTombstoneFailureThreshold(threshold);
+ logger.info("updated tombstone_failure_threshold to {}", threshold);
+ }
+
+ public int getCachedReplicaRowsWarnThreshold()
+ {
+ return DatabaseDescriptor.getCachedReplicaRowsWarnThreshold();
+ }
+
+ public void setCachedReplicaRowsWarnThreshold(int threshold)
+ {
+ DatabaseDescriptor.setCachedReplicaRowsWarnThreshold(threshold);
+ logger.info("updated
replica_filtering_protection.cached_rows_warn_threshold to {}", threshold);
+ }
+
+ public int getCachedReplicaRowsFailThreshold()
+ {
+ return DatabaseDescriptor.getCachedReplicaRowsFailThreshold();
+ }
+
+ public void setCachedReplicaRowsFailThreshold(int threshold)
+ {
+ DatabaseDescriptor.setCachedReplicaRowsFailThreshold(threshold);
+ logger.info("updated
replica_filtering_protection.cached_rows_fail_threshold to {}", threshold);
}
public int getBatchSizeFailureThreshold()
@@ -4819,12 +4843,13 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
public void setBatchSizeFailureThreshold(int threshold)
{
DatabaseDescriptor.setBatchSizeFailThresholdInKB(threshold);
+ logger.info("updated batch_size_fail_threshold_in_kb to {}",
threshold);
}
public void setHintedHandoffThrottleInKB(int throttleInKB)
{
DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB);
- logger.info(String.format("Updated hinted_handoff_throttle_in_kb to
%d", throttleInKB));
+ logger.info("updated hinted_handoff_throttle_in_kb to {}",
throttleInKB);
}
@VisibleForTesting
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index e22b094..1afa48e 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -604,6 +604,18 @@ public interface StorageServiceMBean extends
NotificationEmitter
/** Sets the threshold for abandoning queries with many tombstones */
public void setTombstoneFailureThreshold(int tombstoneDebugThreshold);
+ /** Returns the number of rows cached at the coordinator before
filtering/index queries log a warning. */
+ public int getCachedReplicaRowsWarnThreshold();
+
+ /** Sets the number of rows cached at the coordinator before
filtering/index queries log a warning. */
+ public void setCachedReplicaRowsWarnThreshold(int threshold);
+
+ /** Returns the number of rows cached at the coordinator before
filtering/index queries fail outright. */
+ public int getCachedReplicaRowsFailThreshold();
+
+ /** Sets the number of rows cached at the coordinator before
filtering/index queries fail outright. */
+ public void setCachedReplicaRowsFailThreshold(int threshold);
+
/** Returns the threshold for rejecting queries due to a large batch size
*/
public int getBatchSizeFailureThreshold();
/** Sets the threshold for rejecting queries due to a large batch size */
diff --git a/src/java/org/apache/cassandra/utils/FBUtilities.java
b/src/java/org/apache/cassandra/utils/FBUtilities.java
index b560adf..d3633fd 100644
--- a/src/java/org/apache/cassandra/utils/FBUtilities.java
+++ b/src/java/org/apache/cassandra/utils/FBUtilities.java
@@ -437,9 +437,9 @@ public class FBUtilities
}
}
- public static void waitOnFutures(List<AsyncOneResponse> results, long ms)
throws TimeoutException
+ public static void waitOnFutures(List<AsyncOneResponse<?>> results, long
ms) throws TimeoutException
{
- for (AsyncOneResponse result : results)
+ for (AsyncOneResponse<?> result : results)
result.get(ms, TimeUnit.MILLISECONDS);
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
index ca9bb09..15afdbe 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java
@@ -18,7 +18,6 @@
*/
package org.apache.cassandra.utils.concurrent;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
@@ -138,14 +137,12 @@ public class Accumulator<E> implements Iterable<E>
}
/**
- * Removes all of the elements from this accumulator.
+ * Removes element at the speficied index from this accumulator.
*
* This method is not thread-safe when used concurrently with {@link
#add(Object)}.
*/
- public void clearUnsafe()
+ public void clearUnsafe(int i)
{
- nextIndexUpdater.set(this, 0);
- presentCountUpdater.set(this, 0);
- Arrays.fill(values, null);
+ values[i] = null;
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 2f2b525..51a08e6 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.distributed.api.QueryResult;
import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.pager.QueryPager;
import org.apache.cassandra.transport.Server;
@@ -91,6 +92,11 @@ public class Coordinator implements ICoordinator
boundBBValues.add(ByteBufferUtil.objectToBytes(boundValue));
prepared.validate(QueryState.forInternalCalls().getClientState());
+
+ // Start capturing warnings on this thread. Note that this will
implicitly clear out any previous
+ // warnings as it sets a new State instance on the ThreadLocal.
+ ClientWarn.instance.captureWarnings();
+
ResultMessage res = prepared.execute(QueryState.forInternalCalls(),
QueryOptions.create(toCassandraCL(consistencyLevel),
boundBBValues,
@@ -100,6 +106,10 @@ public class Coordinator implements ICoordinator
null,
Server.CURRENT_VERSION));
+ // Collect warnings reported during the query.
+ if (res != null)
+ res.setWarnings(ClientWarn.instance.getWarnings());
+
return RowUtil.toQueryResult(res);
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
index 50d501e..876ce29 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/RowUtil.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.distributed.impl;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -41,7 +42,11 @@ public class RowUtil
ResultMessage.Rows rows = (ResultMessage.Rows) res;
String[] names = getColumnNames(rows.result.metadata.names);
Object[][] results = RowUtil.toObjects(rows);
- return new SimpleQueryResult(names, results);
+
+ // Warnings may be null here, due to ClientWarn#getWarnings()
handling of empty warning lists.
+ List<String> warnings = res.getWarnings();
+
+ return new SimpleQueryResult(names, results, warnings == null ?
Collections.emptyList() : warnings);
}
else
{
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
new file mode 100644
index 0000000..2a847bf
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+import org.apache.cassandra.exceptions.OverloadedException;
+import org.apache.cassandra.service.StorageService;
+
+import static
org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_FAIL_THRESHOLD;
+import static
org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_WARN_THRESHOLD;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.shared.AssertUtils.row;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Exercises the functionality of {@link
org.apache.cassandra.service.ReplicaFilteringProtection}, the
+ * mechanism that ensures distributed index and filtering queries at read
consistency levels > ONE/LOCAL_ONE
+ * avoid stale replica results.
+ */
+public class ReplicaFilteringProtectionTest extends TestBaseImpl
+{
+ private static final int REPLICAS = 2;
+ private static final int ROWS = 3;
+
+ private static Cluster cluster;
+
+ @BeforeClass
+ public static void setup() throws IOException
+ {
+ cluster = init(Cluster.build()
+ .withNodes(REPLICAS)
+ .withConfig(config ->
config.set("hinted_handoff_enabled", false)
+
.set("commitlog_sync", "batch")
+ .set("num_tokens",
1)).start());
+
+ // Make sure we start w/ the correct defaults:
+ cluster.get(1).runOnInstance(() ->
assertEquals(DEFAULT_WARN_THRESHOLD,
StorageService.instance.getCachedReplicaRowsWarnThreshold()));
+ cluster.get(1).runOnInstance(() ->
assertEquals(DEFAULT_FAIL_THRESHOLD,
StorageService.instance.getCachedReplicaRowsFailThreshold()));
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ cluster.close();
+ }
+
+ @Test
+ public void testMissedUpdatesBelowCachingWarnThreshold()
+ {
+ String tableName = "missed_updates_no_warning";
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + "
(k int PRIMARY KEY, v text)"));
+
+ // The warning threshold provided is one more than the total number of
rows returned
+ // to the coordinator from all replicas and therefore should not be
triggered.
+ testMissedUpdates(tableName, REPLICAS * ROWS, Integer.MAX_VALUE,
false);
+ }
+
+ @Test
+ public void testMissedUpdatesAboveCachingWarnThreshold()
+ {
+ String tableName = "missed_updates_cache_warn";
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + "
(k int PRIMARY KEY, v text)"));
+
+ // The warning threshold provided is one less than the total number of
rows returned
+ // to the coordinator from all replicas and therefore should be
triggered but not fail the query.
+ testMissedUpdates(tableName, REPLICAS * ROWS - 1, Integer.MAX_VALUE,
true);
+ }
+
+ @Test
+ public void testMissedUpdatesAroundCachingFailThreshold()
+ {
+ String tableName = "missed_updates_cache_fail";
+ cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + "
(k int PRIMARY KEY, v text)"));
+
+ // The failure threshold provided is exactly the total number of rows
returned
+ // to the coordinator from all replicas and therefore should just warn.
+ testMissedUpdates(tableName, 1, REPLICAS * ROWS, true);
+
+ try
+ {
+ // The failure threshold provided is one less than the total
number of rows returned
+ // to the coordinator from all replicas and therefore should fail
the query.
+ testMissedUpdates(tableName, 1, REPLICAS * ROWS - 1, true);
+ }
+ catch (RuntimeException e)
+ {
+ assertEquals(e.getCause().getClass().getName(),
OverloadedException.class.getName());
+ }
+ }
+
+ private void testMissedUpdates(String tableName, int warnThreshold, int
failThreshold, boolean shouldWarn)
+ {
+ cluster.get(1).runOnInstance(() ->
StorageService.instance.setCachedReplicaRowsWarnThreshold(warnThreshold));
+ cluster.get(1).runOnInstance(() ->
StorageService.instance.setCachedReplicaRowsFailThreshold(failThreshold));
+
+ String fullTableName = KEYSPACE + '.' + tableName;
+
+ // Case 1: Insert and query rows at ALL to verify base line.
+ for (int i = 0; i < ROWS; i++)
+ {
+ cluster.coordinator(1).execute("INSERT INTO " + fullTableName +
"(k, v) VALUES (?, 'old')", ALL, i);
+ }
+
+ long histogramSampleCount = rowsCachedPerQueryCount(cluster.get(1),
tableName);
+
+ String query = "SELECT * FROM " + fullTableName + " WHERE v = ? LIMIT
? ALLOW FILTERING";
+
+ Object[][] initialRows = cluster.coordinator(1).execute(query, ALL,
"old", ROWS);
+ assertRows(initialRows, row(1, "old"), row(0, "old"), row(2, "old"));
+
+ // Make sure only one sample was recorded for the query.
+ assertEquals(histogramSampleCount + 1,
rowsCachedPerQueryCount(cluster.get(1), tableName));
+
+ // Case 2: Update all rows on only one replica, leaving the entire
dataset of the remaining replica out-of-date.
+ updateAllRowsOn(1, fullTableName, "new");
+
+ // The replica that missed the results creates a mismatch at every
row, and we therefore cache a version
+ // of that row for all replicas.
+ SimpleQueryResult oldResult =
cluster.coordinator(1).executeWithResult(query, ALL, "old", ROWS);
+ assertRows(oldResult.toObjectArrays());
+ verifyWarningState(shouldWarn, oldResult);
+
+ // We should have made 3 row "completion" requests.
+ assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName));
+
+ // In all cases above, the queries should be caching 1 row per
partition per replica, but
+ // 6 for the whole query, given every row is potentially stale.
+ assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1),
tableName));
+
+ // Make sure only one more sample was recorded for the query.
+ assertEquals(histogramSampleCount + 2,
rowsCachedPerQueryCount(cluster.get(1), tableName));
+
+ // Case 3: Observe the effects of blocking read-repair.
+
+ // The previous query peforms a blocking read-repair, which removes
replica divergence. This
+ // will only warn, therefore, if the warning threshold is actually
below the number of replicas.
+ // (i.e. The row cache counter is decremented/reset as each partition
is consumed.)
+ SimpleQueryResult newResult =
cluster.coordinator(1).executeWithResult(query, ALL, "new", ROWS);
+ Object[][] newRows = newResult.toObjectArrays();
+ assertRows(newRows, row(1, "new"), row(0, "new"), row(2, "new"));
+
+ verifyWarningState(warnThreshold < REPLICAS, newResult);
+
+ // We still sould only have made 3 row "completion" requests, with no
replica divergence in the last query.
+ assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName));
+
+ // With no replica divergence, we only cache a single partition at a
time across 2 replicas.
+ assertEquals(REPLICAS, minRowsCachedPerQuery(cluster.get(1),
tableName));
+
+ // Make sure only one more sample was recorded for the query.
+ assertEquals(histogramSampleCount + 3,
rowsCachedPerQueryCount(cluster.get(1), tableName));
+
+ // Case 4: Introduce another mismatch by updating all rows on only one
replica.
+
+ updateAllRowsOn(1, fullTableName, "future");
+
+ // Another mismatch is introduced, and we once again cache a version
of each row during resolution.
+ SimpleQueryResult futureResult =
cluster.coordinator(1).executeWithResult(query, ALL, "future", ROWS);
+ Object[][] futureRows = futureResult.toObjectArrays();
+ assertRows(futureRows, row(1, "future"), row(0, "future"), row(2,
"future"));
+
+ verifyWarningState(shouldWarn, futureResult);
+
+ // We sould have made 3 more row "completion" requests.
+ assertEquals(ROWS * 2, protectionQueryCount(cluster.get(1),
tableName));
+
+ // In all cases above, the queries should be caching 1 row per
partition, but 6 for the
+ // whole query, given every row is potentially stale.
+ assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1),
tableName));
+
+ // Make sure only one more sample was recorded for the query.
+ assertEquals(histogramSampleCount + 4,
rowsCachedPerQueryCount(cluster.get(1), tableName));
+ }
+
+ private void updateAllRowsOn(int node, String table, String value)
+ {
+ for (int i = 0; i < ROWS; i++)
+ {
+ cluster.get(node).executeInternal("UPDATE " + table + " SET v = ?
WHERE k = ?", value, i);
+ }
+ }
+
+ private void verifyWarningState(boolean shouldWarn, SimpleQueryResult
futureResult)
+ {
+ List<String> futureWarnings = futureResult.warnings();
+ assertEquals(shouldWarn, futureWarnings.stream().anyMatch(w ->
w.contains("cached_replica_rows_warn_threshold")));
+ assertEquals(shouldWarn ? 1 : 0, futureWarnings.size());
+ }
+
+ private long protectionQueryCount(IInvokableInstance instance, String
tableName)
+ {
+ return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+
.getColumnFamilyStore(tableName)
+
.metric.replicaFilteringProtectionRequests.getCount());
+ }
+
+ private long maxRowsCachedPerQuery(IInvokableInstance instance, String
tableName)
+ {
+ return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+
.getColumnFamilyStore(tableName)
+
.metric.rfpRowsCachedPerQuery.getSnapshot().getMax());
+ }
+
+ private long minRowsCachedPerQuery(IInvokableInstance instance, String
tableName)
+ {
+ return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+
.getColumnFamilyStore(tableName)
+
.metric.rfpRowsCachedPerQuery.getSnapshot().getMin());
+ }
+
+ private long rowsCachedPerQueryCount(IInvokableInstance instance, String
tableName)
+ {
+ return instance.callOnInstance(() -> Keyspace.open(KEYSPACE)
+
.getColumnFamilyStore(tableName)
+
.metric.rfpRowsCachedPerQuery.getCount());
+ }
+}
diff --git a/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
new file mode 100644
index 0000000..559ce10
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/rows/EncodingStatsTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.rows;
+
+import java.util.function.Function;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.cassandra.db.LivenessInfo;
+
+public class EncodingStatsTest
+{
+ @Test
+ public void testCollectWithNoStats()
+ {
+ EncodingStats none = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ EncodingStats.NO_STATS,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ Assert.assertEquals(none, EncodingStats.NO_STATS);
+ }
+
+ @Test
+ public void testCollectWithNoStatsWithEmpty()
+ {
+ EncodingStats none = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ EncodingStats.NO_STATS,
+ new EncodingStats(LivenessInfo.NO_TIMESTAMP,
LivenessInfo.NO_EXPIRATION_TIME, 0)
+ ), Function.identity());
+ Assert.assertEquals(none, EncodingStats.NO_STATS);
+ }
+
+ @Test
+ public void testCollectWithNoStatsWithTimestamp()
+ {
+ EncodingStats single = new EncodingStats(1,
LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ EncodingStats.NO_STATS,
+ single,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ Assert.assertEquals(single, result);
+ }
+
+ @Test
+ public void testCollectWithNoStatsWithExpires()
+ {
+ EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1,
0);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ single,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ Assert.assertEquals(single, result);
+ }
+
+ @Test
+ public void testCollectWithNoStatsWithTTL()
+ {
+ EncodingStats single = new EncodingStats(LivenessInfo.NO_TIMESTAMP,
LivenessInfo.NO_EXPIRATION_TIME, 1);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ EncodingStats.NO_STATS,
+ single,
+ EncodingStats.NO_STATS
+ ), Function.identity());
+ Assert.assertEquals(single, result);
+ }
+
+ @Test
+ public void testCollectOneEach()
+ {
+ EncodingStats tsp = new EncodingStats(1,
LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats exp = new EncodingStats(LivenessInfo.NO_TIMESTAMP, 1, 0);
+ EncodingStats ttl = new EncodingStats(LivenessInfo.NO_TIMESTAMP,
LivenessInfo.NO_EXPIRATION_TIME, 1);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ tsp,
+ exp,
+ ttl
+ ), Function.identity());
+ Assert.assertEquals(new EncodingStats(1, 1, 1), result);
+ }
+
+ @Test
+ public void testTimestamp()
+ {
+ EncodingStats one = new EncodingStats(1,
LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats two = new EncodingStats(2,
LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats thr = new EncodingStats(3,
LivenessInfo.NO_EXPIRATION_TIME, 0);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ one,
+ two,
+ thr
+ ), Function.identity());
+ Assert.assertEquals(one, result);
+ }
+
+ @Test
+ public void testExpires()
+ {
+ EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP,1, 0);
+ EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP,2, 0);
+ EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP,3, 0);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ one,
+ two,
+ thr
+ ), Function.identity());
+ Assert.assertEquals(one, result);
+ }
+
+ @Test
+ public void testTTL()
+ {
+ EncodingStats one = new EncodingStats(LivenessInfo.NO_TIMESTAMP,
LivenessInfo.NO_EXPIRATION_TIME,1);
+ EncodingStats two = new EncodingStats(LivenessInfo.NO_TIMESTAMP,
LivenessInfo.NO_EXPIRATION_TIME,2);
+ EncodingStats thr = new EncodingStats(LivenessInfo.NO_TIMESTAMP,
LivenessInfo.NO_EXPIRATION_TIME,3);
+ EncodingStats result = EncodingStats.merge(ImmutableList.of(
+ thr,
+ one,
+ two
+ ), Function.identity());
+ Assert.assertEquals(one, result);
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
index 33daca7..ce0fe27 100644
--- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
+++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java
@@ -29,7 +29,7 @@ public class AccumulatorTest
@Test
public void testAddMoreThanCapacity()
{
- Accumulator<Integer> accu = new Accumulator(4);
+ Accumulator<Integer> accu = new Accumulator<>(4);
accu.add(1);
accu.add(2);
@@ -50,7 +50,7 @@ public class AccumulatorTest
@Test
public void testIsEmptyAndSize()
{
- Accumulator<Integer> accu = new Accumulator(4);
+ Accumulator<Integer> accu = new Accumulator<>(4);
assertTrue(accu.isEmpty());
assertEquals(0, accu.size());
@@ -58,20 +58,20 @@ public class AccumulatorTest
accu.add(1);
accu.add(2);
- assertTrue(!accu.isEmpty());
+ assertFalse(accu.isEmpty());
assertEquals(2, accu.size());
accu.add(3);
accu.add(4);
- assertTrue(!accu.isEmpty());
+ assertFalse(accu.isEmpty());
assertEquals(4, accu.size());
}
@Test
public void testGetAndIterator()
{
- Accumulator<String> accu = new Accumulator(4);
+ Accumulator<String> accu = new Accumulator<>(4);
accu.add("3");
accu.add("2");
@@ -99,32 +99,32 @@ public class AccumulatorTest
@Test
public void testClearUnsafe()
{
- Accumulator<String> accu = new Accumulator<>(3);
+ Accumulator<String> accu = new Accumulator<>(5);
accu.add("1");
accu.add("2");
accu.add("3");
- accu.clearUnsafe();
+ accu.clearUnsafe(1);
- assertEquals(0, accu.size());
- assertFalse(accu.iterator().hasNext());
- assertOutOfBonds(accu, 0);
+ assertEquals(3, accu.size());
+ assertTrue(accu.iterator().hasNext());
accu.add("4");
accu.add("5");
- assertEquals(2, accu.size());
+ assertEquals(5, accu.size());
- assertEquals("4", accu.get(0));
- assertEquals("5", accu.get(1));
- assertOutOfBonds(accu, 2);
+ assertEquals("4", accu.get(3));
+ assertEquals("5", accu.get(4));
+ assertOutOfBonds(accu, 5);
Iterator<String> iter = accu.iterator();
assertTrue(iter.hasNext());
- assertEquals("4", iter.next());
- assertEquals("5", iter.next());
- assertFalse(iter.hasNext());
+ assertEquals("1", iter.next());
+ assertNull(iter.next());
+ assertTrue(iter.hasNext());
+ assertEquals("3", iter.next());
}
private static void assertOutOfBonds(Accumulator<String> accumulator, int
index)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]