This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 494c3c7bef Add AccordExecutorMetrics Also Introduce:  - 
Sharded/LogLinearDecayingHistogram Also Improve:  - Do not take a reference to 
CFK unless relevant Also Fix:  - Sharded/LogLinearHistogram  - ExecuteFlags 
serialization bug in ReadData
494c3c7bef is described below

commit 494c3c7befeb291a0bf1d36dd9b958da032a1e43
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Nov 11 10:54:04 2025 +0000

    Add AccordExecutorMetrics
    Also Introduce:
     - Sharded/LogLinearDecayingHistogram
    Also Improve:
     - Do not take a reference to CFK unless relevant
    Also Fix:
     - Sharded/LogLinearHistogram
     - ExecuteFlags serialization bug in ReadData
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21017
---
 modules/accord                                     |   2 +-
 .../cassandra/metrics/AccordCacheMetrics.java      |   4 +-
 .../metrics/AccordCoordinatorMetrics.java          |   5 +-
 .../cassandra/metrics/AccordExecutorMetrics.java   |  61 ++++
 .../cassandra/metrics/AccordReplicaMetrics.java    | 123 +++++---
 .../metrics/CassandraMetricsRegistry.java          |   1 +
 .../metrics/LogLinearDecayingHistograms.java       | 283 +++++++++++++++++++
 .../cassandra/metrics/LogLinearHistogram.java      | 112 +++++---
 .../metrics/ShardedDecayingHistograms.java         | 159 +++++++++++
 .../apache/cassandra/metrics/ShardedHistogram.java |  13 -
 .../cassandra/metrics/ShardedLongGauges.java       | 148 ++++++++++
 .../cassandra/service/accord/AccordCache.java      |  35 ++-
 .../service/accord/AccordCommandStore.java         |   2 +
 .../cassandra/service/accord/AccordExecutor.java   |  37 +++
 .../accord/AccordJournalValueSerializers.java      |   2 -
 .../service/accord/AccordSafeCommandStore.java     |  14 +
 .../cassandra/service/accord/AccordTask.java       |  48 +++-
 .../accord/serializers/ReadDataSerializer.java     |  19 +-
 .../distributed/test/accord/AccordMetricsTest.java |   4 +-
 .../metrics/LogLinearDecayingHistogramTest.java    | 188 +++++++++++++
 .../cassandra/metrics/LogLinearHistogramTest.java  | 313 +++++++++++++++++++++
 .../service/accord/AccordCommandTest.java          |   1 -
 22 files changed, 1452 insertions(+), 122 deletions(-)

diff --git a/modules/accord b/modules/accord
index de03d14e48..938ba19ada 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit de03d14e4821b669fecad5dcdb2de2dba3a9d926
+Subproject commit 938ba19adaf70bf9901fb2dc177dce313d7ee15c
diff --git a/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
index 5285bd914e..10fbfd6621 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
@@ -44,7 +44,7 @@ public class AccordCacheMetrics
 
         public AccordCacheGlobalMetrics()
         {
-            DefaultNameFactory factory = new DefaultNameFactory("AccordCache");
+            DefaultNameFactory factory = new DefaultNameFactory(ACCORD_CACHE);
             this.usedBytes = 
Metrics.gauge(factory.createMetricName("UsedBytes"), 
fromAccordService(sumExecutors(executor -> 
executor.cacheUnsafe().weightedSize()), 0L));
             this.unreferencedBytes = 
Metrics.gauge(factory.createMetricName("UnreferencedBytes"), 
fromAccordService(sumExecutors(executor -> 
executor.cacheUnsafe().unreferencedBytes()), 0L));
         }
@@ -89,7 +89,7 @@ public class AccordCacheMetrics
 
     public AccordCacheMetrics(String subTypeName)
     {
-        DefaultNameFactory factory = new DefaultNameFactory("AccordCache", 
subTypeName);
+        DefaultNameFactory factory = new DefaultNameFactory(ACCORD_CACHE, 
subTypeName);
         this.objectSize = 
Metrics.shardedHistogram(factory.createMetricName("EntrySize"));
         this.hits = Metrics.gauge(factory.createMetricName("Hits"), 
hitRate::totalHits);
         this.misses = Metrics.gauge(factory.createMetricName("Misses"), 
hitRate::totalMisses);
diff --git 
a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
index c4a37ebf2e..1500517bcb 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
@@ -42,6 +42,7 @@ public class AccordCoordinatorMetrics
 {
     public final static AccordCoordinatorMetrics readMetrics = new 
AccordCoordinatorMetrics("ro");
     public final static AccordCoordinatorMetrics writeMetrics = new 
AccordCoordinatorMetrics("rw");
+    public final static AccordCoordinatorMetrics syncPointMetrics = new 
AccordCoordinatorMetrics("rx");
 
     public static final String ACCORD_COORDINATOR = "AccordCoordinator";
     public static final String COORDINATOR_EPOCHS = "Epochs";
@@ -192,7 +193,7 @@ public class AccordCoordinatorMetrics
         {
             throw new RuntimeException(e);
         }
-        builder.append("]");
+        builder.append(']');
         return builder.toString();
     }
 
@@ -206,6 +207,8 @@ public class AccordCoordinatorMetrics
                     return writeMetrics;
                 else if (txnId.isSomeRead())
                     return readMetrics;
+                else if (txnId.isSyncPoint())
+                    return syncPointMetrics;
             }
             return null;
         }
