Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 dfbe3fabd -> a7cb009f8 refs/heads/cassandra-3.11 809f3b30e -> 826ae9c91 refs/heads/trunk 326f3a7c7 -> 278906c6c
Fix AssertionError in short read protection patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for CASSANDRA-13747 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7cb009f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7cb009f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7cb009f Branch: refs/heads/cassandra-3.0 Commit: a7cb009f8a3f4d0e0293111bfcfff3d404a37a89 Parents: dfbe3fa Author: Aleksey Yeschenko <[email protected]> Authored: Sun Aug 6 19:42:47 2017 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Aug 29 12:22:39 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../UnfilteredPartitionIterators.java | 6 --- .../db/transform/EmptyPartitionsDiscarder.java | 35 +++++++++++++++ .../apache/cassandra/db/transform/Filter.java | 28 +++--------- .../db/transform/FilteredPartitions.java | 18 +++++--- .../cassandra/db/transform/FilteredRows.java | 2 +- .../apache/cassandra/metrics/TableMetrics.java | 4 ++ .../apache/cassandra/service/DataResolver.java | 47 ++++++++++++++------ 8 files changed, 94 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5ccd5cd..6609b05 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Fix AssertionError in short read protection (CASSANDRA-13747) * Don't skip corrupted sstables on startup (CASSANDRA-13620) * Fix the merging of cells with different user type versions (CASSANDRA-13776) * Copy session properties on cqlsh.py do_login (CASSANDRA-13640) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java index 1abbb19..4e0ac1b 100644 --- a/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java +++ b/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java @@ -77,12 +77,6 @@ public abstract class UnfilteredPartitionIterators return Transformation.apply(toReturn, new Close()); } - public static PartitionIterator mergeAndFilter(List<UnfilteredPartitionIterator> iterators, int nowInSec, MergeListener listener) - { - // TODO: we could have a somewhat faster version if we were to merge the UnfilteredRowIterators directly as RowIterators - return filter(merge(iterators, nowInSec, listener), nowInSec); - } - public static PartitionIterator filter(final UnfilteredPartitionIterator iterator, final int nowInSec) { return FilteredPartitions.filter(iterator, nowInSec); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java new file mode 100644 index 0000000..5e41cec --- /dev/null +++ b/src/java/org/apache/cassandra/db/transform/EmptyPartitionsDiscarder.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.transform; + +import org.apache.cassandra.db.rows.BaseRowIterator; + +public final class EmptyPartitionsDiscarder extends Transformation<BaseRowIterator<?>> +{ + @Override + protected BaseRowIterator applyToPartition(BaseRowIterator iterator) + { + if (iterator.isEmpty()) + { + iterator.close(); + return null; + } + + return iterator; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/Filter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/Filter.java b/src/java/org/apache/cassandra/db/transform/Filter.java index 48c8b1a..747983f 100644 --- a/src/java/org/apache/cassandra/db/transform/Filter.java +++ b/src/java/org/apache/cassandra/db/transform/Filter.java @@ -23,27 +23,21 @@ package org.apache.cassandra.db.transform; import org.apache.cassandra.db.DeletionPurger; import org.apache.cassandra.db.rows.*; -final class Filter extends Transformation +public final class Filter extends Transformation { - private final boolean filterEmpty; // generally maps to !isForThrift, but also false for direct row filtration private final int nowInSec; - public Filter(boolean filterEmpty, int nowInSec) + + public Filter(int nowInSec) { - this.filterEmpty = filterEmpty; this.nowInSec = nowInSec; } @Override protected RowIterator applyToPartition(BaseRowIterator iterator) { - RowIterator filtered = iterator instanceof UnfilteredRows - ? new FilteredRows(this, (UnfilteredRows) iterator) - : new FilteredRows((UnfilteredRowIterator) iterator, this); - - if (filterEmpty && closeIfEmpty(filtered)) - return null; - - return filtered; + return iterator instanceof UnfilteredRows + ? new FilteredRows(this, (UnfilteredRows) iterator) + : new FilteredRows((UnfilteredRowIterator) iterator, this); } @Override @@ -67,14 +61,4 @@ final class Filter extends Transformation { return null; } - - private static boolean closeIfEmpty(BaseRowIterator<?> iter) - { - if (iter.isEmpty()) - { - iter.close(); - return true; - } - return false; - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java index 09e36b4..ad9446d 100644 --- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java +++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java @@ -50,11 +50,19 @@ public final class FilteredPartitions extends BasePartitions<RowIterator, BasePa /** * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator. */ - public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs) + public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs) { - Filter filter = new Filter(!iterator.isForThrift(), nowInSecs); - if (iterator instanceof UnfilteredPartitions) - return new FilteredPartitions(filter, (UnfilteredPartitions) iterator); - return new FilteredPartitions(iterator, filter); + FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs)); + + return iterator.isForThrift() + ? filtered + : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder()); + } + + public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter) + { + return iterator instanceof UnfilteredPartitions + ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator) + : new FilteredPartitions(iterator, filter); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/db/transform/FilteredRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/FilteredRows.java b/src/java/org/apache/cassandra/db/transform/FilteredRows.java index 818d3bb..5b635eb 100644 --- a/src/java/org/apache/cassandra/db/transform/FilteredRows.java +++ b/src/java/org/apache/cassandra/db/transform/FilteredRows.java @@ -55,6 +55,6 @@ public final class FilteredRows extends BaseRows<Row, BaseRowIterator<?>> implem */ public static RowIterator filter(UnfilteredRowIterator iterator, int nowInSecs) { - return new Filter(false, nowInSecs).applyToPartition(iterator); + return new Filter(nowInSecs).applyToPartition(iterator); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index a493836..fe88a63 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -151,6 +151,8 @@ public class TableMetrics public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write"); public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range"); + public final Meter shortReadProtectionRequests; + public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; /** * stores metrics that will be rolled into a single global metric @@ -645,6 +647,8 @@ public class TableMetrics casPrepare = new LatencyMetrics(factory, "CasPrepare", cfs.keyspace.metric.casPrepare); casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose); casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit); + + shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests")); } public void updateSSTableIterated(int count) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7cb009f/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 26b1b2a..72c4950 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -27,16 +27,13 @@ import com.google.common.collect.Iterables; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.db.transform.MoreRows; -import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.db.transform.*; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.net.*; import org.apache.cassandra.tracing.Tracing; @@ -72,10 +69,29 @@ public class DataResolver extends ResponseResolver sources[i] = msg.from; } - // Even though every responses should honor the limit, we might have more than requested post reconciliation, - // so ensure we're respecting the limit. + /* + * Even though every response, individually, will honor the limit, it is possible that we will, after the merge, + * have more rows than the client requested. To make sure that we still conform to the original limit, + * we apply a top-level post-reconciliation counter to the merged partition iterator. + * + * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied + * to the current partition to work. For this reason we have to apply the counter transformation before + * empty partition discard logic kicks in - for it will eagerly consume the iterator. + * + * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions + * + * See CASSANDRA-13747 for more details. + */ + DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition()); - return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter)); + + UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter); + FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec())); + PartitionIterator counted = counter.applyTo(filtered); + + return command.isForThrift() + ? counted + : Transformation.apply(counted, new EmptyPartitionsDiscarder()); } public void compareResponses() @@ -87,11 +103,13 @@ public class DataResolver extends ResponseResolver } } - private PartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, DataLimits.Counter resultCounter) + private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, + InetAddress[] sources, + DataLimits.Counter resultCounter) { // If we have only one results, there is no read repair to do and we can't get short reads if (results.size() == 1) - return UnfilteredPartitionIterators.filter(results.get(0), command.nowInSec()); + return results.get(0); UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources); @@ -103,7 +121,7 @@ public class DataResolver extends ResponseResolver results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter))); } - return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener); + return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener); } private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener @@ -510,7 +528,7 @@ public class DataResolver extends ResponseResolver // counting iterator. int n = postReconciliationCounter.countedInCurrentPartition(); int x = counter.countedInCurrentPartition(); - int toQuery = Math.max(((n * n) / x) - n, 1); + int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1); DataLimits retryLimits = command.limits().forShortReadRetry(toQuery); ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); @@ -523,6 +541,9 @@ public class DataResolver extends ResponseResolver partitionKey, retryFilter); + Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); + Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark(); + return doShortReadRetry(cmd); } @@ -531,7 +552,7 @@ public class DataResolver extends ResponseResolver DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1); ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source)); if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); else MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
