This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 2ef1f1c150e3d3f297e86c2b2efedd964a43b3c9
Merge: 86b7727 e5c3d08
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Jul 30 13:27:53 2020 +0100

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 build.xml                                          |   2 +-
 conf/cassandra.yaml                                |  20 +
 doc/source/operating/metrics.rst                   | 114 +++---
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  20 +
 .../config/ReplicaFilteringProtectionOptions.java  |  28 ++
 .../db/partitions/PartitionIterators.java          |  48 ++-
 .../apache/cassandra/db/rows/EncodingStats.java    |  24 ++
 .../org/apache/cassandra/metrics/TableMetrics.java |  21 +-
 .../org/apache/cassandra/service/DataResolver.java |  87 +++--
 .../service/ReplicaFilteringProtection.java        | 421 ++++++++++++---------
 .../apache/cassandra/service/StorageService.java   |  27 +-
 .../cassandra/service/StorageServiceMBean.java     |  12 +
 .../org/apache/cassandra/utils/FBUtilities.java    |   4 +-
 .../cassandra/utils/concurrent/Accumulator.java    |   9 +-
 .../cassandra/distributed/impl/Coordinator.java    |  10 +
 .../apache/cassandra/distributed/impl/RowUtil.java |   7 +-
 .../test/ReplicaFilteringProtectionTest.java       | 244 ++++++++++++
 .../cassandra/db/rows/EncodingStatsTest.java       | 145 +++++++
 .../utils/concurrent/AccumulatorTest.java          |  34 +-
 21 files changed, 980 insertions(+), 300 deletions(-)

diff --cc CHANGES.txt
index 73f1a11,182dca3..9dbbd1c
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,10 -1,9 +1,11 @@@
 -3.0.22:
 +3.11.8
 + * Frozen RawTuple is not annotated with frozen in the toString method 
(CASSANDRA-15857)
 +Merged from 3.0:
+  * Operational improvements and hardening for replica filtering protection 
(CASSANDRA-15907)
   * stop_paranoid disk failure policy is ignored on CorruptSSTableException 
after node is up (CASSANDRA-15191)
 - * 3.x fails to start if commit log has range tombstones from a column which 
is also deleted (CASSANDRA-15970)
   * Forbid altering UDTs used in partition keys (CASSANDRA-15933)
   * Fix empty/null json string representation (CASSANDRA-15896)
 + * 3.x fails to start if commit log has range tombstones from a column which 
is also deleted (CASSANDRA-15970)
  Merged from 2.2:
   * Fix CQL parsing of collections when the column type is reversed 
(CASSANDRA-15814)
  
diff --cc conf/cassandra.yaml
index 9182008,bb96f18..29442f5
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -1119,69 -996,6 +1119,89 @@@ enable_scripted_user_defined_functions
  # setting.
  windows_timer_interval: 1
  
 +
 +# Enables encrypting data at-rest (on disk). Different key providers can be 
plugged in, but the default reads from
 +# a JCE-style keystore. A single keystore can hold multiple keys, but the one 
referenced by
 +# the "key_alias" is the only key that will be used for encrypt opertaions; 
previously used keys
 +# can still (and should!) be in the keystore and will be used on decrypt 
operations
 +# (to handle the case of key rotation).
 +#
 +# It is strongly recommended to download and install Java Cryptography 
Extension (JCE)
 +# Unlimited Strength Jurisdiction Policy Files for your version of the JDK.
 +# (current link: 
http://www.oracle.com/technetwork/java/javase/downloads/jce8-download-2133166.html)
 +#
 +# Currently, only the following file types are supported for transparent data 
encryption, although
 +# more are coming in future cassandra releases: commitlog, hints
 +transparent_data_encryption_options:
 +    enabled: false
 +    chunk_length_kb: 64
 +    cipher: AES/CBC/PKCS5Padding
 +    key_alias: testing:1
 +    # CBC IV length for AES needs to be 16 bytes (which is also the default 
size)
 +    # iv_length: 16
 +    key_provider: 
 +      - class_name: org.apache.cassandra.security.JKSKeyProvider
 +        parameters: 
 +          - keystore: conf/.keystore
 +            keystore_password: cassandra
 +            store_type: JCEKS
 +            key_password: cassandra
 +
 +
 +#####################
 +# SAFETY THRESHOLDS #
 +#####################
 +
 +# When executing a scan, within or across a partition, we need to keep the
 +# tombstones seen in memory so we can return them to the coordinator, which
 +# will use them to make sure other replicas also know about the deleted rows.
 +# With workloads that generate a lot of tombstones, this can cause performance
 +# problems and even exaust the server heap.
 +# 
(http://www.datastax.com/dev/blog/cassandra-anti-patterns-queues-and-queue-like-datasets)
 +# Adjust the thresholds here if you understand the dangers and want to
 +# scan more tombstones anyway.  These thresholds may also be adjusted at 
runtime
 +# using the StorageService mbean.
 +tombstone_warn_threshold: 1000
 +tombstone_failure_threshold: 100000
 +
++# Filtering and secondary index queries at read consistency levels above 
ONE/LOCAL_ONE use a
++# mechanism called replica filtering protection to ensure that results from 
stale replicas do
++# not violate consistency. (See CASSANDRA-8272 and CASSANDRA-15907 for more 
details.) This
++# mechanism materializes replica results by partition on-heap at the 
coordinator. The more possibly
++# stale results returned by the replicas, the more rows materialized during 
the query.
++replica_filtering_protection:
++    # These thresholds exist to limit the damage severely out-of-date 
replicas can cause during these
++    # queries. They limit the number of rows from all replicas individual 
index and filtering queries
++    # can materialize on-heap to return correct results at the desired read 
consistency level.
++    #
++    # "cached_replica_rows_warn_threshold" is the per-query threshold at 
which a warning will be logged.
++    # "cached_replica_rows_fail_threshold" is the per-query threshold at 
which the query will fail.
++    #
++    # These thresholds may also be adjusted at runtime using the 
StorageService mbean.
++    #
++    # If the failure threshold is breached, it is likely that either the 
current page/fetch size
++    # is too large or one or more replicas is severely out-of-sync and in 
need of repair.
++    cached_rows_warn_threshold: 2000
++    cached_rows_fail_threshold: 32000
++
 +# Log WARN on any multiple-partition batch size exceeding this value. 5kb per 
batch by default.
 +# Caution should be taken on increasing the size of this threshold as it can 
lead to node instability.
 +batch_size_warn_threshold_in_kb: 5
 +
 +# Fail any multiple-partition batch exceeding this value. 50kb (10x warn 
threshold) by default.
 +batch_size_fail_threshold_in_kb: 50
 +
 +# Log WARN on any batches not of type LOGGED than span across more partitions 
than this limit
 +unlogged_batch_across_partitions_warn_threshold: 10
 +
 +# Log a warning when compacting partitions larger than this value
 +compaction_large_partition_warning_threshold_mb: 100
 +
 +# GC Pauses greater than gc_warn_threshold_in_ms will be logged at WARN level
 +# Adjust the threshold based on your application throughput requirement
 +# By default, Cassandra logs GC Pauses greater than 200 ms at INFO level
 +gc_warn_threshold_in_ms: 1000
 +
  # Maximum size of any value in SSTables. Safety measure to detect SSTable 
corruption
  # early. Any value size larger than this threshold will result into marking 
an SSTable
  # as corrupted. This should be positive and less than 2048.
diff --cc doc/source/operating/metrics.rst
index 04abb48,0000000..4bd0c08
mode 100644,000000..100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@@ -1,706 -1,0 +1,710 @@@
 +.. Licensed to the Apache Software Foundation (ASF) under one
 +.. or more contributor license agreements.  See the NOTICE file
 +.. distributed with this work for additional information
 +.. regarding copyright ownership.  The ASF licenses this file
 +.. to you under the Apache License, Version 2.0 (the
 +.. "License"); you may not use this file except in compliance
 +.. with the License.  You may obtain a copy of the License at
 +..
 +..     http://www.apache.org/licenses/LICENSE-2.0
 +..
 +.. Unless required by applicable law or agreed to in writing, software
 +.. distributed under the License is distributed on an "AS IS" BASIS,
 +.. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 +.. See the License for the specific language governing permissions and
 +.. limitations under the License.
 +
 +.. highlight:: none
 +
 +Monitoring
 +----------
 +
 +Metrics in Cassandra are managed using the `Dropwizard Metrics 
<http://metrics.dropwizard.io>`__ library. These metrics
 +can be queried via JMX or pushed to external monitoring systems using a 
