This is an automated email from the ASF dual-hosted git repository. adelapena pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 2ef1f1c150e3d3f297e86c2b2efedd964a43b3c9 Merge: 86b7727 e5c3d08 Author: Caleb Rackliffe <[email protected]> AuthorDate: Thu Jul 30 13:27:53 2020 +0100 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + build.xml | 2 +- conf/cassandra.yaml | 20 + doc/source/operating/metrics.rst | 114 +++--- src/java/org/apache/cassandra/config/Config.java | 2 + .../cassandra/config/DatabaseDescriptor.java | 20 + .../config/ReplicaFilteringProtectionOptions.java | 28 ++ .../db/partitions/PartitionIterators.java | 48 ++- .../apache/cassandra/db/rows/EncodingStats.java | 24 ++ .../org/apache/cassandra/metrics/TableMetrics.java | 21 +- .../org/apache/cassandra/service/DataResolver.java | 87 +++-- .../service/ReplicaFilteringProtection.java | 421 ++++++++++++--------- .../apache/cassandra/service/StorageService.java | 27 +- .../cassandra/service/StorageServiceMBean.java | 12 + .../org/apache/cassandra/utils/FBUtilities.java | 4 +- .../cassandra/utils/concurrent/Accumulator.java | 9 +- .../cassandra/distributed/impl/Coordinator.java | 10 + .../apache/cassandra/distributed/impl/RowUtil.java | 7 +- .../test/ReplicaFilteringProtectionTest.java | 244 ++++++++++++ .../cassandra/db/rows/EncodingStatsTest.java | 145 +++++++ .../utils/concurrent/AccumulatorTest.java | 34 +- 21 files changed, 980 insertions(+), 300 deletions(-) diff --cc CHANGES.txt index 73f1a11,182dca3..9dbbd1c --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,10 -1,9 +1,11 @@@ -3.0.22: +3.11.8 + * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) +Merged from 3.0: + * Operational improvements and hardening for replica filtering protection (CASSANDRA-15907) * stop_paranoid disk failure policy is ignored on CorruptSSTableException after node is up (CASSANDRA-15191) - * 3.x fails to start if commit log has range tombstones from a column which is also deleted (CASSANDRA-15970) * Forbid altering UDTs used in partition keys (CASSANDRA-15933) * Fix empty/null json string representation (CASSANDRA-15896) + * 3.x fails to start if commit log has range tombstones from a column which is also deleted (CASSANDRA-15970) Merged from 2.2: * Fix CQL parsing of collections when the column type is reversed (CASSANDRA-15814) diff --cc conf/cassandra.yaml index 9182008,bb96f18..29442f5 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@@ -1119,69 -996,6 +1119,89 @@@ enable_scripted_user_defined_functions # setting. windows_timer_interval: 1 + +# Enables encrypting data at-rest (on disk). Different key providers can be plugged in, but the default reads from +# a JCE-style keystore. A single keystore can hold multiple keys, but the one referenced by +# the "key_alias" is the only key that will be used for encrypt opertaions; previously used keys +# can still (and should!) be in the keystore and will be used on decrypt operations +# (to handle the case of key rotation). +# +# It is strongly recommended to download and install Java Cryptography Extension (JCE) +# Unlimited Strength Jurisdiction Policy Files for your version of the JDK. +# (current link: http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html) +# +# Currently, only the following file types are supported for transparent data encryption, although +# more are coming in future cassandra releases: commitlog, hints +transparent_data_encryption_options: + enabled: false + chunk_length_kb: 64 + cipher: AES/CBC/PKCS5Padding + key_alias: testing:1 + # CBC IV length for AES needs to be 16 bytes (which is also the default size) + # iv_length: 16 + key_provider: + - class_name: org.apache.cassandra.security.JKSKeyProvider + parameters: + - keystore: conf/.keystore + keystore_password: cassandra + store_type: JCEKS + key_password: cassandra + + +##################### +# SAFETY THRESHOLDS # +##################### + +# When executing a scan, within or across a partition, we need to keep the +# tombstones seen in memory so we can return them to the coordinator, which +# will use them to make sure other replicas also know about the deleted rows. +# With workloads that generate a lot of tombstones, this can cause performance +# problems and even exaust the server heap. +# (http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets) +# Adjust the thresholds here if you understand the dangers and want to +# scan more tombstones anyway. These thresholds may also be adjusted at runtime +# using the StorageService mbean. +tombstone_warn_threshold: 1000 +tombstone_failure_threshold: 100000 + ++# Filtering and secondary index queries at read consistency levels above ONE/LOCAL_ONE use a ++# mechanism called replica filtering protection to ensure that results from stale replicas do ++# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more details.) This ++# mechanism materializes replica results by partition on-heap at the coordinator. The more possibly ++# stale results returned by the replicas, the more rows materialized during the query. ++replica_filtering_protection: ++ # These thresholds exist to limit the damage severely out-of-date replicas can cause during these ++ # queries. They limit the number of rows from all replicas individual index and filtering queries ++ # can materialize on-heap to return correct results at the desired read consistency level. ++ # ++ # "cached_replica_rows_warn_threshold" is the per-query threshold at which a warning will be logged. ++ # "cached_replica_rows_fail_threshold" is the per-query threshold at which the query will fail. ++ # ++ # These thresholds may also be adjusted at runtime using the StorageService mbean. ++ # ++ # If the failure threshold is breached, it is likely that either the current page/fetch size ++ # is too large or one or more replicas is severely out-of-sync and in need of repair. ++ cached_rows_warn_threshold: 2000 ++ cached_rows_fail_threshold: 32000 ++ +# Log WARN on any multiple-partition batch size exceeding this value. 5kb per batch by default. +# Caution should be taken on increasing the size of this threshold as it can lead to node instability. +batch_size_warn_threshold_in_kb: 5 + +# Fail any multiple-partition batch exceeding this value. 50kb (10x warn threshold) by default. +batch_size_fail_threshold_in_kb: 50 + +# Log WARN on any batches not of type LOGGED than span across more partitions than this limit +unlogged_batch_across_partitions_warn_threshold: 10 + +# Log a warning when compacting partitions larger than this value +compaction_large_partition_warning_threshold_mb: 100 + +# GC Pauses greater than gc_warn_threshold_in_ms will be logged at WARN level +# Adjust the threshold based on your application throughput requirement +# By default, Cassandra logs GC Pauses greater than 200 ms at INFO level +gc_warn_threshold_in_ms: 1000 + # Maximum size of any value in SSTables. Safety measure to detect SSTable corruption # early. Any value size larger than this threshold will result into marking an SSTable # as corrupted. This should be positive and less than 2048. diff --cc doc/source/operating/metrics.rst index 04abb48,0000000..4bd0c08 mode 100644,000000..100644 --- a/doc/source/operating/metrics.rst +++ b/doc/source/operating/metrics.rst @@@ -1,706 -1,0 +1,710 @@@ +.. Licensed to the Apache Software Foundation (ASF) under one +.. or more contributor license agreements. See the NOTICE file +.. distributed with this work for additional information +.. regarding copyright ownership. The ASF licenses this file +.. to you under the Apache License, Version 2.0 (the +.. "License"); you may not use this file except in compliance +.. with the License. You may obtain a copy of the License at +.. +.. http://www.apache.org/licenses/LICENSE-2.0 +.. +.. Unless required by applicable law or agreed to in writing, software +.. distributed under the License is distributed on an "AS IS" BASIS, +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +.. See the License for the specific language governing permissions and +.. limitations under the License. + +.. highlight:: none + +Monitoring +---------- + +Metrics in Cassandra are managed using the `Dropwizard Metrics <http://metrics.dropwizard.io>`__ library. These metrics +can be queried via JMX or pushed to external monitoring systems using a number of `built in +<http://metrics.dropwizard.io/3.1.0/getting-started/#other-reporting>`__ and `third party +<http://metrics.dropwizard.io/3.1.0/manual/third-party/>`__ reporter plugins. + +Metrics are collected for a single node. It's up to the operator to use an external monitoring system to aggregate them. + +Metric Types +^^^^^^^^^^^^ +All metrics reported by cassandra fit into one of the following types. + +``Gauge`` + An instantaneous measurement of a value. + +``Counter`` + A gauge for an ``AtomicLong`` instance. Typically this is consumed by monitoring the change since the last call to + see if there is a large increase compared to the norm. + +``Histogram`` + Measures the statistical distribution of values in a stream of data. + + In addition to minimum, maximum, mean, etc., it also measures median, 75th, 90th, 95th, 98th, 99th, and 99.9th + percentiles. + +``Timer`` + Measures both the rate that a particular piece of code is called and the histogram of its duration. + +``Latency`` + Special type that tracks latency (in microseconds) with a ``Timer`` plus a ``Counter`` that tracks the total latency + accrued since starting. The former is useful if you track the change in total latency since the last check. Each + metric name of this type will have 'Latency' and 'TotalLatency' appended to it. + +``Meter`` + A meter metric which measures mean throughput and one-, five-, and fifteen-minute exponentially-weighted moving + average throughputs. + +Table Metrics +^^^^^^^^^^^^^ + +Each table in Cassandra has metrics responsible for tracking its state and performance. + +The metric names are all appended with the specific ``Keyspace`` and ``Table`` name. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.Table.<MetricName>.<Keyspace>.<Table>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Table keyspace=<Keyspace> scope=<Table> name=<MetricName>`` + +.. NOTE:: + There is a special table called '``all``' without a keyspace. This represents the aggregation of metrics across + **all** tables and keyspaces on the node. + + - ======================================= ============== =========== - Name Type Description - ======================================= ============== =========== - MemtableOnHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **on**-heap, including column related overhead and partitions overwritten. - MemtableOffHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **off**-heap, including column related overhead and partitions overwritten. - MemtableLiveDataSize Gauge<Long> Total amount of live data stored in the memtable, excluding any data structure overhead. - AllMemtablesOnHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **on**-heap. - AllMemtablesOffHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **off**-heap. - AllMemtablesLiveDataSize Gauge<Long> Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead. - MemtableColumnsCount Gauge<Long> Total number of columns present in the memtable. - MemtableSwitchCount Counter Number of times flush has resulted in the memtable being switched out. - CompressionRatio Gauge<Double> Current compression ratio for all SSTables. - EstimatedPartitionSizeHistogram Gauge<long[]> Histogram of estimated partition size (in bytes). - EstimatedPartitionCount Gauge<Long> Approximate number of keys in table. - EstimatedColumnCountHistogram Gauge<long[]> Histogram of estimated number of columns. - SSTablesPerReadHistogram Histogram Histogram of the number of sstable data files accessed per single partition read. SSTables skipped due to Bloom Filters, min-max key or partition index lookup are not taken into acoount. - ReadLatency Latency Local read latency for this table. - RangeLatency Latency Local range scan latency for this table. - WriteLatency Latency Local write latency for this table. - CoordinatorReadLatency Timer Coordinator read latency for this table. - CoordinatorScanLatency Timer Coordinator range scan latency for this table. - PendingFlushes Counter Estimated number of flush tasks pending for this table. - BytesFlushed Counter Total number of bytes flushed since server [re]start. - CompactionBytesWritten Counter Total number of bytes written by compaction since server [re]start. - PendingCompactions Gauge<Integer> Estimate of number of pending compactions for this table. - LiveSSTableCount Gauge<Integer> Number of SSTables on disk for this table. - LiveDiskSpaceUsed Counter Disk space used by SSTables belonging to this table (in bytes). - TotalDiskSpaceUsed Counter Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd. - MinPartitionSize Gauge<Long> Size of the smallest compacted partition (in bytes). - MaxPartitionSize Gauge<Long> Size of the largest compacted partition (in bytes). - MeanPartitionSize Gauge<Long> Size of the average compacted partition (in bytes). - BloomFilterFalsePositives Gauge<Long> Number of false positives on table's bloom filter. - BloomFilterFalseRatio Gauge<Double> False positive ratio of table's bloom filter. - BloomFilterDiskSpaceUsed Gauge<Long> Disk space used by bloom filter (in bytes). - BloomFilterOffHeapMemoryUsed Gauge<Long> Off-heap memory used by bloom filter. - IndexSummaryOffHeapMemoryUsed Gauge<Long> Off-heap memory used by index summary. - CompressionMetadataOffHeapMemoryUsed Gauge<Long> Off-heap memory used by compression meta data. - KeyCacheHitRate Gauge<Double> Key cache hit rate for this table. - TombstoneScannedHistogram Histogram Histogram of tombstones scanned in queries on this table. - LiveScannedHistogram Histogram Histogram of live cells scanned in queries on this table. - ColUpdateTimeDeltaHistogram Histogram Histogram of column update time delta on this table. - ViewLockAcquireTime Timer Time taken acquiring a partition lock for materialized view updates on this table. - ViewReadTime Timer Time taken during the local read of a materialized view update. - TrueSnapshotsSize Gauge<Long> Disk space used by snapshots of this table including all SSTable components. - RowCacheHitOutOfRange Counter Number of table row cache hits that do not satisfy the query filter, thus went to disk. - RowCacheHit Counter Number of table row cache hits. - RowCacheMiss Counter Number of table row cache misses. - CasPrepare Latency Latency of paxos prepare round. - CasPropose Latency Latency of paxos propose round. - CasCommit Latency Latency of paxos commit round. - PercentRepaired Gauge<Double> Percent of table data that is repaired on disk. - SpeculativeRetries Counter Number of times speculative retries were sent for this table. - WaitingOnFreeMemtableSpace Histogram Histogram of time spent waiting for free memtable space, either on- or off-heap. - DroppedMutations Counter Number of dropped mutations on this table. - ======================================= ============== =========== ++=============================================== ============== =========== ++Name Type Description ++=============================================== ============== =========== ++MemtableOnHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **on**-heap, including column related overhead and partitions overwritten. ++MemtableOffHeapSize Gauge<Long> Total amount of data stored in the memtable that resides **off**-heap, including column related overhead and partitions overwritten. ++MemtableLiveDataSize Gauge<Long> Total amount of live data stored in the memtable, excluding any data structure overhead. ++AllMemtablesOnHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **on**-heap. ++AllMemtablesOffHeapSize Gauge<Long> Total amount of data stored in the memtables (2i and pending flush memtables included) that resides **off**-heap. ++AllMemtablesLiveDataSize Gauge<Long> Total amount of live data stored in the memtables (2i and pending flush memtables included) that resides off-heap, excluding any data structure overhead. ++MemtableColumnsCount Gauge<Long> Total number of columns present in the memtable. ++MemtableSwitchCount Counter Number of times flush has resulted in the memtable being switched out. ++CompressionRatio Gauge<Double> Current compression ratio for all SSTables. ++EstimatedPartitionSizeHistogram Gauge<long[]> Histogram of estimated partition size (in bytes). ++EstimatedPartitionCount Gauge<Long> Approximate number of keys in table. ++EstimatedColumnCountHistogram Gauge<long[]> Histogram of estimated number of columns. ++SSTablesPerReadHistogram Histogram Histogram of the number of sstable data files accessed per single partition read. SSTables skipped due to Bloom Filters, min-max key or partition index lookup are not taken into acoount. ++ReadLatency Latency Local read latency for this table. ++RangeLatency Latency Local range scan latency for this table. ++WriteLatency Latency Local write latency for this table. ++CoordinatorReadLatency Timer Coordinator read latency for this table. ++CoordinatorScanLatency Timer Coordinator range scan latency for this table. ++PendingFlushes Counter Estimated number of flush tasks pending for this table. ++BytesFlushed Counter Total number of bytes flushed since server [re]start. ++CompactionBytesWritten Counter Total number of bytes written by compaction since server [re]start. ++PendingCompactions Gauge<Integer> Estimate of number of pending compactions for this table. ++LiveSSTableCount Gauge<Integer> Number of SSTables on disk for this table. ++LiveDiskSpaceUsed Counter Disk space used by SSTables belonging to this table (in bytes). ++TotalDiskSpaceUsed Counter Total disk space used by SSTables belonging to this table, including obsolete ones waiting to be GC'd. ++MinPartitionSize Gauge<Long> Size of the smallest compacted partition (in bytes). ++MaxPartitionSize Gauge<Long> Size of the largest compacted partition (in bytes). ++MeanPartitionSize Gauge<Long> Size of the average compacted partition (in bytes). ++BloomFilterFalsePositives Gauge<Long> Number of false positives on table's bloom filter. ++BloomFilterFalseRatio Gauge<Double> False positive ratio of table's bloom filter. ++BloomFilterDiskSpaceUsed Gauge<Long> Disk space used by bloom filter (in bytes). ++BloomFilterOffHeapMemoryUsed Gauge<Long> Off-heap memory used by bloom filter. ++IndexSummaryOffHeapMemoryUsed Gauge<Long> Off-heap memory used by index summary. ++CompressionMetadataOffHeapMemoryUsed Gauge<Long> Off-heap memory used by compression meta data. ++KeyCacheHitRate Gauge<Double> Key cache hit rate for this table. ++TombstoneScannedHistogram Histogram Histogram of tombstones scanned in queries on this table. ++LiveScannedHistogram Histogram Histogram of live cells scanned in queries on this table. ++ColUpdateTimeDeltaHistogram Histogram Histogram of column update time delta on this table. ++ViewLockAcquireTime Timer Time taken acquiring a partition lock for materialized view updates on this table. ++ViewReadTime Timer Time taken during the local read of a materialized view update. ++TrueSnapshotsSize Gauge<Long> Disk space used by snapshots of this table including all SSTable components. ++RowCacheHitOutOfRange Counter Number of table row cache hits that do not satisfy the query filter, thus went to disk. ++RowCacheHit Counter Number of table row cache hits. ++RowCacheMiss Counter Number of table row cache misses. ++CasPrepare Latency Latency of paxos prepare round. ++CasPropose Latency Latency of paxos propose round. ++CasCommit Latency Latency of paxos commit round. ++PercentRepaired Gauge<Double> Percent of table data that is repaired on disk. ++SpeculativeRetries Counter Number of times speculative retries were sent for this table. ++WaitingOnFreeMemtableSpace Histogram Histogram of time spent waiting for free memtable space, either on- or off-heap. ++DroppedMutations Counter Number of dropped mutations on this table. ++ReadRepairRequests Meter Throughput for mutations generated by read-repair. ++ShortReadProtectionRequests Meter Throughput for requests to get extra rows during short read protection. ++ReplicaFilteringProtectionRequests Meter Throughput for row completion requests during replica filtering protection. ++ReplicaFilteringProtectionRowsCachedPerQuery Histogram Histogram of the number of rows cached per query when replica filtering protection is engaged. ++============================================ ============== =========== + +Keyspace Metrics +^^^^^^^^^^^^^^^^ +Each keyspace in Cassandra has metrics responsible for tracking its state and performance. + +These metrics are the same as the ``Table Metrics`` above, only they are aggregated at the Keyspace level. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.keyspace.<MetricName>.<Keyspace>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Keyspace scope=<Keyspace> name=<MetricName>`` + +ThreadPool Metrics +^^^^^^^^^^^^^^^^^^ + +Cassandra splits work of a particular type into its own thread pool. This provides back-pressure and asynchrony for +requests on a node. It's important to monitor the state of these thread pools since they can tell you how saturated a +node is. + +The metric names are all appended with the specific ``ThreadPool`` name. The thread pools are also categorized under a +specific type. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.ThreadPools.<MetricName>.<Path>.<ThreadPoolName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=ThreadPools scope=<ThreadPoolName> type=<Type> name=<MetricName>`` + +===================== ============== =========== +Name Type Description +===================== ============== =========== +ActiveTasks Gauge<Integer> Number of tasks being actively worked on by this pool. +PendingTasks Gauge<Integer> Number of queued tasks queued up on this pool. +CompletedTasks Counter Number of tasks completed. +TotalBlockedTasks Counter Number of tasks that were blocked due to queue saturation. +CurrentlyBlockedTask Counter Number of tasks that are currently blocked due to queue saturation but on retry will become unblocked. +MaxPoolSize Gauge<Integer> The maximum number of threads in this pool. +===================== ============== =========== + +The following thread pools can be monitored. + +============================ ============== =========== +Name Type Description +============================ ============== =========== +Native-Transport-Requests transport Handles client CQL requests +CounterMutationStage request Responsible for counter writes +ViewMutationStage request Responsible for materialized view writes +MutationStage request Responsible for all other writes +ReadRepairStage request ReadRepair happens on this thread pool +ReadStage request Local reads run on this thread pool +RequestResponseStage request Coordinator requests to the cluster run on this thread pool +AntiEntropyStage internal Builds merkle tree for repairs +CacheCleanupExecutor internal Cache maintenance performed on this thread pool +CompactionExecutor internal Compactions are run on these threads +GossipStage internal Handles gossip requests +HintsDispatcher internal Performs hinted handoff +InternalResponseStage internal Responsible for intra-cluster callbacks +MemtableFlushWriter internal Writes memtables to disk +MemtablePostFlush internal Cleans up commit log after memtable is written to disk +MemtableReclaimMemory internal Memtable recycling +MigrationStage internal Runs schema migrations +MiscStage internal Misceleneous tasks run here +PendingRangeCalculator internal Calculates token range +PerDiskMemtableFlushWriter_0 internal Responsible for writing a spec (there is one of these per disk 0-N) +Sampler internal Responsible for re-sampling the index summaries of SStables +SecondaryIndexManagement internal Performs updates to secondary indexes +ValidationExecutor internal Performs validation compaction or scrubbing +============================ ============== =========== + +.. |nbsp| unicode:: 0xA0 .. nonbreaking space + +Client Request Metrics +^^^^^^^^^^^^^^^^^^^^^^ + +Client requests have their own set of metrics that encapsulate the work happening at coordinator level. + +Different types of client requests are broken down by ``RequestType``. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.ClientRequest.<MetricName>.<RequestType>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=ClientRequest scope=<RequestType> name=<MetricName>`` + + +:RequestType: CASRead +:Description: Metrics related to transactional read requests. +:Metrics: + ===================== ============== ============================================================= + Name Type Description + ===================== ============== ============================================================= + Timeouts Counter Number of timeouts encountered. + Failures Counter Number of transaction failures encountered. + |nbsp| Latency Transaction read latency. + Unavailables Counter Number of unavailable exceptions encountered. + UnfinishedCommit Counter Number of transactions that were committed on read. + ConditionNotMet Counter Number of transaction preconditions did not match current values. + ContentionHistogram Histogram How many contended reads were encountered + ===================== ============== ============================================================= + +:RequestType: CASWrite +:Description: Metrics related to transactional write requests. +:Metrics: + ===================== ============== ============================================================= + Name Type Description + ===================== ============== ============================================================= + Timeouts Counter Number of timeouts encountered. + Failures Counter Number of transaction failures encountered. + |nbsp| Latency Transaction write latency. + UnfinishedCommit Counter Number of transactions that were committed on write. + ConditionNotMet Counter Number of transaction preconditions did not match current values. + ContentionHistogram Histogram How many contended writes were encountered + ===================== ============== ============================================================= + + +:RequestType: Read +:Description: Metrics related to standard read requests. +:Metrics: + ===================== ============== ============================================================= + Name Type Description + ===================== ============== ============================================================= + Timeouts Counter Number of timeouts encountered. + Failures Counter Number of read failures encountered. + |nbsp| Latency Read latency. + Unavailables Counter Number of unavailable exceptions encountered. + ===================== ============== ============================================================= + +:RequestType: RangeSlice +:Description: Metrics related to token range read requests. +:Metrics: + ===================== ============== ============================================================= + Name Type Description + ===================== ============== ============================================================= + Timeouts Counter Number of timeouts encountered. + Failures Counter Number of range query failures encountered. + |nbsp| Latency Range query latency. + Unavailables Counter Number of unavailable exceptions encountered. + ===================== ============== ============================================================= + +:RequestType: Write +:Description: Metrics related to regular write requests. +:Metrics: + ===================== ============== ============================================================= + Name Type Description + ===================== ============== ============================================================= + Timeouts Counter Number of timeouts encountered. + Failures Counter Number of write failures encountered. + |nbsp| Latency Write latency. + Unavailables Counter Number of unavailable exceptions encountered. + ===================== ============== ============================================================= + + +:RequestType: ViewWrite +:Description: Metrics related to materialized view write wrtes. +:Metrics: + ===================== ============== ============================================================= + Timeouts Counter Number of timeouts encountered. + Failures Counter Number of transaction failures encountered. + Unavailables Counter Number of unavailable exceptions encountered. + ViewReplicasAttempted Counter Total number of attempted view replica writes. + ViewReplicasSuccess Counter Total number of succeded view replica writes. + ViewPendingMutations Gauge<Long> ViewReplicasAttempted - ViewReplicasSuccess. + ViewWriteLatency Timer Time between when mutation is applied to base table and when CL.ONE is achieved on view. + ===================== ============== ============================================================= + +Cache Metrics +^^^^^^^^^^^^^ + +Cassandra caches have metrics to track the effectivness of the caches. Though the ``Table Metrics`` might be more useful. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.Cache.<MetricName>.<CacheName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Cache scope=<CacheName> name=<MetricName>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +Capacity Gauge<Long> Cache capacity in bytes. +Entries Gauge<Integer> Total number of cache entries. +FifteenMinuteCacheHitRate Gauge<Double> 15m cache hit rate. +FiveMinuteCacheHitRate Gauge<Double> 5m cache hit rate. +OneMinuteCacheHitRate Gauge<Double> 1m cache hit rate. +HitRate Gauge<Double> All time cache hit rate. +Hits Meter Total number of cache hits. +Misses Meter Total number of cache misses. +MissLatency Timer Latency of misses. +Requests Gauge<Long> Total number of cache requests. +Size Gauge<Long> Total size of occupied cache, in bytes. +========================== ============== =========== + +The following caches are covered: + +============================ =========== +Name Description +============================ =========== +CounterCache Keeps hot counters in memory for performance. +ChunkCache In process uncompressed page cache. +KeyCache Cache for partition to sstable offsets. +RowCache Cache for rows kept in memory. +============================ =========== + +.. NOTE:: + Misses and MissLatency are only defined for the ChunkCache + +CQL Metrics +^^^^^^^^^^^ + +Metrics specific to CQL prepared statement caching. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.CQL.<MetricName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=CQL name=<MetricName>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +PreparedStatementsCount Gauge<Integer> Number of cached prepared statements. +PreparedStatementsEvicted Counter Number of prepared statements evicted from the prepared statement cache +PreparedStatementsExecuted Counter Number of prepared statements executed. +RegularStatementsExecuted Counter Number of **non** prepared statements executed. +PreparedStatementsRatio Gauge<Double> Percentage of statements that are prepared vs unprepared. +========================== ============== =========== + + +DroppedMessage Metrics +^^^^^^^^^^^^^^^^^^^^^^ + +Metrics specific to tracking dropped messages for different types of requests. +Dropped writes are stored and retried by ``Hinted Handoff`` + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.DroppedMessages.<MetricName>.<Type>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=DroppedMetrics scope=<Type> name=<MetricName>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +CrossNodeDroppedLatency Timer The dropped latency across nodes. +InternalDroppedLatency Timer The dropped latency within node. +Dropped Meter Number of dropped messages. +========================== ============== =========== + +The different types of messages tracked are: + +============================ =========== +Name Description +============================ =========== +BATCH_STORE Batchlog write +BATCH_REMOVE Batchlog cleanup (after succesfully applied) +COUNTER_MUTATION Counter writes +HINT Hint replay +MUTATION Regular writes +READ Regular reads +READ_REPAIR Read repair +PAGED_SLICE Paged read +RANGE_SLICE Token range read +REQUEST_RESPONSE RPC Callbacks +_TRACE Tracing writes +============================ =========== + +Streaming Metrics +^^^^^^^^^^^^^^^^^ + +Metrics reported during ``Streaming`` operations, such as repair, bootstrap, rebuild. + +These metrics are specific to a peer endpoint, with the source node being the node you are pulling the metrics from. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.Streaming.<MetricName>.<PeerIP>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Streaming scope=<PeerIP> name=<MetricName>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +IncomingBytes Counter Number of bytes streamed to this node from the peer. +OutgoingBytes Counter Number of bytes streamed to the peer endpoint from this node. +========================== ============== =========== + + +Compaction Metrics +^^^^^^^^^^^^^^^^^^ + +Metrics specific to ``Compaction`` work. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.Compaction.<MetricName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Compaction name=<MetricName>`` + +========================== ======================================== =============================================== +Name Type Description +========================== ======================================== =============================================== +BytesCompacted Counter Total number of bytes compacted since server [re]start. +PendingTasks Gauge<Integer> Estimated number of compactions remaining to perform. +CompletedTasks Gauge<Long> Number of completed compactions since server [re]start. +TotalCompactionsCompleted Meter Throughput of completed compactions since server [re]start. +PendingTasksByTableName Gauge<Map<String, Map<String, Integer>>> Estimated number of compactions remaining to perform, grouped by keyspace and then table name. This info is also kept in ``Table Metrics``. +========================== ======================================== =============================================== + +CommitLog Metrics +^^^^^^^^^^^^^^^^^ + +Metrics specific to the ``CommitLog`` + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.CommitLog.<MetricName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=CommitLog name=<MetricName>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +CompletedTasks Gauge<Long> Total number of commit log messages written since [re]start. +PendingTasks Gauge<Long> Number of commit log messages written but yet to be fsync'd. +TotalCommitLogSize Gauge<Long> Current size, in bytes, used by all the commit log segments. +WaitingOnSegmentAllocation Timer Time spent waiting for a CommitLogSegment to be allocated - under normal conditions this should be zero. +WaitingOnCommit Timer The time spent waiting on CL fsync; for Periodic this is only occurs when the sync is lagging its sync interval. +========================== ============== =========== + +Storage Metrics +^^^^^^^^^^^^^^^ + +Metrics specific to the storage engine. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.Storage.<MetricName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Storage name=<MetricName>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +Exceptions Counter Number of internal exceptions caught. Under normal exceptions this should be zero. +Load Counter Size, in bytes, of the on disk data size this node manages. +TotalHints Counter Number of hint messages written to this node since [re]start. Includes one entry for each host to be hinted per hint. +TotalHintsInProgress Counter Number of hints attemping to be sent currently. +========================== ============== =========== + +HintedHandoff Metrics +^^^^^^^^^^^^^^^^^^^^^ + +Metrics specific to Hinted Handoff. There are also some metrics related to hints tracked in ``Storage Metrics`` + +These metrics include the peer endpoint **in the metric name** + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.HintedHandOffManager.<MetricName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=HintedHandOffManager name=<MetricName>`` + +=========================== ============== =========== +Name Type Description +=========================== ============== =========== +Hints_created-<PeerIP> Counter Number of hints on disk for this peer. +Hints_not_stored-<PeerIP> Counter Number of hints not stored for this peer, due to being down past the configured hint window. +=========================== ============== =========== + +SSTable Index Metrics +^^^^^^^^^^^^^^^^^^^^^ + +Metrics specific to the SSTable index metadata. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.Index.<MetricName>.RowIndexEntry`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Index scope=RowIndexEntry name=<MetricName>`` + +=========================== ============== =========== +Name Type Description +=========================== ============== =========== +IndexedEntrySize Histogram Histogram of the on-heap size, in bytes, of the index across all SSTables. +IndexInfoCount Histogram Histogram of the number of on-heap index entries managed across all SSTables. +IndexInfoGets Histogram Histogram of the number index seeks performed per SSTable. +=========================== ============== =========== + +BufferPool Metrics +^^^^^^^^^^^^^^^^^^ + +Metrics specific to the internal recycled buffer pool Cassandra manages. This pool is meant to keep allocations and GC +lower by recycling on and off heap buffers. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.BufferPool.<MetricName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=BufferPool name=<MetricName>`` + +=========================== ============== =========== +Name Type Description +=========================== ============== =========== +Size Gauge<Long> Size, in bytes, of the managed buffer pool +Misses Meter The rate of misses in the pool. The higher this is the more allocations incurred. +=========================== ============== =========== + + +Client Metrics +^^^^^^^^^^^^^^ + +Metrics specifc to client managment. + +Reported name format: + +**Metric Name** + ``org.apache.cassandra.metrics.Client.<MetricName>`` + +**JMX MBean** + ``org.apache.cassandra.metrics:type=Client name=<MetricName>`` + +=========================== ============== =========== +Name Type Description +=========================== ============== =========== +connectedNativeClients Counter Number of clients connected to this nodes native protocol server +connectedThriftClients Counter Number of clients connected to this nodes thrift protocol server +=========================== ============== =========== + +JVM Metrics +^^^^^^^^^^^ + +JVM metrics such as memory and garbage collection statistics can either be accessed by connecting to the JVM using JMX or can be exported using `Metric Reporters`_. + +BufferPool +++++++++++ + +**Metric Name** + ``jvm.buffers.<direct|mapped>.<MetricName>`` + +**JMX MBean** + ``java.nio:type=BufferPool name=<direct|mapped>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +Capacity Gauge<Long> Estimated total capacity of the buffers in this pool +Count Gauge<Long> Estimated number of buffers in the pool +Used Gauge<Long> Estimated memory that the Java virtual machine is using for this buffer pool +========================== ============== =========== + +FileDescriptorRatio ++++++++++++++++++++ + +**Metric Name** + ``jvm.fd.<MetricName>`` + +**JMX MBean** + ``java.lang:type=OperatingSystem name=<OpenFileDescriptorCount|MaxFileDescriptorCount>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +Usage Ratio Ratio of used to total file descriptors +========================== ============== =========== + +GarbageCollector +++++++++++++++++ + +**Metric Name** + ``jvm.gc.<gc_type>.<MetricName>`` + +**JMX MBean** + ``java.lang:type=GarbageCollector name=<gc_type>`` + +========================== ============== =========== +Name Type Description +========================== ============== =========== +Count Gauge<Long> Total number of collections that have occurred +Time Gauge<Long> Approximate accumulated collection elapsed time in milliseconds +========================== ============== =========== + +Memory +++++++ + +**Metric Name** + ``jvm.memory.<heap/non-heap/total>.<MetricName>`` + +**JMX MBean** + ``java.lang:type=Memory`` + +========================== ============== =========== +Committed Gauge<Long> Amount of memory in bytes that is committed for the JVM to use +Init Gauge<Long> Amount of memory in bytes that the JVM initially requests from the OS +Max Gauge<Long> Maximum amount of memory in bytes that can be used for memory management +Usage Ratio Ratio of used to maximum memory +Used Gauge<Long> Amount of used memory in bytes +========================== ============== =========== + +MemoryPool +++++++++++ + +**Metric Name** + ``jvm.memory.pools.<memory_pool>.<MetricName>`` + +**JMX MBean** + ``java.lang:type=MemoryPool name=<memory_pool>`` + +========================== ============== =========== +Committed Gauge<Long> Amount of memory in bytes that is committed for the JVM to use +Init Gauge<Long> Amount of memory in bytes that the JVM initially requests from the OS +Max Gauge<Long> Maximum amount of memory in bytes that can be used for memory management +Usage Ratio Ratio of used to maximum memory +Used Gauge<Long> Amount of used memory in bytes +========================== ============== =========== + +JMX +^^^ + +Any JMX based client can access metrics from cassandra. + +If you wish to access JMX metrics over http it's possible to download `Mx4jTool <http://mx4j.sourceforge.net/>`__ and +place ``mx4j-tools.jar`` into the classpath. On startup you will see in the log:: + + HttpAdaptor version 3.0.2 started on port 8081 + +To choose a different port (8081 is the default) or a different listen address (0.0.0.0 is not the default) edit +``conf/cassandra-env.sh`` and uncomment:: + + #MX4J_ADDRESS="-Dmx4jaddress=0.0.0.0" + + #MX4J_PORT="-Dmx4jport=8081" + + +Metric Reporters +^^^^^^^^^^^^^^^^ + +As mentioned at the top of this section on monitoring the Cassandra metrics can be exported to a number of monitoring +system a number of `built in <http://metrics.dropwizard.io/3.1.0/getting-started/#other-reporting>`__ and `third party +<http://metrics.dropwizard.io/3.1.0/manual/third-party/>`__ reporter plugins. + +The configuration of these plugins is managed by the `metrics reporter config project +<https://github.com/addthis/metrics-reporter-config>`__. There is a sample configuration file located at +``conf/metrics-reporter-config-sample.yaml``. + +Once configured, you simply start cassandra with the flag +``-Dcassandra.metricsReporterConfigFile=metrics-reporter-config.yaml``. The specified .yaml file plus any 3rd party +reporter jars must all be in Cassandra's classpath. diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java index a02539a,a551a78..5521bf7 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@@ -167,43 -152,18 +167,52 @@@ public class TableMetric public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write"); public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range"); + public final static Gauge<Double> globalPercentRepaired = Metrics.register(globalFactory.createMetricName("PercentRepaired"), + new Gauge<Double>() + { + public Double getValue() + { + double repaired = 0; + double total = 0; + for (String keyspace : Schema.instance.getNonSystemKeyspaces()) + { + Keyspace k = Schema.instance.getKeyspaceInstance(keyspace); + if (SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName())) + continue; + if (k.getReplicationStrategy().getReplicationFactor() < 2) + continue; + + for (ColumnFamilyStore cf : k.getColumnFamilyStores()) + { + if (!SecondaryIndexManager.isIndexColumnFamily(cf.name)) + { + for (SSTableReader sstable : cf.getSSTables(SSTableSet.CANONICAL)) + { + if (sstable.isRepaired()) + { + repaired += sstable.uncompressedLength(); + } + total += sstable.uncompressedLength(); + } + } + } + } + return total > 0 ? (repaired / total) * 100 : 100.0; + } + }); + public final Meter readRepairRequests; public final Meter shortReadProtectionRequests; - public final Meter replicaSideFilteringProtectionRequests; + + public final Meter replicaFilteringProtectionRequests; + + /** + * This histogram records the maximum number of rows {@link org.apache.cassandra.service.ReplicaFilteringProtection} + * caches at a point in time per query. With no replica divergence, this is equivalent to the maximum number of + * cached rows in a single partition during a query. It can be helpful when choosing appropriate values for the + * replica_filtering_protection thresholds in cassandra.yaml. + */ + public final Histogram rfpRowsCachedPerQuery; public final Map<Sampler, TopKSampler<ByteBuffer>> samplers; /** @@@ -820,34 -782,15 +830,41 @@@ register(name, alias, tableMeter); return tableMeter; } + + private Histogram createHistogram(String name, boolean considerZeroes) + { + Histogram histogram = Metrics.histogram(factory.createMetricName(name), aliasFactory.createMetricName(name), considerZeroes); + register(name, name, histogram); + return histogram; + } /** + * Computes the compression ratio for the specified SSTables + * + * @param sstables the SSTables + * @return the compression ratio for the specified SSTables + */ + private static Double computeCompressionRatio(Iterable<SSTableReader> sstables) + { + double compressedLengthSum = 0; + double dataLengthSum = 0; + for (SSTableReader sstable : sstables) + { + if (sstable.compression) + { + // We should not have any sstable which are in an open early mode as the sstable were selected + // using SSTableSet.CANONICAL. + assert sstable.openReason != SSTableReader.OpenReason.EARLY; + + CompressionMetadata compressionMetadata = sstable.getCompressionMetadata(); + compressedLengthSum += compressionMetadata.compressedFileLength; + dataLengthSum += compressionMetadata.dataLength; + } + } + return dataLengthSum != 0 ? compressedLengthSum / dataLengthSum : MetadataCollector.NO_COMPRESSION_RATIO; + } + + /** * Create a histogram-like interface that will register both a CF, keyspace and global level * histogram and forward any updates to both */ diff --cc src/java/org/apache/cassandra/service/DataResolver.java index 3a2d54d,1d0bb47..29fa003 --- a/src/java/org/apache/cassandra/service/DataResolver.java +++ b/src/java/org/apache/cassandra/service/DataResolver.java @@@ -52,14 -49,13 +52,14 @@@ public class DataResolver extends Respo Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations"); @VisibleForTesting - final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>()); + final List<AsyncOneResponse<?>> repairResults = Collections.synchronizedList(new ArrayList<>()); - + private final long queryStartNanoTime; private final boolean enforceStrictLiveness; - DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount) + DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime) { super(keyspace, command, consistency, maxResponseCount); + this.queryStartNanoTime = queryStartNanoTime; this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); } @@@ -185,7 -171,14 +185,15 @@@ // We need separate contexts, as each context has his own counter ResolveContext firstPhaseContext = new ResolveContext(count); ResolveContext secondPhaseContext = new ResolveContext(count); - ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, command, consistency, queryStartNanoTime, firstPhaseContext.sources); + + ReplicaFilteringProtection rfp = new ReplicaFilteringProtection(keyspace, + command, + consistency, ++ queryStartNanoTime, + firstPhaseContext.sources, + DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(), + DatabaseDescriptor.getCachedReplicaRowsFailThreshold()); + PartitionIterator firstPhasePartitions = resolveInternal(firstPhaseContext, rfp.mergeController(), i -> shortReadProtectedResponse(i, firstPhaseContext), @@@ -630,8 -604,11 +631,12 @@@ DataLimits.Counter singleResultCounter = command.limits().newCounter(command.nowInSec(), false, command.selectsFullPartition(), enforceStrictLiveness).onlyCount(); - ShortReadPartitionsProtection protection = - new ShortReadPartitionsProtection(source, singleResultCounter, mergedResultCounter, queryStartNanoTime); + // The pre-fetch callback used here makes the initial round of responses for this replica collectable. + ShortReadPartitionsProtection protection = new ShortReadPartitionsProtection(context.sources[i], + () -> responses.clearUnsafe(i), + singleResultCounter, - context.mergedResultCounter); ++ context.mergedResultCounter, ++ queryStartNanoTime); /* * The order of extention and transformations is important here. Extending with more partitions has to happen @@@ -676,17 -654,15 +682,19 @@@ private boolean partitionsFetched; // whether we've seen any new partitions since iteration start or last moreContents() call + private final long queryStartNanoTime; + private ShortReadPartitionsProtection(InetAddress source, + Runnable preFetchCallback, DataLimits.Counter singleResultCounter, - DataLimits.Counter mergedResultCounter) + DataLimits.Counter mergedResultCounter, + long queryStartNanoTime) { this.source = source; + this.preFetchCallback = preFetchCallback; this.singleResultCounter = singleResultCounter; this.mergedResultCounter = mergedResultCounter; + this.queryStartNanoTime = queryStartNanoTime; } @Override diff --cc src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java index 428c603,0a57e66..f764439 --- a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java +++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java @@@ -87,35 -94,36 +95,38 @@@ class ReplicaFilteringProtectio private final InetAddress[] sources; private final TableMetrics tableMetrics; - /** - * Per-source primary keys of the rows that might be outdated so they need to be fetched. - * For outdated static rows we use an empty builder to signal it has to be queried. - */ - private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> rowsToFetch; + private final int cachedRowsWarnThreshold; + private final int cachedRowsFailThreshold; - ++ + /** Tracks whether or not we've already hit the warning threshold while evaluating a partition. */ + private boolean hitWarningThreshold = false; + + private int currentRowsCached = 0; // tracks the current number of cached rows + private int maxRowsCached = 0; // tracks the high watermark for the number of cached rows /** - * Per-source list of all the partitions seen by the merge listener, to be merged with the extra fetched rows. + * Per-source list of the pending partitions seen by the merge listener, to be merged with the extra fetched rows. */ - private final List<List<PartitionBuilder>> originalPartitions; + private final List<Queue<PartitionBuilder>> originalPartitions; ReplicaFilteringProtection(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, + long queryStartNanoTime, - InetAddress[] sources) + InetAddress[] sources, + int cachedRowsWarnThreshold, + int cachedRowsFailThreshold) { this.keyspace = keyspace; this.command = command; this.consistency = consistency; + this.queryStartNanoTime = queryStartNanoTime; this.sources = sources; - this.rowsToFetch = new ArrayList<>(sources.length); this.originalPartitions = new ArrayList<>(sources.length); - for (InetAddress ignored : sources) + for (int i = 0; i < sources.length; i++) { - rowsToFetch.add(new TreeMap<>()); - originalPartitions.add(new ArrayList<>()); + originalPartitions.add(new ArrayDeque<>()); } tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId); @@@ -220,78 -161,119 +164,119 @@@ */ UnfilteredPartitionIterators.MergeListener mergeController() { - return (partitionKey, versions) -> { + return new UnfilteredPartitionIterators.MergeListener() + { + @Override + public void close() + { + // If we hit the failure threshold before consuming a single partition, record the current rows cached. + tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, maxRowsCached)); + } - PartitionBuilder[] builders = new PartitionBuilder[sources.length]; + @Override + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + PartitionBuilder[] builders = new PartitionBuilder[sources.length]; + PartitionColumns columns = columns(versions); + EncodingStats stats = EncodingStats.merge(versions, NULL_TO_NO_STATS); - + - for (int i = 0; i < sources.length; i++) - builders[i] = new PartitionBuilder(partitionKey, columns(versions), stats(versions)); + for (int i = 0; i < sources.length; i++) + builders[i] = new PartitionBuilder(partitionKey, sources[i], columns, stats); - return new UnfilteredRowIterators.MergeListener() - { - @Override - public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + return new UnfilteredRowIterators.MergeListener() { - // cache the deletion time versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].setDeletionTime(versions[i]); - } + @Override + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions) + { + // cache the deletion time versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].setDeletionTime(versions[i]); + } - @Override - public Row onMergedRows(Row merged, Row[] versions) - { - // cache the row versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].addRow(versions[i]); + @Override + public Row onMergedRows(Row merged, Row[] versions) + { + // cache the row versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].addRow(versions[i]); - if (merged.isEmpty()) - return merged; + if (merged.isEmpty()) + return merged; - boolean isPotentiallyOutdated = false; - boolean isStatic = merged.isStatic(); - for (int i = 0; i < versions.length; i++) - { - Row version = versions[i]; - if (version == null || (isStatic && version.isEmpty())) + boolean isPotentiallyOutdated = false; + boolean isStatic = merged.isStatic(); + for (int i = 0; i < versions.length; i++) { - isPotentiallyOutdated = true; - BTreeSet.Builder<Clustering> toFetch = getOrCreateToFetch(i, partitionKey); - // Note that for static, we shouldn't add the clustering to the clustering set (the - // ClusteringIndexNamesFilter we'll build from this later does not expect it), but the fact - // we created a builder in the first place will act as a marker that the static row must be - // fetched, even if no other rows are added for this partition. - if (!isStatic) - toFetch.add(merged.clustering()); + Row version = versions[i]; + if (version == null || (isStatic && version.isEmpty())) + { + isPotentiallyOutdated = true; + builders[i].addToFetch(merged); + } } - } - // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be - // an outdated result that is only present because other replica have filtered the up-to-date result - // out), then we skip the row. In other words, the results of the initial merging of results by this - // protection assume the worst case scenario where every row that might be outdated actually is. - // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed - // to look at enough data to ultimately fulfill the query limit. - return isPotentiallyOutdated ? null : merged; - } + // If the row is potentially outdated (because some replica didn't send anything and so it _may_ be + // an outdated result that is only present because other replica have filtered the up-to-date result + // out), then we skip the row. In other words, the results of the initial merging of results by this + // protection assume the worst case scenario where every row that might be outdated actually is. + // This ensures that during this first phase (collecting additional row to fetch) we are guaranteed + // to look at enough data to ultimately fulfill the query limit. + return isPotentiallyOutdated ? null : merged; + } - @Override - public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) - { - // cache the marker versions to be able to regenerate the original row iterator - for (int i = 0; i < versions.length; i++) - builders[i].addRangeTombstoneMarker(versions[i]); - } + @Override + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions) + { + // cache the marker versions to be able to regenerate the original row iterator + for (int i = 0; i < versions.length; i++) + builders[i].addRangeTombstoneMarker(versions[i]); + } - @Override - public void close() - { - for (int i = 0; i < sources.length; i++) - originalPartitions.get(i).add(builders[i]); - } - }; + @Override + public void close() + { + for (int i = 0; i < sources.length; i++) + originalPartitions.get(i).add(builders[i]); + } + }; + } }; } + private void incrementCachedRows() + { + currentRowsCached++; - ++ + if (currentRowsCached == cachedRowsFailThreshold + 1) + { + String message = String.format("Replica filtering protection has cached over %d rows during query %s. " + + "(See 'cached_replica_rows_fail_threshold' in cassandra.yaml.)", + cachedRowsFailThreshold, command.toCQLString()); + + logger.error(message); + Tracing.trace(message); + throw new OverloadedException(message); + } + else if (currentRowsCached == cachedRowsWarnThreshold + 1 && !hitWarningThreshold) + { + hitWarningThreshold = true; - ++ + String message = String.format("Replica filtering protection has cached over %d rows during query %s. " + + "(See 'cached_replica_rows_warn_threshold' in cassandra.yaml.)", + cachedRowsWarnThreshold, command.toCQLString()); + + ClientWarn.instance.warn(message); + oneMinuteLogger.warn(message); + Tracing.trace(message); + } + } + + private void releaseCachedRows(int count) + { + maxRowsCached = Math.max(maxRowsCached, currentRowsCached); + currentRowsCached -= count; + } + private static PartitionColumns columns(List<UnfilteredRowIterator> versions) { Columns statics = Columns.NONE; @@@ -348,7 -322,15 +325,15 @@@ @Override public boolean hasNext() { - return iterator.hasNext(); + // If there are no cached partition builders for this source, advance the first phase iterator, which + // will force the RFP merge listener to load at least the next protected partition. Note that this may + // load more than one partition if any divergence between replicas is discovered by the merge listener. + if (partitions.isEmpty()) + { + PartitionIterators.consumeNext(merged); + } - ++ + return !partitions.isEmpty(); } @Override @@@ -464,5 -469,66 +472,66 @@@ } }; } + + private UnfilteredRowIterator protectedPartition() + { + UnfilteredRowIterator original = originalPartition(); + + if (toFetch != null) + { + try (UnfilteredPartitionIterator partitions = fetchFromSource()) + { + if (partitions.hasNext()) + { + try (UnfilteredRowIterator fetchedRows = partitions.next()) + { + return UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows), command.nowInSec()); + } + } + } + } + + return original; + } + + private UnfilteredPartitionIterator fetchFromSource() + { + assert toFetch != null; + + NavigableSet<Clustering> clusterings = toFetch.build(); + tableMetrics.replicaFilteringProtectionRequests.mark(); - ++ + if (logger.isTraceEnabled()) + logger.trace("Requesting rows {} in partition {} from {} for replica filtering protection", + clusterings, key, source); - ++ + Tracing.trace("Requesting {} rows in partition {} from {} for replica filtering protection", + clusterings.size(), key, source); + + // build the read command taking into account that we could be requesting only in the static row + DataLimits limits = clusterings.isEmpty() ? DataLimits.cqlLimits(1) : DataLimits.NONE; + ClusteringIndexFilter filter = new ClusteringIndexNamesFilter(clusterings, command.isReversed()); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand.create(command.metadata(), + command.nowInSec(), + command.columnFilter(), + RowFilter.NONE, + limits, + key, + filter); + try + { + return executeReadCommand(cmd, source); + } + catch (ReadTimeoutException e) + { + int blockFor = consistency.blockFor(keyspace); + throw new ReadTimeoutException(consistency, blockFor - 1, blockFor, true); + } + catch (UnavailableException e) + { + int blockFor = consistency.blockFor(keyspace); + throw new UnavailableException(consistency, blockFor, blockFor - 1); + } + } } } diff --cc test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java index 329fa37,51a08e6..c449fdd --- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java @@@ -39,9 -39,10 +39,10 @@@ import org.apache.cassandra.distributed import org.apache.cassandra.distributed.api.QueryResults; import org.apache.cassandra.distributed.api.SimpleQueryResult; import org.apache.cassandra.service.ClientState; + import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.pager.QueryPager; -import org.apache.cassandra.transport.Server; +import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.transport.messages.ResultMessage; import org.apache.cassandra.utils.ByteBufferUtil; @@@ -98,9 -104,12 +104,13 @@@ public class Coordinator implements ICo Integer.MAX_VALUE, null, null, - Server.CURRENT_VERSION)); + ProtocolVersion.CURRENT), + System.nanoTime()); + // Collect warnings reported during the query. + if (res != null) + res.setWarnings(ClientWarn.instance.getWarnings()); + return RowUtil.toQueryResult(res); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
