This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 292650d968c30757a027905d12c7370c6342dbe3 Merge: 3e393f2 2ef1f1c Author: Caleb Rackliffe <[email protected]> AuthorDate: Thu Jul 30 14:10:48 2020 +0100 Merge branch 'cassandra-3.11' into trunk CHANGES.txt | 1 + build.xml | 3 +- conf/cassandra.yaml | 20 + doc/source/operating/metrics.rst | 144 +++---- src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 20 + .../config/ReplicaFilteringProtectionOptions.java | 28 ++ .../db/partitions/PartitionIterators.java | 47 +++ .../org/apache/cassandra/metrics/TableMetrics.java | 21 +- .../apache/cassandra/service/StorageService.java | 28 +- .../cassandra/service/StorageServiceMBean.java | 12 + .../cassandra/service/reads/DataResolver.java | 57 ++- .../service/reads/ReplicaFilteringProtection.java | 444 ++++++++++++--------- .../reads/ShortReadPartitionsProtection.java | 10 +- .../service/reads/ShortReadProtection.java | 17 +- .../service/reads/repair/NoopReadRepair.java | 3 +- .../org/apache/cassandra/transport/Message.java | 2 +- .../cassandra/utils/concurrent/Accumulator.java | 9 +- .../cassandra/distributed/impl/Coordinator.java | 10 + .../apache/cassandra/distributed/impl/RowUtil.java | 7 +- .../test/ReplicaFilteringProtectionTest.java | 244 +++++++++++ .../utils/concurrent/AccumulatorTest.java | 34 +- 22 files changed, 839 insertions(+), 324 deletions(-) diff --cc CHANGES.txt index 4a9ae14,9dbbd1c..e74f330 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,13 -1,11 +1,14 @@@ -3.11.8 +4.0-beta2 + * Forbid altering UDTs used in partition keys (CASSANDRA-15933) + * Fix version parsing logic when upgrading from 3.0 (CASSANDRA-15973) + * Optimize NoSpamLogger use in hot paths (CASSANDRA-15766) + * Verify sstable components on startup (CASSANDRA-15945) +Merged from 3.11: + * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) Merged from 3.0: + * Operational improvements and hardening for replica filtering protection (CASSANDRA-15907) - * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) - * Forbid altering UDTs used in partition keys (CASSANDRA-15933) * Fix empty/null json string representation (CASSANDRA-15896) - * 3.x fails to start if commit log has range tombstones from a column which is also deleted (CASSANDRA-15970) Merged from 2.2: * Fix CQL parsing of collections when the column type is reversed (CASSANDRA-15814) diff --cc build.xml index f0948ae,50d278e..d861920 --- a/build.xml +++ b/build.xml @@@ -556,15 -412,19 +556,14 @@@ <dependency groupId="com.fasterxml.jackson.core" artifactId="jackson-annotations" version="2.9.10"/> <dependency groupId="com.googlecode.json-simple" artifactId="json-simple" version="1.1"/> <dependency groupId="com.boundary" artifactId="high-scale-lib" version="1.0.6"/> - <dependency groupId="com.github.jbellis" artifactId="jamm" version="0.3.0"/> + <dependency groupId="com.github.jbellis" artifactId="jamm" version="${jamm.version}"/> - <dependency groupId="com.thinkaurelius.thrift" artifactId="thrift-server" version="0.3.7"> - <exclusion groupId="org.slf4j" artifactId="slf4j-log4j12"/> - <exclusion groupId="junit" artifactId="junit"/> - </dependency> <dependency groupId="org.yaml" artifactId="snakeyaml" version="1.11"/> - <dependency groupId="org.apache.thrift" artifactId="libthrift" version="0.9.2"> - <exclusion groupId="commons-logging" artifactId="commons-logging"/> - </dependency> - <dependency groupId="junit" artifactId="junit" version="4.6" /> + <dependency groupId="junit" artifactId="junit" version="4.12" /> <dependency groupId="org.mockito" artifactId="mockito-core" version="3.2.4" /> + <dependency groupId="org.quicktheories" artifactId="quicktheories" version="0.25" /> + <dependency groupId="com.google.code.java-allocation-instrumenter" artifactId="java-allocation-instrumenter" version="${allocation-instrumenter.version}" /> - <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.3" /> - + <dependency groupId="org.apache.cassandra" artifactId="dtest-api" version="0.0.4" /> <dependency groupId="org.apache.rat" artifactId="apache-rat" version="0.10"> <exclusion groupId="commons-lang" artifactId="commons-lang"/> </dependency> diff --cc doc/source/operating/metrics.rst index fc37440,4bd0c08..0053cbc --- a/doc/source/operating/metrics.rst +++ b/doc/source/operating/metrics.rst @@@ -79,76 -75,65 +79,80 @@@ Reported name format **all** tables and keyspaces on the node. - ======================================= ============== =========== - Name Type Description - ======================================= ============== =========== - MemtableOnHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **on**-heap, including column related overhead and partitions overwritten. - MemtableOffHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **off**-heap, including column related overhead and partitions overwritten. - MemtableLiveDataSize Gauge<Long> Total amount of live data stored in the memtable, excluding any data structure overhead. - AllMemtablesOnHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **on**-heap. - AllMemtablesOffHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **off**-heap. - AllMemtablesLiveDataSize Gauge<Long> Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead. - MemtableColumnsCount Gauge<Long> Total number of columns present in the memtable. - MemtableSwitchCount Counter Number of times flush has resulted in the memtable being switched out. - CompressionRatio Gauge<Double> Current compression ratio for all SSTables. - EstimatedPartitionSizeHistogram Gauge<long[]> Histogram of estimated partition size (in bytes). - EstimatedPartitionCount Gauge<Long> Approximate number of keys in table. - EstimatedColumnCountHistogram Gauge<long[]> Histogram of estimated number of columns. - SSTablesPerReadHistogram Histogram Histogram of the number of sstable data files accessed per single partition read. SSTables skipped due to Bloom Filters, min-max key or partition index lookup are not taken into acoount. - ReadLatency Latency Local read latency for this table. - RangeLatency Latency Local range scan latency for this table. - WriteLatency Latency Local write latency for this table. - CoordinatorReadLatency Timer Coordinator read latency for this table. - CoordinatorWriteLatency Timer Coordinator write latency for this table. - CoordinatorScanLatency Timer Coordinator range scan latency for this table. - PendingFlushes Counter Estimated number of flush tasks pending for this table. - BytesFlushed Counter Total number of bytes flushed since server [re]start. - CompactionBytesWritten Counter Total number of bytes written by compaction since server [re]start. - PendingCompactions Gauge<Integer> Estimate of number of pending compactions for this table. - LiveSSTableCount Gauge<Integer> Number of SSTables on disk for this table. - LiveDiskSpaceUsed Counter Disk space used by SSTables belonging to this table (in bytes). - TotalDiskSpaceUsed Counter Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd. - MinPartitionSize Gauge<Long> Size of the smallest compacted partition (in bytes). - MaxPartitionSize Gauge<Long> Size of the largest compacted partition (in bytes). - MeanPartitionSize Gauge<Long> Size of the average compacted partition (in bytes). - BloomFilterFalsePositives Gauge<Long> Number of false positives on table's bloom filter. - BloomFilterFalseRatio Gauge<Double> False positive ratio of table's bloom filter. - BloomFilterDiskSpaceUsed Gauge<Long> Disk space used by bloom filter (in bytes). - BloomFilterOffHeapMemoryUsed Gauge<Long> Off-heap memory used by bloom filter. - IndexSummaryOffHeapMemoryUsed Gauge<Long> Off-heap memory used by index summary. - CompressionMetadataOffHeapMemoryUsed Gauge<Long> Off-heap memory used by compression meta data. - KeyCacheHitRate Gauge<Double> Key cache hit rate for this table. - TombstoneScannedHistogram Histogram Histogram of tombstones scanned in queries on this table. - LiveScannedHistogram Histogram Histogram of live cells scanned in queries on this table. - ColUpdateTimeDeltaHistogram Histogram Histogram of column update time delta on this table. - ViewLockAcquireTime Timer Time taken acquiring a partition lock for materialized view updates on this table. - ViewReadTime Timer Time taken during the local read of a materialized view update. - TrueSnapshotsSize Gauge<Long> Disk space used by snapshots of this table including all SSTable components. - RowCacheHitOutOfRange Counter Number of table row cache hits that do not satisfy the query filter, thus went to disk. - RowCacheHit Counter Number of table row cache hits. - RowCacheMiss Counter Number of table row cache misses. - CasPrepare Latency Latency of paxos prepare round. - CasPropose Latency Latency of paxos propose round. - CasCommit Latency Latency of paxos commit round. - PercentRepaired Gauge<Double> Percent of table data that is repaired on disk. - BytesRepaired Gauge<Long> Size of table data repaired on disk - BytesUnrepaired Gauge<Long> Size of table data unrepaired on disk - BytesPendingRepair Gauge<Long> Size of table data isolated for an ongoing incremental repair - SpeculativeRetries Counter Number of times speculative retries were sent for this table. - SpeculativeFailedRetries Counter Number of speculative retries that failed to prevent a timeout - SpeculativeInsufficientReplicas Counter Number of speculative retries that couldn't be attempted due to lack of replicas - SpeculativeSampleLatencyNanos Gauge<Long> Number of nanoseconds to wait before speculation is attempted. Value may be statically configured or updated periodically based on coordinator latency. - WaitingOnFreeMemtableSpace Histogram Histogram of time spent waiting for free memtable space, either on- or off-heap. - DroppedMutations Counter Number of dropped mutations on this table. - AnticompactionTime Timer Time spent anticompacting before a consistent repair. - ValidationTime Timer Time spent doing validation compaction during repair. - SyncTime Timer Time spent doing streaming during repair. - BytesValidated Histogram Histogram over the amount of bytes read during validation. - PartitionsValidated Histogram Histogram over the number of partitions read during validation. - BytesAnticompacted Counter How many bytes we anticompacted. - BytesMutatedAnticompaction Counter How many bytes we avoided anticompacting because the sstable was fully contained in the repaired range. - MutatedAnticompactionGauge Gauge<Double> Ratio of bytes mutated vs total bytes repaired. - ======================================= ============== =========== + =============================================== ============== =========== + Name Type Description + =============================================== ============== =========== + MemtableOnHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **on**-heap, including column related overhead and partitions overwritten. + MemtableOffHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **off**-heap, including column related overhead and partitions overwritten. + MemtableLiveDataSize Gauge<Long> Total amount of live data stored in the memtable, excluding any data structure overhead. + AllMemtablesOnHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **on**-heap. + AllMemtablesOffHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **off**-heap. + AllMemtablesLiveDataSize Gauge<Long> Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead. + MemtableColumnsCount Gauge<Long> Total number of columns present in the memtable. + MemtableSwitchCount Counter Number of times flush has resulted in the memtable being switched out. + CompressionRatio Gauge<Double> Current compression ratio for all SSTables. + EstimatedPartitionSizeHistogram Gauge<long[]> Histogram of estimated partition size (in bytes). + EstimatedPartitionCount Gauge<Long> Approximate number of keys in table. + EstimatedColumnCountHistogram Gauge<long[]> Histogram of estimated number of columns. + SSTablesPerReadHistogram Histogram Histogram of the number of sstable data files accessed per single partition read. SSTables skipped due to Bloom Filters, min-max key or partition index lookup are not taken into acoount. + ReadLatency Latency Local read latency for this table. + RangeLatency Latency Local range scan latency for this table. + WriteLatency Latency Local write latency for this table. + CoordinatorReadLatency Timer Coordinator read latency for this table. ++CoordinatorWriteLatency Timer Coordinator write latency for this table. + CoordinatorScanLatency Timer Coordinator range scan latency for this table. + PendingFlushes Counter Estimated number of flush tasks pending for this table. + BytesFlushed Counter Total number of bytes flushed since server [re]start. + CompactionBytesWritten Counter Total number of bytes written by compaction since server [re]start. + PendingCompactions Gauge<Integer> Estimate of number of pending compactions for this table. + LiveSSTableCount Gauge<Integer> Number of SSTables on disk for this table. + LiveDiskSpaceUsed Counter Disk space used by SSTables belonging to this table (in bytes). + TotalDiskSpaceUsed Counter Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd. + MinPartitionSize Gauge<Long> Size of the smallest compacted partition (in bytes). + MaxPartitionSize Gauge<Long> Size of the largest compacted partition (in bytes). + MeanPartitionSize Gauge<Long> Size of the average compacted partition (in bytes). + BloomFilterFalsePositives Gauge<Long> Number of false positives on table's bloom filter. + BloomFilterFalseRatio Gauge<Double> False positive ratio of table's bloom filter. + BloomFilterDiskSpaceUsed Gauge<Long> Disk space used by bloom filter (in bytes). + BloomFilterOffHeapMemoryUsed Gauge<Long> Off-heap memory used by bloom filter. + IndexSummaryOffHeapMemoryUsed Gauge<Long> Off-heap memory used by index summary. + CompressionMetadataOffHeapMemoryUsed Gauge<Long> Off-heap memory used by compression meta data. + KeyCacheHitRate Gauge<Double> Key cache hit rate for this table. + TombstoneScannedHistogram Histogram Histogram of tombstones scanned in queries on this table. + LiveScannedHistogram Histogram Histogram of live cells scanned in queries on this table. + ColUpdateTimeDeltaHistogram Histogram Histogram of column update time delta on this table. + ViewLockAcquireTime Timer Time taken acquiring a partition lock for materialized view updates on this table. + ViewReadTime Timer Time taken during the local read of a materialized view update. + TrueSnapshotsSize Gauge<Long> Disk space used by snapshots of this table including all SSTable components. + RowCacheHitOutOfRange Counter Number of table row cache hits that do not satisfy the query filter, thus went to disk. + RowCacheHit Counter Number of table row cache hits. + RowCacheMiss Counter Number of table row cache misses. + CasPrepare Latency Latency of paxos prepare round. + CasPropose Latency Latency of paxos propose round. + CasCommit Latency Latency of paxos commit round. + PercentRepaired Gauge<Double> Percent of table data that is repaired on disk. ++BytesRepaired Gauge<Long> Size of table data repaired on disk ++BytesUnrepaired Gauge<Long> Size of table data unrepaired on disk ++BytesPendingRepair Gauge<Long> Size of table data isolated for an ongoing incremental repair + SpeculativeRetries Counter Number of times speculative retries were sent for this table. ++SpeculativeFailedRetries Counter Number of speculative retries that failed to prevent a timeout ++SpeculativeInsufficientReplicas Counter Number of speculative retries that couldn't be attempted due to lack of replicas ++SpeculativeSampleLatencyNanos Gauge<Long> Number of nanoseconds to wait before speculation is attempted. Value may be statically configured or updated periodically based on coordinator latency. + WaitingOnFreeMemtableSpace Histogram Histogram of time spent waiting for free memtable space, either on- or off-heap. + DroppedMutations Counter Number of dropped mutations on this table. ++AnticompactionTime Timer Time spent anticompacting before a consistent repair. ++ValidationTime Timer Time spent doing validation compaction during repair. ++SyncTime Timer Time spent doing streaming during repair. ++BytesValidated Histogram Histogram over the amount of bytes read during validation. ++PartitionsValidated Histogram Histogram over the number of partitions read during validation. ++BytesAnticompacted Counter How many bytes we anticompacted. ++BytesMutatedAnticompaction Counter How many bytes we avoided anticompacting because the sstable was fully contained in the repaired range. ++MutatedAnticompactionGauge Gauge<Double> Ratio of bytes mutated vs total bytes repaired. + ReadRepairRequests Meter Throughput for mutations generated by read-repair. + ShortReadProtectionRequests Meter Throughput for requests to get extra rows during short read protection. + ReplicaFilteringProtectionRequests Meter Throughput for row completion requests during replica filtering protection. + ReplicaFilteringProtectionRowsCachedPerQuery Histogram Histogram of the number of rows cached per query when replica filtering protection is engaged. + ============================================ ============== =========== Keyspace Metrics ^^^^^^^^^^^^^^^^ diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java index bfb261d,5521bf7..24550b0 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@@ -299,38 -201,20 +299,47 @@@ public class TableMetric } }); + public static final Gauge<Long> globalBytesRepaired = Metrics.register(globalFactory.createMetricName("BytesRepaired"), + new Gauge<Long>() + { + public Long getValue() + { + return totalNonSystemTablesSize(SSTableReader::isRepaired).left; + } + }); + + public static final Gauge<Long> globalBytesUnrepaired = Metrics.register(globalFactory.createMetricName("BytesUnrepaired"), + new Gauge<Long>() + { + public Long getValue() + { + return totalNonSystemTablesSize(s -> !s.isRepaired() && !s.isPendingRepair()).left; + } + }); + + public static final Gauge<Long> globalBytesPendingRepair = Metrics.register(globalFactory.createMetricName("BytesPendingRepair"), + new Gauge<Long>() + { + public Long getValue() + { + return totalNonSystemTablesSize(SSTableReader::isPendingRepair).left; + } + }); + public final Meter readRepairRequests; public final Meter shortReadProtectionRequests; - public final Meter replicaSideFilteringProtectionRequests; + + public final Meter replicaFilteringProtectionRequests; + + /** + * This histogram records the maximum number of rows {@link org.apache.cassandra.service.ReplicaFilteringProtection} + * caches at a point in time per query. With no replica divergence, this is equivalent to the maximum number of + * cached rows in a single partition during a query. It can be helpful when choosing appropriate values for the + * replica_filtering_protection thresholds in cassandra.yaml. + */ + public final Histogram rfpRowsCachedPerQuery; - public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; + public final EnumMap<SamplerType, Sampler<?>> samplers; /** * stores metrics that will be rolled into a single global metric */ @@@ -942,23 -710,8 +951,24 @@@ readRepairRequests = createTableMeter("ReadRepairRequests"); shortReadProtectionRequests = createTableMeter("ShortReadProtectionRequests"); - replicaSideFilteringProtectionRequests = createTableMeter("ReplicaSideFilteringProtectionRequests"); + replicaFilteringProtectionRequests = createTableMeter("ReplicaFilteringProtectionRequests"); + rfpRowsCachedPerQuery = createHistogram("ReplicaFilteringProtectionRowsCachedPerQuery", true); + + confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies); + unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies); + + repairedDataTrackingOverreadRows = createTableHistogram("RepairedDataTrackingOverreadRows", cfs.keyspace.metric.repairedDataTrackingOverreadRows, false); + repairedDataTrackingOverreadTime = createTableTimer("RepairedDataTrackingOverreadTime", cfs.keyspace.metric.repairedDataTrackingOverreadTime); + + unleveledSSTables = createTableGauge("UnleveledSSTables", cfs::getUnleveledSSTables, () -> { + // global gauge + int cnt = 0; + for (Metric cfGauge : allTableMetrics.get("UnleveledSSTables")) + { + cnt += ((Gauge<? extends Number>) cfGauge).getValue().intValue(); + } + return cnt; + }); } public void updateSSTableIterated(int count) diff --cc src/java/org/apache/cassandra/service/StorageService.java index a1b3b82,240a15e..0d10418 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@@ -5397,19 -5265,31 +5398,42 @@@ public class StorageService extends Not public void setTombstoneFailureThreshold(int threshold) { DatabaseDescriptor.setTombstoneFailureThreshold(threshold); + logger.info("updated tombstone_failure_threshold to {}", threshold); + } + + public int getCachedReplicaRowsWarnThreshold() + { + return DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(); + } + + public void setCachedReplicaRowsWarnThreshold(int threshold) + { + DatabaseDescriptor.setCachedReplicaRowsWarnThreshold(threshold); + logger.info("updated replica_filtering_protection.cached_rows_warn_threshold to {}", threshold); + } + + public int getCachedReplicaRowsFailThreshold() + { + return DatabaseDescriptor.getCachedReplicaRowsFailThreshold(); + } + + public void setCachedReplicaRowsFailThreshold(int threshold) + { + DatabaseDescriptor.setCachedReplicaRowsFailThreshold(threshold); + logger.info("updated replica_filtering_protection.cached_rows_fail_threshold to {}", threshold); } + public int getColumnIndexCacheSize() + { + return DatabaseDescriptor.getColumnIndexCacheSizeInKB(); + } + + public void setColumnIndexCacheSize(int cacheSizeInKB) + { + DatabaseDescriptor.setColumnIndexCacheSize(cacheSizeInKB); + logger.info("Updated column_index_cache_size_in_kb to {}", cacheSizeInKB); + } + public int getBatchSizeFailureThreshold() { return DatabaseDescriptor.getBatchSizeFailThresholdInKB(); @@@ -5418,147 -5298,15 +5442,147 @@@ public void setBatchSizeFailureThreshold(int threshold) { DatabaseDescriptor.setBatchSizeFailThresholdInKB(threshold); - logger.info("Updated batch_size_fail_threshold_in_kb to {}", threshold); + logger.info("updated batch_size_fail_threshold_in_kb to {}", threshold); } + public int getBatchSizeWarnThreshold() + { + return DatabaseDescriptor.getBatchSizeWarnThresholdInKB(); + } + + public void setBatchSizeWarnThreshold(int threshold) + { + DatabaseDescriptor.setBatchSizeWarnThresholdInKB(threshold); + logger.info("Updated batch_size_warn_threshold_in_kb to {}", threshold); + } + + public int getInitialRangeTombstoneListAllocationSize() + { + return DatabaseDescriptor.getInitialRangeTombstoneListAllocationSize(); + } + + public void setInitialRangeTombstoneListAllocationSize(int size) + { + if (size < 0 || size > 1024) + { + throw new IllegalStateException("Not updating initial_range_tombstone_allocation_size as it must be in the range [0, 1024] inclusive"); + } + int originalSize = DatabaseDescriptor.getInitialRangeTombstoneListAllocationSize(); + DatabaseDescriptor.setInitialRangeTombstoneListAllocationSize(size); + logger.info("Updated initial_range_tombstone_allocation_size from {} to {}", originalSize, size); + } + + public double getRangeTombstoneResizeListGrowthFactor() + { + return DatabaseDescriptor.getRangeTombstoneListGrowthFactor(); + } + + public void setRangeTombstoneListResizeGrowthFactor(double growthFactor) throws IllegalStateException + { + if (growthFactor < 1.2 || growthFactor > 5) + { + throw new IllegalStateException("Not updating range_tombstone_resize_factor as growth factor must be in the range [1.2, 5.0] inclusive"); + } + else + { + double originalGrowthFactor = DatabaseDescriptor.getRangeTombstoneListGrowthFactor(); + DatabaseDescriptor.setRangeTombstoneListGrowthFactor(growthFactor); + logger.info("Updated range_tombstone_resize_factor from {} to {}", originalGrowthFactor, growthFactor); + } + } + public void setHintedHandoffThrottleInKB(int throttleInKB) { DatabaseDescriptor.setHintedHandoffThrottleInKB(throttleInKB); - logger.info("Updated hinted_handoff_throttle_in_kb to {}", throttleInKB); + logger.info("updated hinted_handoff_throttle_in_kb to {}", throttleInKB); } + @Override + public void clearConnectionHistory() + { + daemon.clearConnectionHistory(); + logger.info("Cleared connection history"); + } + public void disableAuditLog() + { + AuditLogManager.instance.disableAuditLog(); + logger.info("Auditlog is disabled"); + } + + public void enableAuditLog(String loggerName, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories, + String includedUsers, String excludedUsers) throws ConfigurationException, IllegalStateException + { + enableAuditLog(loggerName, Collections.emptyMap(), includedKeyspaces, excludedKeyspaces, includedCategories, excludedCategories, includedUsers, excludedUsers); + } + + public void enableAuditLog(String loggerName, Map<String, String> parameters, String includedKeyspaces, String excludedKeyspaces, String includedCategories, String excludedCategories, + String includedUsers, String excludedUsers) throws ConfigurationException, IllegalStateException + { + loggerName = loggerName != null ? loggerName : DatabaseDescriptor.getAuditLoggingOptions().logger.class_name; + + Preconditions.checkNotNull(loggerName, "cassandra.yaml did not have logger in audit_logging_option and not set as parameter"); + Preconditions.checkState(FBUtilities.isAuditLoggerClassExists(loggerName), "Unable to find AuditLogger class: "+loggerName); + + AuditLogOptions auditLogOptions = new AuditLogOptions(); + auditLogOptions.enabled = true; + auditLogOptions.logger = new ParameterizedClass(loggerName, parameters); + auditLogOptions.included_keyspaces = includedKeyspaces != null ? includedKeyspaces : DatabaseDescriptor.getAuditLoggingOptions().included_keyspaces; + auditLogOptions.excluded_keyspaces = excludedKeyspaces != null ? excludedKeyspaces : DatabaseDescriptor.getAuditLoggingOptions().excluded_keyspaces; + auditLogOptions.included_categories = includedCategories != null ? includedCategories : DatabaseDescriptor.getAuditLoggingOptions().included_categories; + auditLogOptions.excluded_categories = excludedCategories != null ? excludedCategories : DatabaseDescriptor.getAuditLoggingOptions().excluded_categories; + auditLogOptions.included_users = includedUsers != null ? includedUsers : DatabaseDescriptor.getAuditLoggingOptions().included_users; + auditLogOptions.excluded_users = excludedUsers != null ? excludedUsers : DatabaseDescriptor.getAuditLoggingOptions().excluded_users; + + AuditLogManager.instance.enable(auditLogOptions); + + logger.info("AuditLog is enabled with logger: [{}], included_keyspaces: [{}], excluded_keyspaces: [{}], " + + "included_categories: [{}], excluded_categories: [{}], included_users: [{}], " + + "excluded_users: [{}], archive_command: [{}]", auditLogOptions.logger, auditLogOptions.included_keyspaces, auditLogOptions.excluded_keyspaces, + auditLogOptions.included_categories, auditLogOptions.excluded_categories, auditLogOptions.included_users, auditLogOptions.excluded_users, + auditLogOptions.archive_command); + + } + + public boolean isAuditLogEnabled() + { + return AuditLogManager.instance.isEnabled(); + } + + public String getCorruptedTombstoneStrategy() + { + return DatabaseDescriptor.getCorruptedTombstoneStrategy().toString(); + } + + public void setCorruptedTombstoneStrategy(String strategy) + { + DatabaseDescriptor.setCorruptedTombstoneStrategy(Config.CorruptedTombstoneStrategy.valueOf(strategy)); + logger.info("Setting corrupted tombstone strategy to {}", strategy); + } + + @Override + public long getNativeTransportMaxConcurrentRequestsInBytes() + { + return Server.EndpointPayloadTracker.getGlobalLimit(); + } + + @Override + public void setNativeTransportMaxConcurrentRequestsInBytes(long newLimit) + { + Server.EndpointPayloadTracker.setGlobalLimit(newLimit); + } + + @Override + public long getNativeTransportMaxConcurrentRequestsInBytesPerIp() + { + return Server.EndpointPayloadTracker.getEndpointLimit(); + } + + @Override + public void setNativeTransportMaxConcurrentRequestsInBytesPerIp(long newLimit) + { + Server.EndpointPayloadTracker.setEndpointLimit(newLimit); + } + @VisibleForTesting public void shutdownServer() { diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java index 9664769,50d5c2c..3c6bbf9 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@@ -697,11 -678,18 +697,23 @@@ public interface StorageServiceMBean ex /** Sets the threshold for abandoning queries with many tombstones */ public void setTombstoneFailureThreshold(int tombstoneDebugThreshold); + /** Returns the number of rows cached at the coordinator before filtering/index queries log a warning. */ + public int getCachedReplicaRowsWarnThreshold(); + + /** Sets the number of rows cached at the coordinator before filtering/index queries log a warning. */ + public void setCachedReplicaRowsWarnThreshold(int threshold); + + /** Returns the number of rows cached at the coordinator before filtering/index queries fail outright. */ + public int getCachedReplicaRowsFailThreshold(); + + /** Sets the number of rows cached at the coordinator before filtering/index queries fail outright. */ + public void setCachedReplicaRowsFailThreshold(int threshold); + + /** Returns the threshold for skipping the column index when caching partition info **/ + public int getColumnIndexCacheSize(); + /** Sets the threshold for skipping the column index when caching partition info **/ + public void setColumnIndexCacheSize(int cacheSizeInKB); + /** Returns the threshold for rejecting queries due to a large batch size */ public int getBatchSizeFailureThreshold(); /** Sets the threshold for rejecting queries due to a large batch size */ diff --cc src/java/org/apache/cassandra/service/reads/DataResolver.java index 30427d0,0000000..eeabb4b mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/DataResolver.java +++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java @@@ -1,405 -1,0 +1,404 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.reads; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.function.UnaryOperator; + +import com.google.common.base.Joiner; + ++import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.statements.schema.IndexTarget; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadResponse; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.PartitionIterators; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.db.transform.EmptyPartitionsDiscarder; +import org.apache.cassandra.db.transform.Filter; +import org.apache.cassandra.db.transform.FilteredPartitions; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.index.sasi.SASIIndex; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.reads.repair.ReadRepair; +import org.apache.cassandra.service.reads.repair.RepairedDataTracker; +import org.apache.cassandra.service.reads.repair.RepairedDataVerifier; + +import static com.google.common.collect.Iterables.*; + +public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P> +{ + private final boolean enforceStrictLiveness; + private final ReadRepair<E, P> readRepair; + + public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime) + { + super(command, replicaPlan, queryStartNanoTime); + this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); + this.readRepair = readRepair; + } + + public PartitionIterator getData() + { + ReadResponse response = responses.get(0).payload; + return UnfilteredPartitionIterators.filter(response.makeIterator(command), command.nowInSec()); + } + + public boolean isDataPresent() + { + return !responses.isEmpty(); + } + + public PartitionIterator resolve() + { + // We could get more responses while this method runs, which is ok (we're happy to ignore any response not here + // at the beginning of this method), so grab the response count once and use that through the method. + Collection<Message<ReadResponse>> messages = responses.snapshot(); + assert !any(messages, msg -> msg.payload.isDigestResponse()); + + E replicas = replicaPlan().candidates().select(transform(messages, Message::from), false); + + // If requested, inspect each response for a digest of the replica's repaired data set + RepairedDataTracker repairedDataTracker = command.isTrackingRepairedStatus() + ? new RepairedDataTracker(getRepairedDataVerifier(command)) + : null; + if (repairedDataTracker != null) + { + messages.forEach(msg -> { + if (msg.payload.mayIncludeRepairedDigest() && replicas.byEndpoint().get(msg.from()).isFull()) + { + repairedDataTracker.recordDigest(msg.from(), + msg.payload.repairedDataDigest(), + msg.payload.isRepairedDigestConclusive()); + } + }); + } + + if (!needsReplicaFilteringProtection()) + { + ResolveContext context = new ResolveContext(replicas); + return resolveWithReadRepair(context, + i -> shortReadProtectedResponse(i, context), + UnaryOperator.identity(), + repairedDataTracker); + } + + return resolveWithReplicaFilteringProtection(replicas, repairedDataTracker); + } + + private boolean needsReplicaFilteringProtection() + { + if (command.rowFilter().isEmpty()) + return false; + + IndexMetadata indexDef = command.indexMetadata(); + if (indexDef != null && indexDef.isCustom()) + { + String className = indexDef.options.get(IndexTarget.CUSTOM_INDEX_OPTION_NAME); + return !SASIIndex.class.getName().equals(className); + } + + return true; + } + + private class ResolveContext + { + private final E replicas; + private final DataLimits.Counter mergedResultCounter; + + private ResolveContext(E replicas) + { + this.replicas = replicas; + this.mergedResultCounter = command.limits().newCounter(command.nowInSec(), + true, + command.selectsFullPartition(), + enforceStrictLiveness); + } + + private boolean needsReadRepair() + { + return replicas.size() > 1; + } + + private boolean needShortReadProtection() + { + // If we have only one result, there is no read repair to do and we can't get short reads + // Also, so-called "short reads" stems from nodes returning only a subset of the results they have for a + // partition due to the limit, but that subset not being enough post-reconciliation. So if we don't have limit, + // don't bother protecting against short reads. + return replicas.size() > 1 && !command.limits().isUnlimited(); + } + } + + @FunctionalInterface + private interface ResponseProvider + { + UnfilteredPartitionIterator getResponse(int i); + } + + private UnfilteredPartitionIterator shortReadProtectedResponse(int i, ResolveContext context) + { + UnfilteredPartitionIterator originalResponse = responses.get(i).payload.makeIterator(command); + + return context.needShortReadProtection() + ? ShortReadProtection.extend(context.replicas.get(i), ++ () -> responses.clearUnsafe(i), + originalResponse, + command, + context.mergedResultCounter, + queryStartNanoTime, + enforceStrictLiveness) + : originalResponse; + } + + private PartitionIterator resolveWithReadRepair(ResolveContext context, + ResponseProvider responseProvider, + UnaryOperator<PartitionIterator> preCountFilter, + RepairedDataTracker repairedDataTracker) + { + UnfilteredPartitionIterators.MergeListener listener = null; + if (context.needsReadRepair()) + { + P sources = replicaPlan.getWithContacts(context.replicas); + listener = wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker); + } + + return resolveInternal(context, listener, responseProvider, preCountFilter); + } + + @SuppressWarnings("resource") + private PartitionIterator resolveWithReplicaFilteringProtection(E replicas, RepairedDataTracker repairedDataTracker) + { + // Protecting against inconsistent replica filtering (some replica returning a row that is outdated but that + // wouldn't be removed by normal reconciliation because up-to-date replica have filtered the up-to-date version - // of that row) works in 3 steps: - // 1) we read the full response just to collect rows that may be outdated (the ones we got from some - // replica but didn't got any response for other; it could be those other replica have filtered a more - // up-to-date result). In doing so, we do not count any of such "potentially outdated" row towards the - // query limit. This simulate the worst case scenario where all those "potentially outdated" rows are - // indeed outdated, and thus make sure we are guaranteed to read enough results (thanks to short read - // protection). - // 2) we query all the replica/rows we need to rule out whether those "potentially outdated" rows are outdated - // or not. - // 3) we re-read cached copies of each replica response using the "normal" read path merge with read-repair, - // but where for each replica we use their original response _plus_ the additional rows queried in the - // previous step (and apply the command#rowFilter() on the full result). Since the first phase has - // pessimistically collected enough results for the case where all potentially outdated results are indeed - // outdated, we shouldn't need further short-read protection requests during this phase. ++ // of that row) involves 3 main elements: ++ // 1) We combine short-read protection and a merge listener that identifies potentially "out-of-date" ++ // rows to create an iterator that is guaranteed to produce enough valid row results to satisfy the query ++ // limit if enough actually exist. A row is considered out-of-date if its merged from is non-empty and we ++ // receive not response from at least one replica. In this case, it is possible that filtering at the ++ // "silent" replica has produced a more up-to-date result. ++ // 2) This iterator is passed to the standard resolution process with read-repair, but is first wrapped in a ++ // response provider that lazily "completes" potentially out-of-date rows by directly querying them on the ++ // replicas that were previously silent. As this iterator is consumed, it caches valid data for potentially ++ // out-of-date rows, and this cached data is merged with the fetched data as rows are requested. If there ++ // is no replica divergence, only rows in the partition being evalutated will be cached (then released ++ // when the partition is consumed). ++ // 3) After a "complete" row is materialized, it must pass the row filter supplied by the original query ++ // before it counts against the limit. + + // We need separate contexts, as each context has his own counter + ResolveContext firstPhaseContext = new ResolveContext(replicas); + ResolveContext secondPhaseContext = new ResolveContext(replicas); + ReplicaFilteringProtection<E> rfp = new ReplicaFilteringProtection<>(replicaPlan().keyspace(), + command, + replicaPlan().consistencyLevel(), + queryStartNanoTime, - firstPhaseContext.replicas); ++ firstPhaseContext.replicas, ++ DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(), ++ DatabaseDescriptor.getCachedReplicaRowsFailThreshold()); ++ + PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext, + rfp.mergeController(), + i -> shortReadProtectedResponse(i, firstPhaseContext), + UnaryOperator.identity()); + - // Consume the first phase partitions to populate the replica filtering protection with both those materialized - // partitions and the primary keys to be fetched. - PartitionIterators.consume(firstPhasePartitions); - firstPhasePartitions.close(); - - // After reading the entire query results the protection helper should have cached all the partitions so we can - // clear the responses accumulator for the sake of memory usage, given that the second phase might take long if - // it needs to query replicas. - responses.clearUnsafe(); ++ PartitionIterator completedPartitions = resolveWithReadRepair(secondPhaseContext, ++ i -> rfp.queryProtectedPartitions(firstPhasePartitions, i), ++ results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec()), ++ repairedDataTracker); + - return resolveWithReadRepair(secondPhaseContext, - rfp::queryProtectedPartitions, - results -> command.rowFilter().filter(results, command.metadata(), command.nowInSec()), - repairedDataTracker); ++ // Ensure that the RFP instance has a chance to record metrics when the iterator closes. ++ return PartitionIterators.doOnClose(completedPartitions, firstPhasePartitions::close); + } + + @SuppressWarnings("resource") + private PartitionIterator resolveInternal(ResolveContext context, + UnfilteredPartitionIterators.MergeListener mergeListener, + ResponseProvider responseProvider, + UnaryOperator<PartitionIterator> preCountFilter) + { + int count = context.replicas.size(); + List<UnfilteredPartitionIterator> results = new ArrayList<>(count); + for (int i = 0; i < count; i++) + results.add(responseProvider.getResponse(i)); + + /* + * Even though every response, individually, will honor the limit, it is possible that we will, after the merge, + * have more rows than the client requested. To make sure that we still conform to the original limit, + * we apply a top-level post-reconciliation counter to the merged partition iterator. + * + * Short read protection logic (ShortReadRowsProtection.moreContents()) relies on this counter to be applied + * to the current partition to work. For this reason we have to apply the counter transformation before + * empty partition discard logic kicks in - for it will eagerly consume the iterator. + * + * That's why the order here is: 1) merge; 2) filter rows; 3) count; 4) discard empty partitions + * + * See CASSANDRA-13747 for more details. + */ + + UnfilteredPartitionIterator merged = UnfilteredPartitionIterators.merge(results, mergeListener); - FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness())); ++ Filter filter = new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()); ++ FilteredPartitions filtered = FilteredPartitions.filter(merged, filter); + PartitionIterator counted = Transformation.apply(preCountFilter.apply(filtered), context.mergedResultCounter); + return Transformation.apply(counted, new EmptyPartitionsDiscarder()); + } + + protected RepairedDataVerifier getRepairedDataVerifier(ReadCommand command) + { + return RepairedDataVerifier.verifier(command); + } + + private String makeResponsesDebugString(DecoratedKey partitionKey) + { + return Joiner.on(",\n").join(transform(getMessages().snapshot(), m -> m.from() + " => " + m.payload.toDebugString(command, partitionKey))); + } + + private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener, + P sources, + RepairedDataTracker repairedDataTracker) + { + // Avoid wrapping no-op listener as it doesn't throw, unless we're tracking repaired status + // in which case we need to inject the tracker & verify on close + if (partitionListener == UnfilteredPartitionIterators.MergeListener.NOOP) + { + if (repairedDataTracker == null) + return partitionListener; + + return new UnfilteredPartitionIterators.MergeListener() + { + + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + return UnfilteredRowIterators.MergeListener.NOOP; + } + + public void close() + { + repairedDataTracker.verify(); + } + }; + } + + return new UnfilteredPartitionIterators.MergeListener() + { + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + UnfilteredRowIterators.MergeListener rowListener = partitionListener.getRowMergeListener(partitionKey, versions); + + return new UnfilteredRowIterators.MergeListener() + { + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + try + { + rowListener.onMergedPartitionLevelDeletion(mergedDeletion, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging partition level deletion on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + mergedDeletion == null ? "null" : mergedDeletion.toString(), + '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']', + sources.contacts(), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + } + + public Row onMergedRows(Row merged, Row[] versions) + { + try + { + return rowListener.onMergedRows(merged, versions); + } + catch (AssertionError e) + { + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging rows on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + sources.contacts(), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + } + + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + try + { + // The code for merging range tombstones is a tad complex and we had the assertions there triggered + // unexpectedly in a few occasions (CASSANDRA-13237, CASSANDRA-13719). It's hard to get insights + // when that happen without more context that what the assertion errors give us however, hence the + // catch here that basically gather as much as context as reasonable. + rowListener.onMergedRangeTombstoneMarkers(merged, versions); + } + catch (AssertionError e) + { + + // The following can be pretty verbose, but it's really only triggered if a bug happen, so we'd + // rather get more info to debug than not. + TableMetadata table = command.metadata(); + String details = String.format("Error merging RTs on %s: merged=%s, versions=%s, sources={%s}, debug info:%n %s", + table, + merged == null ? "null" : merged.toString(table), + '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']', + sources.contacts(), + makeResponsesDebugString(partitionKey)); + throw new AssertionError(details, e); + } + + } + + public void close() + { + rowListener.close(); + } + }; + } + + public void close() + { + partitionListener.close(); + if (repairedDataTracker != null) + repairedDataTracker.verify(); + } + }; + } +} diff --cc src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java index b4a3cb5,0000000..4f0ad4d mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java @@@ -1,480 -1,0 +1,550 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + ++import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; - import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; - import java.util.SortedMap; - import java.util.TreeMap; - import java.util.stream.Collectors; ++import java.util.concurrent.TimeUnit; ++import java.util.Queue; ++import java.util.function.Function; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Columns; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.RegularAndStaticColumns; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; ++import org.apache.cassandra.db.partitions.PartitionIterator; ++import org.apache.cassandra.db.partitions.PartitionIterators; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; ++import org.apache.cassandra.exceptions.OverloadedException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.locator.Endpoints; ++import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.TableMetadata; ++import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.reads.repair.NoopReadRepair; +import org.apache.cassandra.tracing.Tracing; ++import org.apache.cassandra.utils.NoSpamLogger; +import org.apache.cassandra.utils.btree.BTreeSet; + +/** + * Helper in charge of collecting additional queries to be done on the coordinator to protect against invalid results + * being included due to replica-side filtering (secondary indexes or {@code ALLOW * FILTERING}). + * <p> + * When using replica-side filtering with CL>ONE, a replica can send a stale result satisfying the filter, while updated + * replicas won't send a corresponding tombstone to discard that result during reconciliation. This helper identifies + * the rows in a replica response that don't have a corresponding row in other replica responses, and requests them by + * primary key to the "silent" replicas in a second fetch round. + * <p> - * See CASSANDRA-8272 and CASSANDRA-8273 for further details. ++ * See CASSANDRA-8272, CASSANDRA-8273, and CASSANDRA-15907 for further details. + */ +class ReplicaFilteringProtection<E extends Endpoints<E>> +{ + private static final Logger logger = LoggerFactory.getLogger(ReplicaFilteringProtection.class); ++ private static final NoSpamLogger oneMinuteLogger = NoSpamLogger.getLogger(logger, 1, TimeUnit.MINUTES); ++ ++ private static final Function<UnfilteredRowIterator, EncodingStats> NULL_TO_NO_STATS = ++ rowIterator -> rowIterator == null ? EncodingStats.NO_STATS : rowIterator.stats(); + + private final Keyspace keyspace; + private final ReadCommand command; + private final ConsistencyLevel consistency; + private final long queryStartNanoTime; + private final E sources; + private final TableMetrics tableMetrics; + - /** - * Per-source primary keys of the rows that might be outdated so they need to be fetched. - * For outdated static rows we use an empty builder to signal it has to be queried. - */ - private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> rowsToFetch; ++ private final int cachedRowsWarnThreshold; ++ private final int cachedRowsFailThreshold; ++ ++ /** Tracks whether or not we've already hit the warning threshold while evaluating a partition. */ ++ private boolean hitWarningThreshold = false; ++ ++ private int currentRowsCached = 0; // tracks the current number of cached rows ++ private int maxRowsCached = 0; // tracks the high watermark for the number of cached rows + + /** - * Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows. ++ * Per-source list of the pending partitions seen by the merge listener, to be merged with the extra fetched rows. + */ - private final List<List<PartitionBuilder>> originalPartitions; ++ private final List<Queue<PartitionBuilder>> originalPartitions; + + ReplicaFilteringProtection(Keyspace keyspace, + ReadCommand command, + ConsistencyLevel consistency, + long queryStartNanoTime, - E sources) ++ E sources, ++ int cachedRowsWarnThreshold, ++ int cachedRowsFailThreshold) + { + this.keyspace = keyspace; + this.command = command; + this.consistency = consistency; + this.queryStartNanoTime = queryStartNanoTime; + this.sources = sources; - this.rowsToFetch = new ArrayList<>(sources.size()); + this.originalPartitions = new ArrayList<>(sources.size()); + - for (Replica ignored : sources) ++ for (int i = 0; i < sources.size(); i++) + { - rowsToFetch.add(new TreeMap<>()); - originalPartitions.add(new ArrayList<>()); ++ originalPartitions.add(new ArrayDeque<>()); + } + + tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().id); - } + - private BTreeSet.Builder<Clustering> getOrCreateToFetch(int source, DecoratedKey partitionKey) - { - return rowsToFetch.get(source).computeIfAbsent(partitionKey, k -> BTreeSet.builder(command.metadata().comparator)); ++ this.cachedRowsWarnThreshold = cachedRowsWarnThreshold; ++ this.cachedRowsFailThreshold = cachedRowsFailThreshold; + } + - /** - * Returns the protected results for the specified replica. These are generated fetching the extra rows and merging - * them with the cached original filtered results for that replica. - * - * @param source the source - * @return the protected results for the specified replica - */ - UnfilteredPartitionIterator queryProtectedPartitions(int source) ++ private UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, Replica source, ReplicaPlan.Shared<EndpointsForToken, ReplicaPlan.ForTokenRead> replicaPlan) + { - UnfilteredPartitionIterator original = makeIterator(originalPartitions.get(source)); - SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>> toFetch = rowsToFetch.get(source); - - if (toFetch.isEmpty()) - return original; ++ @SuppressWarnings("unchecked") ++ DataResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> resolver = ++ new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead>) NoopReadRepair.instance, queryStartNanoTime); + - // TODO: this would be more efficient if we had multi-key queries internally - List<UnfilteredPartitionIterator> fetched = toFetch.keySet() - .stream() - .map(k -> querySourceOnKey(source, k)) - .collect(Collectors.toList()); - - return UnfilteredPartitionIterators.merge(Arrays.asList(original, UnfilteredPartitionIterators.concat(fetched)), null); - } - - private UnfilteredPartitionIterator querySourceOnKey(int i, DecoratedKey key) - { - BTreeSet.Builder<Clustering> builder = rowsToFetch.get(i).get(key); - assert builder != null; // We're calling this on the result of rowsToFetch.get(i).keySet() - - Replica source = sources.get(i); - NavigableSet<Clustering> clusterings = builder.build(); - tableMetrics.replicaSideFilteringProtectionRequests.mark(); - if (logger.isTraceEnabled()) - logger.trace("Requesting rows {} in partition {} from {} for replica-side filtering protection", - clusterings, key, source); - Tracing.trace("Requesting {} rows in partition {} from {} for replica-side filtering protection", - clusterings.size(), key, source); - - // build the read command taking into account that we could be requesting only in the static row - DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE; - ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed()); - SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), - command.nowInSec(), - command.columnFilter(), - RowFilter.NONE, - limits, - key, - filter); - - ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(keyspace, key.getToken(), source); - ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan); - try - { - return executeReadCommand(cmd, source, sharedReplicaPlan); - } - catch (ReadTimeoutException e) - { - int blockFor = consistency.blockFor(keyspace); - throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); - } - catch (UnavailableException e) - { - int blockFor = consistency.blockFor(keyspace); - throw UnavailableException.create(consistency, blockFor, blockFor - 1); - } - } - - private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> - UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, Replica source, ReplicaPlan.Shared<E, P> replicaPlan) - { - DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime); - ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); ++ ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); + + if (source.isSelf()) + { + Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); + } + else + { + if (source.isTransient()) + cmd = cmd.copyAsTransientQuery(source); + MessagingService.instance().sendWithCallback(cmd.createMessage(false), source.endpoint(), handler); + } + + // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. + handler.awaitResults(); + assert resolver.getMessages().size() == 1; + return resolver.getMessages().get(0).payload.makeIterator(command); + } + + /** + * Returns a merge listener that skips the merged rows for which any of the replicas doesn't have a version, + * pessimistically assuming that they are outdated. It is intended to be used during a first merge of per-replica + * query results to ensure we fetch enough results from the replicas to ensure we don't miss any potentially + * outdated result. + * <p> + * The listener will track both the accepted data and the primary keys of the rows that are considered as outdated. + * That way, once the query results would have been merged using this listener, further calls to - * {@link #queryProtectedPartitions(int)} will use the collected data to return a copy of the ++ * {@link #queryProtectedPartitions(PartitionIterator, int)} will use the collected data to return a copy of the + * data originally collected from the specified replica, completed with the potentially outdated rows. + */ + UnfilteredPartitionIterators.MergeListener mergeController() + { - return (partitionKey, versions) -> { ++ return new UnfilteredPartitionIterators.MergeListener() ++ { ++ @Override ++ public void close() ++ { ++ // If we hit the failure threshold before consuming a single partition, record the current rows cached. ++ tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, maxRowsCached)); ++ } + - PartitionBuilder[] builders = new PartitionBuilder[sources.size()]; ++ @Override ++ public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) ++ { ++ List<PartitionBuilder> builders = new ArrayList<>(sources.size()); ++ RegularAndStaticColumns columns = columns(versions); ++ EncodingStats stats = EncodingStats.merge(versions, NULL_TO_NO_STATS); + - for (int i = 0; i < sources.size(); i++) - builders[i] = new PartitionBuilder(command, partitionKey, columns(versions), stats(versions)); ++ for (int i = 0; i < sources.size(); i++) ++ builders.add(i, new PartitionBuilder(partitionKey, sources.get(i), columns, stats)); + - return new UnfilteredRowIterators.MergeListener() - { - @Override - public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) ++ return new UnfilteredRowIterators.MergeListener() + { - // cache the deletion time versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].setDeletionTime(versions[i]); - } ++ @Override ++ public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) ++ { ++ // cache the deletion time versions to be able to regenerate the original row iterator ++ for (int i = 0; i < versions.length; i++) ++ builders.get(i).setDeletionTime(versions[i]); ++ } + - @Override - public Row onMergedRows(Row merged, Row[] versions) - { - // cache the row versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].addRow(versions[i]); ++ @Override ++ public Row onMergedRows(Row merged, Row[] versions) ++ { ++ // cache the row versions to be able to regenerate the original row iterator ++ for (int i = 0; i < versions.length; i++) ++ builders.get(i).addRow(versions[i]); + - if (merged.isEmpty()) - return merged; ++ if (merged.isEmpty()) ++ return merged; + - boolean isPotentiallyOutdated = false; - boolean isStatic = merged.isStatic(); - for (int i = 0; i < versions.length; i++) - { - Row version = versions[i]; - if (version == null || (isStatic && version.isEmpty())) ++ boolean isPotentiallyOutdated = false; ++ boolean isStatic = merged.isStatic(); ++ for (int i = 0; i < versions.length; i++) + { - isPotentiallyOutdated = true; - BTreeSet.Builder<Clustering> toFetch = getOrCreateToFetch(i, partitionKey); - // Note that for static, we shouldn't add the clustering to the clustering set (the - // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact - // we created a builder in the first place will act as a marker that the static row must be - // fetched, even if no other rows are added for this partition. - if (!isStatic) - toFetch.add(merged.clustering()); ++ Row version = versions[i]; ++ if (version == null || (isStatic && version.isEmpty())) ++ { ++ isPotentiallyOutdated = true; ++ builders.get(i).addToFetch(merged); ++ } + } - } + - // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be - // an outdated result that is only present because other replica have filtered the up-to-date result - // out), then we skip the row. In other words, the results of the initial merging of results by this - // protection assume the worst case scenario where every row that might be outdated actually is. - // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed - // to look at enough data to ultimately fulfill the query limit. - return isPotentiallyOutdated ? null : merged; - } ++ // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be ++ // an outdated result that is only present because other replica have filtered the up-to-date result ++ // out), then we skip the row. In other words, the results of the initial merging of results by this ++ // protection assume the worst case scenario where every row that might be outdated actually is. ++ // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed ++ // to look at enough data to ultimately fulfill the query limit. ++ return isPotentiallyOutdated ? null : merged; ++ } + - @Override - public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) - { - // cache the marker versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].addRangeTombstoneMarker(versions[i]); - } ++ @Override ++ public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) ++ { ++ // cache the marker versions to be able to regenerate the original row iterator ++ for (int i = 0; i < versions.length; i++) ++ builders.get(i).addRangeTombstoneMarker(versions[i]); ++ } + - @Override - public void close() - { - for (int i = 0; i < sources.size(); i++) - originalPartitions.get(i).add(builders[i]); - } - }; ++ @Override ++ public void close() ++ { ++ for (int i = 0; i < sources.size(); i++) ++ originalPartitions.get(i).add(builders.get(i)); ++ } ++ }; ++ } + }; + } + ++ private void incrementCachedRows() ++ { ++ currentRowsCached++; ++ ++ if (currentRowsCached == cachedRowsFailThreshold + 1) ++ { ++ String message = String.format("Replica filtering protection has cached over %d rows during query %s. " + ++ "(See 'cached_replica_rows_fail_threshold' in cassandra.yaml.)", ++ cachedRowsFailThreshold, command.toCQLString()); ++ ++ logger.error(message); ++ Tracing.trace(message); ++ throw new OverloadedException(message); ++ } ++ else if (currentRowsCached == cachedRowsWarnThreshold + 1 && !hitWarningThreshold) ++ { ++ hitWarningThreshold = true; ++ ++ String message = String.format("Replica filtering protection has cached over %d rows during query %s. " + ++ "(See 'cached_replica_rows_warn_threshold' in cassandra.yaml.)", ++ cachedRowsWarnThreshold, command.toCQLString()); ++ ++ ClientWarn.instance.warn(message); ++ oneMinuteLogger.warn(message); ++ Tracing.trace(message); ++ } ++ } ++ ++ private void releaseCachedRows(int count) ++ { ++ maxRowsCached = Math.max(maxRowsCached, currentRowsCached); ++ currentRowsCached -= count; ++ } ++ + private static RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions) + { + Columns statics = Columns.NONE; + Columns regulars = Columns.NONE; + for (UnfilteredRowIterator iter : versions) + { + if (iter == null) + continue; + + RegularAndStaticColumns cols = iter.columns(); + statics = statics.mergeTo(cols.statics); + regulars = regulars.mergeTo(cols.regulars); + } + return new RegularAndStaticColumns(statics, regulars); + } + - private static EncodingStats stats(List<UnfilteredRowIterator> iterators) - { - EncodingStats stats = EncodingStats.NO_STATS; - for (UnfilteredRowIterator iter : iterators) - { - if (iter == null) - continue; - - stats = stats.mergeWith(iter.stats()); - } - return stats; - } - - private UnfilteredPartitionIterator makeIterator(List<PartitionBuilder> builders) ++ /** ++ * Returns the protected results for the specified replica. These are generated fetching the extra rows and merging ++ * them with the cached original filtered results for that replica. ++ * ++ * @param merged the first iteration partitions, that should have been read used with the {@link #mergeController()} ++ * @param source the source ++ * @return the protected results for the specified replica ++ */ ++ UnfilteredPartitionIterator queryProtectedPartitions(PartitionIterator merged, int source) + { + return new UnfilteredPartitionIterator() + { - final Iterator<PartitionBuilder> iterator = builders.iterator(); ++ final Queue<PartitionBuilder> partitions = originalPartitions.get(source); + + @Override + public TableMetadata metadata() + { + return command.metadata(); + } + + @Override - public void close() - { - // nothing to do here - } ++ public void close() { } + + @Override + public boolean hasNext() + { - return iterator.hasNext(); ++ // If there are no cached partition builders for this source, advance the first phase iterator, which ++ // will force the RFP merge listener to load at least the next protected partition. Note that this may ++ // load more than one partition if any divergence between replicas is discovered by the merge listener. ++ if (partitions.isEmpty()) ++ { ++ PartitionIterators.consumeNext(merged); ++ } ++ ++ return !partitions.isEmpty(); + } + + @Override + public UnfilteredRowIterator next() + { - return iterator.next().build(); ++ PartitionBuilder builder = partitions.poll(); ++ assert builder != null; ++ return builder.protectedPartition(); + } + }; + } + - private static class PartitionBuilder ++ private class PartitionBuilder + { - private final ReadCommand command; - private final DecoratedKey partitionKey; ++ private final DecoratedKey key; ++ private final Replica source; + private final RegularAndStaticColumns columns; + private final EncodingStats stats; + + private DeletionTime deletionTime; + private Row staticRow = Rows.EMPTY_STATIC_ROW; - private final List<Unfiltered> contents = new ArrayList<>(); ++ private final Queue<Unfiltered> contents = new ArrayDeque<>(); ++ private BTreeSet.Builder<Clustering> toFetch; ++ private int partitionRowsCached; + - private PartitionBuilder(ReadCommand command, - DecoratedKey partitionKey, - RegularAndStaticColumns columns, - EncodingStats stats) ++ private PartitionBuilder(DecoratedKey key, Replica source, RegularAndStaticColumns columns, EncodingStats stats) + { - this.command = command; - this.partitionKey = partitionKey; ++ this.key = key; ++ this.source = source; + this.columns = columns; + this.stats = stats; + } + + private void setDeletionTime(DeletionTime deletionTime) + { + this.deletionTime = deletionTime; + } + + private void addRow(Row row) + { ++ partitionRowsCached++; ++ ++ incrementCachedRows(); ++ ++ // Note that even null rows are counted against the row caching limit. The assumption is that ++ // a subsequent protection query will later fetch the row onto the heap anyway. + if (row == null) + return; + + if (row.isStatic()) + staticRow = row; + else + contents.add(row); + } + + private void addRangeTombstoneMarker(RangeTombstoneMarker marker) + { + if (marker != null) + contents.add(marker); + } + - private UnfilteredRowIterator build() ++ private void addToFetch(Row row) ++ { ++ if (toFetch == null) ++ toFetch = BTreeSet.builder(command.metadata().comparator); ++ ++ // Note that for static, we shouldn't add the clustering to the clustering set (the ++ // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact ++ // we created a builder in the first place will act as a marker that the static row must be ++ // fetched, even if no other rows are added for this partition. ++ if (!row.isStatic()) ++ toFetch.add(row.clustering()); ++ } ++ ++ private UnfilteredRowIterator originalPartition() + { + return new UnfilteredRowIterator() + { - final Iterator<Unfiltered> iterator = contents.iterator(); - + @Override + public DeletionTime partitionLevelDeletion() + { + return deletionTime; + } + + @Override + public EncodingStats stats() + { + return stats; + } + + @Override + public TableMetadata metadata() + { + return command.metadata(); + } + + @Override + public boolean isReverseOrder() + { + return command.isReversed(); + } + + @Override + public RegularAndStaticColumns columns() + { + return columns; + } + + @Override + public DecoratedKey partitionKey() + { - return partitionKey; ++ return key; + } + + @Override + public Row staticRow() + { + return staticRow; + } + + @Override + public void close() + { - // nothing to do here ++ releaseCachedRows(partitionRowsCached); + } + + @Override + public boolean hasNext() + { - return iterator.hasNext(); ++ return !contents.isEmpty(); + } + + @Override + public Unfiltered next() + { - return iterator.next(); ++ return contents.poll(); + } + }; + } ++ ++ private UnfilteredRowIterator protectedPartition() ++ { ++ UnfilteredRowIterator original = originalPartition(); ++ ++ if (toFetch != null) ++ { ++ try (UnfilteredPartitionIterator partitions = fetchFromSource()) ++ { ++ if (partitions.hasNext()) ++ { ++ try (UnfilteredRowIterator fetchedRows = partitions.next()) ++ { ++ return UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows)); ++ } ++ } ++ } ++ } ++ ++ return original; ++ } ++ ++ private UnfilteredPartitionIterator fetchFromSource() ++ { ++ assert toFetch != null; ++ ++ NavigableSet<Clustering> clusterings = toFetch.build(); ++ tableMetrics.replicaFilteringProtectionRequests.mark(); ++ ++ if (logger.isTraceEnabled()) ++ logger.trace("Requesting rows {} in partition {} from {} for replica filtering protection", ++ clusterings, key, source); ++ ++ Tracing.trace("Requesting {} rows in partition {} from {} for replica filtering protection", ++ clusterings.size(), key, source); ++ ++ // build the read command taking into account that we could be requesting only in the static row ++ DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE; ++ ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed()); ++ SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), ++ command.nowInSec(), ++ command.columnFilter(), ++ RowFilter.NONE, ++ limits, ++ key, ++ filter); ++ ++ ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(keyspace, key.getToken(), source); ++ ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan); ++ ++ try ++ { ++ return executeReadCommand(cmd, source, sharedReplicaPlan); ++ } ++ catch (ReadTimeoutException e) ++ { ++ int blockFor = consistency.blockFor(keyspace); ++ throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); ++ } ++ catch (UnavailableException e) ++ { ++ int blockFor = consistency.blockFor(keyspace); ++ throw UnavailableException.create(consistency, blockFor, blockFor - 1); ++ } ++ } + } +} diff --cc src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java index 59edc5a,0000000..42676b6 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java @@@ -1,199 -1,0 +1,207 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.locator.ReplicaPlans; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.PartitionRangeReadCommand; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.MorePartitions; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.ExcludingBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.reads.repair.NoopReadRepair; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.tracing.Tracing; + +public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowIterator> implements MorePartitions<UnfilteredPartitionIterator> +{ + private static final Logger logger = LoggerFactory.getLogger(ShortReadPartitionsProtection.class); + private final ReadCommand command; + private final Replica source; + ++ private final Runnable preFetchCallback; // called immediately before fetching more contents ++ + private final DataLimits.Counter singleResultCounter; // unmerged per-source counter + private final DataLimits.Counter mergedResultCounter; // merged end-result counter + + private DecoratedKey lastPartitionKey; // key of the last observed partition + + private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call + + private final long queryStartNanoTime; + - public ShortReadPartitionsProtection(ReadCommand command, Replica source, ++ public ShortReadPartitionsProtection(ReadCommand command, ++ Replica source, ++ Runnable preFetchCallback, + DataLimits.Counter singleResultCounter, + DataLimits.Counter mergedResultCounter, + long queryStartNanoTime) + { + this.command = command; + this.source = source; ++ this.preFetchCallback = preFetchCallback; + this.singleResultCounter = singleResultCounter; + this.mergedResultCounter = mergedResultCounter; + this.queryStartNanoTime = queryStartNanoTime; + } + + @Override + public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + partitionsFetched = true; + + lastPartitionKey = partition.partitionKey(); + + /* + * Extend for moreContents() then apply protection to track lastClustering by applyToRow(). + * + * If we don't apply the transformation *after* extending the partition with MoreRows, + * applyToRow() method of protection will not be called on the first row of the new extension iterator. + */ + ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source); + ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan); + ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(), + command, source, + (cmd) -> executeReadCommand(cmd, sharedReplicaPlan), + singleResultCounter, + mergedResultCounter); + return Transformation.apply(MoreRows.extend(partition, protection), protection); + } + + /* + * We only get here once all the rows and partitions in this iterator have been iterated over, and so + * if the node had returned the requested number of rows but we still get here, then some results were + * skipped during reconciliation. + */ + public UnfilteredPartitionIterator moreContents() + { + // never try to request additional partitions from replicas if our reconciled partitions are already filled to the limit + assert !mergedResultCounter.isDone(); + + // we do not apply short read protection when we have no limits at all + assert !command.limits().isUnlimited(); + + /* + * If this is a single partition read command or an (indexed) partition range read command with + * a partition key specified, then we can't and shouldn't try fetch more partitions. + */ + assert !command.isLimitedToOnePartition(); + + /* + * If the returned result doesn't have enough rows/partitions to satisfy even the original limit, don't ask for more. + * + * Can only take the short cut if there is no per partition limit set. Otherwise it's possible to hit false + * positives due to some rows being uncounted for in certain scenarios (see CASSANDRA-13911). + */ + if (!singleResultCounter.isDone() && command.limits().perPartitionCount() == DataLimits.NO_LIMIT) + return null; + + /* + * Either we had an empty iterator as the initial response, or our moreContents() call got us an empty iterator. + * There is no point to ask the replica for more rows - it has no more in the requested range. + */ + if (!partitionsFetched) + return null; + partitionsFetched = false; + + /* + * We are going to fetch one partition at a time for thrift and potentially more for CQL. + * The row limit will either be set to the per partition limit - if the command has no total row limit set, or + * the total # of rows remaining - if it has some. If we don't grab enough rows in some of the partitions, + * then future ShortReadRowsProtection.moreContents() calls will fetch the missing ones. + */ + int toQuery = command.limits().count() != DataLimits.NO_LIMIT + ? command.limits().count() - counted(mergedResultCounter) + : command.limits().perPartitionCount(); + + ColumnFamilyStore.metricsFor(command.metadata().id).shortReadProtectionRequests.mark(); + Tracing.trace("Requesting {} extra rows from {} for short read protection", toQuery, source); + logger.info("Requesting {} extra rows from {} for short read protection", toQuery, source); + ++ // If we've arrived here, all responses have been consumed, and we're about to request more. ++ preFetchCallback.run(); ++ + return makeAndExecuteFetchAdditionalPartitionReadCommand(toQuery); + } + + // Counts the number of rows for regular queries and the number of groups for GROUP BY queries + private int counted(DataLimits.Counter counter) + { + return command.limits().isGroupByLimit() + ? counter.rowCounted() + : counter.counted(); + } + + private UnfilteredPartitionIterator makeAndExecuteFetchAdditionalPartitionReadCommand(int toQuery) + { + PartitionRangeReadCommand cmd = (PartitionRangeReadCommand) command; + + DataLimits newLimits = cmd.limits().forShortReadRetry(toQuery); + + AbstractBounds<PartitionPosition> bounds = cmd.dataRange().keyRange(); + AbstractBounds<PartitionPosition> newBounds = bounds.inclusiveRight() + ? new Range<>(lastPartitionKey, bounds.right) + : new ExcludingBounds<>(lastPartitionKey, bounds.right); + DataRange newDataRange = cmd.dataRange().forSubRange(newBounds); + + ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source, 1); + return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan)); + } + + private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> + UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan) + { + DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime); + ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime); + + if (source.isSelf()) + { + Stage.READ.maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler)); + } + else + { + if (source.isTransient()) + cmd = cmd.copyAsTransientQuery(source); + MessagingService.instance().sendWithCallback(cmd.createMessage(false), source.endpoint(), handler); + } + + // We don't call handler.get() because we want to preserve tombstones since we're still in the middle of merging node results. + handler.awaitResults(); + assert resolver.getMessages().size() == 1; + return resolver.getMessages().get(0).payload.makeIterator(command); + } +} diff --cc src/java/org/apache/cassandra/service/reads/ShortReadProtection.java index 1a454f9,0000000..a1bdc0e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ShortReadProtection.java @@@ -1,77 -1,0 +1,86 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import java.net.InetAddress; + + +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.transform.MorePartitions; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; + +/** + * We have a potential short read if the result from a given node contains the requested number of rows + * (i.e. it has stopped returning results due to the limit), but some of them haven't + * made it into the final post-reconciliation result due to other nodes' row, range, and/or partition tombstones. + * + * If that is the case, then that node may have more rows that we should fetch, as otherwise we could + * ultimately return fewer rows than required. Also, those additional rows may contain tombstones which + * which we also need to fetch as they may shadow rows or partitions from other replicas' results, which we would + * otherwise return incorrectly. + */ +public class ShortReadProtection +{ + @SuppressWarnings("resource") - public static UnfilteredPartitionIterator extend(Replica source, UnfilteredPartitionIterator partitions, - ReadCommand command, DataLimits.Counter mergedResultCounter, - long queryStartNanoTime, boolean enforceStrictLiveness) ++ public static UnfilteredPartitionIterator extend(Replica source, ++ Runnable preFetchCallback, ++ UnfilteredPartitionIterator partitions, ++ ReadCommand command, ++ DataLimits.Counter mergedResultCounter, ++ long queryStartNanoTime, ++ boolean enforceStrictLiveness) + { + DataLimits.Counter singleResultCounter = command.limits().newCounter(command.nowInSec(), + false, + command.selectsFullPartition(), + enforceStrictLiveness).onlyCount(); + - ShortReadPartitionsProtection protection = new ShortReadPartitionsProtection(command, source, singleResultCounter, mergedResultCounter, queryStartNanoTime); ++ ShortReadPartitionsProtection protection = new ShortReadPartitionsProtection(command, ++ source, ++ preFetchCallback, ++ singleResultCounter, ++ mergedResultCounter, ++ queryStartNanoTime); + + /* + * The order of extention and transformations is important here. Extending with more partitions has to happen + * first due to the way BaseIterator.hasMoreContents() works: only transformations applied after extension will + * be called on the first partition of the extended iterator. + * + * Additionally, we want singleResultCounter to be applied after SRPP, so that its applyToPartition() method will + * be called last, after the extension done by SRRP.applyToPartition() call. That way we preserve the same order + * when it comes to calling SRRP.moreContents() and applyToRow() callbacks. + * + * See ShortReadPartitionsProtection.applyToPartition() for more details. + */ + + // extend with moreContents() only if it's a range read command with no partition key specified + if (!command.isLimitedToOnePartition()) + partitions = MorePartitions.extend(partitions, protection); // register SRPP.moreContents() + + partitions = Transformation.apply(partitions, protection); // register SRPP.applyToPartition() + partitions = Transformation.apply(partitions, singleResultCounter); // register the per-source counter + + return partitions; + } +} diff --cc src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java index 2f82c22,0000000..b65f3fc mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java @@@ -1,83 -1,0 +1,82 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.Map; +import java.util.function.Consumer; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.service.reads.DigestResolver; + +/** + * Bypasses the read repair path for short read protection and testing + */ - public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> - implements ReadRepair<E, P> ++public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements ReadRepair<E, P> +{ + public static final NoopReadRepair instance = new NoopReadRepair(); + + private NoopReadRepair() {} + + @Override + public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicas) + { + return UnfilteredPartitionIterators.MergeListener.NOOP; + } + + @Override + public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer) + { + resultConsumer.accept(digestResolver.getData()); + } + + public void awaitReads() throws ReadTimeoutException + { + } + + @Override + public void maybeSendAdditionalReads() + { + + } + + @Override + public void maybeSendAdditionalWrites() + { + + } + + @Override + public void awaitWrites() + { + + } + + @Override + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan) + { + + } +} diff --cc src/java/org/apache/cassandra/utils/concurrent/Accumulator.java index 7adb33b,15afdbe..3ebbdce --- a/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java +++ b/src/java/org/apache/cassandra/utils/concurrent/Accumulator.java @@@ -18,9 -18,6 +18,8 @@@ */ package org.apache.cassandra.utils.concurrent; +import java.util.AbstractCollection; +import java.util.Collection; - import java.util.Arrays; import java.util.Iterator; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@@ -139,27 -136,8 +138,27 @@@ public class Accumulator<E return (E) values[i]; } + public Collection<E> snapshot() + { + int count = presentCount; + return new AbstractCollection<E>() + { + @Override + public Iterator<E> iterator() + { + return Accumulator.this.iterator(count); + } + + @Override + public int size() + { + return count; + } + }; + } + /** - * Removes all of the elements from this accumulator. + * Removes element at the speficied index from this accumulator. * * This method is not thread-safe when used concurrently with {@link #add(Object)}. */ diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java index 2ee209d,c449fdd..0b6fe6e --- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java @@@ -98,10 -104,13 +104,14 @@@ public class Coordinator implements ICo Integer.MAX_VALUE, null, null, - ProtocolVersion.CURRENT), + ProtocolVersion.V4, + null), System.nanoTime()); + // Collect warnings reported during the query. + if (res != null) + res.setWarnings(ClientWarn.instance.getWarnings()); + return RowUtil.toQueryResult(res); } diff --cc test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java index 0000000,2a847bf..f891dfe mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/ReplicaFilteringProtectionTest.java @@@ -1,0 -1,244 +1,244 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import java.io.IOException; + import java.util.List; + + import org.junit.AfterClass; + import org.junit.BeforeClass; + import org.junit.Test; + + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.distributed.Cluster; + import org.apache.cassandra.distributed.api.IInvokableInstance; + import org.apache.cassandra.distributed.api.SimpleQueryResult; + import org.apache.cassandra.exceptions.OverloadedException; + import org.apache.cassandra.service.StorageService; + + import static org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_FAIL_THRESHOLD; + import static org.apache.cassandra.config.ReplicaFilteringProtectionOptions.DEFAULT_WARN_THRESHOLD; + import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL; + import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + import static org.apache.cassandra.distributed.shared.AssertUtils.row; + import static org.junit.Assert.assertEquals; + + /** - * Exercises the functionality of {@link org.apache.cassandra.service.ReplicaFilteringProtection}, the ++ * Exercises the functionality of {@link org.apache.cassandra.service.reads.ReplicaFilteringProtection}, the + * mechanism that ensures distributed index and filtering queries at read consistency levels > ONE/LOCAL_ONE + * avoid stale replica results. + */ + public class ReplicaFilteringProtectionTest extends TestBaseImpl + { + private static final int REPLICAS = 2; + private static final int ROWS = 3; + + private static Cluster cluster; + + @BeforeClass + public static void setup() throws IOException + { + cluster = init(Cluster.build() + .withNodes(REPLICAS) + .withConfig(config -> config.set("hinted_handoff_enabled", false) + .set("commitlog_sync", "batch") + .set("num_tokens", 1)).start()); + + // Make sure we start w/ the correct defaults: + cluster.get(1).runOnInstance(() -> assertEquals(DEFAULT_WARN_THRESHOLD, StorageService.instance.getCachedReplicaRowsWarnThreshold())); + cluster.get(1).runOnInstance(() -> assertEquals(DEFAULT_FAIL_THRESHOLD, StorageService.instance.getCachedReplicaRowsFailThreshold())); + } + + @AfterClass + public static void teardown() + { + cluster.close(); + } + + @Test + public void testMissedUpdatesBelowCachingWarnThreshold() + { + String tableName = "missed_updates_no_warning"; + cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); + + // The warning threshold provided is one more than the total number of rows returned + // to the coordinator from all replicas and therefore should not be triggered. + testMissedUpdates(tableName, REPLICAS * ROWS, Integer.MAX_VALUE, false); + } + + @Test + public void testMissedUpdatesAboveCachingWarnThreshold() + { + String tableName = "missed_updates_cache_warn"; + cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); + + // The warning threshold provided is one less than the total number of rows returned + // to the coordinator from all replicas and therefore should be triggered but not fail the query. + testMissedUpdates(tableName, REPLICAS * ROWS - 1, Integer.MAX_VALUE, true); + } + + @Test + public void testMissedUpdatesAroundCachingFailThreshold() + { + String tableName = "missed_updates_cache_fail"; + cluster.schemaChange(withKeyspace("CREATE TABLE %s." + tableName + " (k int PRIMARY KEY, v text)")); + + // The failure threshold provided is exactly the total number of rows returned + // to the coordinator from all replicas and therefore should just warn. + testMissedUpdates(tableName, 1, REPLICAS * ROWS, true); + + try + { + // The failure threshold provided is one less than the total number of rows returned + // to the coordinator from all replicas and therefore should fail the query. + testMissedUpdates(tableName, 1, REPLICAS * ROWS - 1, true); + } + catch (RuntimeException e) + { - assertEquals(e.getCause().getClass().getName(), OverloadedException.class.getName()); ++ assertEquals(e.getClass().getName(), OverloadedException.class.getName()); + } + } + + private void testMissedUpdates(String tableName, int warnThreshold, int failThreshold, boolean shouldWarn) + { + cluster.get(1).runOnInstance(() -> StorageService.instance.setCachedReplicaRowsWarnThreshold(warnThreshold)); + cluster.get(1).runOnInstance(() -> StorageService.instance.setCachedReplicaRowsFailThreshold(failThreshold)); + + String fullTableName = KEYSPACE + '.' + tableName; + + // Case 1: Insert and query rows at ALL to verify base line. + for (int i = 0; i < ROWS; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + fullTableName + "(k, v) VALUES (?, 'old')", ALL, i); + } + + long histogramSampleCount = rowsCachedPerQueryCount(cluster.get(1), tableName); - ++ + String query = "SELECT * FROM " + fullTableName + " WHERE v = ? LIMIT ? ALLOW FILTERING"; + + Object[][] initialRows = cluster.coordinator(1).execute(query, ALL, "old", ROWS); + assertRows(initialRows, row(1, "old"), row(0, "old"), row(2, "old")); + + // Make sure only one sample was recorded for the query. + assertEquals(histogramSampleCount + 1, rowsCachedPerQueryCount(cluster.get(1), tableName)); + + // Case 2: Update all rows on only one replica, leaving the entire dataset of the remaining replica out-of-date. + updateAllRowsOn(1, fullTableName, "new"); + + // The replica that missed the results creates a mismatch at every row, and we therefore cache a version + // of that row for all replicas. + SimpleQueryResult oldResult = cluster.coordinator(1).executeWithResult(query, ALL, "old", ROWS); + assertRows(oldResult.toObjectArrays()); + verifyWarningState(shouldWarn, oldResult); + + // We should have made 3 row "completion" requests. + assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName)); + - // In all cases above, the queries should be caching 1 row per partition per replica, but ++ // In all cases above, the queries should be caching 1 row per partition per replica, but + // 6 for the whole query, given every row is potentially stale. + assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), tableName)); + + // Make sure only one more sample was recorded for the query. + assertEquals(histogramSampleCount + 2, rowsCachedPerQueryCount(cluster.get(1), tableName)); + + // Case 3: Observe the effects of blocking read-repair. - ++ + // The previous query peforms a blocking read-repair, which removes replica divergence. This + // will only warn, therefore, if the warning threshold is actually below the number of replicas. + // (i.e. The row cache counter is decremented/reset as each partition is consumed.) + SimpleQueryResult newResult = cluster.coordinator(1).executeWithResult(query, ALL, "new", ROWS); + Object[][] newRows = newResult.toObjectArrays(); + assertRows(newRows, row(1, "new"), row(0, "new"), row(2, "new")); - ++ + verifyWarningState(warnThreshold < REPLICAS, newResult); - ++ + // We still sould only have made 3 row "completion" requests, with no replica divergence in the last query. + assertEquals(ROWS, protectionQueryCount(cluster.get(1), tableName)); + + // With no replica divergence, we only cache a single partition at a time across 2 replicas. + assertEquals(REPLICAS, minRowsCachedPerQuery(cluster.get(1), tableName)); + + // Make sure only one more sample was recorded for the query. + assertEquals(histogramSampleCount + 3, rowsCachedPerQueryCount(cluster.get(1), tableName)); + + // Case 4: Introduce another mismatch by updating all rows on only one replica. - ++ + updateAllRowsOn(1, fullTableName, "future"); + + // Another mismatch is introduced, and we once again cache a version of each row during resolution. + SimpleQueryResult futureResult = cluster.coordinator(1).executeWithResult(query, ALL, "future", ROWS); + Object[][] futureRows = futureResult.toObjectArrays(); + assertRows(futureRows, row(1, "future"), row(0, "future"), row(2, "future")); - ++ + verifyWarningState(shouldWarn, futureResult); + + // We sould have made 3 more row "completion" requests. + assertEquals(ROWS * 2, protectionQueryCount(cluster.get(1), tableName)); + - // In all cases above, the queries should be caching 1 row per partition, but 6 for the ++ // In all cases above, the queries should be caching 1 row per partition, but 6 for the + // whole query, given every row is potentially stale. + assertEquals(ROWS * REPLICAS, maxRowsCachedPerQuery(cluster.get(1), tableName)); + + // Make sure only one more sample was recorded for the query. + assertEquals(histogramSampleCount + 4, rowsCachedPerQueryCount(cluster.get(1), tableName)); + } - ++ + private void updateAllRowsOn(int node, String table, String value) + { + for (int i = 0; i < ROWS; i++) + { + cluster.get(node).executeInternal("UPDATE " + table + " SET v = ? WHERE k = ?", value, i); + } + } - ++ + private void verifyWarningState(boolean shouldWarn, SimpleQueryResult futureResult) + { + List<String> futureWarnings = futureResult.warnings(); + assertEquals(shouldWarn, futureWarnings.stream().anyMatch(w -> w.contains("cached_replica_rows_warn_threshold"))); + assertEquals(shouldWarn ? 1 : 0, futureWarnings.size()); + } + + private long protectionQueryCount(IInvokableInstance instance, String tableName) + { + return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) + .getColumnFamilyStore(tableName) + .metric.replicaFilteringProtectionRequests.getCount()); + } + + private long maxRowsCachedPerQuery(IInvokableInstance instance, String tableName) + { + return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) + .getColumnFamilyStore(tableName) + .metric.rfpRowsCachedPerQuery.getSnapshot().getMax()); + } + + private long minRowsCachedPerQuery(IInvokableInstance instance, String tableName) + { + return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) + .getColumnFamilyStore(tableName) + .metric.rfpRowsCachedPerQuery.getSnapshot().getMin()); + } + + private long rowsCachedPerQueryCount(IInvokableInstance instance, String tableName) + { + return instance.callOnInstance(() -> Keyspace.open(KEYSPACE) + .getColumnFamilyStore(tableName) + .metric.rfpRowsCachedPerQuery.getCount()); + } -} ++} diff --cc test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java index 9b34a23,ce0fe27..da5bd32 --- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java @@@ -105,26 -105,26 +105,26 @@@ public class AccumulatorTes accu.add("2"); accu.add("3"); - accu.clearUnsafe(); + accu.clearUnsafe(1); - assertEquals(0, accu.size()); - assertFalse(accu.snapshot().iterator().hasNext()); - assertOutOfBonds(accu, 0); + assertEquals(3, accu.size()); - assertTrue(accu.iterator().hasNext()); ++ assertTrue(accu.snapshot().iterator().hasNext()); accu.add("4"); accu.add("5"); - assertEquals(2, accu.size()); + assertEquals(5, accu.size()); - assertEquals("4", accu.get(0)); - assertEquals("5", accu.get(1)); - assertOutOfBonds(accu, 2); + assertEquals("4", accu.get(3)); + assertEquals("5", accu.get(4)); + assertOutOfBonds(accu, 5); - Iterator<String> iter = accu.iterator(); + Iterator<String> iter = accu.snapshot().iterator(); assertTrue(iter.hasNext()); - assertEquals("4", iter.next()); - assertEquals("5", iter.next()); - assertFalse(iter.hasNext()); + assertEquals("1", iter.next()); + assertNull(iter.next()); + assertTrue(iter.hasNext()); + assertEquals("3", iter.next()); } private static void assertOutOfBonds(Accumulator<String> accumulator, int index) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