number of `built in
 +<http://metrics.dropwizard.io/3.1.0/getting-started/#other-reporting>`__ and 
`third party
 +<http://metrics.dropwizard.io/3.1.0/manual/third-party/>`__ reporter plugins.
 +
 +Metrics are collected for a single node. It's up to the operator to use an 
external monitoring system to aggregate them.
 +
 +Metric Types
 +^^^^^^^^^^^^
 +All metrics reported by cassandra fit into one of the following types.
 +
 +``Gauge``
 +    An instantaneous measurement of a value.
 +
 +``Counter``
 +    A gauge for an ``AtomicLong`` instance. Typically this is consumed by 
monitoring the change since the last call to
 +    see if there is a large increase compared to the norm.
 +
 +``Histogram``
 +    Measures the statistical distribution of values in a stream of data.
 +
 +    In addition to minimum, maximum, mean, etc., it also measures median, 
75th, 90th, 95th, 98th, 99th, and 99.9th
 +    percentiles.
 +
 +``Timer``
 +    Measures both the rate that a particular piece of code is called and the 
histogram of its duration.
 +
 +``Latency``
 +    Special type that tracks latency (in microseconds) with a ``Timer`` plus 
a ``Counter`` that tracks the total latency
 +    accrued since starting. The former is useful if you track the change in 
total latency since the last check. Each
 +    metric name of this type will have 'Latency' and 'TotalLatency' appended 
to it.
 +
 +``Meter``
 +    A meter metric which measures mean throughput and one-, five-, and 
fifteen-minute exponentially-weighted moving
 +    average throughputs.
 +
 +Table Metrics
 +^^^^^^^^^^^^^
 +
 +Each table in Cassandra has metrics responsible for tracking its state and 
performance.
 +
 +The metric names are all appended with the specific ``Keyspace`` and 
``Table`` name.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Table.<MetricName>.<Keyspace>.<Table>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Table keyspace=<Keyspace> 
scope=<Table> name=<MetricName>``
 +
 +.. NOTE::
 +    There is a special table called '``all``' without a keyspace. This 
represents the aggregation of metrics across
 +    **all** tables and keyspaces on the node.
 +
 +
