Repository: cassandra
Updated Branches:
  refs/heads/trunk 8d32d9100 -> d38694afe


Add coordinator write metric per CF

patch by Sumanth Pasupuleti; reviewed by Jay Zhuang and jasobrown for 
CASSANDRA-14232


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d38694af
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d38694af
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d38694af

Branch: refs/heads/trunk
Commit: d38694afe209215f8fc562ca384b82766147eadf
Parents: 8d32d91
Author: Sumanth Pasupuleti <sumanth.pasupuleti...@gmail.com>
Authored: Tue Mar 6 15:57:01 2018 -0800
Committer: Jason Brown <jasedbr...@gmail.com>
Committed: Wed Mar 28 12:55:49 2018 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   1 +
 doc/source/operating/metrics.rst                |   1 +
 .../apache/cassandra/metrics/TableMetrics.java  |   3 +
 .../apache/cassandra/service/StorageProxy.java  |  27 ++-
 .../cassandra/metrics/TableMetricsTest.java     | 176 +++++++++++++++++++
 6 files changed, 208 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index daafa0a..5763720 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Add coordinator write metric per CF (CASSANDRA-14232)
  * Fix scheduling of speculative retry threshold recalculation 
(CASSANDRA-14338)
  * Add support for hybrid MIN(), MAX() speculative retry policies 
(CASSANDRA-14293)
  * Correct and clarify SSLFactory.getSslContext method and call sites 
(CASSANDRA-14314)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index bcac4ea..f8e3ca6 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -68,6 +68,7 @@ New features
      See nodetool and fqltool help text for more information.
    - SSTableDump now supports the -l option to output each partition as it's 
own json object
      See CASSANDRA-13848 for more detail
+   - Metric for coordinator writes per table has been added. See 
CASSANDRA-14232
 
 Upgrading
 ---------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/doc/source/operating/metrics.rst
----------------------------------------------------------------------
diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst
index 345fc3e..325395c 100644
--- a/doc/source/operating/metrics.rst
+++ b/doc/source/operating/metrics.rst
@@ -95,6 +95,7 @@ ReadLatency                             Latency        Local 
read latency for th
 RangeLatency                            Latency        Local range scan 
latency for this table.
 WriteLatency                            Latency        Local write latency for 
this table.
 CoordinatorReadLatency                  Timer          Coordinator read 
latency for this table.
+CoordinatorWriteLatency                 Timer          Coordinator write 
latency for this table.
 CoordinatorScanLatency                  Timer          Coordinator range scan 
latency for this table.
 PendingFlushes                          Counter        Estimated number of 
flush tasks pending for this table.
 BytesFlushed                            Counter        Total number of bytes 
flushed since server [re]start.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/src/java/org/apache/cassandra/metrics/TableMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java 
b/src/java/org/apache/cassandra/metrics/TableMetrics.java
index 7ce2f16..d8cb18e 100644
--- a/src/java/org/apache/cassandra/metrics/TableMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java
@@ -184,6 +184,7 @@ public class TableMetrics
 
     public final Timer coordinatorReadLatency;
     public final Timer coordinatorScanLatency;
+    public final Timer coordinatorWriteLatency;
 
     /** Time spent waiting for free memtable space, either on- or off-heap */
     public final Histogram waitingOnFreeMemtableSpace;
@@ -791,6 +792,7 @@ public class TableMetrics
         colUpdateTimeDeltaHistogram = 
createTableHistogram("ColUpdateTimeDeltaHistogram", 
cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false);
         coordinatorReadLatency = 
Metrics.timer(factory.createMetricName("CoordinatorReadLatency"));
         coordinatorScanLatency = 
Metrics.timer(factory.createMetricName("CoordinatorScanLatency"));
+        coordinatorWriteLatency = 
Metrics.timer(factory.createMetricName("CoordinatorWriteLatency"));
         waitingOnFreeMemtableSpace = 
Metrics.histogram(factory.createMetricName("WaitingOnFreeMemtableSpace"), 
false);
 
         // We do not want to capture view mutation specific metrics for a view
@@ -880,6 +882,7 @@ public class TableMetrics
         Metrics.remove(factory.createMetricName("KeyCacheHitRate"), 
aliasFactory.createMetricName("KeyCacheHitRate"));
         Metrics.remove(factory.createMetricName("CoordinatorReadLatency"), 
aliasFactory.createMetricName("CoordinatorReadLatency"));
         Metrics.remove(factory.createMetricName("CoordinatorScanLatency"), 
aliasFactory.createMetricName("CoordinatorScanLatency"));
+        Metrics.remove(factory.createMetricName("CoordinatorWriteLatency"), 
aliasFactory.createMetricName("CoordinatorWriteLatency"));
         Metrics.remove(factory.createMetricName("WaitingOnFreeMemtableSpace"), 
aliasFactory.createMetricName("WaitingOnFreeMemtableSpace"));
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index a47c4c6..bacc3a8 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -757,6 +757,7 @@ public class StorageProxy implements StorageProxyMBean
             long latency = System.nanoTime() - startTime;
             writeMetrics.addNano(latency);
             writeMetricsMap.get(consistency_level).addNano(latency);
+            updateCoordinatorWriteLatencyTableMetric(mutations, latency);
         }
     }
 
@@ -1030,7 +1031,7 @@ public class StorageProxy implements StorageProxyMBean
             long latency = System.nanoTime() - startTime;
             writeMetrics.addNano(latency);
             writeMetricsMap.get(consistency_level).addNano(latency);
