Fix short read protection performance patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for CASSANDRA-13794
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f93e6e34 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f93e6e34 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f93e6e34 Branch: refs/heads/cassandra-3.11 Commit: f93e6e3401c343dec74687d8b079b5697813ab28 Parents: ab5084a Author: Aleksey Yeschenko <alek...@yeschenko.com> Authored: Thu Aug 31 20:51:08 2017 +0100 Committer: Aleksey Yeschenko <alek...@yeschenko.com> Committed: Wed Sep 20 16:11:18 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 5 + .../apache/cassandra/service/DataResolver.java | 272 ++++++++++++------- 3 files changed, 181 insertions(+), 97 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 2d11a3e..07742ef 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Improve short read protection performance (CASSANDRA-13794) * Fix sstable reader to support range-tombstone-marker for multi-slices (CASSANDRA-13787) * Fix short read protection for tables with no clustering columns (CASSANDRA-13880) * Make isBuilt volatile in PartitionUpdate (CASSANDRA-13619) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 983d6b1..e6e46b2 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -2470,4 +2470,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return keyspace.getColumnFamilyStore(id); } + + public static TableMetrics metricsFor(UUID tableId) + { + return getIfExists(tableId).metric; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f93e6e34/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java index 99399a3..9a98ee5 100644 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@ -44,7 +44,7 @@ public class DataResolver extends ResponseResolver @VisibleForTesting final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); - public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) { super(keyspace, command, consistency, maxResponseCount); } @@ -55,6 +55,20 @@ public class DataResolver extends ResponseResolver return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec()); } + public boolean isDataPresent() + { + return !responses.isEmpty(); + } + + public void compareResponses() + { + // We need to fully consume the results to trigger read repairs if appropriate + try (PartitionIterator iterator = resolve()) + { + PartitionIterators.consume(iterator); + } + } + public PartitionIterator resolve() { // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here @@ -83,54 +97,56 @@ public class DataResolver extends ResponseResolver * See CASSANDRA-13747 for more details. */ - DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition()); + DataLimits.Counter mergedResultCounter = + command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition()); - UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter); - FilteredPartitions filtered = FilteredPartitions.filter(merged, - new Filter(command.nowInSec(), - command.metadata().enforceStrictLiveness())); - PartitionIterator counted = counter.applyTo(filtered); + UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, mergedResultCounter); + FilteredPartitions filtered = + FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); + PartitionIterator counted = Transformation.apply(filtered, mergedResultCounter); return command.isForThrift() ? counted : Transformation.apply(counted, new EmptyPartitionsDiscarder()); } - public void compareResponses() - { - // We need to fully consume the results to trigger read repairs if appropriate - try (PartitionIterator iterator = resolve()) - { - PartitionIterators.consume(iterator); - } - } - private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results, InetAddress[] sources, - DataLimits.Counter resultCounter) + DataLimits.Counter mergedResultCounter) { // If we have only one results, there is no read repair to do and we can't get short reads if (results.size() == 1) return results.get(0); - UnfilteredPartitionIterators.MergeListener listener = new RepairMergeListener(sources); - // So-called "short reads" stems from nodes returning only a subset of the results they have for a partition due to the limit, // but that subset not being enough post-reconciliation. So if we don't have limit, don't bother. if (!command.limits().isUnlimited()) { for (int i = 0; i < results.size(); i++) - results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter))); + { + DataLimits.Counter singleResultCounter = + command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount(); + + ShortReadResponseProtection protection = + new ShortReadResponseProtection(sources[i], singleResultCounter, mergedResultCounter); + + /* + * The order of transformations is important here. See ShortReadResponseProtection.applyToPartition() + * comments for details. We want singleResultCounter.applyToPartition() to be called after SRRP applies + * its transformations, so that this order is preserved when calling applyToRows() too. + */ + results.set(i, Transformation.apply(Transformation.apply(results.get(i), protection), singleResultCounter)); + } } - return UnfilteredPartitionIterators.merge(results, command.nowInSec(), listener); + return UnfilteredPartitionIterators.merge(results, command.nowInSec(), new RepairMergeListener(sources)); } private class RepairMergeListener implements UnfilteredPartitionIterators.MergeListener { private final InetAddress[] sources; - public RepairMergeListener(InetAddress[] sources) + private RepairMergeListener(InetAddress[] sources) { this.sources = sources; } @@ -209,7 +225,7 @@ public class DataResolver extends ResponseResolver // For each source, record if there is an open range to send as repair, and from where. private final Slice.Bound[] markerToRepair = new Slice.Bound[sources.length]; - public MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) + private MergeListener(DecoratedKey partitionKey, PartitionColumns columns, boolean isReversed) { this.partitionKey = partitionKey; this.columns = columns; @@ -457,17 +473,18 @@ public class DataResolver extends ResponseResolver } } - private class ShortReadProtection extends Transformation<UnfilteredRowIterator> + private class ShortReadResponseProtection extends Transformation<UnfilteredRowIterator> { private final InetAddress source; - private final DataLimits.Counter counter; - private final DataLimits.Counter postReconciliationCounter; - private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter) + private final DataLimits.Counter singleResultCounter; // unmerged per-source counter + private final DataLimits.Counter mergedResultCounter; // merged end-result counter + + private ShortReadResponseProtection(InetAddress source, DataLimits.Counter singleResultCounter, DataLimits.Counter mergedResultCounter) { this.source = source; - this.counter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition()).onlyCount(); - this.postReconciliationCounter = postReconciliationCounter; + this.singleResultCounter = singleResultCounter; + this.mergedResultCounter = mergedResultCounter; } @Override @@ -475,24 +492,25 @@ public class DataResolver extends ResponseResolver { ShortReadRowProtection protection = new ShortReadRowProtection(partition.metadata(), partition.partitionKey()); - partition = MoreRows.extend(partition, protection); // enable moreContents() - /* - * if we don't apply these transformations *after* extending the partition with MoreRows, - * their applyToRow() method will not be called on the first row of the new extension iterator + * Extend for moreContents() then apply protection to track lastClustering. + * + * If we don't apply the transformation *after* extending the partition with MoreRows, + * applyToRow() method of protection will not be called on the first row of the new extension iterator. */ - partition = Transformation.apply(partition, protection); // track lastClustering - partition = Transformation.apply(partition, counter); // do the counting - - return partition; + return Transformation.apply(MoreRows.extend(partition, protection), protection); } private class ShortReadRowProtection extends Transformation implements MoreRows<UnfilteredRowIterator> { - final CFMetaData metadata; - final DecoratedKey partitionKey; - Clustering lastClustering; - int lastCount = 0; + private final CFMetaData metadata; + private final DecoratedKey partitionKey; + + private Clustering lastClustering; + + private int lastCounted = 0; // last seen recorded # before attempting to fetch more rows + private int lastFetched = 0; // # rows returned by last attempt to get more (or by the original read command) + private int lastQueried = 0; // # extra rows requested from the replica last time private ShortReadRowProtection(CFMetaData metadata, DecoratedKey partitionKey) { @@ -507,79 +525,139 @@ public class DataResolver extends ResponseResolver return row; } - @Override + /* + * We have a potential short read if the result from a given node contains the requested number of rows + * for that partition (i.e. it has stopped returning results due to the limit), but some of them haven't + * made it into the final post-reconciliation result due to other nodes' tombstones. + * + * If that is the case, then that node may have more rows that we should fetch, as otherwise we could + * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which + * which we also need to fetch as they may shadow rows from other replicas' results, which we would + * otherwise return incorrectly. + * + * Also note that we only get here once all the rows for this partition have been iterated over, and so + * if the node had returned the requested number of rows but we still get here, then some results were + * skipped during reconciliation. + */ public UnfilteredRowIterator moreContents() { - assert !postReconciliationCounter.isDoneForPartition(); - - // We have a short read if the node this is the result of has returned the requested number of - // rows for that partition (i.e. it has stopped returning results due to the limit), but some of - // those results haven't made it in the final result post-reconciliation due to other nodes - // tombstones. If that is the case, then the node might have more results that we should fetch - // as otherwise we might return less results than required, or results that shouldn't be returned - // (because the node has tombstone that hides future results from other nodes but that haven't - // been returned due to the limit). - // Also note that we only get here once all the results for this node have been returned, and so - // if the node had returned the requested number but we still get there, it imply some results were - // skipped during reconciliation. - if (lastCount == counter.counted() || !counter.isDoneForPartition()) + // never try to request additional rows from replicas if our reconciled partition is already filled to the limit + assert !mergedResultCounter.isDoneForPartition(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + // if the returned partition doesn't have enough rows to satisfy even the original limit, don't ask for more + if (!singleResultCounter.isDoneForPartition()) + return null; + + /* + * If the replica has no live rows in the partition, don't try to fetch more. + * + * Note that the previous branch [if (!singleResultCounter.isDoneForPartition()) return null] doesn't + * always cover this scenario: + * isDoneForPartition() is defined as [isDone() || rowInCurrentPartition >= perPartitionLimit], + * and will return true if isDone() returns true, even if there are 0 rows counted in the current partition. + * + * This can happen with a range read if after 1+ rounds of short read protection requests we managed to fetch + * enough extra rows for other partitions to satisfy the singleResultCounter's total row limit, but only + * have tombstones in the current partition. + * + * One other way we can hit this condition is when the partition only has a live static row and no regular + * rows. In that scenario the counter will remain at 0 until the partition is closed - which happens after + * the moreContents() call. + */ + if (singleResultCounter.countedInCurrentPartition() == 0) return null; - // clustering of the last row returned is empty, meaning that there is only one row per partition, - // and we already have it. - if (lastClustering == Clustering.EMPTY) + /* + * This is a table with no clustering columns, and has at most one row per partition - with EMPTY clustering. + * We already have the row, so there is no point in asking for more from the partition. + */ + if (Clustering.EMPTY == lastClustering) return null; - lastCount = counter.counted(); - - // We need to try to query enough additional results to fulfill our query, but because we could still - // get short reads on that additional query, just querying the number of results we miss may not be - // enough. But we know that when this node answered n rows (counter.countedInCurrentPartition), only - // x rows (postReconciliationCounter.countedInCurrentPartition()) made it in the final result. - // So our ratio of live rows to requested rows is x/n, so since we miss n-x rows, we estimate that - // we should request m rows so that m * x/n = n-x, that is m = (n^2/x) - n. - // Also note that it's ok if we retrieve more results that necessary since our top level iterator is a - // counting iterator. - int n = postReconciliationCounter.countedInCurrentPartition(); - int x = counter.countedInCurrentPartition(); - int toQuery = Math.max(((n * n) / Math.max(x, 1)) - n, 1); - - DataLimits retryLimits = command.limits().forShortReadRetry(toQuery); + lastFetched = singleResultCounter.countedInCurrentPartition() - lastCounted; + lastCounted = singleResultCounter.countedInCurrentPartition(); + + // getting back fewer rows than we asked for means the partition on the replica has been fully consumed + if (lastQueried > 0 && lastFetched < lastQueried) + return null; + + /* + * At this point we know that: + * 1. the replica returned [repeatedly?] as many rows as we asked for and potentially has more + * rows in the partition + * 2. at least one of those returned rows was shadowed by a tombstone returned from another + * replica + * 3. we haven't satisfied the client's limits yet, and should attempt to query for more rows to + * avoid a short read + * + * In the ideal scenario, we would get exactly min(a, b) or fewer rows from the next request, where a and b + * are defined as follows: + * [a] limits.count() - mergedResultCounter.counted() + * [b] limits.perPartitionCount() - mergedResultCounter.countedInCurrentPartition() + * + * It would be naive to query for exactly that many rows, as it's possible and not unlikely + * that some of the returned rows would also be shadowed by tombstones from other hosts. + * + * Note: we don't know, nor do we care, how many rows from the replica made it into the reconciled result; + * we can only tell how many in total we queried for, and that [0, mrc.countedInCurrentPartition()) made it. + * + * In general, our goal should be to minimise the number of extra requests - *not* to minimise the number + * of rows fetched: there is a high transactional cost for every individual request, but a relatively low + * marginal cost for each extra row requested. + * + * As such it's better to overfetch than to underfetch extra rows from a host; but at the same + * time we want to respect paging limits and not blow up spectacularly. + * + * Note: it's ok to retrieve more rows that necessary since singleResultCounter is not stopping and only + * counts. + * + * With that in mind, we'll just request the minimum of (count(), perPartitionCount()) limits, + * but no fewer than 8 rows (an arbitrary round lower bound), to ensure that we won't fetch row by row + * for SELECT DISTINCT queries (that set per partition limit to 1). + * + * See CASSANDRA-13794 for more details. + */ + lastQueried = Math.max(Math.min(command.limits().count(), command.limits().perPartitionCount()), 8); + + ColumnFamilyStore.metricsFor(metadata.cfId).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", lastQueried, source); + + return executeReadCommand(makeFetchAdditionalRowsReadCommand(lastQueried)); + } + + private SinglePartitionReadCommand makeFetchAdditionalRowsReadCommand(int toQuery) + { ClusteringIndexFilter filter = command.clusteringIndexFilter(partitionKey); - ClusteringIndexFilter retryFilter = lastClustering == null ? filter : filter.forPaging(metadata.comparator, lastClustering, false); - SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), - command.nowInSec(), - command.columnFilter(), - command.rowFilter(), - retryLimits, - partitionKey, - retryFilter); - - Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); - Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark(); - - return doShortReadRetry(cmd); + if (null != lastClustering) + filter = filter.forPaging(metadata.comparator, lastClustering, false); + + return SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + command.limits().forShortReadRetry(toQuery), + partitionKey, + filter); } - private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand) + private UnfilteredRowIterator executeReadCommand(SinglePartitionReadCommand cmd) { - DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1); - ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source)); + DataResolver resolver = new DataResolver(keyspace, cmd, ConsistencyLevel.ONE, 1); + ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, cmd, Collections.singletonList(source)); + if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); else - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler); + MessagingService.instance().sendRRWithFailure(cmd.createMessage(MessagingService.current_version), source, handler); // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. handler.awaitResults(); assert resolver.responses.size() == 1; - return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), retryCommand); + return UnfilteredPartitionIterators.getOnlyElement(resolver.responses.get(0).payload.makeIterator(command), cmd); } } } - - public boolean isDataPresent() - { - return !responses.isEmpty(); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org