- ======================================= ============== ===========
- Name                                    Type           Description
- ======================================= ============== ===========
- MemtableOnHeapSize                      Gauge<Long>    Total amount of data 
stored in the memtable that resides **on**-heap, including column related 
overhead and partitions overwritten.
- MemtableOffHeapSize                     Gauge<Long>    Total amount of data 
stored in the memtable that resides **off**-heap, including column related 
overhead and partitions overwritten.
- MemtableLiveDataSize                    Gauge<Long>    Total amount of live 
data stored in the memtable, excluding any data structure overhead.
- AllMemtablesOnHeapSize                  Gauge<Long>    Total amount of data 
stored in the memtables (2i and pending flush memtables included) that resides 
**on**-heap.
- AllMemtablesOffHeapSize                 Gauge<Long>    Total amount of data 
stored in the memtables (2i and pending flush memtables included) that resides 
**off**-heap.
- AllMemtablesLiveDataSize                Gauge<Long>    Total amount of live 
data stored in the memtables (2i and pending flush memtables included) that 
resides off-heap, excluding any data structure overhead.
- MemtableColumnsCount                    Gauge<Long>    Total number of 
columns present in the memtable.
- MemtableSwitchCount                     Counter        Number of times flush 
has resulted in the memtable being switched out.
- CompressionRatio                        Gauge<Double>  Current compression 
ratio for all SSTables.
- EstimatedPartitionSizeHistogram         Gauge<long[]>  Histogram of estimated 
partition size (in bytes).
- EstimatedPartitionCount                 Gauge<Long>    Approximate number of 
keys in table.
- EstimatedColumnCountHistogram           Gauge<long[]>  Histogram of estimated 
number of columns.
- SSTablesPerReadHistogram                Histogram      Histogram of the 
number of sstable data files accessed per single partition read. SSTables 
skipped due to Bloom Filters, min-max key or partition index lookup are not 
taken into acoount.
- ReadLatency                             Latency        Local read latency for 
this table.
- RangeLatency                            Latency        Local range scan 
latency for this table.
- WriteLatency                            Latency        Local write latency 
for this table.
- CoordinatorReadLatency                  Timer          Coordinator read 
latency for this table.
- CoordinatorScanLatency                  Timer          Coordinator range scan 
latency for this table.
- PendingFlushes                          Counter        Estimated number of 
flush tasks pending for this table.
- BytesFlushed                            Counter        Total number of bytes 
flushed since server [re]start.
- CompactionBytesWritten                  Counter        Total number of bytes 
written by compaction since server [re]start.
- PendingCompactions                      Gauge<Integer> Estimate of number of 
pending compactions for this table.
- LiveSSTableCount                        Gauge<Integer> Number of SSTables on 
disk for this table.
- LiveDiskSpaceUsed                       Counter        Disk space used by 
SSTables belonging to this table (in bytes).
- TotalDiskSpaceUsed                      Counter        Total disk space used 
by SSTables belonging to this table, including obsolete ones waiting to be GC'd.
- MinPartitionSize                        Gauge<Long>    Size of the smallest 
compacted partition (in bytes).
- MaxPartitionSize                        Gauge<Long>    Size of the largest 
compacted partition (in bytes).
- MeanPartitionSize                       Gauge<Long>    Size of the average 
compacted partition (in bytes).
- BloomFilterFalsePositives               Gauge<Long>    Number of false 
positives on table's bloom filter.
- BloomFilterFalseRatio                   Gauge<Double>  False positive ratio 
of table's bloom filter.
- BloomFilterDiskSpaceUsed                Gauge<Long>    Disk space used by 
bloom filter (in bytes).
- BloomFilterOffHeapMemoryUsed            Gauge<Long>    Off-heap memory used 
by bloom filter.
- IndexSummaryOffHeapMemoryUsed           Gauge<Long>    Off-heap memory used 
by index summary.
- CompressionMetadataOffHeapMemoryUsed    Gauge<Long>    Off-heap memory used 
by compression meta data.
- KeyCacheHitRate                         Gauge<Double>  Key cache hit rate for 
this table.
- TombstoneScannedHistogram               Histogram      Histogram of 
tombstones scanned in queries on this table.
- LiveScannedHistogram                    Histogram      Histogram of live 
cells scanned in queries on this table.
- ColUpdateTimeDeltaHistogram             Histogram      Histogram of column 
update time delta on this table.
- ViewLockAcquireTime                     Timer          Time taken acquiring a 
partition lock for materialized view updates on this table.
- ViewReadTime                            Timer          Time taken during the 
local read of a materialized view update.
- TrueSnapshotsSize                       Gauge<Long>    Disk space used by 
snapshots of this table including all SSTable components.
- RowCacheHitOutOfRange                   Counter        Number of table row 
cache hits that do not satisfy the query filter, thus went to disk.
- RowCacheHit                             Counter        Number of table row 
cache hits.
- RowCacheMiss                            Counter        Number of table row 
cache misses.
- CasPrepare                              Latency        Latency of paxos 
prepare round.
- CasPropose                              Latency        Latency of paxos 
propose round.
- CasCommit                               Latency        Latency of paxos 
commit round.
- PercentRepaired                         Gauge<Double>  Percent of table data 
that is repaired on disk.
- SpeculativeRetries                      Counter        Number of times 
speculative retries were sent for this table.
- WaitingOnFreeMemtableSpace              Histogram      Histogram of time 
spent waiting for free memtable space, either on- or off-heap.
- DroppedMutations                        Counter        Number of dropped 
mutations on this table.
- ======================================= ============== ===========
++=============================================== ============== ===========
++Name                                            Type           Description
++=============================================== ============== ===========
++MemtableOnHeapSize                              Gauge<Long>    Total amount 
of data stored in the memtable that resides **on**-heap, including column 
related overhead and partitions overwritten.
++MemtableOffHeapSize                             Gauge<Long>    Total amount 
of data stored in the memtable that resides **off**-heap, including column 
related overhead and partitions overwritten.
++MemtableLiveDataSize                            Gauge<Long>    Total amount 
of live data stored in the memtable, excluding any data structure overhead.
++AllMemtablesOnHeapSize                          Gauge<Long>    Total amount 
of data stored in the memtables (2i and pending flush memtables included) that 
resides **on**-heap.
++AllMemtablesOffHeapSize                         Gauge<Long>    Total amount 
of data stored in the memtables (2i and pending flush memtables included) that 
resides **off**-heap.
++AllMemtablesLiveDataSize                        Gauge<Long>    Total amount 
of live data stored in the memtables (2i and pending flush memtables included) 
that resides off-heap, excluding any data structure overhead.
++MemtableColumnsCount                            Gauge<Long>    Total number 
of columns present in the memtable.
++MemtableSwitchCount                             Counter        Number of 
times flush has resulted in the memtable being switched out.
++CompressionRatio                                Gauge<Double>  Current 
compression ratio for all SSTables.
++EstimatedPartitionSizeHistogram                 Gauge<long[]>  Histogram of 
estimated partition size (in bytes).
++EstimatedPartitionCount                         Gauge<Long>    Approximate 
number of keys in table.
++EstimatedColumnCountHistogram                   Gauge<long[]>  Histogram of 
estimated number of columns.
++SSTablesPerReadHistogram                        Histogram      Histogram of 
the number of sstable data files accessed per single partition read. SSTables 
skipped due to Bloom Filters, min-max key or partition index lookup are not 
taken into acoount.
++ReadLatency                                     Latency        Local read 
latency for this table.
++RangeLatency                                    Latency        Local range 
scan latency for this table.
++WriteLatency                                    Latency        Local write 
latency for this table.
++CoordinatorReadLatency                          Timer          Coordinator 
read latency for this table.
++CoordinatorScanLatency                          Timer          Coordinator 
range scan latency for this table.
++PendingFlushes                                  Counter        Estimated 
number of flush tasks pending for this table.
++BytesFlushed                                    Counter        Total number 
of bytes flushed since server [re]start.
++CompactionBytesWritten                          Counter        Total number 
of bytes written by compaction since server [re]start.
++PendingCompactions                              Gauge<Integer> Estimate of 
number of pending compactions for this table.
++LiveSSTableCount                                Gauge<Integer> Number of 
SSTables on disk for this table.
++LiveDiskSpaceUsed                               Counter        Disk space 
used by SSTables belonging to this table (in bytes).
++TotalDiskSpaceUsed                              Counter        Total disk 
space used by SSTables belonging to this table, including obsolete ones waiting 
to be GC'd.
++MinPartitionSize                                Gauge<Long>    Size of the 
smallest compacted partition (in bytes).
++MaxPartitionSize                                Gauge<Long>    Size of the 
largest compacted partition (in bytes).
++MeanPartitionSize                               Gauge<Long>    Size of the 
average compacted partition (in bytes).
++BloomFilterFalsePositives                       Gauge<Long>    Number of 
false positives on table's bloom filter.
++BloomFilterFalseRatio                           Gauge<Double>  False positive 
ratio of table's bloom filter.
++BloomFilterDiskSpaceUsed                        Gauge<Long>    Disk space 
used by bloom filter (in bytes).
++BloomFilterOffHeapMemoryUsed                    Gauge<Long>    Off-heap 
memory used by bloom filter.
++IndexSummaryOffHeapMemoryUsed                   Gauge<Long>    Off-heap 
memory used by index summary.
++CompressionMetadataOffHeapMemoryUsed            Gauge<Long>    Off-heap 
memory used by compression meta data.
++KeyCacheHitRate                                 Gauge<Double>  Key cache hit 
rate for this table.
++TombstoneScannedHistogram                       Histogram      Histogram of 
tombstones scanned in queries on this table.
++LiveScannedHistogram                            Histogram      Histogram of 
live cells scanned in queries on this table.
++ColUpdateTimeDeltaHistogram                     Histogram      Histogram of 
column update time delta on this table.
++ViewLockAcquireTime                             Timer          Time taken 
acquiring a partition lock for materialized view updates on this table.
++ViewReadTime                                    Timer          Time taken 
during the local read of a materialized view update.
++TrueSnapshotsSize                               Gauge<Long>    Disk space 
used by snapshots of this table including all SSTable components.
++RowCacheHitOutOfRange                           Counter        Number of 
table row cache hits that do not satisfy the query filter, thus went to disk.
++RowCacheHit                                     Counter        Number of 
table row cache hits.
++RowCacheMiss                                    Counter        Number of 
table row cache misses.
++CasPrepare                                      Latency        Latency of 
paxos prepare round.
++CasPropose                                      Latency        Latency of 
paxos propose round.
++CasCommit                                       Latency        Latency of 
paxos commit round.
++PercentRepaired                                 Gauge<Double>  Percent of 
table data that is repaired on disk.
++SpeculativeRetries                              Counter        Number of 
times speculative retries were sent for this table.
++WaitingOnFreeMemtableSpace                      Histogram      Histogram of 
time spent waiting for free memtable space, either on- or off-heap.
++DroppedMutations                                Counter        Number of 
dropped mutations on this table.
++ReadRepairRequests                              Meter          Throughput for 
mutations generated by read-repair.
++ShortReadProtectionRequests                     Meter          Throughput for 
requests to get extra rows during short read protection.
++ReplicaFilteringProtectionRequests              Meter          Throughput for 
row completion requests during replica filtering protection.
++ReplicaFilteringProtectionRowsCachedPerQuery    Histogram      Histogram of 
the number of rows cached per query when replica filtering protection is 
engaged.
++============================================    ============== ===========
 +
 +Keyspace Metrics
 +^^^^^^^^^^^^^^^^
 +Each keyspace in Cassandra has metrics responsible for tracking its state and 
performance.
 +
 +These metrics are the same as the ``Table Metrics`` above, only they are 
aggregated at the Keyspace level.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.keyspace.<MetricName>.<Keyspace>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Keyspace scope=<Keyspace> 
name=<MetricName>``
 +
 +ThreadPool Metrics
 +^^^^^^^^^^^^^^^^^^
 +
 +Cassandra splits work of a particular type into its own thread pool.  This 
provides back-pressure and asynchrony for
 +requests on a node.  It's important to monitor the state of these thread 
pools since they can tell you how saturated a
 +node is.
 +
 +The metric names are all appended with the specific ``ThreadPool`` name.  The 
thread pools are also categorized under a
 +specific type.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    
``org.apache.cassandra.metrics.ThreadPools.<MetricName>.<Path>.<ThreadPoolName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=ThreadPools scope=<ThreadPoolName> 
type=<Type> name=<MetricName>``
 +
 +===================== ============== ===========
 +Name                  Type           Description
 +===================== ============== ===========
 +ActiveTasks           Gauge<Integer> Number of tasks being actively worked on 
by this pool.
 +PendingTasks          Gauge<Integer> Number of queued tasks queued up on this 
pool.
 +CompletedTasks        Counter        Number of tasks completed.
 +TotalBlockedTasks     Counter        Number of tasks that were blocked due to 
queue saturation.
 +CurrentlyBlockedTask  Counter        Number of tasks that are currently 
blocked due to queue saturation but on retry will become unblocked.
 +MaxPoolSize           Gauge<Integer> The maximum number of threads in this 
pool.
 +===================== ============== ===========
 +
 +The following thread pools can be monitored.
 +
 +============================ ============== ===========
 +Name                         Type           Description
 +============================ ============== ===========
 +Native-Transport-Requests    transport      Handles client CQL requests
 +CounterMutationStage         request        Responsible for counter writes
 +ViewMutationStage            request        Responsible for materialized view 
writes
 +MutationStage                request        Responsible for all other writes
 +ReadRepairStage              request        ReadRepair happens on this thread 