-
+            updateCoordinatorWriteLatencyTableMetric(mutations, latency);
         }
     }
 
@@ -1039,6 +1040,30 @@ public class StorageProxy implements StorageProxyMBean
         return replica.equals(FBUtilities.getBroadcastAddressAndPort());
     }
 
+    private static void updateCoordinatorWriteLatencyTableMetric(Collection<? 
extends IMutation> mutations, long latency)
+    {
+        if (null == mutations)
+        {
+            return;
+        }
+
+        try
+        {
+            //TODO: Avoid giving same latency number for each CF in each 
mutation in a given set of mutations
+            //We could potentially pass a callback into performWrite. And add 
callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints)
+            //However, Trade off between write metric per CF accuracy vs 
performance hit due to callbacks. Similar issue exists with 
CoordinatorReadLatency metric.
+            mutations.forEach(mutation -> {
+                mutation.getTableIds().forEach(tableId -> {
+                    
Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId).metric.coordinatorWriteLatency.update(latency,
 TimeUnit.NANOSECONDS);
+                });
+            });
+        }
+        catch (Exception ex)
+        {
+            logger.warn("Exception occurred updating coordinatorWriteLatency 
metric", ex);
+        }
+    }
+
     private static void syncWriteToBatchlog(Collection<Mutation> mutations, 
Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException
     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java 
b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
new file mode 100644
index 0000000..a3ae956
--- /dev/null
+++ b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.io.IOException;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Session;
+import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.service.EmbeddedCassandraService;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+@RunWith(OrderedJUnit4ClassRunner.class)
+public class TableMetricsTest extends SchemaLoader
+{
+
+    private static Session session;
+
+    private static final String KEYSPACE = "junit";
+    private static final String TABLE = "tablemetricstest";
+    private static final String COUNTER_TABLE = "tablemetricscountertest";
+
+    @BeforeClass()
+    public static void setup() throws ConfigurationException, IOException
+    {
+        Schema.instance.clear();
+
+        EmbeddedCassandraService cassandra = new EmbeddedCassandraService();
+        cassandra.start();
+
+        Cluster cluster = 
Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
+        session = cluster.connect();
+
+        session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH 
replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };", 
KEYSPACE));
+        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id_c 
counter, id int, val text, PRIMARY KEY(id, val));", KEYSPACE, COUNTER_TABLE));
+    }
+
+    private ColumnFamilyStore recreateTable()
+    {
+        session.execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, 
TABLE));
+        session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id 
int, val1 text, val2 text, PRIMARY KEY(id, val1));", KEYSPACE, TABLE));
+        return ColumnFamilyStore.getIfExists(KEYSPACE, TABLE);
+    }
+
+    private void executeBatch(boolean isLogged, int distinctPartitions, int 
statementsPerPartition)
+    {
+        BatchStatement.Type batchType;
+        PreparedStatement ps = session.prepare(String.format("INSERT INTO 
%s.%s (id, val1, val2) VALUES (?, ?, ?);", KEYSPACE, TABLE));
+
+        if (isLogged)
+        {
+            batchType = BatchStatement.Type.LOGGED;
+        }
+        else
+        {
+            batchType = BatchStatement.Type.UNLOGGED;
+        }
+
+        BatchStatement batch = new BatchStatement(batchType);
+
+        for (int i=0; i<distinctPartitions; i++)
+        {
+            for (int j=0; j<statementsPerPartition; j++)
+            {
+                batch.add(ps.bind(i, j + "a", "b"));
+            }
+        }
+
+        session.execute(batch);
+    }
+
+    @Test
+    public void testRegularStatementsExecuted()
+    {
+        ColumnFamilyStore cfs = recreateTable();
+        assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+
+        for (int i = 0; i < 10; i++)
+        {
+            session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) 
VALUES (%d, '%s', '%s')", KEYSPACE, TABLE, i, "val" + i, "val" + i));
+        }
+
+        assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+    }
+
+    @Test
+    public void testPreparedStatementsExecuted()
+    {
+        ColumnFamilyStore cfs = recreateTable();
+        PreparedStatement metricsStatement = 
session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, 
?)", KEYSPACE, TABLE));
+
+        assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+
+        for (int i = 0; i < 10; i++)
+        {
+            session.execute(metricsStatement.bind(i, "val" + i, "val" + i));
+        }
+
+        assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+    }
+
+    @Test
+    public void testLoggedPartitionsPerBatch()
+    {
+        ColumnFamilyStore cfs = recreateTable();
+        assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+
+        executeBatch(true, 10, 2);
+        assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount());
+
+        executeBatch(true, 20, 2);
+        assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+    }
+
+    @Test
+    public void testUnloggedPartitionsPerBatch()
+    {
+        ColumnFamilyStore cfs = recreateTable();
+        assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+
+        executeBatch(false, 5, 3);
+        assertEquals(5, cfs.metric.coordinatorWriteLatency.getCount());
+
+        executeBatch(false, 25, 2);
+        assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+
+    }
+
+    @Test
+    public void testCounterStatement()
+    {
+        ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, 
COUNTER_TABLE);
+        assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0);
+        session.execute(String.format("UPDATE %s.%s SET id_c = id_c + 1 WHERE 
id = 1 AND val = 'val1'", KEYSPACE, COUNTER_TABLE));
+        assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount());
+        assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0);
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to