diff --git a/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
new file mode 100644
index 0000000000..c916c3239a
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Gauge;
+import 
org.apache.cassandra.metrics.ShardedDecayingHistograms.ShardedDecayingHistogram;
+import org.apache.cassandra.service.accord.AccordExecutor;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import static org.apache.cassandra.service.accord.AccordExecutor.HISTOGRAMS;
+
+public class AccordExecutorMetrics
+{
+    public static final String ACCORD_EXECUTOR = "AccordExecutor";
+    public static final AccordExecutorMetrics INSTANCE = new 
AccordExecutorMetrics();
+
+    public final ShardedLongGauges<AccordExecutor> gauges = new 
ShardedLongGauges<>();
+
+    // latency
+    public final ShardedDecayingHistogram elapsedPreparingToRun = 
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
+    public final ShardedDecayingHistogram elapsedWaitingToRun = 
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
+    public final ShardedDecayingHistogram elapsedRunning = 
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
+
+    // number of keys involved
+    public final ShardedDecayingHistogram keys = HISTOGRAMS.newHistogram(1 << 
12);
+
+    public final Gauge<Long> preparingToRun;
+    public final Gauge<Long> waitingToRun;
+    public final Gauge<Long> running;
+
+    public AccordExecutorMetrics()
+    {
+        DefaultNameFactory factory = new DefaultNameFactory(ACCORD_EXECUTOR);
+        Metrics.register(factory.createMetricName("ElapsedPreparingToRun"), 
elapsedPreparingToRun);
+        Metrics.register(factory.createMetricName("ElapsedWaitingToRun"), 
elapsedWaitingToRun);
+        Metrics.register(factory.createMetricName("ElapsedRunning"), 
elapsedRunning);
+
+        Metrics.register(factory.createMetricName("Keys"), keys);
+        preparingToRun = 
Metrics.register(factory.createMetricName("PreparingToRun"), 
gauges.newGauge(AccordExecutor::unsafePreparingToRunCount, Long::sum));
+        waitingToRun = 
Metrics.register(factory.createMetricName("WaitingToRun"), 
gauges.newGauge(AccordExecutor::unsafeWaitingToRunCount, Long::sum));
+        running = Metrics.register(factory.createMetricName("Running"), 
gauges.newGauge(AccordExecutor::unsafeRunningCount, Long::sum));
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java 
b/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
index 91973e4a95..8461789771 100644
--- a/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
@@ -25,63 +25,85 @@ import accord.api.ReplicaEventListener;
 import accord.local.Command;
 import accord.local.SafeCommandStore;
 import accord.primitives.PartialDeps;
-import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import com.codahale.metrics.Counting;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Timer;
-import org.apache.cassandra.service.accord.api.AccordTimeService;
+import 
org.apache.cassandra.metrics.LogLinearDecayingHistograms.LogLinearDecayingHistogram;
+import 
org.apache.cassandra.metrics.ShardedDecayingHistograms.DecayingHistogramsShard;
+import 
org.apache.cassandra.metrics.ShardedDecayingHistograms.ShardedDecayingHistogram;
+import org.apache.cassandra.service.accord.AccordCommandStore;
+import org.apache.cassandra.service.accord.AccordSafeCommandStore;
 import org.apache.cassandra.tracing.Tracing;
 
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import static org.apache.cassandra.service.accord.AccordExecutor.HISTOGRAMS;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 
 public class AccordReplicaMetrics
 {
     public final static AccordReplicaMetrics readMetrics = new 
AccordReplicaMetrics("ro");
     public final static AccordReplicaMetrics writeMetrics = new 
AccordReplicaMetrics("rw");
+    public final static AccordReplicaMetrics syncPointMetrics = new 
AccordReplicaMetrics("rx");
 
     public static final String ACCORD_REPLICA = "AccordReplica";
     public static final String REPLICA_STABLE_LATENCY = "StableLatency";
     public static final String REPLICA_PREAPPLY_LATENCY = "PreApplyLatency";
     public static final String REPLICA_APPLY_LATENCY = "ApplyLatency";
-    public static final String REPLICA_APPLY_DURATION = "ApplyDuration";
     public static final String REPLICA_DEPENDENCIES = "Dependencies";
 
+    static final class SubShard
+    {
+        final LogLinearDecayingHistogram stableLatency;
+        final LogLinearDecayingHistogram preapplyLatency;
+        final LogLinearDecayingHistogram applyLatency;
+        final LogLinearDecayingHistogram dependencies;
+
+        private SubShard(AccordReplicaMetrics metrics, DecayingHistogramsShard 
shard)
+        {
+            this.stableLatency = metrics.stableLatency.forShard(shard);
+            this.preapplyLatency = metrics.preapplyLatency.forShard(shard);
+            this.applyLatency = metrics.applyLatency.forShard(shard);
+            this.dependencies = metrics.dependencies.forShard(shard);
+        }
+    }
+
+    public static final class Shard
+    {
+        final SubShard reads, writes, syncPoints;
+        public Shard(DecayingHistogramsShard shard)
+        {
+            reads = new SubShard(readMetrics, shard);
+            writes = new SubShard(writeMetrics, shard);
+            syncPoints = new SubShard(syncPointMetrics, shard);
+        }
+    }
+
     /**
      * The time between start on the coordinator and commit on this replica.
      */
-    public final Timer stableLatency;
+    public final ShardedDecayingHistogram stableLatency = 
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
 
     /**
      * The time between start on the coordinator and arrival of the result on 
this replica.
      */
-    public final Timer preapplyLatency;
+    public final ShardedDecayingHistogram preapplyLatency = 
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
 
     /**
      * The time between start on the coordinator and application on this 
replica.
      */
-    public final Timer applyLatency;
-
-    /**
-     * TODO (expected): probably more interesting is latency from preapplied 
to apply;
-     *  we already track local write latencies, whch this effectively 
duplicates (but including queueing latencies)
-     * Duration of applying changes.
-     */
-    public final Timer applyDuration;
+    public final ShardedDecayingHistogram applyLatency = 
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
 
     /**
      * A histogram of the number of dependencies per transaction at this 
replica.
      */
-    public final Histogram dependencies;
+    public final ShardedDecayingHistogram dependencies = 
HISTOGRAMS.newHistogram(1 << 12);
 
     private AccordReplicaMetrics(String scope)
     {
         DefaultNameFactory replica = new DefaultNameFactory(ACCORD_REPLICA, 
scope);
-        stableLatency = 
Metrics.timer(replica.createMetricName(REPLICA_STABLE_LATENCY));
-        preapplyLatency = 
Metrics.timer(replica.createMetricName(REPLICA_PREAPPLY_LATENCY));
-        applyLatency = 
Metrics.timer(replica.createMetricName(REPLICA_APPLY_LATENCY));
-        applyDuration = 
Metrics.timer(replica.createMetricName(REPLICA_APPLY_DURATION));
-        dependencies = 
Metrics.histogram(replica.createMetricName(REPLICA_DEPENDENCIES), true);
+        Metrics.register(replica.createMetricName(REPLICA_STABLE_LATENCY), 
stableLatency);
+        Metrics.register(replica.createMetricName(REPLICA_PREAPPLY_LATENCY), 
preapplyLatency);
+        Metrics.register(replica.createMetricName(REPLICA_APPLY_LATENCY), 
applyLatency);
+        Metrics.register(replica.createMetricName(REPLICA_DEPENDENCIES), 
dependencies);
     }
 
     @Override
@@ -106,64 +128,79 @@ public class AccordReplicaMetrics
         {
             throw new RuntimeException(e);
         }
-        builder.append("]");
+        builder.append(']');
         return builder.toString();
     }
 
     public static class Listener implements ReplicaEventListener
     {
-        private AccordReplicaMetrics forTransaction(TxnId txnId)
+        private SubShard forTransaction(SafeCommandStore safeStore, TxnId 
txnId)
         {
             if (txnId != null)
             {
+                Shard shard = ((AccordCommandStore) 
safeStore.commandStore()).executor().replicaMetrics;
                 if (txnId.isWrite())
-                    return writeMetrics;
+                    return shard.writes;
                 else if (txnId.isSomeRead())
-                    return readMetrics;
+                    return shard.reads;
+                else if (txnId.isSyncPoint())
+                    return shard.syncPoints;
             }
             return null;
         }
 
+        private static long unixNanos()
+        {
+            return currentTimeMillis() * 1_000_000;
+        }
+
+        private static long elapsed(TxnId txnId)
+        {
+            return elapsed(unixNanos(), txnId);
+        }
+
+        private static long elapsed(long unixNanos, TxnId txnId)
+        {
+            return Math.max(0, unixNanos - (txnId.hlc() * 1000));
+        }
+
+        private static LogLinearDecayingHistograms.Buffer 
buffer(SafeCommandStore safeStore)
+        {
+            return ((AccordSafeCommandStore) safeStore).histogramBuffer();
+        }
+
         @Override
         public void onStable(SafeCommandStore safeStore, Command cmd)
         {
             Tracing.trace("Stable {} on {}", cmd.txnId(), 
safeStore.commandStore());
-            long now = AccordTimeService.nowMicros();
-            AccordReplicaMetrics metrics = forTransaction(cmd.txnId());
+            SubShard metrics = forTransaction(safeStore, cmd.txnId());
             if (metrics != null)
-            {
-                long trxTimestamp = cmd.txnId().hlc();
-                metrics.stableLatency.update(now - trxTimestamp, 
TimeUnit.MICROSECONDS);
-            }
+                metrics.stableLatency.add(buffer(safeStore), 
elapsed(cmd.txnId()));
         }
 
         @Override
         public void onPreApplied(SafeCommandStore safeStore, Command cmd)
         {
             Tracing.trace("Preapplied {} on {}", cmd.txnId(), 
safeStore.commandStore());
-            long now = AccordTimeService.nowMicros();
-            AccordReplicaMetrics metrics = forTransaction(cmd.txnId());
+            SubShard metrics = forTransaction(safeStore, cmd.txnId());
             if (metrics != null)
             {
-                Timestamp trxTimestamp = cmd.txnId();
-                metrics.preapplyLatency.update(now - trxTimestamp.hlc(), 
TimeUnit.MICROSECONDS);
+                long elapsed = elapsed(cmd.txnId());
+                metrics.preapplyLatency.add(buffer(safeStore), elapsed);
                 PartialDeps deps = cmd.partialDeps();
-                metrics.dependencies.update(deps != null ? deps.txnIdCount() : 
0);
+                metrics.dependencies.add(buffer(safeStore), deps != null ? 
deps.txnIdCount() : 0);
             }
         }
 
         @Override
-        public void onApplied(SafeCommandStore safeStore, Command cmd, long 
applyStartedAt)
+        public void onApplied(SafeCommandStore safeStore, Command cmd)
         {
             Tracing.trace("Applied {} on {}", cmd.txnId(), 
safeStore.commandStore());
-            long now = AccordTimeService.nowMicros();
-            AccordReplicaMetrics metrics = forTransaction(cmd.txnId());
+            SubShard metrics = forTransaction(safeStore, cmd.txnId());
             if (metrics != null)
             {
-                Timestamp trxTimestamp = cmd.txnId();
-                metrics.applyLatency.update(now - trxTimestamp.hlc(), 
TimeUnit.MICROSECONDS);
-                if (applyStartedAt > 0)
-                    metrics.applyDuration.update(now - applyStartedAt, 
TimeUnit.MICROSECONDS);
+                long now = unixNanos();
+                metrics.applyLatency.add(buffer(safeStore), elapsed(now, 
cmd.txnId()));
             }
         }
     }
diff --git 
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java 
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 756144c84d..fc50797005 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -118,6 +118,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
                                    
.add(AccordCoordinatorMetrics.ACCORD_COORDINATOR)
                                    .add(AccordCacheMetrics.ACCORD_CACHE)
                                    .add(AccordReplicaMetrics.ACCORD_REPLICA)
+                                   .add(AccordExecutorMetrics.ACCORD_EXECUTOR)
                                    .add(AccordSystemMetrics.ACCORD_SYSTEM)
                                    .add(BatchMetrics.TYPE_NAME)
                                    .add(BufferPoolMetrics.TYPE_NAME)
diff --git 
a/src/java/org/apache/cassandra/metrics/LogLinearDecayingHistograms.java 
b/src/java/org/apache/cassandra/metrics/LogLinearDecayingHistograms.java
new file mode 100644
index 0000000000..3d0279c0ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/LogLinearDecayingHistograms.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.metrics.LogLinearHistogram.LogLinearSnapshot;
+
+import static 
org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir.MEAN_LIFETIME_IN_S;
+import static org.apache.cassandra.metrics.LogLinearHistogram.MAX_INDEX;
+import static org.apache.cassandra.metrics.LogLinearHistogram.index;
+import static org.apache.cassandra.metrics.LogLinearHistogram.invertIndex;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+/**
+ * A simple single-threaded decaying histogram with log-linear bucket 
distribution.
+ * This has approximately the same accuracy as the lg 1.2 growth of 
EstimatedHistogram, but is simpler and faster.
+ *
+ * Logically very similar to DecayingEstimatedHistogramReservoir, only due to 
single-threaded behaviour this can be
+ * performed more simply and efficiently. We project forward single updates, 
and periodically rebase in place.
+ * We also share the costly arithmetic across a group of histograms that share 
mutual exclusivity.
+ *
+ * TODO (desired): improve performance and memory locality by using a small 
buffer for collecting updates with e.g. 4 bits per counter,
+ */
+public class LogLinearDecayingHistograms
+{
+    public static class Buffer
+    {
+        static final long LARGE_VALUE = -1L >>> (1 + HISTOGRAM_INDEX_BITS);
+        private final LogLinearDecayingHistograms histograms;
+        private long[] buffer = new long[2];
+        private int bufferCount;
+
+        public Buffer(LogLinearDecayingHistograms histograms)
+        {
+            this.histograms = histograms;
+        }
+
+        private void add(long histogramIndex, long value)
+        {
+            Invariants.require(histogramIndex <= HISTOGRAM_INDEX_MASK);
+            Invariants.require(value >= 0);
+            if (value <= LARGE_VALUE) value <<= HISTOGRAM_INDEX_BITS;
+            else
+            {
+                value |= Long.MIN_VALUE;
+                value &= ~HISTOGRAM_INDEX_MASK;
+            }
+            value |= histogramIndex;
+            if (bufferCount == buffer.length)
+                buffer = Arrays.copyOf(buffer, bufferCount * 2);
+            buffer[bufferCount++] = value;
+        }
+
+        public void flush(long at)
+        {
+            for (int i = 0 ; i < bufferCount ; ++i)
+            {
+                long v = buffer[i];
+                int histogramIndex = (int) (v & HISTOGRAM_INDEX_MASK);
+                if (v < 0) v &= ~Long.MIN_VALUE & ~HISTOGRAM_INDEX_MASK;
+                else v >>>= HISTOGRAM_INDEX_BITS;
+                histograms.histograms.get(histogramIndex).increment(v, at);
+            }
+            bufferCount = 0;
+        }
+
+        public boolean isEmpty()
+        {
+            return bufferCount == 0;
+        }
+    }
+
+    public class LogLinearDecayingHistogram
+    {
+        final int histogramIndex;
+
+        private double[] buckets;
+        double totalCount;
+
+        private LogLinearDecayingHistogram(int histogramIndex, long 
expectedMaxValue)
+        {
+            this.histogramIndex = histogramIndex;
+            buckets = new 
double[LogLinearHistogram.bucketCount(expectedMaxValue)];
+        }
+
+        public void increment(long value)
+        {
+            increment(value, nanoTime());
+        }
+
+        public void increment(long value, long at)
+        {
+            int index = index(value);
+            if (bufferCount >= buffer.length)
+            {
+                Invariants.require(bufferCount == buffer.length);
+                flush();
+                Invariants.require(bufferCount == 0);
+            }
+            updateDecay(at);
+            long v = Double.doubleToRawLongBits(increment) & VALUE_MASK;
+            v |= histogramIndex | ((long)index << HISTOGRAM_INDEX_BITS);
+            buffer[bufferCount++] = v;
+        }
+
+        private double[] buckets(int withIndexAtLeast)
+        {
+            if (buckets.length <= withIndexAtLeast)
+            {
+                Invariants.require(withIndexAtLeast <= MAX_INDEX);
+                buckets = Arrays.copyOf(buckets, (withIndexAtLeast | 0x3) + 1);
+            }
+            return buckets;
+        }
+
+        public LogLinearSnapshot copyToSnapshot(long at)
+        {
+            LogLinearSnapshot snapshot = new LogLinearSnapshot(new 
long[buckets.length], 0);
+            updateSnapshot(snapshot, at);
+            return snapshot;
+        }
+
+        public void updateSnapshot(LogLinearSnapshot snapshot, long at)
+        {
+            decay(at);
+
+            if (snapshot.raw.length < buckets.length)
+                snapshot.raw = Arrays.copyOf(snapshot.raw, buckets.length);
+
+            long[] raw = snapshot.raw;
+            long total = 0;
+            for (int i = 0 ; i < buckets.length ; ++i)
+            {
+                long v = Math.round(buckets[i]);
+                raw[i] += v;
+                total += v;
+            }
+
+            snapshot.totalCount += total;
+            snapshot.cumulative = null;
+        }
+
+        public void add(Buffer buffer, long value)
+        {
+            Invariants.require(buffer.histograms == 
LogLinearDecayingHistograms.this);
+            buffer.add(histogramIndex, value);
+        }
+    }
+
+    private static final long ANTI_DECAY_REFRESH_RATE = 
TimeUnit.SECONDS.toNanos(1L);
+    private static final long DECAY_REFRESH_RATE = TimeUnit.HOURS.toNanos(1L);
+    private static final double DECAY_RATE = 1d/(TimeUnit.SECONDS.toNanos(1) * 
MEAN_LIFETIME_IN_S);
+    private static final int BUCKET_INDEX_BITS = 8;
+    private static final int HISTOGRAM_INDEX_BITS = 5;
+    private static final long HISTOGRAM_INDEX_MASK = (1L << 
HISTOGRAM_INDEX_BITS) - 1;
+    private static final long BUCKET_INDEX_MASK = (1L << BUCKET_INDEX_BITS) - 
1;
+    private static final long VALUE_MASK = ~(HISTOGRAM_INDEX_MASK | 
(BUCKET_INDEX_MASK << HISTOGRAM_INDEX_BITS));
+    private static final int BUFFER_SIZE = 64;
+
+    static
+    {
+        Invariants.require((1 << BUCKET_INDEX_BITS) > MAX_INDEX);
+    }
+
+    private final long[] buffer = new long[BUFFER_SIZE];
+    private int bufferCount;
+    private long lastDecayedAt, lastAntiDecayedAt;
+    private double increment = 1d;
+
+    List<LogLinearDecayingHistogram> histograms = new ArrayList<>();
+
+    void updateDecay(long now)
+    {
+        long delta = now - lastAntiDecayedAt;
+        if (delta <= ANTI_DECAY_REFRESH_RATE)
+            return;
+
+        if (delta < DECAY_REFRESH_RATE) antiDecay(now);
+        else decay(now);
+    }
+
+    private void antiDecay(long now)
+    {
+        increment = Math.exp((now - lastDecayedAt) * DECAY_RATE);
+    }
+
+    private void flush()
+    {
+        for (int i = 0 ; i < bufferCount ; ++i)
+        {
+            long bits = buffer[i];
+            double increment = Double.longBitsToDouble(bits & VALUE_MASK);
+            int histogramIndex = (int) (bits & HISTOGRAM_INDEX_MASK);
+            int bucketIndex = (int)((bits >>> HISTOGRAM_INDEX_BITS) & 
BUCKET_INDEX_MASK);
+            LogLinearDecayingHistogram histogram = 
histograms.get(histogramIndex);
+            histogram.buckets(bucketIndex)[bucketIndex] += increment;
+            histogram.totalCount += increment;
+        }
+        bufferCount = 0;
+    }
+
+    private void decay(long now)
+    {
+        flush();
+
+        if (now <= lastDecayedAt)
+            return;
+
+        antiDecay(now);
+        double decay = 1d/increment;
+        increment = 1d;
+        for (LogLinearDecayingHistogram histogram : histograms)
+        {
+            if (histogram.totalCount <= 0d)
+                continue;
+
+            double[] buckets = histogram.buckets;
+            double total = 0d;
+            for (int i = 0 ; i < buckets.length ; ++i)
+            {
+                if (buckets[i] <= 0d)
+                    continue;
+
+                double in = buckets[i];
+                double out = in * decay;
+                if (out < 0.5d)
+                    out = 0d;
+                buckets[i] = out;
+                total += out;
+            }
+            histogram.totalCount = total;
+        }
+        lastDecayedAt = lastAntiDecayedAt = now;
+    }
+
+    public synchronized LogLinearDecayingHistogram allocate(long 
expectedMaxValue)
+    {
+        Invariants.require(histograms.size() <= HISTOGRAM_INDEX_MASK);
+        LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistogram(histograms.size(), expectedMaxValue);
+        histograms.add(histogram);
+        return histogram;
+    }
+
+    public synchronized LogLinearDecayingHistogram get(int histogramIndex)
+    {
+        return histograms.get(histogramIndex);
+    }
+
+    public static double[] bucketsWithScale(long maxScale)
+    {
+        return bucketsWithLength(1 + index(maxScale));
+    }
+
+    public static double[] bucketsWithLength(int length)
+    {
+        Invariants.require(length <= MAX_INDEX + 1);
+        double[] buckets = new double[length];
+        for (int i = 0 ; i < length ; ++i)
+            buckets[i] = invertIndex(i);
+        return buckets;
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/LogLinearHistogram.java 
b/src/java/org/apache/cassandra/metrics/LogLinearHistogram.java
index 43420cf7f6..e6be4f9017 100644
--- a/src/java/org/apache/cassandra/metrics/LogLinearHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/LogLinearHistogram.java
@@ -28,8 +28,11 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 
 import accord.utils.Invariants;
+import accord.utils.SortedArrays;
 import com.codahale.metrics.Snapshot;
 
+import static accord.utils.SortedArrays.Search.CEIL;
+
 /**
  * A simple single-threaded histogram with log-linear buckets.
  * This has approximately the same accuracy as the lg 1.2 growth of 
EstimatedHistogram, but is simpler and faster.
@@ -40,13 +43,13 @@ import com.codahale.metrics.Snapshot;
  */
 public class LogLinearHistogram
 {
-    private static final int MAX_INDEX = 247;
+    public static final int MAX_INDEX = 247;
     private long[] buckets;
     long totalCount;
 
     public LogLinearHistogram(long expectedMaxValue)
     {
-        buckets = new long[buckets(expectedMaxValue)];
+        buckets = new long[bucketCount(expectedMaxValue)];
     }
 
     public void increment(long value)
@@ -99,12 +102,12 @@ public class LogLinearHistogram
         return buckets;
     }
 
-    private static int buckets(long maxValue)
+    static int bucketCount(long maxValue)
     {
         return 1 + (index(maxValue) | 0x3);
     }
 
-    private static int index(long value)
+    static int index(long value)
     {
         if (value < 4)
             return (int) value;
@@ -113,7 +116,7 @@ public class LogLinearHistogram
         return (log + 1) * 4 + linear;
     }
 
-    private static long invertIndex(int index)
+    static long invertIndex(int index)
     {
         if (index < 4)
             return index;
@@ -141,7 +144,7 @@ public class LogLinearHistogram
 
         public static LogLinearSnapshot emptyForMax(long maxValue)
         {
-            return new LogLinearSnapshot(buckets(maxValue));
+            return new LogLinearSnapshot(bucketCount(maxValue));
         }
 
         LogLinearSnapshot(int size)
@@ -163,33 +166,37 @@ public class LogLinearHistogram
                 cumulative = new long[raw.length];
                 long sum = 0;
                 for (int i = 0 ; i < cumulative.length ; ++i)
-                    cumulative[i] = sum += cumulative[i];
+                    cumulative[i] = sum += raw[i];
             }
             return cumulative;
         }
 
-        private long get(long tot)
+        private double get(long tot)
         {
-            if (totalCount == 0)
+            if (totalCount == 0 || tot == 0)
                 return 0;
 
             long[] cumulative = cumulative();
-            int i = Arrays.binarySearch(cumulative, tot);
-            if (i >= 0) while (i > 0 && cumulative[i-1] == tot) --i;
-            else i = Math.max(0, -2 - i);
-            long prevValue = invertIndex(i);
+            int i = SortedArrays.binarySearch(cumulative, 0, 
cumulative.length, tot, CEIL);
+            if (i >= 0)
+                return invertIndex(i);
+
+            i = Math.max(0, -2 - i);
             long prevCount = cumulative[i];
+            long nextCount = cumulative[i + 1];
+
+            long prevValue = invertIndex(i);
             long nextValue = invertIndex(i + 1);
-            long nextCount = i + 1 == cumulative.length ? totalCount : 
cumulative[i + 1];
-            long gap = tot - prevCount;
-            // should we use double arithmetic here to avoid overflow?
-            return prevValue + ((nextValue - prevValue) * gap) / (nextCount - 
prevCount);
+
+            double granularity = (nextValue - prevValue) / (double)(nextCount 
- prevCount);
+            double targetGap = tot - prevCount;
+            return prevValue + Math.round(targetGap * granularity);
         }
 
         @Override
         public double getValue(double quantile)
         {
-            return get(Math.round(totalCount * quantile));
+            return get(Math.max(1L, (long)Math.ceil(totalCount * quantile)));
         }
 
         @Override
@@ -199,23 +206,47 @@ public class LogLinearHistogram
         }
 
         @Override
-        public long getMax()
+        public double getMean()
         {
-            return get(totalCount);
+            if (totalCount == 0)
+                return 0.0D;
+
+            double sum = 0;
+            for (int i = 0; i < raw.length; i++)
+            {
+                if (raw[i] != 0)
+                    sum += raw[i] * (double) invertIndex(i);
+            }
+            return sum / totalCount;
         }
 
         @Override
-        public double getMean()
+        public long getMin()
         {
-            if (totalCount <= 1)
-                return 0.0D;
-            return get(totalCount / 2);
+            if (totalCount == 0)
+                return 0;
+
+            long[] cumulative = cumulative();
+            int i = SortedArrays.binarySearch(cumulative, 0, 
cumulative.length, 1, CEIL);
+            if (i < 0)
+            {
+                i = Math.max(0, -2 - i);
+                if (cumulative[i] == 0)
+                    ++i;
+            }
+            return invertIndex(i);
         }
 
         @Override
-        public long getMin()
+        public long getMax()
         {
-            return get(1);
+            if (totalCount == 0)
+                return 0;
+
+            long[] cumulative = cumulative();
+            int i = SortedArrays.binarySearch(cumulative, 0, 
cumulative.length, totalCount, CEIL);
+            if (i < 0) i = -2 - i;
+            return invertIndex(i + 1);
         }
 
         /**
@@ -230,23 +261,19 @@ public class LogLinearHistogram
         public double getStdDev()
         {
             if (totalCount <= 1)
-            {
                 return 0.0D;
-            }
-            else
-            {
-                double mean = this.getMean();
-                double sum = 0.0D;
 
-                for(int i = 0; i < raw.length; ++i)
-                {
-                    long value = invertIndex(i);
-                    double diff = value - mean;
-                    sum += diff * diff * raw[i];
-                }
+            double mean = this.getMean();
+            double sum = 0.0D;
 
-                return Math.sqrt(sum / (totalCount - 1));
+            for(int i = 0; i < raw.length; ++i)
+            {
+                long value = invertIndex(i);
+                double diff = value - mean;
+                sum += diff * diff * raw[i];
             }
+
+            return Math.sqrt(sum / (totalCount - 1));
         }
 
         @Override
@@ -274,13 +301,18 @@ public class LogLinearHistogram
         return result;
     }
 
+    public LogLinearSnapshot copyToSnapshot()
+    {
+        return new LogLinearSnapshot(buckets.clone(), totalCount);
+    }
+
     public void updateSnapshot(LogLinearSnapshot snapshot)
     {
         if (snapshot.raw.length < buckets.length)
             snapshot.raw = Arrays.copyOf(snapshot.raw, buckets.length);
 
         long[] raw = snapshot.raw;
-        for (int i = 0 ; i < raw.length ; ++i)
+        for (int i = 0 ; i < buckets.length ; ++i)
             raw[i] = raw[i] + buckets[i];
         snapshot.totalCount += totalCount;
         snapshot.cumulative = null;
diff --git 
a/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java 
b/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
new file mode 100644
index 0000000000..db91857b3c
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import com.codahale.metrics.Snapshot;
+import 
org.apache.cassandra.metrics.LogLinearDecayingHistograms.LogLinearDecayingHistogram;
+import org.apache.cassandra.metrics.LogLinearHistogram.LogLinearSnapshot;
+import org.apache.cassandra.utils.Clock;
+
+import static 
org.apache.cassandra.metrics.CassandraReservoir.BucketStrategy.log_linear;
+
+public class ShardedDecayingHistograms
+{
+    private static final long REFRESH_RATE = TimeUnit.SECONDS.toNanos(15);
+
+    public class ShardedDecayingHistogram extends OverrideHistogram
+    {
+        final int histogramIndex;
+        final long initialMaxValue;
+
+        private ShardedDecayingHistogram(int histogramIndex, long 
initialMaxValue)
+        {
+            this.histogramIndex = histogramIndex;
+            this.initialMaxValue = initialMaxValue;
+        }
+
+        @Override
+        public CassandraReservoir.BucketStrategy bucketStrategy()
+        {
+            return log_linear;
+        }
+
+        @Override
+        public long[] bucketStarts(int length)
+        {
+            return LogLinearHistogram.bucketsWithLength(length);
+        }
+
+        @Override
+        public synchronized long getCount()
+        {
+            return maybeRefresh().get(histogramIndex).totalCount;
+        }
+
+        @Override
+        public Snapshot getSnapshot()
+        {
+            return maybeRefresh().get(histogramIndex);
+        }
+
+        public LogLinearDecayingHistogram forShard(DecayingHistogramsShard 
shard)
+        {
+            return shard.histograms.get(histogramIndex);
+        }
+    }
+
+    public static class DecayingHistogramsShard
+    {
+        final Lock lock;
+        final LogLinearDecayingHistograms histograms;
+
+        DecayingHistogramsShard(Lock lock, LogLinearDecayingHistograms 
histograms)
+        {
+            this.lock = lock;
+            this.histograms = histograms;
+        }
+
+        void updateSnapshot(List<LogLinearSnapshot> update, long at)
+        {
+            lock.lock();
+            try
+            {
+                for (int i = 0 ; i < update.size() ; ++i)
+                    histograms.get(i).updateSnapshot(update.get(i), at);
+            }
+            finally
+            {
+                lock.unlock();
+            }
+        }
+
+        public LogLinearDecayingHistograms unsafeGetInternal()
+        {
+            return histograms;
+        }
+    }
+
+    final List<DecayingHistogramsShard> shards = new ArrayList<>();
+    final List<ShardedDecayingHistogram> histograms = new ArrayList<>();
+
+    public synchronized DecayingHistogramsShard newShard(Lock guardedBy)
+    {
+        DecayingHistogramsShard shard = new DecayingHistogramsShard(guardedBy, 
new LogLinearDecayingHistograms());
+        for (int i = 0 ; i < histograms.size() ; ++i)
+            shard.histograms.allocate(histograms.get(i).initialMaxValue);
+        shards.add(shard);
+        return shard;
+    }
+
+    public synchronized ShardedDecayingHistogram newHistogram(long 
initialMaxValue)
+    {
+        ShardedDecayingHistogram histogram = new 
ShardedDecayingHistogram(histograms.size(), initialMaxValue);
+        for (int i = 0 ; i < shards.size() ; ++i)
+            shards.get(i).histograms.allocate(initialMaxValue);
+        histograms.add(histogram);
+        return histogram;
+    }
+
+    private long snapshotAt = Long.MIN_VALUE;
+    private List<LogLinearSnapshot> snapshot = Collections.emptyList();
+
+    public synchronized void refresh()
+    {
+        refresh(Clock.Global.nanoTime());
+    }
+
+    private synchronized List<LogLinearSnapshot> refresh(long now)
+    {
+        List<LogLinearSnapshot> snapshot = new ArrayList<>(histograms.size());
+        for (ShardedDecayingHistogram histogram : histograms)
+            
snapshot.add(LogLinearSnapshot.emptyForMax(histogram.initialMaxValue));
+        for (DecayingHistogramsShard shard : shards)
+            shard.updateSnapshot(snapshot, now);
+        this.snapshot = snapshot;
+        this.snapshotAt = now;
+        return this.snapshot;
+    }
+
+    private synchronized List<LogLinearSnapshot> maybeRefresh()
+    {
+        long now = Clock.Global.nanoTime();
+        if (snapshot.size() == histograms.size() && snapshotAt + REFRESH_RATE 
>= now)
+            return snapshot;
+
+        return refresh(now);
+    }
+}
diff --git a/src/java/org/apache/cassandra/metrics/ShardedHistogram.java 
b/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
index 1677edad81..66defdac56 100644
--- a/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
@@ -43,19 +43,6 @@ public class ShardedHistogram extends OverrideHistogram
             this.histogram = histogram;
         }
 
-        long total()
-        {
-            lock.lock();
-            try
-            {
-                return histogram.totalCount;
-            }
-            finally
-            {
-                lock.unlock();
-            }
-        }
-
         public void updateSnapshot(LogLinearSnapshot snapshot)
         {
             lock.lock();
diff --git a/src/java/org/apache/cassandra/metrics/ShardedLongGauges.java 
b/src/java/org/apache/cassandra/metrics/ShardedLongGauges.java
new file mode 100644
index 0000000000..d2721a94c6
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ShardedLongGauges.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.function.LongBinaryOperator;
+import java.util.function.ToLongFunction;
+
+import accord.utils.Invariants;
+import com.codahale.metrics.Gauge;
+import org.apache.cassandra.utils.Clock;
+
+public class ShardedLongGauges<S>
+{
+    private static final long REFRESH_RATE = TimeUnit.SECONDS.toNanos(15);
+
+    public class ShardedLongGauge implements Gauge<Long>
+    {
+        final int gaugeIndex;
+        final ToLongFunction<S> compute;
+        final LongBinaryOperator reduce;
+
+        private ShardedLongGauge(int gaugeIndex, ToLongFunction<S> compute, 
LongBinaryOperator reduce)
+        {
+            this.gaugeIndex = gaugeIndex;
+            this.compute = compute;
+            this.reduce = reduce;
+        }
+
+        @Override
+        public Long getValue()
+        {
+            return maybeRefresh()[gaugeIndex];
+        }
+    }
+
+    static class LongGaugeShard<T>
+    {
+        final Lock lock;
+        final T shard;
+
+        LongGaugeShard(Lock lock, T shard)
+        {
+            this.lock = lock;
+            this.shard = shard;
+        }
+
+        public void init(long[] init, 
List<ShardedLongGauges<T>.ShardedLongGauge> gauges)
+        {
+            lock.lock();
+            try
+            {
+                for (int i = 0 ; i < init.length ; ++i)
+                    init[i] = gauges.get(i).compute.applyAsLong(shard);
+            }
+            finally
+            {
+                lock.unlock();
+            }
+        }
+
+        public void update(long[] update, 
List<ShardedLongGauges<T>.ShardedLongGauge> gauges)
+        {
+            lock.lock();
+            try
+            {
+                for (int i = 0 ; i < update.length ; ++i)
+                {
+                    ShardedLongGauges<T>.ShardedLongGauge gauge = 
gauges.get(i);
+                    Invariants.require(gauge.gaugeIndex == i);
+                    update[i] = gauge.reduce.applyAsLong(update[i], 
gauge.compute.applyAsLong(shard));
+                }
+            }
+            finally
+            {
+                lock.unlock();
+            }
+        }
+    }
+
+    final List<LongGaugeShard<S>> shards = new ArrayList<>();
+    final List<ShardedLongGauge> gauges = new ArrayList<>();
+
+    public synchronized void newShard(Lock guardedBy, S shard)
+    {
+        shards.add(new LongGaugeShard<>(guardedBy, shard));
+    }
+
+    public synchronized ShardedLongGauge newGauge(ToLongFunction<S> compute, 
LongBinaryOperator reduce)
+    {
+        ShardedLongGauge gauge = new ShardedLongGauge(gauges.size(), compute, 
reduce);
+        gauges.add(gauge);
+        return gauge;
+    }
+
+    private long snapshotAt = Long.MIN_VALUE;
+    private long[] snapshot = new long[0];
+
+    public synchronized void refresh()
+    {
+        refresh(Clock.Global.nanoTime());
+    }
+
+    private synchronized long[] refresh(long now)
+    {
+        if (gauges.isEmpty())
+            return new long[0];
+
+        long[] snapshot = new long[gauges.size()];
+        if (shards.isEmpty())
+            return snapshot;
+
+        shards.get(0).init(snapshot, gauges);
+        for (int i = 1; i < shards.size() ; ++i)
+            shards.get(i).update(snapshot, gauges);
+        this.snapshot = snapshot;
+        this.snapshotAt = now;
+        return this.snapshot;
+    }
+
+    private synchronized long[] maybeRefresh()
+    {
+        long now = Clock.Global.nanoTime();
+        if (snapshot.length == gauges.size() && snapshotAt + REFRESH_RATE >= 
now)
+            return snapshot;
+
+        return refresh(now);
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java 
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index 0a1dd623f0..c17ccc879c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -54,6 +54,7 @@ import accord.utils.UnhandledEnum;
 import org.agrona.collections.Object2ObjectHashMap;
 import org.apache.cassandra.cache.CacheSize;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
 import org.apache.cassandra.exceptions.UnknownTableException;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.metrics.AccordCacheMetrics;
@@ -62,11 +63,11 @@ import org.apache.cassandra.metrics.ShardedHitRate;
 import org.apache.cassandra.service.accord.AccordCacheEntry.LoadExecutor;
 import org.apache.cassandra.service.accord.AccordCacheEntry.Status;
 import org.apache.cassandra.service.accord.events.CacheEvents;
+import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import org.apache.cassandra.service.accord.serializers.Version;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.NoSpamLogger.NoSpamLogStatement;
 import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.btree.BTree;
 
 import static accord.utils.Invariants.illegalState;
 import static accord.utils.Invariants.require;
@@ -219,6 +220,7 @@ public class AccordCache implements CacheSize
     {
         require(node.references() == 0);
 
+        // TODO (desired): special-case evict queue with 1 element as no point 
shrinking first
         if (shrinkingOn && node.tryShrink())
         {
             IntrusiveLinkedList<AccordCacheEntry<?,?>> queue;
@@ -1107,6 +1109,11 @@ public class AccordCache implements CacheSize
         @Override
         public Runnable save(AccordCommandStore commandStore, RoutingKey key, 
@Nullable CommandsForKey value, @Nullable Object serialized)
         {
+            if (serialized != null)
+            {
+                ByteBuffer bb = (ByteBuffer)serialized;
+                serialized = bb.duplicate().position(prefixBytes(bb));
+            }
             return commandStore.saveCommandsForKey(key, value, serialized);
         }
 
@@ -1125,19 +1132,35 @@ public class AccordCache implements CacheSize
         @Override
         public Object fullShrink(RoutingKey key, CommandsForKey value)
         {
-            if (value.isEmpty())
+            if (value.isEmpty() || value.isLoadingPruned())
                 return null;
 
-            if (value.isLoadingPruned())
-                return value;
+            TxnId last = value.size() == 0 ? null : value.get(value.size() - 
1);
+            TxnId minUndecided = value.minUndecided();
+            int lastSize = (int) CommandSerializers.txnId.serializedSize(last);
+            int minUndecidedSize = (int) 
CommandSerializers.txnId.serializedSize(minUndecided);
+            ByteBuffer result = Serialize.toBytesWithoutKey(lastSize + 
minUndecidedSize, value.maximalPrune());
+            int limit = result.limit();
+            result.limit(lastSize + minUndecidedSize);
+            CommandSerializers.txnId.serialize(last, result, 
ByteBufferAccessor.instance, 0);
+            CommandSerializers.txnId.serialize(minUndecided, result, 
ByteBufferAccessor.instance, lastSize);
+            result.limit(limit);
+            return result;
+        }
 
-            return Serialize.toBytesWithoutKey(value.maximalPrune());
+        private static int prefixBytes(ByteBuffer bb)
+        {
+            int prefix = (int) 
CommandSerializers.txnId.serializedSize(CommandSerializers.txnId.deserialize(bb,
 0));
+            prefix += 
(int)CommandSerializers.txnId.serializedSize(CommandSerializers.txnId.deserialize(bb,
 prefix));
+            return prefix;
         }
 
         @Override
         public CommandsForKey inflate(AccordCommandStore commandStore, 
RoutingKey key, Object shrunk)
         {
-            return Serialize.fromBytes(key, (ByteBuffer)shrunk);
+            ByteBuffer bb = ((ByteBuffer)shrunk).duplicate();
+            bb.position(prefixBytes(bb));
+            return Serialize.fromBytes(key, bb, false);
         }
 
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 5c1df8f136..99b763de21 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -67,6 +67,7 @@ import accord.utils.async.AsyncChains;
 import accord.utils.async.AsyncResults.CountingResult;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.metrics.LogLinearDecayingHistograms;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
@@ -162,6 +163,7 @@ public class AccordCommandStore extends CommandStore
     volatile SafeRedundantBefore safeRedundantBefore;
 
     private AccordSafeCommandStore current;
+    LogLinearDecayingHistograms.Buffer metricsBuffer;
 
     public AccordCommandStore(int id,
                               NodeCommandStoreService node,
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java 
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index 2fa8675563..c9731d101a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -69,6 +69,12 @@ import 
org.apache.cassandra.concurrent.DebuggableTask.DebuggableTaskRunner;
 import org.apache.cassandra.concurrent.ScheduledExecutors;
 import org.apache.cassandra.concurrent.Shutdownable;
 import org.apache.cassandra.metrics.AccordCacheMetrics;
+import org.apache.cassandra.metrics.AccordExecutorMetrics;
+import org.apache.cassandra.metrics.AccordReplicaMetrics;
+import org.apache.cassandra.metrics.LogLinearDecayingHistograms;
+import 
org.apache.cassandra.metrics.LogLinearDecayingHistograms.LogLinearDecayingHistogram;
+import org.apache.cassandra.metrics.ShardedDecayingHistograms;
+import 
org.apache.cassandra.metrics.ShardedDecayingHistograms.DecayingHistogramsShard;
 import org.apache.cassandra.service.accord.AccordCacheEntry.LoadExecutor;
 import org.apache.cassandra.service.accord.AccordCacheEntry.SaveExecutor;
 import org.apache.cassandra.service.accord.AccordCacheEntry.UniqueSave;
@@ -98,6 +104,8 @@ import static 
org.apache.cassandra.service.accord.AccordTask.State.WAITING_TO_RU
 public abstract class AccordExecutor implements CacheSize, 
LoadExecutor<AccordTask<?>, Boolean>, SaveExecutor, Shutdownable, 
AbstractAsyncExecutor
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordExecutor.class);
+    public static final ShardedDecayingHistograms HISTOGRAMS = new 
ShardedDecayingHistograms();
+
     public interface AccordExecutorFactory
     {
         AccordExecutor get(int executorId, Mode mode, int threads, 
IntFunction<String> name, Agent agent);
@@ -159,6 +167,13 @@ public abstract class AccordExecutor implements CacheSize, 
LoadExecutor<AccordTa
     private List<Condition> waitingForQuiescence;
     private Queue<WaitForCompletion> waitingForCompletion;
 
+    final LogLinearDecayingHistograms histograms;
+    final LogLinearDecayingHistogram elapsedPreparingToRun;
+    final LogLinearDecayingHistogram elapsedWaitingToRun;
+    final LogLinearDecayingHistogram elapsedRunning;
+    final LogLinearDecayingHistogram keys;
+    public final AccordReplicaMetrics.Shard replicaMetrics;
+
     private static class WaitForCompletion
     {
         final int position;
@@ -212,6 +227,14 @@ public abstract class AccordExecutor implements CacheSize, 
LoadExecutor<AccordTa
         registerJfrListener(executorId, commandsForKey, "CommandsForKey");
 
         this.caches = new ExclusiveGlobalCaches(this, cache, commands, 
commandsForKey);
+
+        DecayingHistogramsShard histogramsShard = HISTOGRAMS.newShard(lock);
+        this.histograms = histogramsShard.unsafeGetInternal();
+        this.elapsedPreparingToRun = 
AccordExecutorMetrics.INSTANCE.elapsedPreparingToRun.forShard(histogramsShard);
+        this.elapsedWaitingToRun = 
AccordExecutorMetrics.INSTANCE.elapsedWaitingToRun.forShard(histogramsShard);
+        this.elapsedRunning = 
AccordExecutorMetrics.INSTANCE.elapsedRunning.forShard(histogramsShard);
+        this.keys = 
AccordExecutorMetrics.INSTANCE.keys.forShard(histogramsShard);
+        this.replicaMetrics = new AccordReplicaMetrics.Shard(histogramsShard);
         ScheduledExecutors.scheduledFastTasks.scheduleAtFixedRate(() -> {
             executeDirectlyWithLock(cache::processNoEvictQueue);
         }, 1L, 1L, TimeUnit.SECONDS);
@@ -1761,4 +1784,18 @@ public abstract class AccordExecutor implements 
CacheSize, LoadExecutor<AccordTa
         }
     }
 
+    public int unsafePreparingToRunCount()
+    {
+        return waitingToLoad.size() + waitingToLoadRangeTxns.size() + 
scanningRanges.size() + loading.size();
+    }
+
+    public int unsafeWaitingToRunCount()
+    {
+        return waitingToRun.size();
+    }
+
+    public int unsafeRunningCount()
+    {
+        return running.size();
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
index 99553db5cc..5d2a565932 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
@@ -37,8 +37,6 @@ import 
org.apache.cassandra.service.accord.serializers.Version;
 import static accord.local.CommandStores.RangesForEpoch;
 
 // TODO (required): test with large collection values, and perhaps split out 
some fields if they have a tendency to grow larger
-// TODO (required): alert on metadata size
-// TODO (required): versioning
 public class AccordJournalValueSerializers
 {
     public interface FlyweightImage
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index f3ecdc43fd..f9624d2354 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -40,6 +40,8 @@ import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.primitives.Unseekables;
+import accord.utils.Invariants;
+import org.apache.cassandra.metrics.LogLinearDecayingHistograms;
 import org.apache.cassandra.service.accord.AccordCommandStore.ExclusiveCaches;
 import 
org.apache.cassandra.service.accord.AccordCommandStore.SafeRedundantBefore;
 import org.apache.cassandra.service.paxos.PaxosState;
@@ -207,6 +209,18 @@ public class AccordSafeCommandStore extends 
AbstractSafeCommandStore<AccordSafeC
         return commandStore.node();
     }
 
+    public LogLinearDecayingHistograms.Buffer histogramBuffer()
+    {
+        if (task.histogramBuffer == null)
+        {
+            task.histogramBuffer = commandStore.metricsBuffer;
+            if (task.histogramBuffer == null)
+                task.histogramBuffer = commandStore.metricsBuffer = new 
LogLinearDecayingHistograms.Buffer(commandStore.executor().histograms);
+            Invariants.require(task.histogramBuffer.isEmpty());
+        }
+        return task.histogramBuffer;
+    }
+
     private boolean visitForKey(Unseekables<?> keysOrRanges, 
Predicate<CommandsForKey> forEach)
     {
         Map<RoutingKey, AccordSafeCommandsForKey> commandsForKey = 
task.commandsForKey;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java 
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index d2b87d25b1..2234ca00f4 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.service.accord;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -59,12 +60,14 @@ import accord.utils.async.Cancellable;
 import org.agrona.collections.Object2ObjectHashMap;
 import org.agrona.collections.ObjectHashSet;
 import org.apache.cassandra.concurrent.DebuggableTask;
+import org.apache.cassandra.metrics.LogLinearDecayingHistograms;
 import org.apache.cassandra.service.accord.AccordCacheEntry.Status;
 import org.apache.cassandra.service.accord.AccordCommandStore.Caches;
 import org.apache.cassandra.service.accord.AccordExecutor.SubmittableTask;
 import org.apache.cassandra.service.accord.AccordExecutor.TaskQueue;
 import 
org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor;
 import org.apache.cassandra.service.accord.api.TokenKey;
+import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.Clock;
 import org.apache.cassandra.utils.Closeable;
@@ -201,6 +204,7 @@ public abstract class AccordTask<R> extends SubmittableTask 
implements Function<
     @Nullable Object2ObjectHashMap<TxnId, AccordSafeCommand> commands;
     @Nullable Object2ObjectHashMap<RoutingKey, AccordSafeCommandsForKey> 
commandsForKey;
     @Nullable Object2ObjectHashMap<Object, AccordSafeState<?, ?>> loading;
+    LogLinearDecayingHistograms.Buffer histogramBuffer;
     // TODO (desired): collection supporting faster deletes but still fast 
poll (e.g. some ordered collection)
     @Nullable ArrayDeque<AccordCacheEntry<?, ?>> waitingToLoad;
     @Nullable RangeTxnScanner rangeScanner;
@@ -299,12 +303,20 @@ public abstract class AccordTask<R> extends 
SubmittableTask implements Function<
             Invariants.require(rangeScanner == null || rangeScanner.scanned);
             Invariants.require(loading == null && waitingToLoad == null, 
"WAITING_TO_RUN => no loading or waiting; found %s", this, 
AccordTask::toDescription);
             waitingToRunAt = nanoTime();
+            
commandStore.executor().elapsedPreparingToRun.increment(waitingToRunAt - 
createdAt, runningAt);
         }
         else if (state == RUNNING)
         {
             runningAt = nanoTime();
+            if (waitingToRunAt == 0)
+            {
+                waitingToRunAt = runningAt;
+                
commandStore.executor().elapsedPreparingToRun.increment(waitingToRunAt - 
createdAt, runningAt);
+            }
+            commandStore.executor().elapsedWaitingToRun.increment(runningAt - 
waitingToRunAt, runningAt);
+            commandStore.executor().keys.increment(commandsForKey == null ? 0 
: commandsForKey.size(), runningAt);
         }
-        else if (state.isExecuted())
+        else if (state.isExecuted() && completedAt == 0)
         {
             completedAt = nanoTime();
         }
@@ -756,7 +768,17 @@ public abstract class AccordTask<R> extends 
SubmittableTask implements Function<
     {
         if (state == FAILING)
             state(FAILED);
+        Invariants.expect(state.isExecuted());
         releaseResources(commandStore.cachesExclusive());
+        if (runningAt != 0)
+        {
+            commandStore.executor().elapsedRunning.increment(completedAt - 
runningAt, completedAt);
+        }
+        if (histogramBuffer != null)
+        {
+            histogramBuffer.flush(completedAt);
+            histogramBuffer = null;
+        }
     }
 
     @Nullable
@@ -979,13 +1001,6 @@ public abstract class AccordTask<R> extends 
SubmittableTask implements Function<
                 default: throw new AssertionError("Unhandled Status: " + 
entry.status());
                 case WAITING_TO_LOAD:
                 case LOADING:
-                    if (scanned)
-                        // if we've finished scanning and not already taken a 
reference we shouldn't need to witness (unless modified)
-                        return;
-                    ensureLoading().put(entry.key(), 
commandsForKeyCache.acquire(entry));
-                    if (entry.status() == Status.WAITING_TO_LOAD)
-                        ensureWaitingToLoad().add(entry);
-                    entry.loadingOrWaiting().add(AccordTask.this);
                     return;
 
                 case MODIFIED:
@@ -995,6 +1010,23 @@ public abstract class AccordTask<R> extends 
SubmittableTask implements Function<
                 case FAILED_TO_SAVE:
                     if (commandsForKey != null && 
commandsForKey.containsKey(entry.key()))
                         return;
+
+                    Object v = entry.getOrShrunkExclusive();
+                    if (v == null) return;
+                    else if (v instanceof CommandsForKey)
+                    {
+                        if (!summaryLoader.isRelevant((CommandsForKey) v))
+                            return;
+                    }
+                    else
+                    {
+                        TxnId last = 
CommandSerializers.txnId.deserialize((ByteBuffer) v);
+                        int position = 
(int)CommandSerializers.txnId.serializedSize(last);
+                        TxnId minUndecided = 
CommandSerializers.txnId.deserialize((ByteBuffer) v, position);
+                        if (!summaryLoader.isRelevant(entry.key(), last, 
minUndecided))
+                            return;
+                    }
+
                     ensureCommandsForKey().putIfAbsent(entry.key(), 
commandsForKeyCache.acquire(entry));
             }
         }
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializer.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializer.java
index 34ab52fc2d..8c1bd7eca8 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializer.java
@@ -42,6 +42,7 @@ import accord.primitives.Ranges;
 import accord.primitives.Timestamp;
 import accord.primitives.TxnId;
 import accord.primitives.Writes;
+import accord.utils.Invariants;
 import accord.utils.UnhandledEnum;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.VersionedSerializer;
@@ -82,7 +83,7 @@ public class ReadDataSerializer implements 
IVersionedSerializer<ReadData>
         boolean hasExecuteAtEpoch = (read.executeAt() != null ? 
read.executeAt() : read.txnId).epoch() != read.executeAtEpoch;
 
         out.writeUnsignedVInt32(type.ordinal()
-                                | (read.flags.bits() << 3)
+                                | shiftedExecuteFlags(read.flags)
                                 | (hasTxn ? HAS_TXN : 0)
                                 | (hasExecuteAt ? HAS_EXECUTE_AT : 0)
                                 | (hasExecuteAtEpoch ? HAS_EXECUTE_AT_EPOCH : 
0)
@@ -154,7 +155,7 @@ public class ReadDataSerializer implements 
IVersionedSerializer<ReadData>
         {
             int tmp = in.readUnsignedVInt32();
             type = TYPES[tmp & 0x7];
-            flags = ExecuteFlags.get((tmp >>> 3) & 0x3);
+            flags = ExecuteFlags.get(unshiftedExecuteFlags(tmp));
             hasTxn = (tmp & HAS_TXN) != 0;
             hasExecuteAt = (tmp & HAS_EXECUTE_AT) != 0;
             hasExecuteAtEpoch = (tmp & HAS_EXECUTE_AT_EPOCH) != 0;
@@ -219,6 +220,17 @@ public class ReadDataSerializer implements 
IVersionedSerializer<ReadData>
         }
     }
 
+    private static int shiftedExecuteFlags(ExecuteFlags flags)
+    {
+        Invariants.require(flags.bits() <= 0x7);
+        return ((flags.bits() & 0x3) << 3) | ((flags.bits() & ~0x3) << 6);
+    }
+
+    private static int unshiftedExecuteFlags(int flags)
+    {
+        return ((flags & 0x18) >> 3) | ((flags & 0x100) >> 6);
+    }
+
     @Override
     public long serializedSize(ReadData read, Version version)
     {
@@ -228,12 +240,13 @@ public class ReadDataSerializer implements 
IVersionedSerializer<ReadData>
         boolean hasExecuteAtEpoch = (read.executeAt() != null ? 
read.executeAt() : read.txnId).epoch() != read.executeAtEpoch;
 
         long size = VIntCoding.computeUnsignedVIntSize(type.ordinal()
-                                                       | (read.flags.bits() << 
3)
+                                                       | 
shiftedExecuteFlags(read.flags)
                                                        | (hasTxn ? HAS_TXN : 0)
                                                        | (hasExecuteAt ? 
HAS_EXECUTE_AT : 0)
                                                        | (hasExecuteAtEpoch ? 
HAS_EXECUTE_AT_EPOCH : 0))
                     + CommandSerializers.txnId.serializedSize(read.txnId)
                     + KeySerializers.participants.serializedSize(read.scope);
+
         if (hasTxn)
             size += 
CommandSerializers.partialTxn.serializedSize(read.partialTxn(), version);
         if (hasExecuteAt)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
index 5c78547432..ff47950f7a 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.metrics.DefaultNameFactory;
 import org.apache.cassandra.metrics.RatioGaugeSet;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.accord.AccordExecutor;
 import org.apache.cassandra.service.accord.AccordService;
 import 
org.apache.cassandra.service.accord.exceptions.AccordReadPreemptedException;
 import 
org.apache.cassandra.service.accord.exceptions.AccordWritePreemptedException;
@@ -61,7 +62,6 @@ import static java.lang.String.format;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.fail;
 
-
 public class AccordMetricsTest extends AccordTestBase
 {
     private static final Logger logger = 
LoggerFactory.getLogger(AccordMetricsTest.class);
@@ -332,7 +332,6 @@ public class AccordMetricsTest extends AccordTestBase
         
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_STABLE_LATENCY)).isLessThanOrEqualTo(stable);
         
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_PREAPPLY_LATENCY)).isEqualTo(executions);
         
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_APPLY_LATENCY)).isEqualTo(applications);
-        
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_APPLY_DURATION)).isEqualTo(scope.equals("rw")
 ? applications : 0);
         
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_DEPENDENCIES)).isEqualTo(executions);
 
         // Verify that replica metrics are published to the appropriate 
virtual table:
@@ -357,6 +356,7 @@ public class AccordMetricsTest extends AccordTestBase
         Map<Integer, Map<String, Long>> metrics = new HashMap<>();
         for (int i = 0; i < SHARED_CLUSTER.size(); i++)
         {
+            SHARED_CLUSTER.get(i + 1).runOnInstance(() -> 
AccordExecutor.HISTOGRAMS.refresh());
             Map<String, Long> map = SHARED_CLUSTER.get(i + 
1).metrics().getCounters(name -> 
name.startsWith("org.apache.cassandra.metrics.Accord") || 
(name.startsWith("org.apache.cassandra.metrics.ClientRequest") && 
(name.endsWith("AccordRead") || name.endsWith("AccordWrite"))));
             SHARED_CLUSTER.get(i + 1).metrics().getGauges(name -> 
name.startsWith("org.apache.cassandra.metrics.Accord"))
                                                .forEach((key, value) -> 
map.put(key, ((Number)value).longValue()));
diff --git 
a/test/unit/org/apache/cassandra/metrics/LogLinearDecayingHistogramTest.java 
b/test/unit/org/apache/cassandra/metrics/LogLinearDecayingHistogramTest.java
new file mode 100644
index 0000000000..31ca6fbb54
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/LogLinearDecayingHistogramTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Snapshot;
+import 
org.apache.cassandra.metrics.LogLinearDecayingHistograms.LogLinearDecayingHistogram;
+
+import static org.junit.Assert.assertEquals;
+
+public class LogLinearDecayingHistogramTest
+{
+    public static final Logger logger = 
LoggerFactory.getLogger(LogLinearDecayingHistogramTest.class);
+
+    private static final double DOUBLE_ASSERT_DELTA = 0;
+    private static final long ONE_SECOND = TimeUnit.NANOSECONDS.toSeconds(1);
+    private static final long ONE_HOUR = TimeUnit.HOURS.toSeconds(1);
+    private static final long[] TIMES = new long[] { 0, ONE_SECOND - 1, 
ONE_SECOND, ONE_HOUR - 1, ONE_HOUR };
+
+    @Test
+    public void testMinMax()
+    {
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+            histogram.increment(16, time);
+            Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+            assertEquals(16, snapshot.getMin());
+            assertEquals(20, snapshot.getMax());
+        }
+    }
+
+    @Test
+    public void testMean()
+    {
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+            for (int i = 0; i < 40; i++)
+                histogram.increment(0, time);
+            for (int i = 0; i < 20; i++)
+                histogram.increment(1, time);
+            for (int i = 0; i < 10; i++)
+                histogram.increment(2, time);
+            assertEquals(40/70d, histogram.copyToSnapshot(time + 1).getMean(), 
0.1D);
+        }
+    }
+
+    @Test
+    public void testStdDev()
+    {
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+            for (int i = 0; i < 20; i++)
+                histogram.increment(10, time);
+            for (int i = 0; i < 40; i++)
+                histogram.increment(20, time);
+            for (int i = 0; i < 20; i++)
+                histogram.increment(30, time);
+
+            Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+            assertEquals(20.0D, snapshot.getMean(), 2.0D);
+            assertEquals(7.07D, snapshot.getStdDev(), 2.0D);
+        }
+    }
+
+    @Test
+    public void testPercentile()
+    {
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+            // percentile of empty histogram is 0
+            assertEquals(0D, histogram.copyToSnapshot(0).getValue(0.99), 
DOUBLE_ASSERT_DELTA);
+            assertEquals(0D, 
histogram.copyToSnapshot(ONE_SECOND).getValue(0.99), DOUBLE_ASSERT_DELTA);
+            assertEquals(0D, 
histogram.copyToSnapshot(ONE_HOUR).getValue(0.99), DOUBLE_ASSERT_DELTA);
+        }
+
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+            histogram.increment(1, time);
+            // percentile of a histogram with one element should be that 
element
+            assertEquals(1D, histogram.copyToSnapshot(time + 
1).getValue(0.99), DOUBLE_ASSERT_DELTA);
+        }
+
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+            histogram.increment(1, time);
+            histogram.increment(10, time);
+            assertEquals(10D, histogram.copyToSnapshot(time + 
1).getValue(0.99), DOUBLE_ASSERT_DELTA);
+        }
+
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+
+            histogram.increment(1, time);
+            histogram.increment(2, time);
+            histogram.increment(3, time);
+            histogram.increment(4, time);
+            histogram.increment(5, time);
+
+            Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+            assertEquals(1, snapshot.getValue(0.00), DOUBLE_ASSERT_DELTA);
+            assertEquals(3, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+            assertEquals(3, snapshot.getValue(0.60), DOUBLE_ASSERT_DELTA);
+            assertEquals(5, snapshot.getValue(1.00), DOUBLE_ASSERT_DELTA);
+        }
+
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+
+            for (int i = 11; i <= 20; i++)
+                histogram.increment(i, time);
+
+            // Right now the histogram looks like:
+            //     10   12   14   16   20
+            //      1    2    2    4    1
+            // %:  10   30   50   90  100
+            Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+            assertEquals(10, snapshot.getValue(0.01), DOUBLE_ASSERT_DELTA);
+            assertEquals(10, snapshot.getValue(0.10), DOUBLE_ASSERT_DELTA);
+
+            assertEquals(11, snapshot.getValue(0.11), DOUBLE_ASSERT_DELTA);
+            assertEquals(11, snapshot.getValue(0.20), DOUBLE_ASSERT_DELTA);
+            assertEquals(12, snapshot.getValue(0.21), DOUBLE_ASSERT_DELTA);
+            assertEquals(12, snapshot.getValue(0.30), DOUBLE_ASSERT_DELTA);
+
+            assertEquals(13, snapshot.getValue(0.31), DOUBLE_ASSERT_DELTA);
+            assertEquals(13, snapshot.getValue(0.40), DOUBLE_ASSERT_DELTA);
+            assertEquals(14, snapshot.getValue(0.41), DOUBLE_ASSERT_DELTA);
+            assertEquals(14, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+            assertEquals(15, snapshot.getValue(0.51), DOUBLE_ASSERT_DELTA);
+            assertEquals(15, snapshot.getValue(0.70), DOUBLE_ASSERT_DELTA);
+            assertEquals(16, snapshot.getValue(0.71), DOUBLE_ASSERT_DELTA);
+            assertEquals(16, snapshot.getValue(0.90), DOUBLE_ASSERT_DELTA);
+            assertEquals(20, snapshot.getValue(0.91), DOUBLE_ASSERT_DELTA);
+        }
+
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+            histogram.increment(0, time);
+            histogram.increment(0, time);
+            histogram.increment(1, time);
+
+            Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+            assertEquals(0, snapshot.getValue(0.5), DOUBLE_ASSERT_DELTA);
+            assertEquals(1, snapshot.getValue(0.99), DOUBLE_ASSERT_DELTA);
+        }
+    }
+
+    @Test
+    public void testSize()
+    {
+        for (long time : TIMES)
+        {
+            LogLinearDecayingHistogram histogram = new 
LogLinearDecayingHistograms().allocate(1);
+            histogram.increment(42, time);
+            histogram.increment(42, time);
+            assertEquals(2, histogram.copyToSnapshot(time + 1).size());
+        }
+    }
+}
diff --git a/test/unit/org/apache/cassandra/metrics/LogLinearHistogramTest.java 
b/test/unit/org/apache/cassandra/metrics/LogLinearHistogramTest.java
new file mode 100644
index 0000000000..c7954e2704
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/LogLinearHistogramTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.RandomTestRunner;
+import com.codahale.metrics.Snapshot;
+import static org.junit.Assert.assertEquals;
+
+public class LogLinearHistogramTest
+{
+    public static final Logger logger = 
LoggerFactory.getLogger(LogLinearHistogramTest.class);
+
+    private static final double DOUBLE_ASSERT_DELTA = 0;
+
+    @Test
+    public void testMinMax()
+    {
+        LogLinearHistogram histogram = new LogLinearHistogram(1);
+        histogram.increment(16);
+        Snapshot snapshot = histogram.copyToSnapshot();
+        assertEquals(16, snapshot.getMin());
+        assertEquals(20, snapshot.getMax());
+    }
+
+    @Test
+    public void testMean()
+    {
+        LogLinearHistogram histogram = new LogLinearHistogram(1);
+        for (int i = 0; i < 40; i++)
+            histogram.increment(0);
+        for (int i = 0; i < 20; i++)
+            histogram.increment(1);
+        for (int i = 0; i < 10; i++)
+            histogram.increment(2);
+        assertEquals(40/70d, histogram.copyToSnapshot().getMean(), 0.1D);
+    }
+
+    @Test
+    public void testStdDev()
+    {
+        LogLinearHistogram histogram = new LogLinearHistogram(1);
+        for (int i = 0; i < 20; i++)
+            histogram.increment(10);
+        for (int i = 0; i < 40; i++)
+            histogram.increment(20);
+        for (int i = 0; i < 20; i++)
+            histogram.increment(30);
+
+        Snapshot snapshot = histogram.copyToSnapshot();
+        assertEquals(20.0D, snapshot.getMean(), 2.0D);
+        assertEquals(7.07D, snapshot.getStdDev(), 2.0D);
+    }
+
+    @Test
+    public void testPercentile()
+    {
+        {
+            LogLinearHistogram histogram = new LogLinearHistogram(1);
+            // percentile of empty histogram is 0
+            assertEquals(0D, histogram.copyToSnapshot().getValue(0.99), 
DOUBLE_ASSERT_DELTA);
+
+            histogram.increment(1);
+            // percentile of a histogram with one element should be that 
element
+            assertEquals(1D, histogram.copyToSnapshot().getValue(0.99), 
DOUBLE_ASSERT_DELTA);
+
+            histogram.increment(10);
+            assertEquals(10D, histogram.copyToSnapshot().getValue(0.99), 
DOUBLE_ASSERT_DELTA);
+        }
+
+        {
+            LogLinearHistogram histogram = new LogLinearHistogram(1);
+
+            histogram.increment(1);
+            histogram.increment(2);
+            histogram.increment(3);
+            histogram.increment(4);
+            histogram.increment(5);
+
+            Snapshot snapshot = histogram.copyToSnapshot();
+            assertEquals(1, snapshot.getValue(0.00), DOUBLE_ASSERT_DELTA);
+            assertEquals(3, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+            assertEquals(3, snapshot.getValue(0.60), DOUBLE_ASSERT_DELTA);
+            assertEquals(5, snapshot.getValue(1.00), DOUBLE_ASSERT_DELTA);
+        }
+
+        {
+            LogLinearHistogram histogram = new LogLinearHistogram(1);
+
+            for (int i = 11; i <= 20; i++)
+                histogram.increment(i);
+
+            // Right now the histogram looks like:
+            //     10   12   14   16   20
+            //      1    2    2    4    1
+            // %:  10   30   50   90  100
+            Snapshot snapshot = histogram.copyToSnapshot();
+            assertEquals(10, snapshot.getValue(0.01), DOUBLE_ASSERT_DELTA);
+            assertEquals(10, snapshot.getValue(0.10), DOUBLE_ASSERT_DELTA);
+
+            assertEquals(11, snapshot.getValue(0.11), DOUBLE_ASSERT_DELTA);
+            assertEquals(11, snapshot.getValue(0.20), DOUBLE_ASSERT_DELTA);
+            assertEquals(12, snapshot.getValue(0.21), DOUBLE_ASSERT_DELTA);
+            assertEquals(12, snapshot.getValue(0.30), DOUBLE_ASSERT_DELTA);
+
+            assertEquals(13, snapshot.getValue(0.31), DOUBLE_ASSERT_DELTA);
+            assertEquals(13, snapshot.getValue(0.40), DOUBLE_ASSERT_DELTA);
+            assertEquals(14, snapshot.getValue(0.41), DOUBLE_ASSERT_DELTA);
+            assertEquals(14, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+            assertEquals(15, snapshot.getValue(0.51), DOUBLE_ASSERT_DELTA);
+            assertEquals(15, snapshot.getValue(0.70), DOUBLE_ASSERT_DELTA);
+            assertEquals(16, snapshot.getValue(0.71), DOUBLE_ASSERT_DELTA);
+            assertEquals(16, snapshot.getValue(0.90), DOUBLE_ASSERT_DELTA);
+            assertEquals(20, snapshot.getValue(0.91), DOUBLE_ASSERT_DELTA);
+        }
+        {
+            LogLinearHistogram histogram = new LogLinearHistogram(1);
+            histogram.increment(0);
+            histogram.increment(0);
+            histogram.increment(1);
+
+            Snapshot snapshot = histogram.copyToSnapshot();
+            assertEquals(0, snapshot.getValue(0.5), DOUBLE_ASSERT_DELTA);
+            assertEquals(1, snapshot.getValue(0.99), DOUBLE_ASSERT_DELTA);
+        }
+    }
+
+    @Test
+    public void testSize()
+    {
+        LogLinearHistogram histogram = new LogLinearHistogram(1);
+        histogram.increment(42);
+        histogram.increment(42);
+        assertEquals(2, histogram.copyToSnapshot().size());
+    }
+
+    @Test
+    public void testRandomizedMinMaxProperties()
+    {
+        RandomTestRunner.test().check(random -> {
+            for (int trial = 0; trial < 100; trial++)
+            {
+                int maxValue = 500;
+                LogLinearHistogram histogram = new 
LogLinearHistogram(maxValue);
+                List<Long> values = new ArrayList<>();
+
+                int numValues = 10 + random.nextInt(100);
+                for (int i = 0; i < numValues; i++)
+                {
+                    long value = random.nextInt(maxValue);
+                    histogram.increment(value);
+                    values.add(value);
+                }
+
+                Snapshot snapshot = histogram.copyToSnapshot();
+                long expectedMin = Collections.min(values);
+                long expectedMax = Collections.max(values);
+
+                assertEquals("" + trial, 
LogLinearHistogram.invertIndex(LogLinearHistogram.index(expectedMin)),
+                             snapshot.getMin());
+                assertEquals(LogLinearHistogram.invertIndex(1 + 
LogLinearHistogram.index(expectedMax)),
+                             snapshot.getMax());
+            }
+        });
+    }
+
+    @Test
+    public void testRandomizedIncrementDecrementConsistency()
+    {
+        RandomTestRunner.test().check(random -> {
+            for (int trial = 0; trial < 50; trial++)
+            {
+                int maxValue = 500;
+                LogLinearHistogram histogram = new 
LogLinearHistogram(maxValue);
+                List<Long> values = new ArrayList<>();
+
+                // Add random values
+                int numValues = 20 + random.nextInt(80);
+                for (int i = 0; i < numValues; i++)
+                {
+                    long value = random.nextInt(5000);
+                    histogram.increment(value);
+                    values.add(value);
+                }
+
+                Snapshot beforeSnapshot = histogram.copyToSnapshot();
+
+                // Remove and re-add some values
+                int numToModify = 1 + random.nextInt(Math.min(10, 
values.size()));
+                for (int i = 0; i < numToModify; i++)
+                {
+                    int idx = random.nextInt(values.size());
+                    long oldValue = values.get(idx);
+                    long newValue = random.nextInt(5000);
+
+                    histogram.decrement(oldValue);
+                    histogram.increment(newValue);
+                    values.set(idx, newValue);
+                }
+
+                Snapshot afterSnapshot = histogram.copyToSnapshot();
+
+                assertEquals(beforeSnapshot.size(), afterSnapshot.size());
+
+                long expectedMin = Collections.min(values);
+                long expectedMax = Collections.max(values);
+                
assertEquals(LogLinearHistogram.invertIndex(LogLinearHistogram.index(expectedMin)),
+                             afterSnapshot.getMin());
+                assertEquals(LogLinearHistogram.invertIndex(1 + 
LogLinearHistogram.index(expectedMax)),
+                             afterSnapshot.getMax());
+            }
+        });
+    }
+
+    @Test
+    public void testRandomizedReplaceConsistency()
+    {
+        RandomTestRunner.test().check(random -> {
+            for (int trial = 0; trial < 100; trial++)
+            {
+                int maxValue = 500;
+                LogLinearHistogram histogram = new 
LogLinearHistogram(maxValue);
+                List<Long> values = new ArrayList<>();
+
+                int numValues = 20 + random.nextInt(80);
+                for (int i = 0; i < numValues; i++)
+                {
+                    long value = random.nextInt(maxValue);
+                    histogram.increment(value);
+                    values.add(value);
+                }
+
+                Snapshot beforeSnapshot = histogram.copyToSnapshot();
+                int numToReplace = 1 + random.nextInt(Math.min(10, 
values.size()));
+                for (int i = 0; i < numToReplace; i++)
+                {
+                    int idx = random.nextInt(values.size());
+                    long oldValue = values.get(idx);
+                    long newValue = random.nextInt(maxValue);
+
+                    histogram.replace(oldValue, newValue);
+                    values.set(idx, newValue);
+                }
+
+                Snapshot afterSnapshot = histogram.copyToSnapshot();
+                assertEquals(beforeSnapshot.size(), afterSnapshot.size());
+
+                long expectedMin = Collections.min(values);
+                long expectedMax = Collections.max(values);
+                
assertEquals(LogLinearHistogram.invertIndex(LogLinearHistogram.index(expectedMin)),
+                             afterSnapshot.getMin());
+                assertEquals(LogLinearHistogram.invertIndex(1 + 
LogLinearHistogram.index(expectedMax)),
+                             afterSnapshot.getMax());
+            }
+        });
+    }
+
+    @Test
+    public void testRandomizedAutoGrowth()
+    {
+        RandomTestRunner.test().check(random -> {
+            for (int trial = 0; trial < 20; trial++)
+            {
+                int maxValue = 500;
+                LogLinearHistogram histogram = new 
LogLinearHistogram(maxValue);
+                List<Long> values = new ArrayList<>();
+
+                int numValues = 50;
+                for (int i = 0; i < numValues; i++)
+                {
+                    long maxRange = (long) Math.pow(2, i / 5.0);
+                    long value = random.nextInt((int) Math.min(maxRange, 
Integer.MAX_VALUE));
+                    histogram.increment(value);
+                    values.add(value);
+                }
+
+                Snapshot snapshot = histogram.copyToSnapshot();
+
+                assertEquals(numValues, snapshot.size());
+
+                long expectedMin = Collections.min(values);
+                long expectedMax = Collections.max(values);
+                
assertEquals(LogLinearHistogram.invertIndex(LogLinearHistogram.index(expectedMin)),
+                             snapshot.getMin());
+                assertEquals(LogLinearHistogram.invertIndex(1 + 
LogLinearHistogram.index(expectedMax)),
+                             snapshot.getMax());
+            }
+        });
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index ac322403d1..f43ddb66e4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -69,7 +69,6 @@ import static 
org.apache.cassandra.service.accord.AccordTestUtils.txnId;
 
 public class AccordCommandTest
 {
-
     static final AtomicLong clock = new AtomicLong(0);
     private static final Node.Id ID1 = new Node.Id(1);
     private static final Node.Id ID2 = new Node.Id(2);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to