pool
 +ReadStage                    request        Local reads run on this thread 
pool
 +RequestResponseStage         request        Coordinator requests to the 
cluster run on this thread pool
 +AntiEntropyStage             internal       Builds merkle tree for repairs
 +CacheCleanupExecutor         internal       Cache maintenance performed on 
this thread pool
 +CompactionExecutor           internal       Compactions are run on these 
threads
 +GossipStage                  internal       Handles gossip requests
 +HintsDispatcher              internal       Performs hinted handoff
 +InternalResponseStage        internal       Responsible for intra-cluster 
callbacks
 +MemtableFlushWriter          internal       Writes memtables to disk
 +MemtablePostFlush            internal       Cleans up commit log after 
memtable is written to disk
 +MemtableReclaimMemory        internal       Memtable recycling
 +MigrationStage               internal       Runs schema migrations
 +MiscStage                    internal       Misceleneous tasks run here
 +PendingRangeCalculator       internal       Calculates token range
 +PerDiskMemtableFlushWriter_0 internal       Responsible for writing a spec 
(there is one of these per disk 0-N)
 +Sampler                      internal       Responsible for re-sampling the 
index summaries of SStables
 +SecondaryIndexManagement     internal       Performs updates to secondary 
indexes
 +ValidationExecutor           internal       Performs validation compaction or 
scrubbing
 +============================ ============== ===========
 +
 +.. |nbsp| unicode:: 0xA0 .. nonbreaking space
 +
 +Client Request Metrics
 +^^^^^^^^^^^^^^^^^^^^^^
 +
 +Client requests have their own set of metrics that encapsulate the work 
happening at coordinator level.
 +
 +Different types of client requests are broken down by ``RequestType``.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.ClientRequest.<MetricName>.<RequestType>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=ClientRequest scope=<RequestType> 
name=<MetricName>``
 +
 +
 +:RequestType: CASRead
 +:Description: Metrics related to transactional read requests.
 +:Metrics:
 +    ===================== ============== 
=============================================================
 +    Name                  Type           Description
 +    ===================== ============== 
=============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of transaction failures 
encountered.
 +    |nbsp|                Latency        Transaction read latency.
 +    Unavailables          Counter        Number of unavailable exceptions 
encountered.
 +    UnfinishedCommit      Counter        Number of transactions that were 
committed on read.
 +    ConditionNotMet       Counter        Number of transaction preconditions 
did not match current values.
 +    ContentionHistogram   Histogram      How many contended reads were 
encountered
 +    ===================== ============== 
=============================================================
 +
 +:RequestType: CASWrite
 +:Description: Metrics related to transactional write requests.
 +:Metrics:
 +    ===================== ============== 
=============================================================
 +    Name                  Type           Description
 +    ===================== ============== 
=============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of transaction failures 
encountered.
 +    |nbsp|                Latency        Transaction write latency.
 +    UnfinishedCommit      Counter        Number of transactions that were 
committed on write.
 +    ConditionNotMet       Counter        Number of transaction preconditions 
did not match current values.
 +    ContentionHistogram   Histogram      How many contended writes were 
encountered
 +    ===================== ============== 
=============================================================
 +
 +
 +:RequestType: Read
 +:Description: Metrics related to standard read requests.
 +:Metrics:
 +    ===================== ============== 
=============================================================
 +    Name                  Type           Description
 +    ===================== ============== 
=============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of read failures encountered.
 +    |nbsp|                Latency        Read latency.
 +    Unavailables          Counter        Number of unavailable exceptions 
encountered.
 +    ===================== ============== 
=============================================================
 +
 +:RequestType: RangeSlice
 +:Description: Metrics related to token range read requests.
 +:Metrics:
 +    ===================== ============== 
=============================================================
 +    Name                  Type           Description
 +    ===================== ============== 
=============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of range query failures 
encountered.
 +    |nbsp|                Latency        Range query latency.
 +    Unavailables          Counter        Number of unavailable exceptions 
encountered.
 +    ===================== ============== 
=============================================================
 +
 +:RequestType: Write
 +:Description: Metrics related to regular write requests.
 +:Metrics:
 +    ===================== ============== 
=============================================================
 +    Name                  Type           Description
 +    ===================== ============== 
=============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of write failures encountered.
 +    |nbsp|                Latency        Write latency.
 +    Unavailables          Counter        Number of unavailable exceptions 
encountered.
 +    ===================== ============== 
=============================================================
 +
 +
 +:RequestType: ViewWrite
 +:Description: Metrics related to materialized view write wrtes.
 +:Metrics:
 +    ===================== ============== 
=============================================================
 +    Timeouts              Counter        Number of timeouts encountered.
 +    Failures              Counter        Number of transaction failures 
encountered.
 +    Unavailables          Counter        Number of unavailable exceptions 
encountered.
 +    ViewReplicasAttempted Counter        Total number of attempted view 
replica writes.
 +    ViewReplicasSuccess   Counter        Total number of succeded view 
replica writes.
 +    ViewPendingMutations  Gauge<Long>    ViewReplicasAttempted - 
ViewReplicasSuccess.
 +    ViewWriteLatency      Timer          Time between when mutation is 
applied to base table and when CL.ONE is achieved on view.
 +    ===================== ============== 
=============================================================
 +
 +Cache Metrics
 +^^^^^^^^^^^^^
 +
 +Cassandra caches have metrics to track the effectivness of the caches. Though 
the ``Table Metrics`` might be more useful.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Cache.<MetricName>.<CacheName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Cache scope=<CacheName> 
name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Capacity                   Gauge<Long>    Cache capacity in bytes.
 +Entries                    Gauge<Integer> Total number of cache entries.
 +FifteenMinuteCacheHitRate  Gauge<Double>  15m cache hit rate.
 +FiveMinuteCacheHitRate     Gauge<Double>  5m cache hit rate.
 +OneMinuteCacheHitRate      Gauge<Double>  1m cache hit rate.
 +HitRate                    Gauge<Double>  All time cache hit rate.
 +Hits                       Meter          Total number of cache hits.
 +Misses                     Meter          Total number of cache misses.
 +MissLatency                Timer          Latency of misses.
 +Requests                   Gauge<Long>    Total number of cache requests.
 +Size                       Gauge<Long>    Total size of occupied cache, in 
bytes.
 +========================== ============== ===========
 +
 +The following caches are covered:
 +
 +============================ ===========
 +Name                         Description
 +============================ ===========
 +CounterCache                 Keeps hot counters in memory for performance.
 +ChunkCache                   In process uncompressed page cache.
 +KeyCache                     Cache for partition to sstable offsets.
 +RowCache                     Cache for rows kept in memory.
 +============================ ===========
 +
 +.. NOTE::
 +    Misses and MissLatency are only defined for the ChunkCache
 +
 +CQL Metrics
 +^^^^^^^^^^^
 +
 +Metrics specific to CQL prepared statement caching.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.CQL.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=CQL name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +PreparedStatementsCount    Gauge<Integer> Number of cached prepared 
statements.
 +PreparedStatementsEvicted  Counter        Number of prepared statements 
evicted from the prepared statement cache
 +PreparedStatementsExecuted Counter        Number of prepared statements 
executed.
 +RegularStatementsExecuted  Counter        Number of **non** prepared 
statements executed.
 +PreparedStatementsRatio    Gauge<Double>  Percentage of statements that are 
prepared vs unprepared.
 +========================== ============== ===========
 +
 +
 +DroppedMessage Metrics
 +^^^^^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to tracking dropped messages for different types of requests.
 +Dropped writes are stored and retried by ``Hinted Handoff``
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.DroppedMessages.<MetricName>.<Type>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=DroppedMetrics scope=<Type> 
name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +CrossNodeDroppedLatency    Timer          The dropped latency across nodes.
 +InternalDroppedLatency     Timer          The dropped latency within node.
 +Dropped                    Meter          Number of dropped messages.
 +========================== ============== ===========
 +
 +The different types of messages tracked are:
 +
 +============================ ===========
 +Name                         Description
 +============================ ===========
 +BATCH_STORE                  Batchlog write
 +BATCH_REMOVE                 Batchlog cleanup (after succesfully applied)
 +COUNTER_MUTATION             Counter writes
 +HINT                         Hint replay
 +MUTATION                     Regular writes
 +READ                         Regular reads
 +READ_REPAIR                  Read repair
 +PAGED_SLICE                  Paged read
 +RANGE_SLICE                  Token range read
 +REQUEST_RESPONSE             RPC Callbacks
 +_TRACE                       Tracing writes
 +============================ ===========
 +
 +Streaming Metrics
 +^^^^^^^^^^^^^^^^^
 +
 +Metrics reported during ``Streaming`` operations, such as repair, bootstrap, 
