This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 494c3c7bef Add AccordExecutorMetrics Also Introduce: -
Sharded/LogLinearDecayingHistogram Also Improve: - Do not take a reference to
CFK unless relevant Also Fix: - Sharded/LogLinearHistogram - ExecuteFlags
serialization bug in ReadData
494c3c7bef is described below
commit 494c3c7befeb291a0bf1d36dd9b958da032a1e43
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Tue Nov 11 10:54:04 2025 +0000
Add AccordExecutorMetrics
Also Introduce:
- Sharded/LogLinearDecayingHistogram
Also Improve:
- Do not take a reference to CFK unless relevant
Also Fix:
- Sharded/LogLinearHistogram
- ExecuteFlags serialization bug in ReadData
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-21017
---
modules/accord | 2 +-
.../cassandra/metrics/AccordCacheMetrics.java | 4 +-
.../metrics/AccordCoordinatorMetrics.java | 5 +-
.../cassandra/metrics/AccordExecutorMetrics.java | 61 ++++
.../cassandra/metrics/AccordReplicaMetrics.java | 123 +++++---
.../metrics/CassandraMetricsRegistry.java | 1 +
.../metrics/LogLinearDecayingHistograms.java | 283 +++++++++++++++++++
.../cassandra/metrics/LogLinearHistogram.java | 112 +++++---
.../metrics/ShardedDecayingHistograms.java | 159 +++++++++++
.../apache/cassandra/metrics/ShardedHistogram.java | 13 -
.../cassandra/metrics/ShardedLongGauges.java | 148 ++++++++++
.../cassandra/service/accord/AccordCache.java | 35 ++-
.../service/accord/AccordCommandStore.java | 2 +
.../cassandra/service/accord/AccordExecutor.java | 37 +++
.../accord/AccordJournalValueSerializers.java | 2 -
.../service/accord/AccordSafeCommandStore.java | 14 +
.../cassandra/service/accord/AccordTask.java | 48 +++-
.../accord/serializers/ReadDataSerializer.java | 19 +-
.../distributed/test/accord/AccordMetricsTest.java | 4 +-
.../metrics/LogLinearDecayingHistogramTest.java | 188 +++++++++++++
.../cassandra/metrics/LogLinearHistogramTest.java | 313 +++++++++++++++++++++
.../service/accord/AccordCommandTest.java | 1 -
22 files changed, 1452 insertions(+), 122 deletions(-)
diff --git a/modules/accord b/modules/accord
index de03d14e48..938ba19ada 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit de03d14e4821b669fecad5dcdb2de2dba3a9d926
+Subproject commit 938ba19adaf70bf9901fb2dc177dce313d7ee15c
diff --git a/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
index 5285bd914e..10fbfd6621 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCacheMetrics.java
@@ -44,7 +44,7 @@ public class AccordCacheMetrics
public AccordCacheGlobalMetrics()
{
- DefaultNameFactory factory = new DefaultNameFactory("AccordCache");
+ DefaultNameFactory factory = new DefaultNameFactory(ACCORD_CACHE);
this.usedBytes =
Metrics.gauge(factory.createMetricName("UsedBytes"),
fromAccordService(sumExecutors(executor ->
executor.cacheUnsafe().weightedSize()), 0L));
this.unreferencedBytes =
Metrics.gauge(factory.createMetricName("UnreferencedBytes"),
fromAccordService(sumExecutors(executor ->
executor.cacheUnsafe().unreferencedBytes()), 0L));
}
@@ -89,7 +89,7 @@ public class AccordCacheMetrics
public AccordCacheMetrics(String subTypeName)
{
- DefaultNameFactory factory = new DefaultNameFactory("AccordCache",
subTypeName);
+ DefaultNameFactory factory = new DefaultNameFactory(ACCORD_CACHE,
subTypeName);
this.objectSize =
Metrics.shardedHistogram(factory.createMetricName("EntrySize"));
this.hits = Metrics.gauge(factory.createMetricName("Hits"),
hitRate::totalHits);
this.misses = Metrics.gauge(factory.createMetricName("Misses"),
hitRate::totalMisses);
diff --git
a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
index c4a37ebf2e..1500517bcb 100644
--- a/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordCoordinatorMetrics.java
@@ -42,6 +42,7 @@ public class AccordCoordinatorMetrics
{
public final static AccordCoordinatorMetrics readMetrics = new
AccordCoordinatorMetrics("ro");
public final static AccordCoordinatorMetrics writeMetrics = new
AccordCoordinatorMetrics("rw");
+ public final static AccordCoordinatorMetrics syncPointMetrics = new
AccordCoordinatorMetrics("rx");
public static final String ACCORD_COORDINATOR = "AccordCoordinator";
public static final String COORDINATOR_EPOCHS = "Epochs";
@@ -192,7 +193,7 @@ public class AccordCoordinatorMetrics
{
throw new RuntimeException(e);
}
- builder.append("]");
+ builder.append(']');
return builder.toString();
}
@@ -206,6 +207,8 @@ public class AccordCoordinatorMetrics
return writeMetrics;
else if (txnId.isSomeRead())
return readMetrics;
+ else if (txnId.isSyncPoint())
+ return syncPointMetrics;
}
return null;
}
diff --git a/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
new file mode 100644
index 0000000000..c916c3239a
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/AccordExecutorMetrics.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import com.codahale.metrics.Gauge;
+import
org.apache.cassandra.metrics.ShardedDecayingHistograms.ShardedDecayingHistogram;
+import org.apache.cassandra.service.accord.AccordExecutor;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import static org.apache.cassandra.service.accord.AccordExecutor.HISTOGRAMS;
+
+public class AccordExecutorMetrics
+{
+ public static final String ACCORD_EXECUTOR = "AccordExecutor";
+ public static final AccordExecutorMetrics INSTANCE = new
AccordExecutorMetrics();
+
+ public final ShardedLongGauges<AccordExecutor> gauges = new
ShardedLongGauges<>();
+
+ // latency
+ public final ShardedDecayingHistogram elapsedPreparingToRun =
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
+ public final ShardedDecayingHistogram elapsedWaitingToRun =
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
+ public final ShardedDecayingHistogram elapsedRunning =
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
+
+ // number of keys involved
+ public final ShardedDecayingHistogram keys = HISTOGRAMS.newHistogram(1 <<
12);
+
+ public final Gauge<Long> preparingToRun;
+ public final Gauge<Long> waitingToRun;
+ public final Gauge<Long> running;
+
+ public AccordExecutorMetrics()
+ {
+ DefaultNameFactory factory = new DefaultNameFactory(ACCORD_EXECUTOR);
+ Metrics.register(factory.createMetricName("ElapsedPreparingToRun"),
elapsedPreparingToRun);
+ Metrics.register(factory.createMetricName("ElapsedWaitingToRun"),
elapsedWaitingToRun);
+ Metrics.register(factory.createMetricName("ElapsedRunning"),
elapsedRunning);
+
+ Metrics.register(factory.createMetricName("Keys"), keys);
+ preparingToRun =
Metrics.register(factory.createMetricName("PreparingToRun"),
gauges.newGauge(AccordExecutor::unsafePreparingToRunCount, Long::sum));
+ waitingToRun =
Metrics.register(factory.createMetricName("WaitingToRun"),
gauges.newGauge(AccordExecutor::unsafeWaitingToRunCount, Long::sum));
+ running = Metrics.register(factory.createMetricName("Running"),
gauges.newGauge(AccordExecutor::unsafeRunningCount, Long::sum));
+ }
+}
diff --git a/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
b/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
index 91973e4a95..8461789771 100644
--- a/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/AccordReplicaMetrics.java
@@ -25,63 +25,85 @@ import accord.api.ReplicaEventListener;
import accord.local.Command;
import accord.local.SafeCommandStore;
import accord.primitives.PartialDeps;
-import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import com.codahale.metrics.Counting;
-import com.codahale.metrics.Histogram;
-import com.codahale.metrics.Timer;
-import org.apache.cassandra.service.accord.api.AccordTimeService;
+import
org.apache.cassandra.metrics.LogLinearDecayingHistograms.LogLinearDecayingHistogram;
+import
org.apache.cassandra.metrics.ShardedDecayingHistograms.DecayingHistogramsShard;
+import
org.apache.cassandra.metrics.ShardedDecayingHistograms.ShardedDecayingHistogram;
+import org.apache.cassandra.service.accord.AccordCommandStore;
+import org.apache.cassandra.service.accord.AccordSafeCommandStore;
import org.apache.cassandra.tracing.Tracing;
import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+import static org.apache.cassandra.service.accord.AccordExecutor.HISTOGRAMS;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
public class AccordReplicaMetrics
{
public final static AccordReplicaMetrics readMetrics = new
AccordReplicaMetrics("ro");
public final static AccordReplicaMetrics writeMetrics = new
AccordReplicaMetrics("rw");
+ public final static AccordReplicaMetrics syncPointMetrics = new
AccordReplicaMetrics("rx");
public static final String ACCORD_REPLICA = "AccordReplica";
public static final String REPLICA_STABLE_LATENCY = "StableLatency";
public static final String REPLICA_PREAPPLY_LATENCY = "PreApplyLatency";
public static final String REPLICA_APPLY_LATENCY = "ApplyLatency";
- public static final String REPLICA_APPLY_DURATION = "ApplyDuration";
public static final String REPLICA_DEPENDENCIES = "Dependencies";
+ static final class SubShard
+ {
+ final LogLinearDecayingHistogram stableLatency;
+ final LogLinearDecayingHistogram preapplyLatency;
+ final LogLinearDecayingHistogram applyLatency;
+ final LogLinearDecayingHistogram dependencies;
+
+ private SubShard(AccordReplicaMetrics metrics, DecayingHistogramsShard
shard)
+ {
+ this.stableLatency = metrics.stableLatency.forShard(shard);
+ this.preapplyLatency = metrics.preapplyLatency.forShard(shard);
+ this.applyLatency = metrics.applyLatency.forShard(shard);
+ this.dependencies = metrics.dependencies.forShard(shard);
+ }
+ }
+
+ public static final class Shard
+ {
+ final SubShard reads, writes, syncPoints;
+ public Shard(DecayingHistogramsShard shard)
+ {
+ reads = new SubShard(readMetrics, shard);
+ writes = new SubShard(writeMetrics, shard);
+ syncPoints = new SubShard(syncPointMetrics, shard);
+ }
+ }
+
/**
* The time between start on the coordinator and commit on this replica.
*/
- public final Timer stableLatency;
+ public final ShardedDecayingHistogram stableLatency =
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
/**
* The time between start on the coordinator and arrival of the result on
this replica.
*/
- public final Timer preapplyLatency;
+ public final ShardedDecayingHistogram preapplyLatency =
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
/**
* The time between start on the coordinator and application on this
replica.
*/
- public final Timer applyLatency;
-
- /**
- * TODO (expected): probably more interesting is latency from preapplied
to apply;
- * we already track local write latencies, whch this effectively
duplicates (but including queueing latencies)
- * Duration of applying changes.
- */
- public final Timer applyDuration;
+ public final ShardedDecayingHistogram applyLatency =
HISTOGRAMS.newHistogram(TimeUnit.SECONDS.toNanos(1L));
/**
* A histogram of the number of dependencies per transaction at this
replica.
*/
- public final Histogram dependencies;
+ public final ShardedDecayingHistogram dependencies =
HISTOGRAMS.newHistogram(1 << 12);
private AccordReplicaMetrics(String scope)
{
DefaultNameFactory replica = new DefaultNameFactory(ACCORD_REPLICA,
scope);
- stableLatency =
Metrics.timer(replica.createMetricName(REPLICA_STABLE_LATENCY));
- preapplyLatency =
Metrics.timer(replica.createMetricName(REPLICA_PREAPPLY_LATENCY));
- applyLatency =
Metrics.timer(replica.createMetricName(REPLICA_APPLY_LATENCY));
- applyDuration =
Metrics.timer(replica.createMetricName(REPLICA_APPLY_DURATION));
- dependencies =
Metrics.histogram(replica.createMetricName(REPLICA_DEPENDENCIES), true);
+ Metrics.register(replica.createMetricName(REPLICA_STABLE_LATENCY),
stableLatency);
+ Metrics.register(replica.createMetricName(REPLICA_PREAPPLY_LATENCY),
preapplyLatency);
+ Metrics.register(replica.createMetricName(REPLICA_APPLY_LATENCY),
applyLatency);
+ Metrics.register(replica.createMetricName(REPLICA_DEPENDENCIES),
dependencies);
}
@Override
@@ -106,64 +128,79 @@ public class AccordReplicaMetrics
{
throw new RuntimeException(e);
}
- builder.append("]");
+ builder.append(']');
return builder.toString();
}
public static class Listener implements ReplicaEventListener
{
- private AccordReplicaMetrics forTransaction(TxnId txnId)
+ private SubShard forTransaction(SafeCommandStore safeStore, TxnId
txnId)
{
if (txnId != null)
{
+ Shard shard = ((AccordCommandStore)
safeStore.commandStore()).executor().replicaMetrics;
if (txnId.isWrite())
- return writeMetrics;
+ return shard.writes;
else if (txnId.isSomeRead())
- return readMetrics;
+ return shard.reads;
+ else if (txnId.isSyncPoint())
+ return shard.syncPoints;
}
return null;
}
+ private static long unixNanos()
+ {
+ return currentTimeMillis() * 1_000_000;
+ }
+
+ private static long elapsed(TxnId txnId)
+ {
+ return elapsed(unixNanos(), txnId);
+ }
+
+ private static long elapsed(long unixNanos, TxnId txnId)
+ {
+ return Math.max(0, unixNanos - (txnId.hlc() * 1000));
+ }
+
+ private static LogLinearDecayingHistograms.Buffer
buffer(SafeCommandStore safeStore)
+ {
+ return ((AccordSafeCommandStore) safeStore).histogramBuffer();
+ }
+
@Override
public void onStable(SafeCommandStore safeStore, Command cmd)
{
Tracing.trace("Stable {} on {}", cmd.txnId(),
safeStore.commandStore());
- long now = AccordTimeService.nowMicros();
- AccordReplicaMetrics metrics = forTransaction(cmd.txnId());
+ SubShard metrics = forTransaction(safeStore, cmd.txnId());
if (metrics != null)
- {
- long trxTimestamp = cmd.txnId().hlc();
- metrics.stableLatency.update(now - trxTimestamp,
TimeUnit.MICROSECONDS);
- }
+ metrics.stableLatency.add(buffer(safeStore),
elapsed(cmd.txnId()));
}
@Override
public void onPreApplied(SafeCommandStore safeStore, Command cmd)
{
Tracing.trace("Preapplied {} on {}", cmd.txnId(),
safeStore.commandStore());
- long now = AccordTimeService.nowMicros();
- AccordReplicaMetrics metrics = forTransaction(cmd.txnId());
+ SubShard metrics = forTransaction(safeStore, cmd.txnId());
if (metrics != null)
{
- Timestamp trxTimestamp = cmd.txnId();
- metrics.preapplyLatency.update(now - trxTimestamp.hlc(),
TimeUnit.MICROSECONDS);
+ long elapsed = elapsed(cmd.txnId());
+ metrics.preapplyLatency.add(buffer(safeStore), elapsed);
PartialDeps deps = cmd.partialDeps();
- metrics.dependencies.update(deps != null ? deps.txnIdCount() :
0);
+ metrics.dependencies.add(buffer(safeStore), deps != null ?
deps.txnIdCount() : 0);
}
}
@Override
- public void onApplied(SafeCommandStore safeStore, Command cmd, long
applyStartedAt)
+ public void onApplied(SafeCommandStore safeStore, Command cmd)
{
Tracing.trace("Applied {} on {}", cmd.txnId(),
safeStore.commandStore());
- long now = AccordTimeService.nowMicros();
- AccordReplicaMetrics metrics = forTransaction(cmd.txnId());
+ SubShard metrics = forTransaction(safeStore, cmd.txnId());
if (metrics != null)
{
- Timestamp trxTimestamp = cmd.txnId();
- metrics.applyLatency.update(now - trxTimestamp.hlc(),
TimeUnit.MICROSECONDS);
- if (applyStartedAt > 0)
- metrics.applyDuration.update(now - applyStartedAt,
TimeUnit.MICROSECONDS);
+ long now = unixNanos();
+ metrics.applyLatency.add(buffer(safeStore), elapsed(now,
cmd.txnId()));
}
}
}
diff --git
a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
index 756144c84d..fc50797005 100644
--- a/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
+++ b/src/java/org/apache/cassandra/metrics/CassandraMetricsRegistry.java
@@ -118,6 +118,7 @@ public class CassandraMetricsRegistry extends MetricRegistry
.add(AccordCoordinatorMetrics.ACCORD_COORDINATOR)
.add(AccordCacheMetrics.ACCORD_CACHE)
.add(AccordReplicaMetrics.ACCORD_REPLICA)
+ .add(AccordExecutorMetrics.ACCORD_EXECUTOR)
.add(AccordSystemMetrics.ACCORD_SYSTEM)
.add(BatchMetrics.TYPE_NAME)
.add(BufferPoolMetrics.TYPE_NAME)
diff --git
a/src/java/org/apache/cassandra/metrics/LogLinearDecayingHistograms.java
b/src/java/org/apache/cassandra/metrics/LogLinearDecayingHistograms.java
new file mode 100644
index 0000000000..3d0279c0ca
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/LogLinearDecayingHistograms.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import accord.utils.Invariants;
+import org.apache.cassandra.metrics.LogLinearHistogram.LogLinearSnapshot;
+
+import static
org.apache.cassandra.metrics.DecayingEstimatedHistogramReservoir.MEAN_LIFETIME_IN_S;
+import static org.apache.cassandra.metrics.LogLinearHistogram.MAX_INDEX;
+import static org.apache.cassandra.metrics.LogLinearHistogram.index;
+import static org.apache.cassandra.metrics.LogLinearHistogram.invertIndex;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+/**
+ * A simple single-threaded decaying histogram with log-linear bucket
distribution.
+ * This has approximately the same accuracy as the lg 1.2 growth of
EstimatedHistogram, but is simpler and faster.
+ *
+ * Logically very similar to DecayingEstimatedHistogramReservoir, only due to
single-threaded behaviour this can be
+ * performed more simply and efficiently. We project forward single updates,
and periodically rebase in place.
+ * We also share the costly arithmetic across a group of histograms that share
mutual exclusivity.
+ *
+ * TODO (desired): improve performance and memory locality by using a small
buffer for collecting updates with e.g. 4 bits per counter,
+ */
+public class LogLinearDecayingHistograms
+{
+ public static class Buffer
+ {
+ static final long LARGE_VALUE = -1L >>> (1 + HISTOGRAM_INDEX_BITS);
+ private final LogLinearDecayingHistograms histograms;
+ private long[] buffer = new long[2];
+ private int bufferCount;
+
+ public Buffer(LogLinearDecayingHistograms histograms)
+ {
+ this.histograms = histograms;
+ }
+
+ private void add(long histogramIndex, long value)
+ {
+ Invariants.require(histogramIndex <= HISTOGRAM_INDEX_MASK);
+ Invariants.require(value >= 0);
+ if (value <= LARGE_VALUE) value <<= HISTOGRAM_INDEX_BITS;
+ else
+ {
+ value |= Long.MIN_VALUE;
+ value &= ~HISTOGRAM_INDEX_MASK;
+ }
+ value |= histogramIndex;
+ if (bufferCount == buffer.length)
+ buffer = Arrays.copyOf(buffer, bufferCount * 2);
+ buffer[bufferCount++] = value;
+ }
+
+ public void flush(long at)
+ {
+ for (int i = 0 ; i < bufferCount ; ++i)
+ {
+ long v = buffer[i];
+ int histogramIndex = (int) (v & HISTOGRAM_INDEX_MASK);
+ if (v < 0) v &= ~Long.MIN_VALUE & ~HISTOGRAM_INDEX_MASK;
+ else v >>>= HISTOGRAM_INDEX_BITS;
+ histograms.histograms.get(histogramIndex).increment(v, at);
+ }
+ bufferCount = 0;
+ }
+
+ public boolean isEmpty()
+ {
+ return bufferCount == 0;
+ }
+ }
+
+ public class LogLinearDecayingHistogram
+ {
+ final int histogramIndex;
+
+ private double[] buckets;
+ double totalCount;
+
+ private LogLinearDecayingHistogram(int histogramIndex, long
expectedMaxValue)
+ {
+ this.histogramIndex = histogramIndex;
+ buckets = new
double[LogLinearHistogram.bucketCount(expectedMaxValue)];
+ }
+
+ public void increment(long value)
+ {
+ increment(value, nanoTime());
+ }
+
+ public void increment(long value, long at)
+ {
+ int index = index(value);
+ if (bufferCount >= buffer.length)
+ {
+ Invariants.require(bufferCount == buffer.length);
+ flush();
+ Invariants.require(bufferCount == 0);
+ }
+ updateDecay(at);
+ long v = Double.doubleToRawLongBits(increment) & VALUE_MASK;
+ v |= histogramIndex | ((long)index << HISTOGRAM_INDEX_BITS);
+ buffer[bufferCount++] = v;
+ }
+
+ private double[] buckets(int withIndexAtLeast)
+ {
+ if (buckets.length <= withIndexAtLeast)
+ {
+ Invariants.require(withIndexAtLeast <= MAX_INDEX);
+ buckets = Arrays.copyOf(buckets, (withIndexAtLeast | 0x3) + 1);
+ }
+ return buckets;
+ }
+
+ public LogLinearSnapshot copyToSnapshot(long at)
+ {
+ LogLinearSnapshot snapshot = new LogLinearSnapshot(new
long[buckets.length], 0);
+ updateSnapshot(snapshot, at);
+ return snapshot;
+ }
+
+ public void updateSnapshot(LogLinearSnapshot snapshot, long at)
+ {
+ decay(at);
+
+ if (snapshot.raw.length < buckets.length)
+ snapshot.raw = Arrays.copyOf(snapshot.raw, buckets.length);
+
+ long[] raw = snapshot.raw;
+ long total = 0;
+ for (int i = 0 ; i < buckets.length ; ++i)
+ {
+ long v = Math.round(buckets[i]);
+ raw[i] += v;
+ total += v;
+ }
+
+ snapshot.totalCount += total;
+ snapshot.cumulative = null;
+ }
+
+ public void add(Buffer buffer, long value)
+ {
+ Invariants.require(buffer.histograms ==
LogLinearDecayingHistograms.this);
+ buffer.add(histogramIndex, value);
+ }
+ }
+
+ private static final long ANTI_DECAY_REFRESH_RATE =
TimeUnit.SECONDS.toNanos(1L);
+ private static final long DECAY_REFRESH_RATE = TimeUnit.HOURS.toNanos(1L);
+ private static final double DECAY_RATE = 1d/(TimeUnit.SECONDS.toNanos(1) *
MEAN_LIFETIME_IN_S);
+ private static final int BUCKET_INDEX_BITS = 8;
+ private static final int HISTOGRAM_INDEX_BITS = 5;
+ private static final long HISTOGRAM_INDEX_MASK = (1L <<
HISTOGRAM_INDEX_BITS) - 1;
+ private static final long BUCKET_INDEX_MASK = (1L << BUCKET_INDEX_BITS) -
1;
+ private static final long VALUE_MASK = ~(HISTOGRAM_INDEX_MASK |
(BUCKET_INDEX_MASK << HISTOGRAM_INDEX_BITS));
+ private static final int BUFFER_SIZE = 64;
+
+ static
+ {
+ Invariants.require((1 << BUCKET_INDEX_BITS) > MAX_INDEX);
+ }
+
+ private final long[] buffer = new long[BUFFER_SIZE];
+ private int bufferCount;
+ private long lastDecayedAt, lastAntiDecayedAt;
+ private double increment = 1d;
+
+ List<LogLinearDecayingHistogram> histograms = new ArrayList<>();
+
+ void updateDecay(long now)
+ {
+ long delta = now - lastAntiDecayedAt;
+ if (delta <= ANTI_DECAY_REFRESH_RATE)
+ return;
+
+ if (delta < DECAY_REFRESH_RATE) antiDecay(now);
+ else decay(now);
+ }
+
+ private void antiDecay(long now)
+ {
+ increment = Math.exp((now - lastDecayedAt) * DECAY_RATE);
+ }
+
+ private void flush()
+ {
+ for (int i = 0 ; i < bufferCount ; ++i)
+ {
+ long bits = buffer[i];
+ double increment = Double.longBitsToDouble(bits & VALUE_MASK);
+ int histogramIndex = (int) (bits & HISTOGRAM_INDEX_MASK);
+ int bucketIndex = (int)((bits >>> HISTOGRAM_INDEX_BITS) &
BUCKET_INDEX_MASK);
+ LogLinearDecayingHistogram histogram =
histograms.get(histogramIndex);
+ histogram.buckets(bucketIndex)[bucketIndex] += increment;
+ histogram.totalCount += increment;
+ }
+ bufferCount = 0;
+ }
+
+ private void decay(long now)
+ {
+ flush();
+
+ if (now <= lastDecayedAt)
+ return;
+
+ antiDecay(now);
+ double decay = 1d/increment;
+ increment = 1d;
+ for (LogLinearDecayingHistogram histogram : histograms)
+ {
+ if (histogram.totalCount <= 0d)
+ continue;
+
+ double[] buckets = histogram.buckets;
+ double total = 0d;
+ for (int i = 0 ; i < buckets.length ; ++i)
+ {
+ if (buckets[i] <= 0d)
+ continue;
+
+ double in = buckets[i];
+ double out = in * decay;
+ if (out < 0.5d)
+ out = 0d;
+ buckets[i] = out;
+ total += out;
+ }
+ histogram.totalCount = total;
+ }
+ lastDecayedAt = lastAntiDecayedAt = now;
+ }
+
+ public synchronized LogLinearDecayingHistogram allocate(long
expectedMaxValue)
+ {
+ Invariants.require(histograms.size() <= HISTOGRAM_INDEX_MASK);
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistogram(histograms.size(), expectedMaxValue);
+ histograms.add(histogram);
+ return histogram;
+ }
+
+ public synchronized LogLinearDecayingHistogram get(int histogramIndex)
+ {
+ return histograms.get(histogramIndex);
+ }
+
+ public static double[] bucketsWithScale(long maxScale)
+ {
+ return bucketsWithLength(1 + index(maxScale));
+ }
+
+ public static double[] bucketsWithLength(int length)
+ {
+ Invariants.require(length <= MAX_INDEX + 1);
+ double[] buckets = new double[length];
+ for (int i = 0 ; i < length ; ++i)
+ buckets[i] = invertIndex(i);
+ return buckets;
+ }
+}
diff --git a/src/java/org/apache/cassandra/metrics/LogLinearHistogram.java
b/src/java/org/apache/cassandra/metrics/LogLinearHistogram.java
index 43420cf7f6..e6be4f9017 100644
--- a/src/java/org/apache/cassandra/metrics/LogLinearHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/LogLinearHistogram.java
@@ -28,8 +28,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.primitives.Ints;
import accord.utils.Invariants;
+import accord.utils.SortedArrays;
import com.codahale.metrics.Snapshot;
+import static accord.utils.SortedArrays.Search.CEIL;
+
/**
* A simple single-threaded histogram with log-linear buckets.
* This has approximately the same accuracy as the lg 1.2 growth of
EstimatedHistogram, but is simpler and faster.
@@ -40,13 +43,13 @@ import com.codahale.metrics.Snapshot;
*/
public class LogLinearHistogram
{
- private static final int MAX_INDEX = 247;
+ public static final int MAX_INDEX = 247;
private long[] buckets;
long totalCount;
public LogLinearHistogram(long expectedMaxValue)
{
- buckets = new long[buckets(expectedMaxValue)];
+ buckets = new long[bucketCount(expectedMaxValue)];
}
public void increment(long value)
@@ -99,12 +102,12 @@ public class LogLinearHistogram
return buckets;
}
- private static int buckets(long maxValue)
+ static int bucketCount(long maxValue)
{
return 1 + (index(maxValue) | 0x3);
}
- private static int index(long value)
+ static int index(long value)
{
if (value < 4)
return (int) value;
@@ -113,7 +116,7 @@ public class LogLinearHistogram
return (log + 1) * 4 + linear;
}
- private static long invertIndex(int index)
+ static long invertIndex(int index)
{
if (index < 4)
return index;
@@ -141,7 +144,7 @@ public class LogLinearHistogram
public static LogLinearSnapshot emptyForMax(long maxValue)
{
- return new LogLinearSnapshot(buckets(maxValue));
+ return new LogLinearSnapshot(bucketCount(maxValue));
}
LogLinearSnapshot(int size)
@@ -163,33 +166,37 @@ public class LogLinearHistogram
cumulative = new long[raw.length];
long sum = 0;
for (int i = 0 ; i < cumulative.length ; ++i)
- cumulative[i] = sum += cumulative[i];
+ cumulative[i] = sum += raw[i];
}
return cumulative;
}
- private long get(long tot)
+ private double get(long tot)
{
- if (totalCount == 0)
+ if (totalCount == 0 || tot == 0)
return 0;
long[] cumulative = cumulative();
- int i = Arrays.binarySearch(cumulative, tot);
- if (i >= 0) while (i > 0 && cumulative[i-1] == tot) --i;
- else i = Math.max(0, -2 - i);
- long prevValue = invertIndex(i);
+ int i = SortedArrays.binarySearch(cumulative, 0,
cumulative.length, tot, CEIL);
+ if (i >= 0)
+ return invertIndex(i);
+
+ i = Math.max(0, -2 - i);
long prevCount = cumulative[i];
+ long nextCount = cumulative[i + 1];
+
+ long prevValue = invertIndex(i);
long nextValue = invertIndex(i + 1);
- long nextCount = i + 1 == cumulative.length ? totalCount :
cumulative[i + 1];
- long gap = tot - prevCount;
- // should we use double arithmetic here to avoid overflow?
- return prevValue + ((nextValue - prevValue) * gap) / (nextCount -
prevCount);
+
+ double granularity = (nextValue - prevValue) / (double)(nextCount
- prevCount);
+ double targetGap = tot - prevCount;
+ return prevValue + Math.round(targetGap * granularity);
}
@Override
public double getValue(double quantile)
{
- return get(Math.round(totalCount * quantile));
+ return get(Math.max(1L, (long)Math.ceil(totalCount * quantile)));
}
@Override
@@ -199,23 +206,47 @@ public class LogLinearHistogram
}
@Override
- public long getMax()
+ public double getMean()
{
- return get(totalCount);
+ if (totalCount == 0)
+ return 0.0D;
+
+ double sum = 0;
+ for (int i = 0; i < raw.length; i++)
+ {
+ if (raw[i] != 0)
+ sum += raw[i] * (double) invertIndex(i);
+ }
+ return sum / totalCount;
}
@Override
- public double getMean()
+ public long getMin()
{
- if (totalCount <= 1)
- return 0.0D;
- return get(totalCount / 2);
+ if (totalCount == 0)
+ return 0;
+
+ long[] cumulative = cumulative();
+ int i = SortedArrays.binarySearch(cumulative, 0,
cumulative.length, 1, CEIL);
+ if (i < 0)
+ {
+ i = Math.max(0, -2 - i);
+ if (cumulative[i] == 0)
+ ++i;
+ }
+ return invertIndex(i);
}
@Override
- public long getMin()
+ public long getMax()
{
- return get(1);
+ if (totalCount == 0)
+ return 0;
+
+ long[] cumulative = cumulative();
+ int i = SortedArrays.binarySearch(cumulative, 0,
cumulative.length, totalCount, CEIL);
+ if (i < 0) i = -2 - i;
+ return invertIndex(i + 1);
}
/**
@@ -230,23 +261,19 @@ public class LogLinearHistogram
public double getStdDev()
{
if (totalCount <= 1)
- {
return 0.0D;
- }
- else
- {
- double mean = this.getMean();
- double sum = 0.0D;
- for(int i = 0; i < raw.length; ++i)
- {
- long value = invertIndex(i);
- double diff = value - mean;
- sum += diff * diff * raw[i];
- }
+ double mean = this.getMean();
+ double sum = 0.0D;
- return Math.sqrt(sum / (totalCount - 1));
+ for(int i = 0; i < raw.length; ++i)
+ {
+ long value = invertIndex(i);
+ double diff = value - mean;
+ sum += diff * diff * raw[i];
}
+
+ return Math.sqrt(sum / (totalCount - 1));
}
@Override
@@ -274,13 +301,18 @@ public class LogLinearHistogram
return result;
}
+ public LogLinearSnapshot copyToSnapshot()
+ {
+ return new LogLinearSnapshot(buckets.clone(), totalCount);
+ }
+
public void updateSnapshot(LogLinearSnapshot snapshot)
{
if (snapshot.raw.length < buckets.length)
snapshot.raw = Arrays.copyOf(snapshot.raw, buckets.length);
long[] raw = snapshot.raw;
- for (int i = 0 ; i < raw.length ; ++i)
+ for (int i = 0 ; i < buckets.length ; ++i)
raw[i] = raw[i] + buckets[i];
snapshot.totalCount += totalCount;
snapshot.cumulative = null;
diff --git
a/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
b/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
new file mode 100644
index 0000000000..db91857b3c
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ShardedDecayingHistograms.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+
+import com.codahale.metrics.Snapshot;
+import
org.apache.cassandra.metrics.LogLinearDecayingHistograms.LogLinearDecayingHistogram;
+import org.apache.cassandra.metrics.LogLinearHistogram.LogLinearSnapshot;
+import org.apache.cassandra.utils.Clock;
+
+import static
org.apache.cassandra.metrics.CassandraReservoir.BucketStrategy.log_linear;
+
+public class ShardedDecayingHistograms
+{
+ private static final long REFRESH_RATE = TimeUnit.SECONDS.toNanos(15);
+
+ public class ShardedDecayingHistogram extends OverrideHistogram
+ {
+ final int histogramIndex;
+ final long initialMaxValue;
+
+ private ShardedDecayingHistogram(int histogramIndex, long
initialMaxValue)
+ {
+ this.histogramIndex = histogramIndex;
+ this.initialMaxValue = initialMaxValue;
+ }
+
+ @Override
+ public CassandraReservoir.BucketStrategy bucketStrategy()
+ {
+ return log_linear;
+ }
+
+ @Override
+ public long[] bucketStarts(int length)
+ {
+ return LogLinearHistogram.bucketsWithLength(length);
+ }
+
+ @Override
+ public synchronized long getCount()
+ {
+ return maybeRefresh().get(histogramIndex).totalCount;
+ }
+
+ @Override
+ public Snapshot getSnapshot()
+ {
+ return maybeRefresh().get(histogramIndex);
+ }
+
+ public LogLinearDecayingHistogram forShard(DecayingHistogramsShard
shard)
+ {
+ return shard.histograms.get(histogramIndex);
+ }
+ }
+
+ public static class DecayingHistogramsShard
+ {
+ final Lock lock;
+ final LogLinearDecayingHistograms histograms;
+
+ DecayingHistogramsShard(Lock lock, LogLinearDecayingHistograms
histograms)
+ {
+ this.lock = lock;
+ this.histograms = histograms;
+ }
+
+ void updateSnapshot(List<LogLinearSnapshot> update, long at)
+ {
+ lock.lock();
+ try
+ {
+ for (int i = 0 ; i < update.size() ; ++i)
+ histograms.get(i).updateSnapshot(update.get(i), at);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public LogLinearDecayingHistograms unsafeGetInternal()
+ {
+ return histograms;
+ }
+ }
+
+ final List<DecayingHistogramsShard> shards = new ArrayList<>();
+ final List<ShardedDecayingHistogram> histograms = new ArrayList<>();
+
+ public synchronized DecayingHistogramsShard newShard(Lock guardedBy)
+ {
+ DecayingHistogramsShard shard = new DecayingHistogramsShard(guardedBy,
new LogLinearDecayingHistograms());
+ for (int i = 0 ; i < histograms.size() ; ++i)
+ shard.histograms.allocate(histograms.get(i).initialMaxValue);
+ shards.add(shard);
+ return shard;
+ }
+
+ public synchronized ShardedDecayingHistogram newHistogram(long
initialMaxValue)
+ {
+ ShardedDecayingHistogram histogram = new
ShardedDecayingHistogram(histograms.size(), initialMaxValue);
+ for (int i = 0 ; i < shards.size() ; ++i)
+ shards.get(i).histograms.allocate(initialMaxValue);
+ histograms.add(histogram);
+ return histogram;
+ }
+
+ private long snapshotAt = Long.MIN_VALUE;
+ private List<LogLinearSnapshot> snapshot = Collections.emptyList();
+
+ public synchronized void refresh()
+ {
+ refresh(Clock.Global.nanoTime());
+ }
+
+ private synchronized List<LogLinearSnapshot> refresh(long now)
+ {
+ List<LogLinearSnapshot> snapshot = new ArrayList<>(histograms.size());
+ for (ShardedDecayingHistogram histogram : histograms)
+
snapshot.add(LogLinearSnapshot.emptyForMax(histogram.initialMaxValue));
+ for (DecayingHistogramsShard shard : shards)
+ shard.updateSnapshot(snapshot, now);
+ this.snapshot = snapshot;
+ this.snapshotAt = now;
+ return this.snapshot;
+ }
+
+ private synchronized List<LogLinearSnapshot> maybeRefresh()
+ {
+ long now = Clock.Global.nanoTime();
+ if (snapshot.size() == histograms.size() && snapshotAt + REFRESH_RATE
>= now)
+ return snapshot;
+
+ return refresh(now);
+ }
+}
diff --git a/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
b/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
index 1677edad81..66defdac56 100644
--- a/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
+++ b/src/java/org/apache/cassandra/metrics/ShardedHistogram.java
@@ -43,19 +43,6 @@ public class ShardedHistogram extends OverrideHistogram
this.histogram = histogram;
}
- long total()
- {
- lock.lock();
- try
- {
- return histogram.totalCount;
- }
- finally
- {
- lock.unlock();
- }
- }
-
public void updateSnapshot(LogLinearSnapshot snapshot)
{
lock.lock();
diff --git a/src/java/org/apache/cassandra/metrics/ShardedLongGauges.java
b/src/java/org/apache/cassandra/metrics/ShardedLongGauges.java
new file mode 100644
index 0000000000..d2721a94c6
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ShardedLongGauges.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.function.LongBinaryOperator;
+import java.util.function.ToLongFunction;
+
+import accord.utils.Invariants;
+import com.codahale.metrics.Gauge;
+import org.apache.cassandra.utils.Clock;
+
+public class ShardedLongGauges<S>
+{
+ private static final long REFRESH_RATE = TimeUnit.SECONDS.toNanos(15);
+
+ public class ShardedLongGauge implements Gauge<Long>
+ {
+ final int gaugeIndex;
+ final ToLongFunction<S> compute;
+ final LongBinaryOperator reduce;
+
+ private ShardedLongGauge(int gaugeIndex, ToLongFunction<S> compute,
LongBinaryOperator reduce)
+ {
+ this.gaugeIndex = gaugeIndex;
+ this.compute = compute;
+ this.reduce = reduce;
+ }
+
+ @Override
+ public Long getValue()
+ {
+ return maybeRefresh()[gaugeIndex];
+ }
+ }
+
+ static class LongGaugeShard<T>
+ {
+ final Lock lock;
+ final T shard;
+
+ LongGaugeShard(Lock lock, T shard)
+ {
+ this.lock = lock;
+ this.shard = shard;
+ }
+
+ public void init(long[] init,
List<ShardedLongGauges<T>.ShardedLongGauge> gauges)
+ {
+ lock.lock();
+ try
+ {
+ for (int i = 0 ; i < init.length ; ++i)
+ init[i] = gauges.get(i).compute.applyAsLong(shard);
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+
+ public void update(long[] update,
List<ShardedLongGauges<T>.ShardedLongGauge> gauges)
+ {
+ lock.lock();
+ try
+ {
+ for (int i = 0 ; i < update.length ; ++i)
+ {
+ ShardedLongGauges<T>.ShardedLongGauge gauge =
gauges.get(i);
+ Invariants.require(gauge.gaugeIndex == i);
+ update[i] = gauge.reduce.applyAsLong(update[i],
gauge.compute.applyAsLong(shard));
+ }
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ }
+
+ final List<LongGaugeShard<S>> shards = new ArrayList<>();
+ final List<ShardedLongGauge> gauges = new ArrayList<>();
+
+ public synchronized void newShard(Lock guardedBy, S shard)
+ {
+ shards.add(new LongGaugeShard<>(guardedBy, shard));
+ }
+
+ public synchronized ShardedLongGauge newGauge(ToLongFunction<S> compute,
LongBinaryOperator reduce)
+ {
+ ShardedLongGauge gauge = new ShardedLongGauge(gauges.size(), compute,
reduce);
+ gauges.add(gauge);
+ return gauge;
+ }
+
+ private long snapshotAt = Long.MIN_VALUE;
+ private long[] snapshot = new long[0];
+
+ public synchronized void refresh()
+ {
+ refresh(Clock.Global.nanoTime());
+ }
+
+ private synchronized long[] refresh(long now)
+ {
+ if (gauges.isEmpty())
+ return new long[0];
+
+ long[] snapshot = new long[gauges.size()];
+ if (shards.isEmpty())
+ return snapshot;
+
+ shards.get(0).init(snapshot, gauges);
+ for (int i = 1; i < shards.size() ; ++i)
+ shards.get(i).update(snapshot, gauges);
+ this.snapshot = snapshot;
+ this.snapshotAt = now;
+ return this.snapshot;
+ }
+
+ private synchronized long[] maybeRefresh()
+ {
+ long now = Clock.Global.nanoTime();
+ if (snapshot.length == gauges.size() && snapshotAt + REFRESH_RATE >=
now)
+ return snapshot;
+
+ return refresh(now);
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordCache.java
b/src/java/org/apache/cassandra/service/accord/AccordCache.java
index 0a1dd623f0..c17ccc879c 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCache.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCache.java
@@ -54,6 +54,7 @@ import accord.utils.UnhandledEnum;
import org.agrona.collections.Object2ObjectHashMap;
import org.apache.cassandra.cache.CacheSize;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.marshal.ByteBufferAccessor;
import org.apache.cassandra.exceptions.UnknownTableException;
import org.apache.cassandra.io.util.DataInputBuffer;
import org.apache.cassandra.metrics.AccordCacheMetrics;
@@ -62,11 +63,11 @@ import org.apache.cassandra.metrics.ShardedHitRate;
import org.apache.cassandra.service.accord.AccordCacheEntry.LoadExecutor;
import org.apache.cassandra.service.accord.AccordCacheEntry.Status;
import org.apache.cassandra.service.accord.events.CacheEvents;
+import org.apache.cassandra.service.accord.serializers.CommandSerializers;
import org.apache.cassandra.service.accord.serializers.Version;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.NoSpamLogger.NoSpamLogStatement;
import org.apache.cassandra.utils.ObjectSizes;
-import org.apache.cassandra.utils.btree.BTree;
import static accord.utils.Invariants.illegalState;
import static accord.utils.Invariants.require;
@@ -219,6 +220,7 @@ public class AccordCache implements CacheSize
{
require(node.references() == 0);
+ // TODO (desired): special-case evict queue with 1 element as no point
shrinking first
if (shrinkingOn && node.tryShrink())
{
IntrusiveLinkedList<AccordCacheEntry<?,?>> queue;
@@ -1107,6 +1109,11 @@ public class AccordCache implements CacheSize
@Override
public Runnable save(AccordCommandStore commandStore, RoutingKey key,
@Nullable CommandsForKey value, @Nullable Object serialized)
{
+ if (serialized != null)
+ {
+ ByteBuffer bb = (ByteBuffer)serialized;
+ serialized = bb.duplicate().position(prefixBytes(bb));
+ }
return commandStore.saveCommandsForKey(key, value, serialized);
}
@@ -1125,19 +1132,35 @@ public class AccordCache implements CacheSize
@Override
public Object fullShrink(RoutingKey key, CommandsForKey value)
{
- if (value.isEmpty())
+ if (value.isEmpty() || value.isLoadingPruned())
return null;
- if (value.isLoadingPruned())
- return value;
+ TxnId last = value.size() == 0 ? null : value.get(value.size() -
1);
+ TxnId minUndecided = value.minUndecided();
+ int lastSize = (int) CommandSerializers.txnId.serializedSize(last);
+ int minUndecidedSize = (int)
CommandSerializers.txnId.serializedSize(minUndecided);
+ ByteBuffer result = Serialize.toBytesWithoutKey(lastSize +
minUndecidedSize, value.maximalPrune());
+ int limit = result.limit();
+ result.limit(lastSize + minUndecidedSize);
+ CommandSerializers.txnId.serialize(last, result,
ByteBufferAccessor.instance, 0);
+ CommandSerializers.txnId.serialize(minUndecided, result,
ByteBufferAccessor.instance, lastSize);
+ result.limit(limit);
+ return result;
+ }
- return Serialize.toBytesWithoutKey(value.maximalPrune());
+ private static int prefixBytes(ByteBuffer bb)
+ {
+ int prefix = (int)
CommandSerializers.txnId.serializedSize(CommandSerializers.txnId.deserialize(bb,
0));
+ prefix +=
(int)CommandSerializers.txnId.serializedSize(CommandSerializers.txnId.deserialize(bb,
prefix));
+ return prefix;
}
@Override
public CommandsForKey inflate(AccordCommandStore commandStore,
RoutingKey key, Object shrunk)
{
- return Serialize.fromBytes(key, (ByteBuffer)shrunk);
+ ByteBuffer bb = ((ByteBuffer)shrunk).duplicate();
+ bb.position(prefixBytes(bb));
+ return Serialize.fromBytes(key, bb, false);
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 5c1df8f136..99b763de21 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -67,6 +67,7 @@ import accord.utils.async.AsyncChains;
import accord.utils.async.AsyncResults.CountingResult;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.metrics.LogLinearDecayingHistograms;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.schema.TableMetadata;
@@ -162,6 +163,7 @@ public class AccordCommandStore extends CommandStore
volatile SafeRedundantBefore safeRedundantBefore;
private AccordSafeCommandStore current;
+ LogLinearDecayingHistograms.Buffer metricsBuffer;
public AccordCommandStore(int id,
NodeCommandStoreService node,
diff --git a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
index 2fa8675563..c9731d101a 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordExecutor.java
@@ -69,6 +69,12 @@ import
org.apache.cassandra.concurrent.DebuggableTask.DebuggableTaskRunner;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.concurrent.Shutdownable;
import org.apache.cassandra.metrics.AccordCacheMetrics;
+import org.apache.cassandra.metrics.AccordExecutorMetrics;
+import org.apache.cassandra.metrics.AccordReplicaMetrics;
+import org.apache.cassandra.metrics.LogLinearDecayingHistograms;
+import
org.apache.cassandra.metrics.LogLinearDecayingHistograms.LogLinearDecayingHistogram;
+import org.apache.cassandra.metrics.ShardedDecayingHistograms;
+import
org.apache.cassandra.metrics.ShardedDecayingHistograms.DecayingHistogramsShard;
import org.apache.cassandra.service.accord.AccordCacheEntry.LoadExecutor;
import org.apache.cassandra.service.accord.AccordCacheEntry.SaveExecutor;
import org.apache.cassandra.service.accord.AccordCacheEntry.UniqueSave;
@@ -98,6 +104,8 @@ import static
org.apache.cassandra.service.accord.AccordTask.State.WAITING_TO_RU
public abstract class AccordExecutor implements CacheSize,
LoadExecutor<AccordTask<?>, Boolean>, SaveExecutor, Shutdownable,
AbstractAsyncExecutor
{
private static final Logger logger =
LoggerFactory.getLogger(AccordExecutor.class);
+ public static final ShardedDecayingHistograms HISTOGRAMS = new
ShardedDecayingHistograms();
+
public interface AccordExecutorFactory
{
AccordExecutor get(int executorId, Mode mode, int threads,
IntFunction<String> name, Agent agent);
@@ -159,6 +167,13 @@ public abstract class AccordExecutor implements CacheSize,
LoadExecutor<AccordTa
private List<Condition> waitingForQuiescence;
private Queue<WaitForCompletion> waitingForCompletion;
+ final LogLinearDecayingHistograms histograms;
+ final LogLinearDecayingHistogram elapsedPreparingToRun;
+ final LogLinearDecayingHistogram elapsedWaitingToRun;
+ final LogLinearDecayingHistogram elapsedRunning;
+ final LogLinearDecayingHistogram keys;
+ public final AccordReplicaMetrics.Shard replicaMetrics;
+
private static class WaitForCompletion
{
final int position;
@@ -212,6 +227,14 @@ public abstract class AccordExecutor implements CacheSize,
LoadExecutor<AccordTa
registerJfrListener(executorId, commandsForKey, "CommandsForKey");
this.caches = new ExclusiveGlobalCaches(this, cache, commands,
commandsForKey);
+
+ DecayingHistogramsShard histogramsShard = HISTOGRAMS.newShard(lock);
+ this.histograms = histogramsShard.unsafeGetInternal();
+ this.elapsedPreparingToRun =
AccordExecutorMetrics.INSTANCE.elapsedPreparingToRun.forShard(histogramsShard);
+ this.elapsedWaitingToRun =
AccordExecutorMetrics.INSTANCE.elapsedWaitingToRun.forShard(histogramsShard);
+ this.elapsedRunning =
AccordExecutorMetrics.INSTANCE.elapsedRunning.forShard(histogramsShard);
+ this.keys =
AccordExecutorMetrics.INSTANCE.keys.forShard(histogramsShard);
+ this.replicaMetrics = new AccordReplicaMetrics.Shard(histogramsShard);
ScheduledExecutors.scheduledFastTasks.scheduleAtFixedRate(() -> {
executeDirectlyWithLock(cache::processNoEvictQueue);
}, 1L, 1L, TimeUnit.SECONDS);
@@ -1761,4 +1784,18 @@ public abstract class AccordExecutor implements
CacheSize, LoadExecutor<AccordTa
}
}
+ public int unsafePreparingToRunCount()
+ {
+ return waitingToLoad.size() + waitingToLoadRangeTxns.size() +
scanningRanges.size() + loading.size();
+ }
+
+ public int unsafeWaitingToRunCount()
+ {
+ return waitingToRun.size();
+ }
+
+ public int unsafeRunningCount()
+ {
+ return running.size();
+ }
}
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
index 99553db5cc..5d2a565932 100644
---
a/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/AccordJournalValueSerializers.java
@@ -37,8 +37,6 @@ import
org.apache.cassandra.service.accord.serializers.Version;
import static accord.local.CommandStores.RangesForEpoch;
// TODO (required): test with large collection values, and perhaps split out
some fields if they have a tendency to grow larger
-// TODO (required): alert on metadata size
-// TODO (required): versioning
public class AccordJournalValueSerializers
{
public interface FlyweightImage
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
index f3ecdc43fd..f9624d2354 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordSafeCommandStore.java
@@ -40,6 +40,8 @@ import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
import accord.primitives.Unseekables;
+import accord.utils.Invariants;
+import org.apache.cassandra.metrics.LogLinearDecayingHistograms;
import org.apache.cassandra.service.accord.AccordCommandStore.ExclusiveCaches;
import
org.apache.cassandra.service.accord.AccordCommandStore.SafeRedundantBefore;
import org.apache.cassandra.service.paxos.PaxosState;
@@ -207,6 +209,18 @@ public class AccordSafeCommandStore extends
AbstractSafeCommandStore<AccordSafeC
return commandStore.node();
}
+ public LogLinearDecayingHistograms.Buffer histogramBuffer()
+ {
+ if (task.histogramBuffer == null)
+ {
+ task.histogramBuffer = commandStore.metricsBuffer;
+ if (task.histogramBuffer == null)
+ task.histogramBuffer = commandStore.metricsBuffer = new
LogLinearDecayingHistograms.Buffer(commandStore.executor().histograms);
+ Invariants.require(task.histogramBuffer.isEmpty());
+ }
+ return task.histogramBuffer;
+ }
+
private boolean visitForKey(Unseekables<?> keysOrRanges,
Predicate<CommandsForKey> forEach)
{
Map<RoutingKey, AccordSafeCommandsForKey> commandsForKey =
task.commandsForKey;
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTask.java
b/src/java/org/apache/cassandra/service/accord/AccordTask.java
index d2b87d25b1..2234ca00f4 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTask.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTask.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.service.accord;
+import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
@@ -59,12 +60,14 @@ import accord.utils.async.Cancellable;
import org.agrona.collections.Object2ObjectHashMap;
import org.agrona.collections.ObjectHashSet;
import org.apache.cassandra.concurrent.DebuggableTask;
+import org.apache.cassandra.metrics.LogLinearDecayingHistograms;
import org.apache.cassandra.service.accord.AccordCacheEntry.Status;
import org.apache.cassandra.service.accord.AccordCommandStore.Caches;
import org.apache.cassandra.service.accord.AccordExecutor.SubmittableTask;
import org.apache.cassandra.service.accord.AccordExecutor.TaskQueue;
import
org.apache.cassandra.service.accord.AccordKeyspace.CommandsForKeyAccessor;
import org.apache.cassandra.service.accord.api.TokenKey;
+import org.apache.cassandra.service.accord.serializers.CommandSerializers;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.Clock;
import org.apache.cassandra.utils.Closeable;
@@ -201,6 +204,7 @@ public abstract class AccordTask<R> extends SubmittableTask
implements Function<
@Nullable Object2ObjectHashMap<TxnId, AccordSafeCommand> commands;
@Nullable Object2ObjectHashMap<RoutingKey, AccordSafeCommandsForKey>
commandsForKey;
@Nullable Object2ObjectHashMap<Object, AccordSafeState<?, ?>> loading;
+ LogLinearDecayingHistograms.Buffer histogramBuffer;
// TODO (desired): collection supporting faster deletes but still fast
poll (e.g. some ordered collection)
@Nullable ArrayDeque<AccordCacheEntry<?, ?>> waitingToLoad;
@Nullable RangeTxnScanner rangeScanner;
@@ -299,12 +303,20 @@ public abstract class AccordTask<R> extends
SubmittableTask implements Function<
Invariants.require(rangeScanner == null || rangeScanner.scanned);
Invariants.require(loading == null && waitingToLoad == null,
"WAITING_TO_RUN => no loading or waiting; found %s", this,
AccordTask::toDescription);
waitingToRunAt = nanoTime();
+
commandStore.executor().elapsedPreparingToRun.increment(waitingToRunAt -
createdAt, runningAt);
}
else if (state == RUNNING)
{
runningAt = nanoTime();
+ if (waitingToRunAt == 0)
+ {
+ waitingToRunAt = runningAt;
+
commandStore.executor().elapsedPreparingToRun.increment(waitingToRunAt -
createdAt, runningAt);
+ }
+ commandStore.executor().elapsedWaitingToRun.increment(runningAt -
waitingToRunAt, runningAt);
+ commandStore.executor().keys.increment(commandsForKey == null ? 0
: commandsForKey.size(), runningAt);
}
- else if (state.isExecuted())
+ else if (state.isExecuted() && completedAt == 0)
{
completedAt = nanoTime();
}
@@ -756,7 +768,17 @@ public abstract class AccordTask<R> extends
SubmittableTask implements Function<
{
if (state == FAILING)
state(FAILED);
+ Invariants.expect(state.isExecuted());
releaseResources(commandStore.cachesExclusive());
+ if (runningAt != 0)
+ {
+ commandStore.executor().elapsedRunning.increment(completedAt -
runningAt, completedAt);
+ }
+ if (histogramBuffer != null)
+ {
+ histogramBuffer.flush(completedAt);
+ histogramBuffer = null;
+ }
}
@Nullable
@@ -979,13 +1001,6 @@ public abstract class AccordTask<R> extends
SubmittableTask implements Function<
default: throw new AssertionError("Unhandled Status: " +
entry.status());
case WAITING_TO_LOAD:
case LOADING:
- if (scanned)
- // if we've finished scanning and not already taken a
reference we shouldn't need to witness (unless modified)
- return;
- ensureLoading().put(entry.key(),
commandsForKeyCache.acquire(entry));
- if (entry.status() == Status.WAITING_TO_LOAD)
- ensureWaitingToLoad().add(entry);
- entry.loadingOrWaiting().add(AccordTask.this);
return;
case MODIFIED:
@@ -995,6 +1010,23 @@ public abstract class AccordTask<R> extends
SubmittableTask implements Function<
case FAILED_TO_SAVE:
if (commandsForKey != null &&
commandsForKey.containsKey(entry.key()))
return;
+
+ Object v = entry.getOrShrunkExclusive();
+ if (v == null) return;
+ else if (v instanceof CommandsForKey)
+ {
+ if (!summaryLoader.isRelevant((CommandsForKey) v))
+ return;
+ }
+ else
+ {
+ TxnId last =
CommandSerializers.txnId.deserialize((ByteBuffer) v);
+ int position =
(int)CommandSerializers.txnId.serializedSize(last);
+ TxnId minUndecided =
CommandSerializers.txnId.deserialize((ByteBuffer) v, position);
+ if (!summaryLoader.isRelevant(entry.key(), last,
minUndecided))
+ return;
+ }
+
ensureCommandsForKey().putIfAbsent(entry.key(),
commandsForKeyCache.acquire(entry));
}
}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializer.java
index 34ab52fc2d..8c1bd7eca8 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/ReadDataSerializer.java
@@ -42,6 +42,7 @@ import accord.primitives.Ranges;
import accord.primitives.Timestamp;
import accord.primitives.TxnId;
import accord.primitives.Writes;
+import accord.utils.Invariants;
import accord.utils.UnhandledEnum;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.io.VersionedSerializer;
@@ -82,7 +83,7 @@ public class ReadDataSerializer implements
IVersionedSerializer<ReadData>
boolean hasExecuteAtEpoch = (read.executeAt() != null ?
read.executeAt() : read.txnId).epoch() != read.executeAtEpoch;
out.writeUnsignedVInt32(type.ordinal()
- | (read.flags.bits() << 3)
+ | shiftedExecuteFlags(read.flags)
| (hasTxn ? HAS_TXN : 0)
| (hasExecuteAt ? HAS_EXECUTE_AT : 0)
| (hasExecuteAtEpoch ? HAS_EXECUTE_AT_EPOCH :
0)
@@ -154,7 +155,7 @@ public class ReadDataSerializer implements
IVersionedSerializer<ReadData>
{
int tmp = in.readUnsignedVInt32();
type = TYPES[tmp & 0x7];
- flags = ExecuteFlags.get((tmp >>> 3) & 0x3);
+ flags = ExecuteFlags.get(unshiftedExecuteFlags(tmp));
hasTxn = (tmp & HAS_TXN) != 0;
hasExecuteAt = (tmp & HAS_EXECUTE_AT) != 0;
hasExecuteAtEpoch = (tmp & HAS_EXECUTE_AT_EPOCH) != 0;
@@ -219,6 +220,17 @@ public class ReadDataSerializer implements
IVersionedSerializer<ReadData>
}
}
+ private static int shiftedExecuteFlags(ExecuteFlags flags)
+ {
+ Invariants.require(flags.bits() <= 0x7);
+ return ((flags.bits() & 0x3) << 3) | ((flags.bits() & ~0x3) << 6);
+ }
+
+ private static int unshiftedExecuteFlags(int flags)
+ {
+ return ((flags & 0x18) >> 3) | ((flags & 0x100) >> 6);
+ }
+
@Override
public long serializedSize(ReadData read, Version version)
{
@@ -228,12 +240,13 @@ public class ReadDataSerializer implements
IVersionedSerializer<ReadData>
boolean hasExecuteAtEpoch = (read.executeAt() != null ?
read.executeAt() : read.txnId).epoch() != read.executeAtEpoch;
long size = VIntCoding.computeUnsignedVIntSize(type.ordinal()
- | (read.flags.bits() <<
3)
+ |
shiftedExecuteFlags(read.flags)
| (hasTxn ? HAS_TXN : 0)
| (hasExecuteAt ?
HAS_EXECUTE_AT : 0)
| (hasExecuteAtEpoch ?
HAS_EXECUTE_AT_EPOCH : 0))
+ CommandSerializers.txnId.serializedSize(read.txnId)
+ KeySerializers.participants.serializedSize(read.scope);
+
if (hasTxn)
size +=
CommandSerializers.partialTxn.serializedSize(read.partialTxn(), version);
if (hasExecuteAt)
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
index 5c78547432..ff47950f7a 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordMetricsTest.java
@@ -49,6 +49,7 @@ import org.apache.cassandra.metrics.DefaultNameFactory;
import org.apache.cassandra.metrics.RatioGaugeSet;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.accord.AccordExecutor;
import org.apache.cassandra.service.accord.AccordService;
import
org.apache.cassandra.service.accord.exceptions.AccordReadPreemptedException;
import
org.apache.cassandra.service.accord.exceptions.AccordWritePreemptedException;
@@ -61,7 +62,6 @@ import static java.lang.String.format;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
-
public class AccordMetricsTest extends AccordTestBase
{
private static final Logger logger =
LoggerFactory.getLogger(AccordMetricsTest.class);
@@ -332,7 +332,6 @@ public class AccordMetricsTest extends AccordTestBase
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_STABLE_LATENCY)).isLessThanOrEqualTo(stable);
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_PREAPPLY_LATENCY)).isEqualTo(executions);
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_APPLY_LATENCY)).isEqualTo(applications);
-
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_APPLY_DURATION)).isEqualTo(scope.equals("rw")
? applications : 0);
assertThat(metric.apply(AccordReplicaMetrics.REPLICA_DEPENDENCIES)).isEqualTo(executions);
// Verify that replica metrics are published to the appropriate
virtual table:
@@ -357,6 +356,7 @@ public class AccordMetricsTest extends AccordTestBase
Map<Integer, Map<String, Long>> metrics = new HashMap<>();
for (int i = 0; i < SHARED_CLUSTER.size(); i++)
{
+ SHARED_CLUSTER.get(i + 1).runOnInstance(() ->
AccordExecutor.HISTOGRAMS.refresh());
Map<String, Long> map = SHARED_CLUSTER.get(i +
1).metrics().getCounters(name ->
name.startsWith("org.apache.cassandra.metrics.Accord") ||
(name.startsWith("org.apache.cassandra.metrics.ClientRequest") &&
(name.endsWith("AccordRead") || name.endsWith("AccordWrite"))));
SHARED_CLUSTER.get(i + 1).metrics().getGauges(name ->
name.startsWith("org.apache.cassandra.metrics.Accord"))
.forEach((key, value) ->
map.put(key, ((Number)value).longValue()));
diff --git
a/test/unit/org/apache/cassandra/metrics/LogLinearDecayingHistogramTest.java
b/test/unit/org/apache/cassandra/metrics/LogLinearDecayingHistogramTest.java
new file mode 100644
index 0000000000..31ca6fbb54
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/LogLinearDecayingHistogramTest.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Snapshot;
+import
org.apache.cassandra.metrics.LogLinearDecayingHistograms.LogLinearDecayingHistogram;
+
+import static org.junit.Assert.assertEquals;
+
+public class LogLinearDecayingHistogramTest
+{
+ public static final Logger logger =
LoggerFactory.getLogger(LogLinearDecayingHistogramTest.class);
+
+ private static final double DOUBLE_ASSERT_DELTA = 0;
+ private static final long ONE_SECOND = TimeUnit.NANOSECONDS.toSeconds(1);
+ private static final long ONE_HOUR = TimeUnit.HOURS.toSeconds(1);
+ private static final long[] TIMES = new long[] { 0, ONE_SECOND - 1,
ONE_SECOND, ONE_HOUR - 1, ONE_HOUR };
+
+ @Test
+ public void testMinMax()
+ {
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+ histogram.increment(16, time);
+ Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+ assertEquals(16, snapshot.getMin());
+ assertEquals(20, snapshot.getMax());
+ }
+ }
+
+ @Test
+ public void testMean()
+ {
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+ for (int i = 0; i < 40; i++)
+ histogram.increment(0, time);
+ for (int i = 0; i < 20; i++)
+ histogram.increment(1, time);
+ for (int i = 0; i < 10; i++)
+ histogram.increment(2, time);
+ assertEquals(40/70d, histogram.copyToSnapshot(time + 1).getMean(),
0.1D);
+ }
+ }
+
+ @Test
+ public void testStdDev()
+ {
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+ for (int i = 0; i < 20; i++)
+ histogram.increment(10, time);
+ for (int i = 0; i < 40; i++)
+ histogram.increment(20, time);
+ for (int i = 0; i < 20; i++)
+ histogram.increment(30, time);
+
+ Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+ assertEquals(20.0D, snapshot.getMean(), 2.0D);
+ assertEquals(7.07D, snapshot.getStdDev(), 2.0D);
+ }
+ }
+
+ @Test
+ public void testPercentile()
+ {
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+ // percentile of empty histogram is 0
+ assertEquals(0D, histogram.copyToSnapshot(0).getValue(0.99),
DOUBLE_ASSERT_DELTA);
+ assertEquals(0D,
histogram.copyToSnapshot(ONE_SECOND).getValue(0.99), DOUBLE_ASSERT_DELTA);
+ assertEquals(0D,
histogram.copyToSnapshot(ONE_HOUR).getValue(0.99), DOUBLE_ASSERT_DELTA);
+ }
+
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+ histogram.increment(1, time);
+ // percentile of a histogram with one element should be that
element
+ assertEquals(1D, histogram.copyToSnapshot(time +
1).getValue(0.99), DOUBLE_ASSERT_DELTA);
+ }
+
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+ histogram.increment(1, time);
+ histogram.increment(10, time);
+ assertEquals(10D, histogram.copyToSnapshot(time +
1).getValue(0.99), DOUBLE_ASSERT_DELTA);
+ }
+
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+
+ histogram.increment(1, time);
+ histogram.increment(2, time);
+ histogram.increment(3, time);
+ histogram.increment(4, time);
+ histogram.increment(5, time);
+
+ Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+ assertEquals(1, snapshot.getValue(0.00), DOUBLE_ASSERT_DELTA);
+ assertEquals(3, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+ assertEquals(3, snapshot.getValue(0.60), DOUBLE_ASSERT_DELTA);
+ assertEquals(5, snapshot.getValue(1.00), DOUBLE_ASSERT_DELTA);
+ }
+
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+
+ for (int i = 11; i <= 20; i++)
+ histogram.increment(i, time);
+
+ // Right now the histogram looks like:
+ // 10 12 14 16 20
+ // 1 2 2 4 1
+ // %: 10 30 50 90 100
+ Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+ assertEquals(10, snapshot.getValue(0.01), DOUBLE_ASSERT_DELTA);
+ assertEquals(10, snapshot.getValue(0.10), DOUBLE_ASSERT_DELTA);
+
+ assertEquals(11, snapshot.getValue(0.11), DOUBLE_ASSERT_DELTA);
+ assertEquals(11, snapshot.getValue(0.20), DOUBLE_ASSERT_DELTA);
+ assertEquals(12, snapshot.getValue(0.21), DOUBLE_ASSERT_DELTA);
+ assertEquals(12, snapshot.getValue(0.30), DOUBLE_ASSERT_DELTA);
+
+ assertEquals(13, snapshot.getValue(0.31), DOUBLE_ASSERT_DELTA);
+ assertEquals(13, snapshot.getValue(0.40), DOUBLE_ASSERT_DELTA);
+ assertEquals(14, snapshot.getValue(0.41), DOUBLE_ASSERT_DELTA);
+ assertEquals(14, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+ assertEquals(15, snapshot.getValue(0.51), DOUBLE_ASSERT_DELTA);
+ assertEquals(15, snapshot.getValue(0.70), DOUBLE_ASSERT_DELTA);
+ assertEquals(16, snapshot.getValue(0.71), DOUBLE_ASSERT_DELTA);
+ assertEquals(16, snapshot.getValue(0.90), DOUBLE_ASSERT_DELTA);
+ assertEquals(20, snapshot.getValue(0.91), DOUBLE_ASSERT_DELTA);
+ }
+
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+ histogram.increment(0, time);
+ histogram.increment(0, time);
+ histogram.increment(1, time);
+
+ Snapshot snapshot = histogram.copyToSnapshot(time + 1);
+ assertEquals(0, snapshot.getValue(0.5), DOUBLE_ASSERT_DELTA);
+ assertEquals(1, snapshot.getValue(0.99), DOUBLE_ASSERT_DELTA);
+ }
+ }
+
+ @Test
+ public void testSize()
+ {
+ for (long time : TIMES)
+ {
+ LogLinearDecayingHistogram histogram = new
LogLinearDecayingHistograms().allocate(1);
+ histogram.increment(42, time);
+ histogram.increment(42, time);
+ assertEquals(2, histogram.copyToSnapshot(time + 1).size());
+ }
+ }
+}
diff --git a/test/unit/org/apache/cassandra/metrics/LogLinearHistogramTest.java
b/test/unit/org/apache/cassandra/metrics/LogLinearHistogramTest.java
new file mode 100644
index 0000000000..c7954e2704
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/LogLinearHistogramTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.utils.RandomTestRunner;
+import com.codahale.metrics.Snapshot;
+import static org.junit.Assert.assertEquals;
+
+public class LogLinearHistogramTest
+{
+ public static final Logger logger =
LoggerFactory.getLogger(LogLinearHistogramTest.class);
+
+ private static final double DOUBLE_ASSERT_DELTA = 0;
+
+ @Test
+ public void testMinMax()
+ {
+ LogLinearHistogram histogram = new LogLinearHistogram(1);
+ histogram.increment(16);
+ Snapshot snapshot = histogram.copyToSnapshot();
+ assertEquals(16, snapshot.getMin());
+ assertEquals(20, snapshot.getMax());
+ }
+
+ @Test
+ public void testMean()
+ {
+ LogLinearHistogram histogram = new LogLinearHistogram(1);
+ for (int i = 0; i < 40; i++)
+ histogram.increment(0);
+ for (int i = 0; i < 20; i++)
+ histogram.increment(1);
+ for (int i = 0; i < 10; i++)
+ histogram.increment(2);
+ assertEquals(40/70d, histogram.copyToSnapshot().getMean(), 0.1D);
+ }
+
+ @Test
+ public void testStdDev()
+ {
+ LogLinearHistogram histogram = new LogLinearHistogram(1);
+ for (int i = 0; i < 20; i++)
+ histogram.increment(10);
+ for (int i = 0; i < 40; i++)
+ histogram.increment(20);
+ for (int i = 0; i < 20; i++)
+ histogram.increment(30);
+
+ Snapshot snapshot = histogram.copyToSnapshot();
+ assertEquals(20.0D, snapshot.getMean(), 2.0D);
+ assertEquals(7.07D, snapshot.getStdDev(), 2.0D);
+ }
+
+ @Test
+ public void testPercentile()
+ {
+ {
+ LogLinearHistogram histogram = new LogLinearHistogram(1);
+ // percentile of empty histogram is 0
+ assertEquals(0D, histogram.copyToSnapshot().getValue(0.99),
DOUBLE_ASSERT_DELTA);
+
+ histogram.increment(1);
+ // percentile of a histogram with one element should be that
element
+ assertEquals(1D, histogram.copyToSnapshot().getValue(0.99),
DOUBLE_ASSERT_DELTA);
+
+ histogram.increment(10);
+ assertEquals(10D, histogram.copyToSnapshot().getValue(0.99),
DOUBLE_ASSERT_DELTA);
+ }
+
+ {
+ LogLinearHistogram histogram = new LogLinearHistogram(1);
+
+ histogram.increment(1);
+ histogram.increment(2);
+ histogram.increment(3);
+ histogram.increment(4);
+ histogram.increment(5);
+
+ Snapshot snapshot = histogram.copyToSnapshot();
+ assertEquals(1, snapshot.getValue(0.00), DOUBLE_ASSERT_DELTA);
+ assertEquals(3, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+ assertEquals(3, snapshot.getValue(0.60), DOUBLE_ASSERT_DELTA);
+ assertEquals(5, snapshot.getValue(1.00), DOUBLE_ASSERT_DELTA);
+ }
+
+ {
+ LogLinearHistogram histogram = new LogLinearHistogram(1);
+
+ for (int i = 11; i <= 20; i++)
+ histogram.increment(i);
+
+ // Right now the histogram looks like:
+ // 10 12 14 16 20
+ // 1 2 2 4 1
+ // %: 10 30 50 90 100
+ Snapshot snapshot = histogram.copyToSnapshot();
+ assertEquals(10, snapshot.getValue(0.01), DOUBLE_ASSERT_DELTA);
+ assertEquals(10, snapshot.getValue(0.10), DOUBLE_ASSERT_DELTA);
+
+ assertEquals(11, snapshot.getValue(0.11), DOUBLE_ASSERT_DELTA);
+ assertEquals(11, snapshot.getValue(0.20), DOUBLE_ASSERT_DELTA);
+ assertEquals(12, snapshot.getValue(0.21), DOUBLE_ASSERT_DELTA);
+ assertEquals(12, snapshot.getValue(0.30), DOUBLE_ASSERT_DELTA);
+
+ assertEquals(13, snapshot.getValue(0.31), DOUBLE_ASSERT_DELTA);
+ assertEquals(13, snapshot.getValue(0.40), DOUBLE_ASSERT_DELTA);
+ assertEquals(14, snapshot.getValue(0.41), DOUBLE_ASSERT_DELTA);
+ assertEquals(14, snapshot.getValue(0.50), DOUBLE_ASSERT_DELTA);
+ assertEquals(15, snapshot.getValue(0.51), DOUBLE_ASSERT_DELTA);
+ assertEquals(15, snapshot.getValue(0.70), DOUBLE_ASSERT_DELTA);
+ assertEquals(16, snapshot.getValue(0.71), DOUBLE_ASSERT_DELTA);
+ assertEquals(16, snapshot.getValue(0.90), DOUBLE_ASSERT_DELTA);
+ assertEquals(20, snapshot.getValue(0.91), DOUBLE_ASSERT_DELTA);
+ }
+ {
+ LogLinearHistogram histogram = new LogLinearHistogram(1);
+ histogram.increment(0);
+ histogram.increment(0);
+ histogram.increment(1);
+
+ Snapshot snapshot = histogram.copyToSnapshot();
+ assertEquals(0, snapshot.getValue(0.5), DOUBLE_ASSERT_DELTA);
+ assertEquals(1, snapshot.getValue(0.99), DOUBLE_ASSERT_DELTA);
+ }
+ }
+
+ @Test
+ public void testSize()
+ {
+ LogLinearHistogram histogram = new LogLinearHistogram(1);
+ histogram.increment(42);
+ histogram.increment(42);
+ assertEquals(2, histogram.copyToSnapshot().size());
+ }
+
+ @Test
+ public void testRandomizedMinMaxProperties()
+ {
+ RandomTestRunner.test().check(random -> {
+ for (int trial = 0; trial < 100; trial++)
+ {
+ int maxValue = 500;
+ LogLinearHistogram histogram = new
LogLinearHistogram(maxValue);
+ List<Long> values = new ArrayList<>();
+
+ int numValues = 10 + random.nextInt(100);
+ for (int i = 0; i < numValues; i++)
+ {
+ long value = random.nextInt(maxValue);
+ histogram.increment(value);
+ values.add(value);
+ }
+
+ Snapshot snapshot = histogram.copyToSnapshot();
+ long expectedMin = Collections.min(values);
+ long expectedMax = Collections.max(values);
+
+ assertEquals("" + trial,
LogLinearHistogram.invertIndex(LogLinearHistogram.index(expectedMin)),
+ snapshot.getMin());
+ assertEquals(LogLinearHistogram.invertIndex(1 +
LogLinearHistogram.index(expectedMax)),
+ snapshot.getMax());
+ }
+ });
+ }
+
+ @Test
+ public void testRandomizedIncrementDecrementConsistency()
+ {
+ RandomTestRunner.test().check(random -> {
+ for (int trial = 0; trial < 50; trial++)
+ {
+ int maxValue = 500;
+ LogLinearHistogram histogram = new
LogLinearHistogram(maxValue);
+ List<Long> values = new ArrayList<>();
+
+ // Add random values
+ int numValues = 20 + random.nextInt(80);
+ for (int i = 0; i < numValues; i++)
+ {
+ long value = random.nextInt(5000);
+ histogram.increment(value);
+ values.add(value);
+ }
+
+ Snapshot beforeSnapshot = histogram.copyToSnapshot();
+
+ // Remove and re-add some values
+ int numToModify = 1 + random.nextInt(Math.min(10,
values.size()));
+ for (int i = 0; i < numToModify; i++)
+ {
+ int idx = random.nextInt(values.size());
+ long oldValue = values.get(idx);
+ long newValue = random.nextInt(5000);
+
+ histogram.decrement(oldValue);
+ histogram.increment(newValue);
+ values.set(idx, newValue);
+ }
+
+ Snapshot afterSnapshot = histogram.copyToSnapshot();
+
+ assertEquals(beforeSnapshot.size(), afterSnapshot.size());
+
+ long expectedMin = Collections.min(values);
+ long expectedMax = Collections.max(values);
+
assertEquals(LogLinearHistogram.invertIndex(LogLinearHistogram.index(expectedMin)),
+ afterSnapshot.getMin());
+ assertEquals(LogLinearHistogram.invertIndex(1 +
LogLinearHistogram.index(expectedMax)),
+ afterSnapshot.getMax());
+ }
+ });
+ }
+
+ @Test
+ public void testRandomizedReplaceConsistency()
+ {
+ RandomTestRunner.test().check(random -> {
+ for (int trial = 0; trial < 100; trial++)
+ {
+ int maxValue = 500;
+ LogLinearHistogram histogram = new
LogLinearHistogram(maxValue);
+ List<Long> values = new ArrayList<>();
+
+ int numValues = 20 + random.nextInt(80);
+ for (int i = 0; i < numValues; i++)
+ {
+ long value = random.nextInt(maxValue);
+ histogram.increment(value);
+ values.add(value);
+ }
+
+ Snapshot beforeSnapshot = histogram.copyToSnapshot();
+ int numToReplace = 1 + random.nextInt(Math.min(10,
values.size()));
+ for (int i = 0; i < numToReplace; i++)
+ {
+ int idx = random.nextInt(values.size());
+ long oldValue = values.get(idx);
+ long newValue = random.nextInt(maxValue);
+
+ histogram.replace(oldValue, newValue);
+ values.set(idx, newValue);
+ }
+
+ Snapshot afterSnapshot = histogram.copyToSnapshot();
+ assertEquals(beforeSnapshot.size(), afterSnapshot.size());
+
+ long expectedMin = Collections.min(values);
+ long expectedMax = Collections.max(values);
+
assertEquals(LogLinearHistogram.invertIndex(LogLinearHistogram.index(expectedMin)),
+ afterSnapshot.getMin());
+ assertEquals(LogLinearHistogram.invertIndex(1 +
LogLinearHistogram.index(expectedMax)),
+ afterSnapshot.getMax());
+ }
+ });
+ }
+
+ @Test
+ public void testRandomizedAutoGrowth()
+ {
+ RandomTestRunner.test().check(random -> {
+ for (int trial = 0; trial < 20; trial++)
+ {
+ int maxValue = 500;
+ LogLinearHistogram histogram = new
LogLinearHistogram(maxValue);
+ List<Long> values = new ArrayList<>();
+
+ int numValues = 50;
+ for (int i = 0; i < numValues; i++)
+ {
+ long maxRange = (long) Math.pow(2, i / 5.0);
+ long value = random.nextInt((int) Math.min(maxRange,
Integer.MAX_VALUE));
+ histogram.increment(value);
+ values.add(value);
+ }
+
+ Snapshot snapshot = histogram.copyToSnapshot();
+
+ assertEquals(numValues, snapshot.size());
+
+ long expectedMin = Collections.min(values);
+ long expectedMax = Collections.max(values);
+
assertEquals(LogLinearHistogram.invertIndex(LogLinearHistogram.index(expectedMin)),
+ snapshot.getMin());
+ assertEquals(LogLinearHistogram.invertIndex(1 +
LogLinearHistogram.index(expectedMax)),
+ snapshot.getMax());
+ }
+ });
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
index ac322403d1..f43ddb66e4 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java
@@ -69,7 +69,6 @@ import static
org.apache.cassandra.service.accord.AccordTestUtils.txnId;
public class AccordCommandTest
{
-
static final AtomicLong clock = new AtomicLong(0);
private static final Node.Id ID1 = new Node.Id(1);
private static final Node.Id ID2 = new Node.Id(2);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]