This is an automated email from the ASF dual-hosted git repository.

smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c64ff69  Emit a metric for number of local read and write calls
c64ff69 is described below

commit c64ff69bd982e288fd9f19697cee01514ab5f838
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Fri Dec 3 21:19:18 2021 +0100

    Emit a metric for number of local read and write calls
    
    patch by Damien Stevenson; reviewed by Stefan Miklosovic and Brandon 
Williams for CASSANDRA-10023
---
 CHANGES.txt                                        |   1 +
 .../cassandra/metrics/ClientRequestMetrics.java    |   6 +
 .../org/apache/cassandra/service/StorageProxy.java |  42 ++++
 .../service/reads/AbstractReadExecutor.java        |  10 +
 .../metrics/ClientRequestMetricsTest.java          | 221 +++++++++++++++++++++
 5 files changed, 280 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index d60f541..1d11bbc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.1
+ * Emit a metric for number of local read and write calls 
  * Add non-blocking mode for CDC writes (CASSANDRA-17001)
  * Add guardrails framework (CASSANDRA-17147)
  * Harden resource management on SSTable components to prevent future leaks 
(CASSANDRA-17174)
diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java 
b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
index 19bc6d6..4080870 100644
--- a/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestMetrics.java
@@ -37,6 +37,8 @@ public class ClientRequestMetrics extends LatencyMetrics
     public final Meter aborts;
     public final Meter tombstoneAborts;
     public final Meter readSizeAborts;
+    public final Meter localRequests;
+    public final Meter remoteRequests;
 
     public ClientRequestMetrics(String scope)
     {
@@ -48,6 +50,8 @@ public class ClientRequestMetrics extends LatencyMetrics
         aborts = Metrics.meter(factory.createMetricName("Aborts"));
         tombstoneAborts = 
Metrics.meter(factory.createMetricName("TombstoneAborts"));
         readSizeAborts = 
Metrics.meter(factory.createMetricName("ReadSizeAborts"));
+        localRequests = 
Metrics.meter(factory.createMetricName("LocalRequests"));
+        remoteRequests = 
Metrics.meter(factory.createMetricName("RemoteRequests"));
     }
 
     public void markAbort(Throwable cause)
@@ -74,5 +78,7 @@ public class ClientRequestMetrics extends LatencyMetrics
         Metrics.remove(factory.createMetricName("Aborts"));
         Metrics.remove(factory.createMetricName("TombstoneAborts"));
         Metrics.remove(factory.createMetricName("ReadSizeAborts"));
+        Metrics.remove(factory.createMetricName("LocalRequests"));
+        Metrics.remove(factory.createMetricName("RemoteRequests"));
     }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 3883062..c3c0c88 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -643,10 +643,14 @@ public class StorageProxy implements StorageProxyMBean
     {
         PrepareCallback callback = new 
PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), 
replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), 
queryStartNanoTime);
         Message<Commit> message = Message.out(PAXOS_PREPARE_REQ, toPrepare);