rebuild.
 +
 +These metrics are specific to a peer endpoint, with the source node being the 
node you are pulling the metrics from.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Streaming.<MetricName>.<PeerIP>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Streaming scope=<PeerIP> 
name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +IncomingBytes              Counter        Number of bytes streamed to this 
node from the peer.
 +OutgoingBytes              Counter        Number of bytes streamed to the 
peer endpoint from this node.
 +========================== ============== ===========
 +
 +
 +Compaction Metrics
 +^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to ``Compaction`` work.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Compaction.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Compaction name=<MetricName>``
 +
 +========================== ======================================== 
===============================================
 +Name                       Type                                     
Description
 +========================== ======================================== 
===============================================
 +BytesCompacted             Counter                                  Total 
number of bytes compacted since server [re]start.
 +PendingTasks               Gauge<Integer>                           Estimated 
number of compactions remaining to perform.
 +CompletedTasks             Gauge<Long>                              Number of 
completed compactions since server [re]start.
 +TotalCompactionsCompleted  Meter                                    
Throughput of completed compactions since server [re]start.
 +PendingTasksByTableName    Gauge<Map<String, Map<String, Integer>>> Estimated 
number of compactions remaining to perform, grouped by keyspace and then table 
name. This info is also kept in ``Table Metrics``.
 +========================== ======================================== 
===============================================
 +
 +CommitLog Metrics
 +^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to the ``CommitLog``
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.CommitLog.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=CommitLog name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +CompletedTasks             Gauge<Long>    Total number of commit log messages 
written since [re]start.
 +PendingTasks               Gauge<Long>    Number of commit log messages 
written but yet to be fsync'd.
 +TotalCommitLogSize         Gauge<Long>    Current size, in bytes, used by all 
the commit log segments.
 +WaitingOnSegmentAllocation Timer          Time spent waiting for a 
CommitLogSegment to be allocated - under normal conditions this should be zero.
 +WaitingOnCommit            Timer          The time spent waiting on CL fsync; 
for Periodic this is only occurs when the sync is lagging its sync interval.
 +========================== ============== ===========
 +
 +Storage Metrics
 +^^^^^^^^^^^^^^^
 +
 +Metrics specific to the storage engine.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Storage.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Storage name=<MetricName>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Exceptions                 Counter        Number of internal exceptions 
caught. Under normal exceptions this should be zero.
 +Load                       Counter        Size, in bytes, of the on disk data 
size this node manages.
 +TotalHints                 Counter        Number of hint messages written to 
this node since [re]start. Includes one entry for each host to be hinted per 
hint.
 +TotalHintsInProgress       Counter        Number of hints attemping to be 
sent currently.
 +========================== ============== ===========
 +
 +HintedHandoff Metrics
 +^^^^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to Hinted Handoff.  There are also some metrics related to 
hints tracked in ``Storage Metrics``
 +
 +These metrics include the peer endpoint **in the metric name**
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.HintedHandOffManager.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=HintedHandOffManager 
name=<MetricName>``
 +
 +=========================== ============== ===========
 +Name                        Type           Description
 +=========================== ============== ===========
 +Hints_created-<PeerIP>       Counter        Number of hints on disk for this 
peer.
 +Hints_not_stored-<PeerIP>    Counter        Number of hints not stored for 
this peer, due to being down past the configured hint window.
 +=========================== ============== ===========
 +
 +SSTable Index Metrics
 +^^^^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to the SSTable index metadata.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Index.<MetricName>.RowIndexEntry``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Index scope=RowIndexEntry 
name=<MetricName>``
 +
 +=========================== ============== ===========
 +Name                        Type           Description
 +=========================== ============== ===========
 +IndexedEntrySize            Histogram      Histogram of the on-heap size, in 
bytes, of the index across all SSTables.
 +IndexInfoCount              Histogram      Histogram of the number of on-heap 
index entries managed across all SSTables.
 +IndexInfoGets               Histogram      Histogram of the number index 
seeks performed per SSTable.
 +=========================== ============== ===========
 +
 +BufferPool Metrics
 +^^^^^^^^^^^^^^^^^^
 +
 +Metrics specific to the internal recycled buffer pool Cassandra manages.  
This pool is meant to keep allocations and GC
 +lower by recycling on and off heap buffers.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.BufferPool.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=BufferPool name=<MetricName>``
 +
 +=========================== ============== ===========
 +Name                        Type           Description
 +=========================== ============== ===========
 +Size                        Gauge<Long>    Size, in bytes, of the managed 
buffer pool
 +Misses                      Meter           The rate of misses in the pool. 
The higher this is the more allocations incurred.
 +=========================== ============== ===========
 +
 +
 +Client Metrics
 +^^^^^^^^^^^^^^
 +
 +Metrics specifc to client managment.
 +
 +Reported name format:
 +
 +**Metric Name**
 +    ``org.apache.cassandra.metrics.Client.<MetricName>``
 +
 +**JMX MBean**
 +    ``org.apache.cassandra.metrics:type=Client name=<MetricName>``
 +
 +=========================== ============== ===========
 +Name                        Type           Description
 +=========================== ============== ===========
 +connectedNativeClients      Counter        Number of clients connected to 
this nodes native protocol server
 +connectedThriftClients      Counter        Number of clients connected to 
this nodes thrift protocol server
 +=========================== ============== ===========
 +
 +JVM Metrics
 +^^^^^^^^^^^
 +
 +JVM metrics such as memory and garbage collection statistics can either be 
accessed by connecting to the JVM using JMX or can be exported using `Metric 
Reporters`_.
 +
 +BufferPool
 +++++++++++
 +
 +**Metric Name**
 +    ``jvm.buffers.<direct|mapped>.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.nio:type=BufferPool name=<direct|mapped>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Capacity                   Gauge<Long>    Estimated total capacity of the 
buffers in this pool
 +Count                      Gauge<Long>    Estimated number of buffers in the 
pool
 +Used                       Gauge<Long>    Estimated memory that the Java 
virtual machine is using for this buffer pool
 +========================== ============== ===========
 +
 +FileDescriptorRatio
 ++++++++++++++++++++
 +
 +**Metric Name**
 +    ``jvm.fd.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.lang:type=OperatingSystem 
name=<OpenFileDescriptorCount|MaxFileDescriptorCount>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Usage                      Ratio          Ratio of used to total file 
descriptors
 +========================== ============== ===========
 +
 +GarbageCollector
 +++++++++++++++++
 +
 +**Metric Name**
 +    ``jvm.gc.<gc_type>.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.lang:type=GarbageCollector name=<gc_type>``
 +
 +========================== ============== ===========
 +Name                       Type           Description
 +========================== ============== ===========
 +Count                      Gauge<Long>    Total number of collections that 
have occurred
 +Time                       Gauge<Long>    Approximate accumulated collection 
elapsed time in milliseconds
 +========================== ============== ===========
 +
 +Memory
 +++++++
 +
 +**Metric Name**
 +    ``jvm.memory.<heap/non-heap/total>.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.lang:type=Memory``
 +
 +========================== ============== ===========
 +Committed                  Gauge<Long>    Amount of memory in bytes that is 
committed for the JVM to use
 +Init                       Gauge<Long>    Amount of memory in bytes that the 
JVM initially requests from the OS
 +Max                        Gauge<Long>    Maximum amount of memory in bytes 
that can be used for memory management
 +Usage                      Ratio          Ratio of used to maximum memory
 +Used                       Gauge<Long>    Amount of used memory in bytes
 +========================== ============== ===========
 +
 +MemoryPool
 +++++++++++
 +
 +**Metric Name**
 +    ``jvm.memory.pools.<memory_pool>.<MetricName>``
 +
 +**JMX MBean**
 +    ``java.lang:type=MemoryPool name=<memory_pool>``
 +
 +========================== ============== ===========
 +Committed                  Gauge<Long>    Amount of memory in bytes that is 
