This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new c64ff69 Emit a metric for number of local read and write calls
c64ff69 is described below
commit c64ff69bd982e288fd9f19697cee01514ab5f838
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Fri Dec 3 21:19:18 2021 +0100
Emit a metric for number of local read and write calls
patch by Damien Stevenson; reviewed by Stefan Miklosovic and Brandon
Williams for CASSANDRA-10023
---
CHANGES.txt | 1 +
.../cassandra/metrics/ClientRequestMetrics.java | 6 +
.../org/apache/cassandra/service/StorageProxy.java | 42 ++++
.../service/reads/AbstractReadExecutor.java | 10 +
.../metrics/ClientRequestMetricsTest.java | 221 +++++++++++++++++++++
5 files changed, 280 insertions(+)
diff --git a/CHANGES.txt b/CHANGES.txt
index d60f541..1d11bbc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Emit a metric for number of local read and write calls
* Add non-blocking mode for CDC writes (CASSANDRA-17001)
* Add guardrails framework (CASSANDRA-17147)
* Harden resource management on SSTable components to prevent future leaks
(CASSANDRA-17174)
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
index 19bc6d6..4080870 100644
--- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
@@ -37,6 +37,8 @@ public class ClientRequestMetrics extends LatencyMetrics
public final Meter aborts;
public final Meter tombstoneAborts;
public final Meter readSizeAborts;
+ public final Meter localRequests;
+ public final Meter remoteRequests;
public ClientRequestMetrics(String scope)
{
@@ -48,6 +50,8 @@ public class ClientRequestMetrics extends LatencyMetrics
aborts = Metrics.meter(factory.createMetricName("Aborts"));
tombstoneAborts =
Metrics.meter(factory.createMetricName("TombstoneAborts"));
readSizeAborts =
Metrics.meter(factory.createMetricName("ReadSizeAborts"));
+ localRequests =
Metrics.meter(factory.createMetricName("LocalRequests"));
+ remoteRequests =
Metrics.meter(factory.createMetricName("RemoteRequests"));
}
public void markAbort(Throwable cause)
@@ -74,5 +78,7 @@ public class ClientRequestMetrics extends LatencyMetrics
Metrics.remove(factory.createMetricName("Aborts"));
Metrics.remove(factory.createMetricName("TombstoneAborts"));
Metrics.remove(factory.createMetricName("ReadSizeAborts"));
+ Metrics.remove(factory.createMetricName("LocalRequests"));
+ Metrics.remove(factory.createMetricName("RemoteRequests"));
}
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3883062..c3c0c88 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -643,10 +643,14 @@ public class StorageProxy implements StorageProxyMBean
{
PrepareCallback callback = new
PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(),
replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(),
queryStartNanoTime);
Message<Commit> message = Message.out(PAXOS_PREPARE_REQ, toPrepare);
+
+ boolean hasLocalRequest = false;
+
for (Replica replica: replicaPlan.contacts())
{
if (replica.isSelf())
{
+ hasLocalRequest = true;
PAXOS_PREPARE_REQ.stage.execute(() -> {
try
{
@@ -663,6 +667,12 @@ public class StorageProxy implements StorageProxyMBean
MessagingService.instance().sendWithCallback(message,
replica.endpoint(), callback);
}
}
+
+ if (hasLocalRequest)
+ writeMetrics.localRequests.mark();
+ else
+ writeMetrics.remoteRequests.mark();
+
callback.await();
return callback;
}
@@ -795,6 +805,17 @@ public class StorageProxy implements StorageProxyMBean
});
}
+ private static boolean hasLocalMutation(IMutation mutation)
+ {
+ return
canDoLocalRequest(StorageService.instance.getNaturalEndpointsWithPort(mutation.getKeyspaceName(),
+
mutation.key().getKey()));
+ }
+
+ private static boolean canDoLocalRequest(List<String> endpoints)
+ {
+ return
endpoints.contains(FBUtilities.getBroadcastAddressAndPort().getHostAddressAndPort());
+ }
+
/**
* Use this method to have these Mutations applied
* across all replicas. This method will take care
@@ -820,6 +841,11 @@ public class StorageProxy implements StorageProxyMBean
{
for (IMutation mutation : mutations)
{
+ if (hasLocalMutation(mutation))
+ writeMetrics.localRequests.mark();
+ else
+ writeMetrics.remoteRequests.mark();
+
if (mutation instanceof CounterMutation)
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter,
queryStartNanoTime));
else
@@ -973,6 +999,11 @@ public class StorageProxy implements StorageProxyMBean
// add a handler for each mutation - includes checking
availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
{
+ if (hasLocalMutation(mutation))
+ writeMetrics.localRequests.mark();
+ else
+ writeMetrics.remoteRequests.mark();
+
String keyspaceName = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
AbstractReplicationStrategy replicationStrategy =
Keyspace.open(keyspaceName).getReplicationStrategy();
@@ -1139,6 +1170,11 @@ public class StorageProxy implements StorageProxyMBean
// add a handler for each mutation - includes checking
availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
{
+ if (hasLocalMutation(mutation))
+ writeMetrics.localRequests.mark();
+ else
+ writeMetrics.remoteRequests.mark();
+
WriteResponseHandlerWrapper wrapper =
wrapBatchResponseHandler(mutation,
consistency_level,
batchConsistencyLevel,
@@ -1976,6 +2012,12 @@ public class StorageProxy implements StorageProxyMBean
for (int i=0; i<cmdCount; i++)
{
reads[i] = AbstractReadExecutor.getReadExecutor(commands.get(i),
consistencyLevel, queryStartNanoTime);
+
+ if (canDoLocalRequest(reads[i].getContactedReplicas())) {
+ readMetrics.localRequests.mark();
+ } else {
+ readMetrics.remoteRequests.mark();
+ }
}
// sends a data request to the closest replica, and a digest request
to the others. If we have a speculating
diff --git
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index fd1b372..13627c8 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -17,6 +17,9 @@
*/
package org.apache.cassandra.service.reads;
+import java.util.List;
+import java.util.stream.Collectors;
+
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -207,6 +210,13 @@ public abstract class AbstractReadExecutor
return new SpeculatingReadExecutor(cfs, command, replicaPlan,
queryStartNanoTime);
}
+ public List<String> getContactedReplicas() {
+ return replicaPlan().contacts()
+ .stream()
+ .map(r -> r.endpoint().getHostAddress(true))
+ .collect(Collectors.toList());
+ }
+
/**
* Returns true if speculation should occur and if it should then block
until it is time to
* send the speculative reads
diff --git
a/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java
b/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java
new file mode 100644
index 0000000..e4f9646
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+
+import static com.datastax.driver.core.Cluster.*;
+import static org.junit.Assert.assertEquals;
+
+public class ClientRequestMetricsTest extends SchemaLoader
+{
+ private static EmbeddedCassandraService cassandra;
+
+ private static Cluster cluster;
+ private static Session session;
+
+ private static String KEYSPACE = "junit";
+ private static final String TABLE = "clientrequestsmetricstest";
+
+ private static PreparedStatement writePS;
+ private static PreparedStatement paxosPS;
+ private static PreparedStatement readPS;
+ private static PreparedStatement readRangePS;
+
+ private static final ClientRequestMetrics readMetrics =
ClientRequestsMetricsHolder.readMetrics;
+ private static final ClientWriteRequestMetrics writeMetrics =
ClientRequestsMetricsHolder.writeMetrics;
+
+ @BeforeClass
+ public static void setup() throws ConfigurationException, IOException
+ {
+ Schema.instance.clear();
+
+ cassandra = new EmbeddedCassandraService();
+ cassandra.start();
+
+ cluster =
builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+ session = cluster.connect();
+
+ session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH
replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
+ session.execute("USE " + KEYSPACE);
+ session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (id int, ord
int, val text, PRIMARY KEY (id, ord));");
+
+ writePS = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + "
(id, ord, val) VALUES (?, ?, ?);");
+ paxosPS = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + "
(id, ord, val) VALUES (?, ?, ?) IF NOT EXISTS;");
+ readPS = session.prepare("SELECT * FROM " + KEYSPACE + '.' + TABLE + "
WHERE id=?;");
+ readRangePS = session.prepare("SELECT * FROM " + KEYSPACE + '.' +
TABLE + " WHERE id=? AND ord>=? AND ord <= ?;");
+ }
+
+ @Test
+ public void testWriteStatement()
+ {
+ ClientRequestMetricsContainer writeMetricsContainer = new
ClientRequestMetricsContainer(writeMetrics);
+ ClientRequestMetricsContainer readMetricsContainer = new
ClientRequestMetricsContainer(readMetrics);
+
+ executeWrite(1, 1, "aaaa");
+
+ assertEquals(1, writeMetricsContainer.compareLocalRequest());
+ assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+
+ assertEquals(0, readMetricsContainer.compareLocalRequest());
+ assertEquals(0, readMetricsContainer.compareRemoteRequest());
+ }
+
+ @Test
+ public void testPaxosStatement()
+ {
+ ClientRequestMetricsContainer writeMetricsContainer = new
ClientRequestMetricsContainer(writeMetrics);
+ ClientRequestMetricsContainer readMetricsContainer = new
ClientRequestMetricsContainer(readMetrics);
+
+ executePAXOS(2, 2, "aaaa");
+
+ assertEquals(1, readMetricsContainer.compareLocalRequest());
+ assertEquals(0, readMetricsContainer.compareRemoteRequest());
+
+ assertEquals(1, writeMetricsContainer.compareLocalRequest());
+ assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+ }
+
+ @Test
+ public void testBatchStatement()
+ {
+ ClientRequestMetricsContainer writeMetricsContainer = new
ClientRequestMetricsContainer(writeMetrics);
+ ClientRequestMetricsContainer readMetricsContainer = new
ClientRequestMetricsContainer(readMetrics);
+
+ executeBatch(10, 10);
+
+ assertEquals(0, readMetricsContainer.compareLocalRequest());
+ assertEquals(0, readMetricsContainer.compareRemoteRequest());
+
+ assertEquals(10, writeMetricsContainer.compareLocalRequest());
+ assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+ }
+
+ @Test
+ public void testReadStatement()
+ {
+ executeWrite(1, 1, "aaaa");
+
+ ClientRequestMetricsContainer writeMetricsContainer = new
ClientRequestMetricsContainer(writeMetrics);
+ ClientRequestMetricsContainer readMetricsContainer = new
ClientRequestMetricsContainer(readMetrics);
+
+ executeRead(1);
+
+ assertEquals(1, readMetricsContainer.compareLocalRequest());
+ assertEquals(0, readMetricsContainer.compareRemoteRequest());
+
+ assertEquals(0, writeMetricsContainer.compareLocalRequest());
+ assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+ }
+
+ @Test
+ public void testRangeStatement()
+ {
+ executeBatch(1, 100);
+
+ ClientRequestMetricsContainer writeMetricsContainer = new
ClientRequestMetricsContainer(writeMetrics);
+ ClientRequestMetricsContainer readMetricsContainer = new
ClientRequestMetricsContainer(readMetrics);
+
+ executeSlice(1, 0, 99);
+
+ assertEquals(1, readMetricsContainer.compareLocalRequest());
+ assertEquals(0, readMetricsContainer.compareRemoteRequest());
+
+ assertEquals(0, writeMetricsContainer.compareLocalRequest());
+ assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+ }
+
+
+ private static class ClientRequestMetricsContainer
+ {
+ private ClientRequestMetrics metrics;
+
+ private long localRequests;
+ private long remoteRequests;
+
+ public ClientRequestMetricsContainer(ClientRequestMetrics
clientRequestMetrics)
+ {
+ metrics = clientRequestMetrics;
+ localRequests = metrics.localRequests.getCount();
+ remoteRequests = metrics.remoteRequests.getCount();
+ }
+
+ public long compareLocalRequest()
+ {
+ return metrics.localRequests.getCount() - localRequests;
+ }
+
+ public long compareRemoteRequest()
+ {
+ return metrics.remoteRequests.getCount() - remoteRequests;
+ }
+ }
+
+ private void executeWrite(int id, int ord, String val)
+ {
+ BoundStatement bs = writePS.bind(id, ord, val);
+ session.execute(bs);
+ }
+
+ private void executePAXOS(int id, int ord, String val)
+ {
+ BoundStatement bs = paxosPS.bind(id, ord, val);
+ session.execute(bs);
+ }
+
+ private void executeBatch(int distinctPartitions, int numClusteringKeys)
+ {
+ BatchStatement batch = new BatchStatement();
+
+ for (int i = 0; i < distinctPartitions; i++)
+ {
+ for (int y = 0; y < numClusteringKeys; y++)
+ {
+ batch.add(writePS.bind(i, y, "aaaaaaaa"));
+ }
+ }
+ session.execute(batch);
+ }
+
+ private void executeRead(int id)
+ {
+ BoundStatement bs = readPS.bind(id);
+ session.execute(bs);
+ }
+
+ private void executeSlice(int id, int start_range, int end_range)
+ {
+ BoundStatement bs = readRangePS.bind(id, start_range, end_range);
+ session.execute(bs);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]