+
+        boolean hasLocalRequest = false;
+
         for (Replica replica: replicaPlan.contacts())
         {
             if (replica.isSelf())
             {
+                hasLocalRequest = true;
                 PAXOS_PREPARE_REQ.stage.execute(() -> {
                     try
                     {
@@ -663,6 +667,12 @@ public class StorageProxy implements StorageProxyMBean
                 MessagingService.instance().sendWithCallback(message, 
replica.endpoint(), callback);
             }
         }
+
+        if (hasLocalRequest)
+            writeMetrics.localRequests.mark();
+        else
+            writeMetrics.remoteRequests.mark();
+
         callback.await();
         return callback;
     }
@@ -795,6 +805,17 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
+    private static boolean hasLocalMutation(IMutation mutation)
+    {
+        return 
canDoLocalRequest(StorageService.instance.getNaturalEndpointsWithPort(mutation.getKeyspaceName(),
+                                                                               
      mutation.key().getKey()));
+    }
+
+    private static boolean canDoLocalRequest(List<String> endpoints)
+    {
+        return 
endpoints.contains(FBUtilities.getBroadcastAddressAndPort().getHostAddressAndPort());
+    }
+
     /**
      * Use this method to have these Mutations applied
      * across all replicas. This method will take care
@@ -820,6 +841,11 @@ public class StorageProxy implements StorageProxyMBean
         {
             for (IMutation mutation : mutations)
             {
+                if (hasLocalMutation(mutation))
+                    writeMetrics.localRequests.mark();
+                else
+                    writeMetrics.remoteRequests.mark();
+
                 if (mutation instanceof CounterMutation)
                     
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, 
queryStartNanoTime));
                 else
@@ -973,6 +999,11 @@ public class StorageProxy implements StorageProxyMBean
                 // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
                 for (Mutation mutation : mutations)
                 {
+                    if (hasLocalMutation(mutation))
+                        writeMetrics.localRequests.mark();
+                    else
+                        writeMetrics.remoteRequests.mark();
+
                     String keyspaceName = mutation.getKeyspaceName();
                     Token tk = mutation.key().getToken();
                     AbstractReplicationStrategy replicationStrategy = 
Keyspace.open(keyspaceName).getReplicationStrategy();
@@ -1139,6 +1170,11 @@ public class StorageProxy implements StorageProxyMBean
             // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
             {
+                if (hasLocalMutation(mutation))
+                    writeMetrics.localRequests.mark();
+                else
+                    writeMetrics.remoteRequests.mark();
+
                 WriteResponseHandlerWrapper wrapper = 
wrapBatchResponseHandler(mutation,
                                                                                
consistency_level,
                                                                                
batchConsistencyLevel,
@@ -1976,6 +2012,12 @@ public class StorageProxy implements StorageProxyMBean
         for (int i=0; i<cmdCount; i++)
         {
             reads[i] = AbstractReadExecutor.getReadExecutor(commands.get(i), 
consistencyLevel, queryStartNanoTime);
+
+            if (canDoLocalRequest(reads[i].getContactedReplicas())) {
+                readMetrics.localRequests.mark();
+            } else {
+                readMetrics.remoteRequests.mark();
+            }
         }
 
         // sends a data request to the closest replica, and a digest request 
to the others. If we have a speculating
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index fd1b372..13627c8 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -17,6 +17,9 @@
  */
 package org.apache.cassandra.service.reads;
 
+import java.util.List;
+import java.util.stream.Collectors;
+
 import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -207,6 +210,13 @@ public abstract class AbstractReadExecutor
             return new SpeculatingReadExecutor(cfs, command, replicaPlan, 
queryStartNanoTime);
     }
 
+    public List<String> getContactedReplicas() {
+        return replicaPlan().contacts()
+                            .stream()
+                            .map(r -> r.endpoint().getHostAddress(true))
+                            .collect(Collectors.toList());
+    }
+
     /**
      *  Returns true if speculation should occur and if it should then block 
until it is time to
      *  send the speculative reads
diff --git 
a/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java 
b/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java
new file mode 100644
index 0000000..e4f9646
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+
+import static com.datastax.driver.core.Cluster.*;
+import static org.junit.Assert.assertEquals;
+
+public class ClientRequestMetricsTest extends SchemaLoader
+{
+    private static EmbeddedCassandraService cassandra;
+
+    private static Cluster cluster;
+    private static Session session;
+
+    private static String KEYSPACE = "junit";
+    private static final String TABLE = "clientrequestsmetricstest";
+
+    private static PreparedStatement writePS;
+    private static PreparedStatement paxosPS;
+    private static PreparedStatement readPS;
+    private static PreparedStatement readRangePS;
+
+    private static final ClientRequestMetrics readMetrics = 
ClientRequestsMetricsHolder.readMetrics;
+    private static final ClientWriteRequestMetrics writeMetrics = 
ClientRequestsMetricsHolder.writeMetrics;
+
+    @BeforeClass
+    public static void setup() throws ConfigurationException, IOException
+    {
+        Schema.instance.clear();
+
+        cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        cluster = 
builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        session = cluster.connect();
+
+        session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH 
replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
+        session.execute("USE " + KEYSPACE);
+        session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (id int, ord 
int, val text, PRIMARY KEY (id, ord));");
+
+        writePS = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + " 
(id, ord, val) VALUES (?, ?, ?);");
+        paxosPS = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + " 
(id, ord, val) VALUES (?, ?, ?) IF NOT EXISTS;");
+        readPS = session.prepare("SELECT * FROM " + KEYSPACE + '.' + TABLE + " 
WHERE id=?;");
+        readRangePS = session.prepare("SELECT * FROM " + KEYSPACE + '.' + 
TABLE + " WHERE id=? AND ord>=? AND ord <= ?;");
+    }
+
+    @Test
+    public void testWriteStatement()
+    {
+        ClientRequestMetricsContainer writeMetricsContainer = new 
ClientRequestMetricsContainer(writeMetrics);
+        ClientRequestMetricsContainer readMetricsContainer = new 
ClientRequestMetricsContainer(readMetrics);
+
+        executeWrite(1, 1, "aaaa");
+
+        assertEquals(1, writeMetricsContainer.compareLocalRequest());
+        assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+
+        assertEquals(0, readMetricsContainer.compareLocalRequest());
+        assertEquals(0, readMetricsContainer.compareRemoteRequest());
+    }
+
+    @Test
+    public void testPaxosStatement()
+    {
+        ClientRequestMetricsContainer writeMetricsContainer = new 
ClientRequestMetricsContainer(writeMetrics);
+        ClientRequestMetricsContainer readMetricsContainer = new 
ClientRequestMetricsContainer(readMetrics);
+
+        executePAXOS(2, 2, "aaaa");
+
+        assertEquals(1, readMetricsContainer.compareLocalRequest());
+        assertEquals(0, readMetricsContainer.compareRemoteRequest());
+
+        assertEquals(1, writeMetricsContainer.compareLocalRequest());
+        assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+    }
+
+    @Test
+    public void testBatchStatement()
+    {
+        ClientRequestMetricsContainer writeMetricsContainer = new 
ClientRequestMetricsContainer(writeMetrics);
+        ClientRequestMetricsContainer readMetricsContainer = new 
ClientRequestMetricsContainer(readMetrics);
+
+        executeBatch(10, 10);
+
+        assertEquals(0, readMetricsContainer.compareLocalRequest());
+        assertEquals(0, readMetricsContainer.compareRemoteRequest());
+
+        assertEquals(10, writeMetricsContainer.compareLocalRequest());
+        assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+    }
+
+    @Test
+    public void testReadStatement()
+    {
+        executeWrite(1, 1, "aaaa");
+
+        ClientRequestMetricsContainer writeMetricsContainer = new 
ClientRequestMetricsContainer(writeMetrics);
+        ClientRequestMetricsContainer readMetricsContainer = new 
ClientRequestMetricsContainer(readMetrics);
+
+        executeRead(1);
+
+        assertEquals(1, readMetricsContainer.compareLocalRequest());
+        assertEquals(0, readMetricsContainer.compareRemoteRequest());
+
+        assertEquals(0, writeMetricsContainer.compareLocalRequest());
+        assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+    }
+
+    @Test
+    public void testRangeStatement()
+    {
+        executeBatch(1, 100);
+
+        ClientRequestMetricsContainer writeMetricsContainer = new 
ClientRequestMetricsContainer(writeMetrics);
+        ClientRequestMetricsContainer readMetricsContainer = new 
ClientRequestMetricsContainer(readMetrics);
+
+        executeSlice(1, 0, 99);
+
+        assertEquals(1, readMetricsContainer.compareLocalRequest());
+        assertEquals(0, readMetricsContainer.compareRemoteRequest());
+
+        assertEquals(0, writeMetricsContainer.compareLocalRequest());
+        assertEquals(0, writeMetricsContainer.compareRemoteRequest());
+    }
+
+
+    private static class ClientRequestMetricsContainer
+    {
+        private ClientRequestMetrics metrics;
+
+        private long localRequests;
+        private long remoteRequests;
+
+        public ClientRequestMetricsContainer(ClientRequestMetrics 
clientRequestMetrics)
+        {
+            metrics = clientRequestMetrics;
+            localRequests = metrics.localRequests.getCount();
+            remoteRequests = metrics.remoteRequests.getCount();
+        }
+
+        public long compareLocalRequest()
+        {
+            return metrics.localRequests.getCount() - localRequests;
+        }
+
+        public long compareRemoteRequest()
+        {
+            return metrics.remoteRequests.getCount() - remoteRequests;
+        }
+    }
+
+    private void executeWrite(int id, int ord, String val)
+    {
+        BoundStatement bs = writePS.bind(id, ord, val);
+        session.execute(bs);
+    }
+
+    private void executePAXOS(int id, int ord, String val)
+    {
+        BoundStatement bs = paxosPS.bind(id, ord, val);
+        session.execute(bs);
+    }
+
+    private void executeBatch(int distinctPartitions, int numClusteringKeys)
+    {
+        BatchStatement batch = new BatchStatement();
+
+        for (int i = 0; i < distinctPartitions; i++)
+        {
+            for (int y = 0; y < numClusteringKeys; y++)
+            {
+                batch.add(writePS.bind(i, y, "aaaaaaaa"));
+            }
+        }
+        session.execute(batch);
+    }
+
+    private void executeRead(int id)
+    {
+        BoundStatement bs = readPS.bind(id);
+        session.execute(bs);
+    }
+
+    private void executeSlice(int id, int start_range, int end_range)
+    {
+        BoundStatement bs = readRangePS.bind(id, start_range, end_range);
+        session.execute(bs);
+    }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to