committed for the JVM to use
 +Init                       Gauge<Long>    Amount of memory in bytes that the 
JVM initially requests from the OS
 +Max                        Gauge<Long>    Maximum amount of memory in bytes 
that can be used for memory management
 +Usage                      Ratio          Ratio of used to maximum memory
 +Used                       Gauge<Long>    Amount of used memory in bytes
 +========================== ============== ===========
 +
 +JMX
 +^^^
 +
 +Any JMX based client can access metrics from cassandra.
 +
 +If you wish to access JMX metrics over http it's possible to download 
`Mx4jTool <http://mx4j.sourceforge.net/>`__ and
 +place ``mx4j-tools.jar`` into the classpath.  On startup you will see in the 
log::
 +
 +    HttpAdaptor version 3.0.2 started on port 8081
 +
 +To choose a different port (8081 is the default) or a different listen 
address (0.0.0.0 is not the default) edit
 +``conf/cassandra-env.sh`` and uncomment::
 +
 +    #MX4J_ADDRESS="-Dmx4jaddress=0.0.0.0"
 +
 +    #MX4J_PORT="-Dmx4jport=8081"
 +
 +
 +Metric Reporters
 +^^^^^^^^^^^^^^^^
 +
 +As mentioned at the top of this section on monitoring the Cassandra metrics 
can be exported to a number of monitoring
 +system a number of `built in 
<http://metrics.dropwizard.io/3.1.0/getting-started/#other-reporting>`__ and 
`third party
 +<http://metrics.dropwizard.io/3.1.0/manual/third-party/>`__ reporter plugins.
 +
 +The configuration of these plugins is managed by the `metrics reporter config 
project
 +<https://github.com/addthis/metrics-reporter-config>`__. There is a sample 
configuration file located at
 +``conf/metrics-reporter-config-sample.yaml``.
 +
 +Once configured, you simply start cassandra with the flag
 +``-Dcassandra.metricsReporterConfigFile=metrics-reporter-config.yaml``. The 
specified .yaml file plus any 3rd party
 +reporter jars must all be in Cassandra's classpath.
diff --cc src/java/org/apache/cassandra/metrics/TableMetrics.java
index a02539a,a551a78..5521bf7
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@@ -167,43 -152,18 +167,52 @@@ public class TableMetric
      public final static LatencyMetrics globalWriteLatency = new 
LatencyMetrics(globalFactory, globalAliasFactory, "Write");
      public final static LatencyMetrics globalRangeLatency = new 
LatencyMetrics(globalFactory, globalAliasFactory, "Range");
  
 +    public final static Gauge<Double> globalPercentRepaired = 
Metrics.register(globalFactory.createMetricName("PercentRepaired"),
 +            new Gauge<Double>()
 +    {
 +        public Double getValue()
 +        {
 +            double repaired = 0;
 +            double total = 0;
 +            for (String keyspace : Schema.instance.getNonSystemKeyspaces())
 +            {
 +                Keyspace k = Schema.instance.getKeyspaceInstance(keyspace);
 +                if 
(SchemaConstants.DISTRIBUTED_KEYSPACE_NAME.equals(k.getName()))
 +                    continue;
 +                if (k.getReplicationStrategy().getReplicationFactor() < 2)
 +                    continue;
 +
 +                for (ColumnFamilyStore cf : k.getColumnFamilyStores())
 +                {
 +                    if (!SecondaryIndexManager.isIndexColumnFamily(cf.name))
 +                    {
 +                        for (SSTableReader sstable : 
cf.getSSTables(SSTableSet.CANONICAL))
 +                        {
 +                            if (sstable.isRepaired())
 +                            {
 +                                repaired += sstable.uncompressedLength();
 +                            }
 +                            total += sstable.uncompressedLength();
 +                        }
 +                    }
 +                }
 +            }
 +            return total > 0 ? (repaired / total) * 100 : 100.0;
 +        }
 +    });
 +
      public final Meter readRepairRequests;
      public final Meter shortReadProtectionRequests;
-     public final Meter replicaSideFilteringProtectionRequests;
+     
+     public final Meter replicaFilteringProtectionRequests;
+     
+     /**
+      * This histogram records the maximum number of rows {@link 
org.apache.cassandra.service.ReplicaFilteringProtection}
+      * caches at a point in time per query. With no replica divergence, this 
is equivalent to the maximum number of
+      * cached rows in a single partition during a query. It can be helpful 
when choosing appropriate values for the
+      * replica_filtering_protection thresholds in cassandra.yaml. 
+      */
+     public final Histogram rfpRowsCachedPerQuery;
  
      public final Map<Sampler, TopKSampler<ByteBuffer>> samplers;
      /**
@@@ -820,34 -782,15 +830,41 @@@
          register(name, alias, tableMeter);
          return tableMeter;
      }
+     
+     private Histogram createHistogram(String name, boolean considerZeroes)
+     {
+         Histogram histogram = 
Metrics.histogram(factory.createMetricName(name), 
aliasFactory.createMetricName(name), considerZeroes);
+         register(name, name, histogram);
+         return histogram;
+     }
  
      /**
 +     * Computes the compression ratio for the specified SSTables
 +     *
 +     * @param sstables the SSTables
 +     * @return the compression ratio for the specified SSTables
 +     */
 +    private static Double computeCompressionRatio(Iterable<SSTableReader> 
sstables)
 +    {
 +        double compressedLengthSum = 0;
 +        double dataLengthSum = 0;
 +        for (SSTableReader sstable : sstables)
 +        {
 +            if (sstable.compression)
 +            {
 +                // We should not have any sstable which are in an open early 
mode as the sstable were selected
 +                // using SSTableSet.CANONICAL.
 +                assert sstable.openReason != SSTableReader.OpenReason.EARLY;
 +
 +                CompressionMetadata compressionMetadata = 
sstable.getCompressionMetadata();
 +                compressedLengthSum += 
compressionMetadata.compressedFileLength;
 +                dataLengthSum += compressionMetadata.dataLength;
 +            }
 +        }
 +        return dataLengthSum != 0 ? compressedLengthSum / dataLengthSum : 
MetadataCollector.NO_COMPRESSION_RATIO;
 +    }
 +
 +    /**
       * Create a histogram-like interface that will register both a CF, 
keyspace and global level
       * histogram and forward any updates to both
       */
diff --cc src/java/org/apache/cassandra/service/DataResolver.java
index 3a2d54d,1d0bb47..29fa003
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@@ -52,14 -49,13 +52,14 @@@ public class DataResolver extends Respo
          Boolean.getBoolean("cassandra.drop_oversized_readrepair_mutations");
  
      @VisibleForTesting
-     final List<AsyncOneResponse> repairResults = 
Collections.synchronizedList(new ArrayList<>());
+     final List<AsyncOneResponse<?>> repairResults = 
Collections.synchronizedList(new ArrayList<>());
 -
 +    private final long queryStartNanoTime;
      private final boolean enforceStrictLiveness;
  
 -    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel 
consistency, int maxResponseCount)
 +    DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel 
consistency, int maxResponseCount, long queryStartNanoTime)
      {
          super(keyspace, command, consistency, maxResponseCount);
 +        this.queryStartNanoTime = queryStartNanoTime;
          this.enforceStrictLiveness = 
command.metadata().enforceStrictLiveness();
      }
  
@@@ -185,7 -171,14 +185,15 @@@
          // We need separate contexts, as each context has his own counter
          ResolveContext firstPhaseContext = new ResolveContext(count);
          ResolveContext secondPhaseContext = new ResolveContext(count);
-         ReplicaFilteringProtection rfp = new 
ReplicaFilteringProtection(keyspace, command, consistency, queryStartNanoTime, 
firstPhaseContext.sources);
+ 
+         ReplicaFilteringProtection rfp = new 
ReplicaFilteringProtection(keyspace,
+                                                                         
command,
+                                                                         
consistency,
++                                                                        
queryStartNanoTime,
+                                                                         
firstPhaseContext.sources,
+                                                                         
DatabaseDescriptor.getCachedReplicaRowsWarnThreshold(),
+                                                                         
DatabaseDescriptor.getCachedReplicaRowsFailThreshold());
+ 
          PartitionIterator firstPhasePartitions = 
