Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/278906c6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/278906c6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/278906c6 Branch: refs/heads/trunk Commit: 278906c6c0424c1ce0d922c24747c97978b0aa14 Parents: 326f3a7 826ae9c Author: Aleksey Yeschenko <[email protected]> Authored: Tue Aug 29 12:33:33 2017 +0100 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Aug 29 12:33:50 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../UnfilteredPartitionIterators.java | 7 --- .../db/transform/EmptyPartitionsDiscarder.java | 35 +++++++++++++++ .../apache/cassandra/db/transform/Filter.java | 28 +++--------- .../db/transform/FilteredPartitions.java | 15 ++++--- .../cassandra/db/transform/FilteredRows.java | 2 +- .../apache/cassandra/metrics/TableMetrics.java | 4 ++ .../apache/cassandra/service/DataResolver.java | 45 ++++++++++++++------ .../apache/cassandra/db/ReadCommandTest.java | 23 +++++----- 9 files changed, 101 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/db/partitions/UnfilteredPartitionIterators.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/transform/FilteredPartitions.java index ed643bb,ad9446d..fa12c9c --- a/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java +++ b/src/java/org/apache/cassandra/db/transform/FilteredPartitions.java @@@ -50,11 -50,19 +50,16 @@@ public final class FilteredPartitions e /** * Filter any RangeTombstoneMarker from the iterator's iterators, transforming it into a PartitionIterator. */ - public static PartitionIterator filter(UnfilteredPartitionIterator iterator, int nowInSecs) + public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, int nowInSecs) { - Filter filter = new Filter(true, nowInSecs); - if (iterator instanceof UnfilteredPartitions) - return new FilteredPartitions(filter, (UnfilteredPartitions) iterator); - return new FilteredPartitions(iterator, filter); + FilteredPartitions filtered = filter(iterator, new Filter(nowInSecs)); - - return iterator.isForThrift() - ? filtered - : (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder()); ++ return (FilteredPartitions) Transformation.apply(filtered, new EmptyPartitionsDiscarder()); + } + + public static FilteredPartitions filter(UnfilteredPartitionIterator iterator, Filter filter) + { + return iterator instanceof UnfilteredPartitions + ? new FilteredPartitions(filter, (UnfilteredPartitions) iterator) + : new FilteredPartitions(iterator, filter); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java index 58b017e,b0f667c..7e6ca25 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@@ -240,33 -201,8 +240,35 @@@ public class TableMetric } }); + public static final Gauge<Long> globalBytesRepaired = Metrics.register(globalFactory.createMetricName("BytesRepaired"), + new Gauge<Long>() + { + public Long getValue() + { + return totalNonSystemTablesSize(SSTableReader::isRepaired).left; + } + }); + + public static final Gauge<Long> globalBytesUnrepaired = Metrics.register(globalFactory.createMetricName("BytesUnrepaired"), + new Gauge<Long>() + { + public Long getValue() + { + return totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left; + } + }); + + public static final Gauge<Long> globalBytesPendingRepair = Metrics.register(globalFactory.createMetricName("BytesPendingRepair"), + new Gauge<Long>() + { + public Long getValue() + { + return totalNonSystemTablesSize(SSTableReader::isPendingRepair).left; + } + }); + + public final Meter shortReadProtectionRequests; + public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; /** * stores metrics that will be rolled into a single global metric @@@ -810,25 -697,7 +812,27 @@@ casPropose = new LatencyMetrics(factory, "CasPropose", cfs.keyspace.metric.casPropose); casCommit = new LatencyMetrics(factory, "CasCommit", cfs.keyspace.metric.casCommit); + repairsStarted = createTableCounter("RepairJobsStarted"); + repairsCompleted = createTableCounter("RepairJobsCompleted"); + + anticompactionTime = createTableTimer("AnticompactionTime", cfs.keyspace.metric.anticompactionTime); + validationTime = createTableTimer("ValidationTime", cfs.keyspace.metric.validationTime); + syncTime = createTableTimer("SyncTime", cfs.keyspace.metric.repairSyncTime); + + bytesValidated = createTableHistogram("BytesValidated", cfs.keyspace.metric.bytesValidated, false); + partitionsValidated = createTableHistogram("PartitionsValidated", cfs.keyspace.metric.partitionsValidated, false); + bytesAnticompacted = createTableCounter("BytesAnticompacted"); + bytesMutatedAnticompaction = createTableCounter("BytesMutatedAnticompaction"); + mutatedAnticompactionGauge = createTableGauge("MutatedAnticompactionGauge", () -> + { + double bytesMutated = bytesMutatedAnticompaction.getCount(); + double bytesAnticomp = bytesAnticompacted.getCount(); + if (bytesAnticomp + bytesMutated > 0) + return bytesMutated / (bytesAnticomp + bytesMutated); + return 0.0; + }); ++ + shortReadProtectionRequests = Metrics.meter(factory.createMetricName("ShortReadProtectionRequests")); } public void updateSSTableIterated(int count) http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/src/java/org/apache/cassandra/service/DataResolver.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/DataResolver.java index 78bbe16,32b6d79..f4a472d --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@@ -27,13 -27,9 +27,12 @@@ import com.google.common.collect.Iterab import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.concurrent.StageManager; -import org.apache.cassandra.config.*; +import org.apache.cassandra.schema.ColumnMetadata; ++import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.*; - import org.apache.cassandra.db.filter.ClusteringIndexFilter; - import org.apache.cassandra.db.filter.ColumnFilter; - import org.apache.cassandra.db.filter.DataLimits; + import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.filter.DataLimits.Counter; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; @@@ -76,10 -71,29 +74,26 @@@ public class DataResolver extends Respo sources[i] = msg.from; } - // Even though every responses should honor the limit, we might have more than requested post reconciliation, - // so ensure we're respecting the limit. + /* + * Even though every response, individually, will honor the limit, it is possible that we will, after the merge, + * have more rows than the client requested. To make sure that we still conform to the original limit, + * we apply a top-level post-reconciliation counter to the merged partition iterator. + * + * Short read protection logic (ShortReadRowProtection.moreContents()) relies on this counter to be applied + * to the current partition to work. For this reason we have to apply the counter transformation before + * empty partition discard logic kicks in - for it will eagerly consume the iterator. + * + * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions + * + * See CASSANDRA-13747 for more details. + */ + DataLimits.Counter counter = command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition()); - return counter.applyTo(mergeWithShortReadProtection(iters, sources, counter)); + + UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters, sources, counter); + FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec())); + PartitionIterator counted = counter.applyTo(filtered); - - return command.isForThrift() - ? counted - : Transformation.apply(counted, new EmptyPartitionsDiscarder()); ++ return Transformation.apply(counted, new EmptyPartitionsDiscarder()); } public void compareResponses() @@@ -541,6 -557,9 +557,9 @@@ partitionKey, retryFilter); + Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); - Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().cfId).metric.shortReadProtectionRequests.mark(); ++ Schema.instance.getColumnFamilyStoreInstance(cmd.metadata().id).metric.shortReadProtectionRequests.mark(); + return doShortReadRetry(cmd); } @@@ -581,9 -600,9 +600,9 @@@ DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime); ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime); if (StorageProxy.canDoLocalRequest(source)) - StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler)); else - MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(MessagingService.current_version), source, handler); + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), source, handler); // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. handler.awaitResults(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/278906c6/test/unit/org/apache/cassandra/db/ReadCommandTest.java ---------------------------------------------------------------------- --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