resolveInternal(firstPhaseContext,
                                                                   
rfp.mergeController(),
                                                                   i -> 
shortReadProtectedResponse(i, firstPhaseContext),
@@@ -630,8 -604,11 +631,12 @@@
          DataLimits.Counter singleResultCounter =
              command.limits().newCounter(command.nowInSec(), false, 
command.selectsFullPartition(), enforceStrictLiveness).onlyCount();
  
-         ShortReadPartitionsProtection protection =
-             new ShortReadPartitionsProtection(source, singleResultCounter, 
mergedResultCounter, queryStartNanoTime);
+         // The pre-fetch callback used here makes the initial round of 
responses for this replica collectable.
+         ShortReadPartitionsProtection protection = new 
ShortReadPartitionsProtection(context.sources[i],
+                                                                               
       () -> responses.clearUnsafe(i),
+                                                                               
       singleResultCounter,
 -                                                                              
       context.mergedResultCounter);
++                                                                              
       context.mergedResultCounter,
++                                                                              
       queryStartNanoTime);
  
          /*
           * The order of extention and transformations is important here. 
Extending with more partitions has to happen
@@@ -676,17 -654,15 +682,19 @@@
  
          private boolean partitionsFetched; // whether we've seen any new 
partitions since iteration start or last moreContents() call
  
 +        private final long queryStartNanoTime;
 +
          private ShortReadPartitionsProtection(InetAddress source,
+                                               Runnable preFetchCallback,
                                                DataLimits.Counter 
singleResultCounter,
 -                                              DataLimits.Counter 
mergedResultCounter)
 +                                              DataLimits.Counter 
mergedResultCounter,
 +                                              long queryStartNanoTime)
          {
              this.source = source;
+             this.preFetchCallback = preFetchCallback;
              this.singleResultCounter = singleResultCounter;
              this.mergedResultCounter = mergedResultCounter;
 +            this.queryStartNanoTime = queryStartNanoTime;
          }
  
          @Override
diff --cc src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
index 428c603,0a57e66..f764439
--- a/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
+++ b/src/java/org/apache/cassandra/service/ReplicaFilteringProtection.java
@@@ -87,35 -94,36 +95,38 @@@ class ReplicaFilteringProtectio
      private final InetAddress[] sources;
      private final TableMetrics tableMetrics;
  
-     /**
-      * Per-source primary keys of the rows that might be outdated so they 
need to be fetched.
-      * For outdated static rows we use an empty builder to signal it has to 
be queried.
-      */
-     private final List<SortedMap<DecoratedKey, BTreeSet.Builder<Clustering>>> 
rowsToFetch;
+     private final int cachedRowsWarnThreshold;
+     private final int cachedRowsFailThreshold;
 -    
++
+     /** Tracks whether or not we've already hit the warning threshold while 
evaluating a partition. */
+     private boolean hitWarningThreshold = false;
+ 
+     private int currentRowsCached = 0; // tracks the current number of cached 
rows
+     private int maxRowsCached = 0; // tracks the high watermark for the 
number of cached rows
  
      /**
-      * Per-source list of all the partitions seen by the merge listener, to 
be merged with the extra fetched rows.
+      * Per-source list of the pending partitions seen by the merge listener, 
to be merged with the extra fetched rows.
       */
-     private final List<List<PartitionBuilder>> originalPartitions;
+     private final List<Queue<PartitionBuilder>> originalPartitions;
  
      ReplicaFilteringProtection(Keyspace keyspace,
                                 ReadCommand command,
                                 ConsistencyLevel consistency,
 +                               long queryStartNanoTime,
-                                InetAddress[] sources)
+                                InetAddress[] sources,
+                                int cachedRowsWarnThreshold,
+                                int cachedRowsFailThreshold)
      {
          this.keyspace = keyspace;
          this.command = command;
          this.consistency = consistency;
 +        this.queryStartNanoTime = queryStartNanoTime;
          this.sources = sources;
-         this.rowsToFetch = new ArrayList<>(sources.length);
          this.originalPartitions = new ArrayList<>(sources.length);
  
-         for (InetAddress ignored : sources)
+         for (int i = 0; i < sources.length; i++)
          {
-             rowsToFetch.add(new TreeMap<>());
-             originalPartitions.add(new ArrayList<>());
+             originalPartitions.add(new ArrayDeque<>());
          }
  
          tableMetrics = ColumnFamilyStore.metricsFor(command.metadata().cfId);
@@@ -220,78 -161,119 +164,119 @@@
       */
      UnfilteredPartitionIterators.MergeListener mergeController()
      {
-         return (partitionKey, versions) -> {
+         return new UnfilteredPartitionIterators.MergeListener()
+         {
+             @Override
+             public void close()
+             {
+                 // If we hit the failure threshold before consuming a single 
partition, record the current rows cached.
+                 
tableMetrics.rfpRowsCachedPerQuery.update(Math.max(currentRowsCached, 
maxRowsCached));
+             }
  
-             PartitionBuilder[] builders = new 
PartitionBuilder[sources.length];
+             @Override
+             public UnfilteredRowIterators.MergeListener 
getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> 
versions)
+             {
+                 PartitionBuilder[] builders = new 
PartitionBuilder[sources.length];
+                 PartitionColumns columns = columns(versions);
+                 EncodingStats stats = EncodingStats.merge(versions, 
NULL_TO_NO_STATS);
 -                
 +
-             for (int i = 0; i < sources.length; i++)
-                 builders[i] = new PartitionBuilder(partitionKey, 
columns(versions), stats(versions));
+                 for (int i = 0; i < sources.length; i++)
+                     builders[i] = new PartitionBuilder(partitionKey, 
sources[i], columns, stats);
  
-             return new UnfilteredRowIterators.MergeListener()
-             {
-                 @Override
-                 public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
+                 return new UnfilteredRowIterators.MergeListener()
                  {
-                     // cache the deletion time versions to be able to 
regenerate the original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].setDeletionTime(versions[i]);
-                 }
+                     @Override
+                     public void onMergedPartitionLevelDeletion(DeletionTime 
mergedDeletion, DeletionTime[] versions)
+                     {
+                         // cache the deletion time versions to be able to 
regenerate the original row iterator
+                         for (int i = 0; i < versions.length; i++)
+                             builders[i].setDeletionTime(versions[i]);
+                     }
  
-                 @Override
-                 public Row onMergedRows(Row merged, Row[] versions)
-                 {
-                     // cache the row versions to be able to regenerate the 
original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].addRow(versions[i]);
+                     @Override
+                     public Row onMergedRows(Row merged, Row[] versions)
+                     {
+                         // cache the row versions to be able to regenerate 
the original row iterator
+                         for (int i = 0; i < versions.length; i++)
+                             builders[i].addRow(versions[i]);
  
-                     if (merged.isEmpty())
-                         return merged;
+                         if (merged.isEmpty())
+                             return merged;
  
-                     boolean isPotentiallyOutdated = false;
-                     boolean isStatic = merged.isStatic();
-                     for (int i = 0; i < versions.length; i++)
-                     {
-                         Row version = versions[i];
-                         if (version == null || (isStatic && 
version.isEmpty()))
+                         boolean isPotentiallyOutdated = false;
+                         boolean isStatic = merged.isStatic();
+                         for (int i = 0; i < versions.length; i++)
                          {
-                             isPotentiallyOutdated = true;
-                             BTreeSet.Builder<Clustering> toFetch = 
getOrCreateToFetch(i, partitionKey);
-                             // Note that for static, we shouldn't add the 
clustering to the clustering set (the
-                             // ClusteringIndexNamesFilter we'll build from 
this later does not expect it), but the fact
-                             // we created a builder in the first place will 
act as a marker that the static row must be
-                             // fetched, even if no other rows are added for 
this partition.
-                             if (!isStatic)
-                                 toFetch.add(merged.clustering());
+                             Row version = versions[i];
+                             if (version == null || (isStatic && 
version.isEmpty()))
+                             {
+                                 isPotentiallyOutdated = true;
+                                 builders[i].addToFetch(merged);
+                             }
                          }
-                     }
  
-                     // If the row is potentially outdated (because some 
replica didn't send anything and so it _may_ be
-                     // an outdated result that is only present because other 
replica have filtered the up-to-date result
-                     // out), then we skip the row. In other words, the 
results of the initial merging of results by this
-                     // protection assume the worst case scenario where every 
row that might be outdated actually is.
-                     // This ensures that during this first phase (collecting 
additional row to fetch) we are guaranteed
-                     // to look at enough data to ultimately fulfill the query 
limit.
-                     return isPotentiallyOutdated ? null : merged;
-                 }
+                         // If the row is potentially outdated (because some 
replica didn't send anything and so it _may_ be
+                         // an outdated result that is only present because 
other replica have filtered the up-to-date result
+                         // out), then we skip the row. In other words, the 
results of the initial merging of results by this
+                         // protection assume the worst case scenario where 
every row that might be outdated actually is.
+                         // This ensures that during this first phase 
(collecting additional row to fetch) we are guaranteed
+                         // to look at enough data to ultimately fulfill the 
query limit.
+                         return isPotentiallyOutdated ? null : merged;
+                     }
  
-                 @Override
-                 public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
-                 {
-                     // cache the marker versions to be able to regenerate the 
original row iterator
-                     for (int i = 0; i < versions.length; i++)
-                         builders[i].addRangeTombstoneMarker(versions[i]);
-                 }
+                     @Override
+                     public void 
onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, 
RangeTombstoneMarker[] versions)
+                     {
+                         // cache the marker versions to be able to regenerate 
the original row iterator
+                         for (int i = 0; i < versions.length; i++)
+                             builders[i].addRangeTombstoneMarker(versions[i]);
+                     }
  
-                 @Override
-                 public void close()
-                 {
-                     for (int i = 0; i < sources.length; i++)
-                         originalPartitions.get(i).add(builders[i]);
-                 }
-             };
+                     @Override
+                     public void close()
+                     {
+                         for (int i = 0; i < sources.length; i++)
+                             originalPartitions.get(i).add(builders[i]);
+                     }
+                 };
+             }
          };
      }
  
+     private void incrementCachedRows()
+     {
+         currentRowsCached++;
 -        
++
+         if (currentRowsCached == cachedRowsFailThreshold + 1)
+         {
+             String message = String.format("Replica filtering protection has 
cached over %d rows during query %s. " +
+                                            "(See 
'cached_replica_rows_fail_threshold' in cassandra.yaml.)",
+                                            cachedRowsFailThreshold, 
command.toCQLString());
+ 
+             logger.error(message);
+             Tracing.trace(message);
+             throw new OverloadedException(message);
+         }
+         else if (currentRowsCached == cachedRowsWarnThreshold + 1 && 
!hitWarningThreshold)
+         {
+             hitWarningThreshold = true;
 -            
++
+             String message = String.format("Replica filtering protection has 
cached over %d rows during query %s. " +
+                                            "(See 
'cached_replica_rows_warn_threshold' in cassandra.yaml.)",
+                                            cachedRowsWarnThreshold, 
command.toCQLString());
+ 
+             ClientWarn.instance.warn(message);
+             oneMinuteLogger.warn(message);
+             Tracing.trace(message);
+         }
+     }
+ 
+     private void releaseCachedRows(int count)
+     {
+         maxRowsCached = Math.max(maxRowsCached, currentRowsCached);
+         currentRowsCached -= count;
+     }
+ 
      private static PartitionColumns columns(List<UnfilteredRowIterator> 
versions)
      {
          Columns statics = Columns.NONE;
@@@ -348,7 -322,15 +325,15 @@@
              @Override
              public boolean hasNext()
              {
-                 return iterator.hasNext();
+                 // If there are no cached partition builders for this source, 
advance the first phase iterator, which
+                 // will force the RFP merge listener to load at least the 
next protected partition. Note that this may
+                 // load more than one partition if any divergence between 
replicas is discovered by the merge listener.
+                 if (partitions.isEmpty())
+                 {
+                     PartitionIterators.consumeNext(merged);
+                 }
 -                
++
+                 return !partitions.isEmpty();
              }
  
              @Override
@@@ -464,5 -469,66 +472,66 @@@
                  }
              };
          }
+ 
+         private UnfilteredRowIterator protectedPartition()
+         {
+             UnfilteredRowIterator original = originalPartition();
+ 
+             if (toFetch != null)
+             {
+                 try (UnfilteredPartitionIterator partitions = 
fetchFromSource())
+                 {
+                     if (partitions.hasNext())
+                     {
+                         try (UnfilteredRowIterator fetchedRows = 
partitions.next())
+                         {
+                             return 
UnfilteredRowIterators.merge(Arrays.asList(original, fetchedRows), 
command.nowInSec());
+                         }
+                     }
+                 }
+             }
+ 
+             return original;
+         }
+ 
+         private UnfilteredPartitionIterator fetchFromSource()
+         {
+             assert toFetch != null;
+ 
+             NavigableSet<Clustering> clusterings = toFetch.build();
+             tableMetrics.replicaFilteringProtectionRequests.mark();
 -            
++
+             if (logger.isTraceEnabled())
+                 logger.trace("Requesting rows {} in partition {} from {} for 
replica filtering protection",
+                              clusterings, key, source);
 -            
++
+             Tracing.trace("Requesting {} rows in partition {} from {} for 
replica filtering protection",
+                           clusterings.size(), key, source);
+ 
+             // build the read command taking into account that we could be 
requesting only in the static row
+             DataLimits limits = clusterings.isEmpty() ? 
DataLimits.cqlLimits(1) : DataLimits.NONE;
+             ClusteringIndexFilter filter = new 
ClusteringIndexNamesFilter(clusterings, command.isReversed());
+             SinglePartitionReadCommand cmd = 
SinglePartitionReadCommand.create(command.metadata(),
+                                                                               
 command.nowInSec(),
+                                                                               
 command.columnFilter(),
+                                                                               
 RowFilter.NONE,
+                                                                               
 limits,
+                                                                               
 key,
+                                                                               
 filter);
+             try
+             {
+                 return executeReadCommand(cmd, source);
+             }
+             catch (ReadTimeoutException e)
+             {
+                 int blockFor = consistency.blockFor(keyspace);
+                 throw new ReadTimeoutException(consistency, blockFor - 1, 
blockFor, true);
+             }
+             catch (UnavailableException e)
+             {
+                 int blockFor = consistency.blockFor(keyspace);
+                 throw new UnavailableException(consistency, blockFor, 
blockFor - 1);
+             }
+         }
      }
  }
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
index 329fa37,51a08e6..c449fdd
--- a/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Coordinator.java
@@@ -39,9 -39,10 +39,10 @@@ import org.apache.cassandra.distributed
  import org.apache.cassandra.distributed.api.QueryResults;
  import org.apache.cassandra.distributed.api.SimpleQueryResult;
  import org.apache.cassandra.service.ClientState;
+ import org.apache.cassandra.service.ClientWarn;
  import org.apache.cassandra.service.QueryState;
  import org.apache.cassandra.service.pager.QueryPager;
 -import org.apache.cassandra.transport.Server;
 +import org.apache.cassandra.transport.ProtocolVersion;
  import org.apache.cassandra.tracing.Tracing;
  import org.apache.cassandra.transport.messages.ResultMessage;
  import org.apache.cassandra.utils.ByteBufferUtil;
@@@ -98,9 -104,12 +104,13 @@@ public class Coordinator implements ICo
                                                                   
Integer.MAX_VALUE,
                                                                   null,
                                                                   null,
 -                                                                 
Server.CURRENT_VERSION));
 +                                                                 
ProtocolVersion.CURRENT),
 +                                             System.nanoTime());
  
+         // Collect warnings reported during the query.
+         if (res != null)
+             res.setWarnings(ClientWarn.instance.getWarnings());
+ 
          return RowUtil.toQueryResult(res);
      }
  


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to