This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4892331eb4 Coordinator level metrics for read response and mutation 
row and column counts
4892331eb4 is described below

commit 4892331eb49f72d2e18432a2af56538d11c7c5d1
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Apr 27 15:08:11 2022 -0500

    Coordinator level metrics for read response and mutation row and column 
counts
    
    patch by Caleb Rackliffe; reviewed by Berenguer Blasi for CASSANDRA-18155
---
 CHANGES.txt                                        |   1 +
 src/java/org/apache/cassandra/config/Config.java   |   2 +
 .../cassandra/config/DatabaseDescriptor.java       |  10 +
 .../restrictions/ClusteringColumnRestrictions.java |   1 -
 .../cql3/restrictions/RestrictionSet.java          |  10 +-
 .../cql3/restrictions/RestrictionSetWrapper.java   |  12 +
 .../cassandra/cql3/restrictions/Restrictions.java  |  10 +
 .../cql3/restrictions/SingleRestriction.java       |   8 +
 .../cql3/restrictions/StatementRestrictions.java   |  48 ++
 .../cassandra/cql3/statements/BatchStatement.java  |   2 +
 .../cql3/statements/ModificationStatement.java     |   7 +
 .../cassandra/cql3/statements/SelectStatement.java |  36 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |  56 ++
 .../metrics/ClientRequestSizeMetrics.java          |  91 +++
 .../org/apache/cassandra/service/StorageProxy.java |  20 +-
 .../cassandra/service/StorageProxyMBean.java       |   3 +
 .../org/apache/cassandra/service/paxos/Paxos.java  |   4 +
 .../ClientRequestRowAndColumnMetricsTest.java      | 625 +++++++++++++++++++++
 18 files changed, 929 insertions(+), 17 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 2fc4f017b4..f48dc48bc6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Coordinator level metrics for read response and mutation row and column 
counts (CASSANDRA-18155)
  * Add CQL functions for dynamic data masking (CASSANDRA-17941)
  * Print friendly error when nodetool attempts to connect to uninitialized 
server (CASSANDRA-11537)
  * Use G1GC by default, and update default G1GC settings (CASSANDRA-18027)
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index c2992e6e4b..3aff5fe956 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -1064,6 +1064,8 @@ public class Config
 
     public volatile boolean sstable_read_rate_persistence_enabled = false;
 
+    public volatile boolean client_request_size_metrics_enabled = true;
+
     public volatile int max_top_size_partition_count = 10;
     public volatile int max_top_tombstone_partition_count = 10;
     public volatile DataStorageSpec.LongBytesBound min_tracked_partition_size 
= new DataStorageSpec.LongBytesBound("1MiB");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index af11e89b74..1cf36a064b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -4572,4 +4572,14 @@ public class DatabaseDescriptor
             conf.sstable_read_rate_persistence_enabled = enabled;
         }
     }
+
+    public static boolean getClientRequestSizeMetricsEnabled()
+    {
+        return conf.client_request_size_metrics_enabled;
+    }
+
+    public static void setClientRequestSizeMetricsEnabled(boolean enabled)
+    {
+        conf.client_request_size_metrics_enabled = enabled;
+    }
 }
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
 
b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
index c1d0c52ca7..146ec20557 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
@@ -228,5 +228,4 @@ final class ClusteringColumnRestrictions extends 
RestrictionSetWrapper
     {
         return restriction.isContains() || restriction.isLIKE() || index != 
restriction.getFirstColumn().position();
     }
-
 }
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java 
b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index 7a5d5b964b..344498ce0e 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -105,6 +105,15 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
         return new ArrayList<>(restrictions.keySet());
     }
 
+    /**
+     * @return a direct reference to the key set from {@link #restrictions} 
with no defenseive copying
+     */
+    @Override
+    public Collection<ColumnMetadata> getColumnDefinitions()
+    {
+        return restrictions.keySet();
+    }
+
     @Override
     public void addFunctionsTo(List<Function> functions)
     {
@@ -332,7 +341,6 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
     /**
      * {@code Iterator} decorator that removes duplicates in an ordered one.
      *
-     * @param iterator the decorated iterator
      * @param <E> the iterator element type.
      */
     private static final class DistinctIterator<E> extends AbstractIterator<E>
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java 
b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
index 9803adc459..8c89bca314 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.restrictions;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
@@ -57,6 +58,17 @@ class RestrictionSetWrapper implements Restrictions
         return restrictions.getColumnDefs();
     }
 
+    @Override
+    public Collection<ColumnMetadata> getColumnDefinitions()
+    {
+        return restrictions.getColumnDefinitions();
+    }
+
+    public RestrictionSet getRestrictionSet()
+    {
+        return restrictions;
+    }
+
     public void addFunctionsTo(List<Function> functions)
     {
         restrictions.addFunctionsTo(functions);
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
index 77e0dd92fe..0ad7530e76 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.cql3.restrictions;
 
+import java.util.Collection;
 import java.util.Set;
 
 import org.apache.cassandra.schema.ColumnMetadata;
@@ -34,6 +35,15 @@ public interface Restrictions extends Restriction
      */
     Set<Restriction> getRestrictions(ColumnMetadata columnDef);
 
+    /**
+     * This method exists in addition to {@link #getColumnDefs()} in case 
implementations want to
+     * provide columns definitions that are not strictly in position order.
+     */
+    default Collection<ColumnMetadata> getColumnDefinitions()
+    {
+        return getColumnDefs();
+    }
+
     /**
      * Checks if this <code>Restrictions</code> is empty or not.
      *
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/SingleRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/SingleRestriction.java
index 42b0b4ece7..989d5dabdf 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleRestriction.java
@@ -51,6 +51,14 @@ public interface SingleRestriction extends Restriction
         return false;
     }
 
+    /**
+     * @return <code>true</code> if this restriction is based on equality 
comparison rather than a range or negation
+     */
+    default boolean isEqualityBased()
+    {
+        return isEQ() || isIN() || isContains();
+    }
+
     public default boolean isNotNull()
     {
         return false;
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 371bc589c3..b74d8adc43 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -390,6 +390,54 @@ public final class StatementRestrictions
                            .anyMatch(p -> ((SingleRestriction) p).isEQ());
     }
 
+    /**
+     * This method determines whether a specified column is restricted on 
equality or something equivalent, like IN.
+     * It can be used in conjunction with the columns selected by a query to 
determine which of those columns is 
+     * already bound by the client (and from its perspective, not retrieved by 
the database).
+     *
+     * @param column a column from the same table these restrictions are 
against
+     *
+     * @return <code>true</code> if the given column is restricted on equality
+     */
+    public boolean isEqualityRestricted(ColumnMetadata column)
+    {
+        if (column.kind == ColumnMetadata.Kind.PARTITION_KEY)
+        {
+            if (partitionKeyRestrictions.hasOnlyEqualityRestrictions())
+                for (ColumnMetadata restricted : 
partitionKeyRestrictions.getColumnDefinitions())
+                    if (restricted.name.equals(column.name))
+                        return true;
+        }
+        else if (column.kind == ColumnMetadata.Kind.CLUSTERING)
+        {
+            if (hasClusteringColumnsRestrictions())
+            {
+                for (SingleRestriction restriction : 
clusteringColumnsRestrictions.getRestrictionSet())
+                {
+                    if (restriction.isEqualityBased())
+                    {
+                        if (restriction.isMultiColumn())
+                        {
+                            for (ColumnMetadata restricted : 
restriction.getColumnDefs())
+                                if (restricted.name.equals(column.name))
+                                    return true;
+                        }
+                        else if 
(restriction.getFirstColumn().name.equals(column.name))
+                            return true;
+                    }
+                }
+            }
+        }
+        else if (hasNonPrimaryKeyRestrictions())
+        {
+            for (SingleRestriction restriction : nonPrimaryKeyRestrictions)
+                if (restriction.getFirstColumn().name.equals(column.name) && 
restriction.isEqualityBased())
+                    return true;
+        }
+
+        return false;
+    }
+
     /**
      * Returns the <code>Restrictions</code> for the specified type of columns.
      *
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ab761ad9c1..b034db4482 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.exceptions.*;
 import org.apache.cassandra.metrics.BatchMetrics;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
 import org.apache.cassandra.service.*;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.messages.ResultMessage;
@@ -442,6 +443,7 @@ public class BatchStatement implements CQLStatement
 
         boolean mutateAtomic = (isLogged() && mutations.size() > 1);
         StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, 
queryStartNanoTime);
+        ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations);
     }
 
     private void updatePartitionsPerBatchMetrics(int updatedPartitions)
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index c69cf4945a..c40109c607 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.ViewMetadata;
 import org.apache.cassandra.cql3.*;
@@ -51,6 +52,7 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.service.StorageProxy;
@@ -513,8 +515,13 @@ public abstract class ModificationStatement implements 
CQLStatement.SingleKeyspa
                          options.getNowInSeconds(queryState),
                          queryStartNanoTime);
         if (!mutations.isEmpty())
+        {
             StorageProxy.mutateWithTriggers(mutations, cl, false, 
queryStartNanoTime);
 
+            if (!SchemaConstants.isSystemKeyspace(metadata.keyspace))
+                
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations);
+        }
+
         return null;
     }
 
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 75c12f065e..774fb3b77a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
 import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
@@ -260,21 +261,30 @@ public class SelectStatement implements 
CQLStatement.SingleKeyspaceCqlStatement
 
         if (options.isReadThresholdsEnabled())
             query.trackWarnings();
+        ResultMessage.Rows rows;
 
         if (aggregationSpec == null && (pageSize <= 0 || 
(query.limits().count() <= pageSize)))
-            return execute(query, options, state.getClientState(), selectors, 
nowInSec, userLimit, null, queryStartNanoTime);
-
-        QueryPager pager = getPager(query, options);
-
-        return execute(state,
-                       Pager.forDistributedQuery(pager, cl, 
state.getClientState()),
-                       options,
-                       selectors,
-                       pageSize,
-                       nowInSec,
-                       userLimit,
-                       aggregationSpec,
-                       queryStartNanoTime);
+        {
+            rows = execute(query, options, state.getClientState(), selectors, 
nowInSec, userLimit, null, queryStartNanoTime);
+        }
+        else
+        {
+            QueryPager pager = getPager(query, options);
+
+            rows = execute(state,
+                           Pager.forDistributedQuery(pager, cl, 
state.getClientState()),
+                           options,
+                           selectors,
+                           pageSize,
+                           nowInSec,
+                           userLimit,
+                           aggregationSpec,
+                           queryStartNanoTime);
+        }
+        if (!SchemaConstants.isSystemKeyspace(table.keyspace))
+            ClientRequestSizeMetrics.recordReadResponseMetrics(rows, 
restrictions, selection);
+
+        return rows;
     }
 
 
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 9f854c670a..7f2ca7d465 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -466,6 +466,62 @@ public class PartitionUpdate extends AbstractBTreePartition
         return marks;
     }
 
+    /**
+     *
+     * @return the estimated number of rows affected by this mutation 
+     */
+    public int affectedRowCount()
+    {
+        // If there is a partition-level deletion, we intend to delete at 
least one row.
+        if (!partitionLevelDeletion().isLive())
+            return 1;
+
+        int count = 0;
+
+        // Each range delete should correspond to at least one intended row 
deletion.
+        if (deletionInfo().hasRanges())
+            count += deletionInfo().rangeCount();
+
+        count += rowCount();
+
+        if (!staticRow().isEmpty())
+            count++;
+
+        return count;
+    }
+
+    /**
+     *
+     * @return the estimated total number of columns that either have live 
data or are covered by a delete
+     */
+    public int affectedColumnCount()
+    {
+        // If there is a partition-level deletion, we intend to delete at 
least the columns of one row.
+        if (!partitionLevelDeletion().isLive())
+            return metadata().regularAndStaticColumns().size();
+
+        int count = 0;
+
+        // Each range delete should correspond to at least one intended row 
deletion, and with it, its regular columns.
+        if (deletionInfo().hasRanges())
+            count += deletionInfo().rangeCount() * 
metadata().regularColumns().size();
+
+        for (Row row : this)
+        {
+            if (row.deletion().isLive())
+                // If the row is live, this will include simple tombstones as 
well as cells w/ actual data. 
+                count += row.columnCount();
+            else
+                // We have a row deletion, so account for the columns that 
might be deleted.
+                count += metadata().regularColumns().size();
+        }
+
+        if (!staticRow().isEmpty())
+            count += staticRow().columnCount();
+
+        return count;
+    }
+
     private static void addMarksForRow(Row row, List<CounterMark> marks)
     {
         for (Cell<?> cell : row.cells())
diff --git 
a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java 
b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
new file mode 100644
index 0000000000..4def87ea16
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.Collection;
+
+import com.codahale.metrics.Counter;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class ClientRequestSizeMetrics
+{
+    private static final String TYPE = "ClientRequestSize";
+
+    public static final Counter totalColumnsRead = 
Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ColumnsRead", null));
+    public static final Counter totalRowsRead = 
Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "RowsRead", null));
+    public static final Counter totalColumnsWritten = 
Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ColumnsWritten", 
null));
+    public static final Counter totalRowsWritten = 
Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "RowsWritten", null));
+
+    public static void recordReadResponseMetrics(ResultMessage.Rows rows, 
StatementRestrictions restrictions, Selection selection)
+    {
+        if (!DatabaseDescriptor.getClientRequestSizeMetricsEnabled())
+            return;
+
+        int rowCount = rows.result.size();
+        ClientRequestSizeMetrics.totalRowsRead.inc(rowCount);
+        
+        int nonRestrictedColumns = selection.getColumns().size();
+        
+        for (ColumnMetadata column : selection.getColumns())
+            if (restrictions.isEqualityRestricted(column))
+                nonRestrictedColumns--;
+            
+        long columnCount = (long) rowCount * nonRestrictedColumns;
+        ClientRequestSizeMetrics.totalColumnsRead.inc(columnCount);
+    }
+
+    public static void recordRowAndColumnCountMetrics(Collection<? extends 
IMutation> mutations)
+    {
+        if (!DatabaseDescriptor.getClientRequestSizeMetricsEnabled())
+            return;
+
+        int rowCount = 0;
+        int columnCount = 0;
+
+        for (IMutation mutation : mutations)
+        {
+            for (PartitionUpdate update : mutation.getPartitionUpdates())
+            {
+                columnCount += update.affectedColumnCount();
+                rowCount += update.affectedRowCount();
+            }
+        }
+
+        ClientRequestSizeMetrics.totalColumnsWritten.inc(columnCount);
+        ClientRequestSizeMetrics.totalRowsWritten.inc(rowCount);
+    }
+
+    public static void recordRowAndColumnCountMetrics(PartitionUpdate update)
+    {
+        if (!DatabaseDescriptor.getClientRequestSizeMetricsEnabled())
+            return;
+
+        
ClientRequestSizeMetrics.totalColumnsWritten.inc(update.affectedColumnCount());
+        
ClientRequestSizeMetrics.totalRowsWritten.inc(update.affectedRowCount());
+    }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 52039c1d9f..0aafa9539f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -108,6 +108,7 @@ import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.metrics.CASClientRequestMetrics;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
 import org.apache.cassandra.metrics.DenylistMetrics;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.metrics.StorageMetrics;
@@ -362,6 +363,9 @@ public class StorageProxy implements StorageProxyMBean
                 // Create the desired updates
                 PartitionUpdate updates = request.makeUpdates(current, 
clientState, ballot);
 
+                // Update the metrics before triggers potentially add 
mutations.
+                
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(updates);
+
                 long size = updates.dataSize();
                 casWriteMetrics.mutationSize.update(size);
                 
writeMetricsForLevel(consistencyForPaxos).mutationSize.update(size);
@@ -1714,8 +1718,8 @@ public class StorageProxy implements StorageProxyMBean
             // we build this ONLY to perform the sufficiency check that 
happens on construction
             ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, 
ReplicaPlans.writeAll);
 
-            // This host isn't a replica, so mark the request as being remote. 
If this host is a 
-            // replica, applyCounterMutationOnCoordinator() in the branch 
above will call performWrite(), and 
+            // This host isn't a replica, so mark the request as being remote. 
If this host is a
+            // replica, applyCounterMutationOnCoordinator() in the branch 
above will call performWrite(), and
             // there we'll mark a local request against the metrics.
             writeMetrics.remoteRequests.mark();
 
@@ -3207,4 +3211,16 @@ public class StorageProxy implements StorageProxyMBean
     {
         DatabaseDescriptor.setSStableReadRatePersistenceEnabled(enabled);
     }
+
+    @Override
+    public boolean getClientRequestSizeMetricsEnabled()
+    {
+        return DatabaseDescriptor.getClientRequestSizeMetricsEnabled();
+    }
+
+    @Override
+    public void setClientRequestSizeMetricsEnabled(boolean enabled)
+    {
+        DatabaseDescriptor.setClientRequestSizeMetricsEnabled(enabled);
+    }
 }
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java 
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 4a3adfd5bd..4ba41d6340 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -141,4 +141,7 @@ public interface StorageProxyMBean
 
     boolean getSStableReadRatePersistenceEnabled();
     void setSStableReadRatePersistenceEnabled(boolean enabled);
+
+    boolean getClientRequestSizeMetricsEnabled();
+    void setClientRequestSizeMetricsEnabled(boolean enabled);
 }
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java 
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 5bdb75c78f..c8bf3b9579 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.ReplicaLayout.ForTokenWrite;
 import org.apache.cassandra.locator.ReplicaPlan.ForRead;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -701,6 +702,9 @@ public class Paxos
                     // TODO "turn null updates into delete?" - what does this 
TODO even mean?
                     PartitionUpdate updates = request.makeUpdates(current, 
clientState, begin.ballot);
 
+                    // Update the metrics before triggers potentially add 
mutations.
+                    
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(updates);
+
                     // Apply triggers to cas updates. A consideration here is 
that
                     // triggers emit Mutations, and so a given trigger 
implementation
                     // may generate mutations for partitions other than the 
one this
diff --git 
a/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
 
b/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
new file mode 100644
index 0000000000..cb4b38897f
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.paxos.Paxos;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.BatchMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.apache.cassandra.transport.ProtocolVersion.CURRENT;
+
+public class ClientRequestRowAndColumnMetricsTest extends CQLTester
+{
+    @BeforeClass
+    public static void setup()
+    {
+        requireNetwork();
+    }
+
+    @Before
+    public void clearMetrics()
+    {
+        
ClientRequestSizeMetrics.totalRowsRead.dec(ClientRequestSizeMetrics.totalRowsRead.getCount());
+        
ClientRequestSizeMetrics.totalColumnsRead.dec(ClientRequestSizeMetrics.totalColumnsRead.getCount());
+        
ClientRequestSizeMetrics.totalRowsWritten.dec(ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        
ClientRequestSizeMetrics.totalColumnsWritten.dec(ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+        StorageProxy.instance.setClientRequestSizeMetricsEnabled(true);
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForMultiRowPartitionSelection() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1");
+
+        assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The partition key is provided by the client in the request, so we 
don't consider those columns as read.
+        assertEquals(4, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsWithOnlyPartitionKeyInSelect() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+        executeNet(CURRENT, "SELECT pk FROM %s WHERE pk = 1");
+
+        assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The partition key is provided by the client in the request, so we 
don't consider that column read.
+        assertEquals(0, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsWithOnlyClusteringKeyInSelect() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+        executeNet(CURRENT, "SELECT ck FROM %s WHERE pk = 1");
+
+        assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The partition key is provided by the client in the request, so we 
don't consider that column read.
+        assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldNotRecordReadMetricsWhenDisabled() throws Throwable
+    {
+        StorageProxy.instance.setClientRequestSizeMetricsEnabled(false);
+
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1");
+
+        assertEquals(0, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        assertEquals(0, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsWithSingleRowSelection() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck = 1");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // Both the partition key and clustering key are provided by the 
client in the request.
+        assertEquals(1, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsWithSliceRestriction() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck > 0");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The partition key is selected, but the restriction over the 
clustering key is a slice.
+        assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsWithINRestrictionSinglePartition() 
throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck IN (0, 1)");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The partition key and clustering key are both selected.
+        assertEquals(1, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsWithINRestrictionMultiplePartitions() 
throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 3)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (4, 5, 6)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk IN (1, 4)");
+
+        assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The partition key is selected, but there is no clustering 
restriction.
+        assertEquals(4, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForMultiColumnClusteringRestriction() 
throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, ck3 int, v 
int, PRIMARY KEY (pk, ck1, ck2, ck3))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck1, ck2, ck3, v) VALUES (1, 
2, 3, 4, 6)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck1 = 2 AND 
(ck2, ck3) = (3, 4)");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The entire primary key is selected, so only one value is actually 
read.
+        assertEquals(1, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForClusteringSlice() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, ck3 int, v 
int, PRIMARY KEY (pk, ck1, ck2, ck3))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck1, ck2, ck3, v) VALUES (1, 
2, 3, 4, 6)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck1 = 2 AND ck2 
= 3 AND ck3 >= 4");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The last clustering key element isn't bound, so count it as being 
read.
+        assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForTokenAndClusteringSlice() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, ck3 int, v 
int, PRIMARY KEY (pk, ck1, ck2, ck3))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck1, ck2, ck3, v) VALUES (1, 
2, 3, 4, 6)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE token(pk) = token(1) AND 
ck1 = 2 AND ck2 = 3 AND ck3 >= 4 ALLOW FILTERING");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // Last clustering is a slice, and the partition key is restricted on 
token, so count them as read.
+        assertEquals(3, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForSingleValueRow() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        assertEquals(1, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldNotRecordWriteMetricsWhenDisabled() throws Throwable
+    {
+        StorageProxy.instance.setClientRequestSizeMetricsEnabled(false);
+
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+
+        assertEquals(0, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        assertEquals(0, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForMultiValueRow() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int, v3 
int)");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, v1, v2, v3) VALUES (1, 2, 3, 
4)");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        assertEquals(3, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForBatch() throws Exception
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int)");
+
+        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+        {
+            client.connect(false);
+
+            String first = String.format("INSERT INTO %s.%s (pk, v1, v2) 
VALUES (1, 10, 100)", KEYSPACE, currentTable());
+            String second = String.format("INSERT INTO %s.%s (pk, v1, v2) 
VALUES (2, 20, 200)", KEYSPACE, currentTable());
+
+            List<List<ByteBuffer>> values = 
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+            BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED, 
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
+            client.execute(batch);
+
+            // The metrics should reflect the batch as a single write 
operation with multiple rows and columns.
+            assertEquals(2, 
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+            assertEquals(4, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+        }
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForCellDeletes() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int, v3 
int)");
+
+        executeNet(CURRENT, "DELETE v1, v2, v3 FROM %s WHERE pk = 1");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        assertEquals(3, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForCellNulls() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int, v3 
int)");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, v1, v2, v3) VALUES (1, null, 
null, null)");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        assertEquals(3, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForSingleStaticInsert() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int, 
v2 int, PRIMARY KEY (pk, ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v0, v1, v2) VALUES (0, 1, 
2, 3, 4)");
+
+        assertEquals(2, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        assertEquals(3, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForBatchedStaticInserts() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int, 
v2 int, PRIMARY KEY (pk, ck))");
+
+        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+        {
+            client.connect(false);
+
+            String first = String.format("INSERT INTO %s.%s (pk, ck, v0, v1, 
v2) VALUES (0, 1, 2, 3, 4)", KEYSPACE, currentTable());
+            String second = String.format("INSERT INTO %s.%s (pk, ck, v0, v1, 
v2) VALUES (0, 2, 3, 5, 6)", KEYSPACE, currentTable());
+
+            List<List<ByteBuffer>> values = 
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+            BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED, 
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
+            client.execute(batch);
+
+            // Two normal rows and the single static row:
+            assertEquals(3, 
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+            // Two normal columns per insert, and then one columns for the 
static row:
+            assertEquals(5, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+        }
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForRowDelete() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int, 
v2 int, PRIMARY KEY (pk, ck))");
+
+        executeNet(CURRENT, "DELETE FROM %s WHERE pk = 1 AND ck = 1");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        // The columns metric should account for all regular columns.
+        assertEquals(2, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForRangeDelete() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int, 
v2 int, PRIMARY KEY (pk, ck))");
+
+        executeNet(CURRENT, "DELETE FROM %s WHERE pk = 1 AND ck > 1");
+
+        // The range delete is intended to delete at least one row, but that 
is only a lower bound.
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        // The columns metric should account for all regular columns.
+        assertEquals(2, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForPartitionDelete() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int, 
v2 int, PRIMARY KEY (pk, ck))");
+
+        executeNet(CURRENT, "DELETE FROM %s WHERE pk = 1");
+
+        // A partition deletion intends to delete at least one row.
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+        // If we delete one row, we intended to delete all its regular and 
static columns.
+        assertEquals(3, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForIntraRowBatch() throws Exception
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v1 int, v2 int, PRIMARY 
KEY (pk, ck))");
+
+        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+        {
+            client.connect(false);
+
+            String first = String.format("INSERT INTO %s.%s (pk, ck, v1, v2) 
VALUES (1, 2, 3, 4)", KEYSPACE, currentTable());
+            String second = String.format("DELETE FROM %s.%s WHERE pk = 1 AND 
ck > 1", KEYSPACE, currentTable());
+
+            List<List<ByteBuffer>> values = 
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+            BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED, 
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
+            client.execute(batch);
+
+            // Both operations affect the same row, but writes and deletes are 
distinct.
+            assertEquals(2, 
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+            assertEquals(4, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+        }
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForIfNotExistsV1() throws Exception
+    {
+        Paxos.setPaxosVariant(Config.PaxosVariant.v1);
+        shouldRecordWriteMetricsForIfNotExists();
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForIfNotExistsV2() throws Exception
+    {
+        Paxos.setPaxosVariant(Config.PaxosVariant.v2);
+        shouldRecordWriteMetricsForIfNotExists();
+    }
+
+    public void shouldRecordWriteMetricsForIfNotExists() throws Exception
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int, v3 
int)");
+
+        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+        {
+            client.connect(false);
+            client.execute(new QueryMessage(String.format("INSERT INTO %s.%s 
(pk, v1, v2, v3) VALUES (1, 2, 3, 4) IF NOT EXISTS", KEYSPACE, currentTable()), 
QueryOptions.DEFAULT));
+
+            assertEquals(1, 
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+            assertEquals(3, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+            // We read internally, but don't reflect that in our read metrics.
+            assertEquals(0, ClientRequestSizeMetrics.totalRowsRead.getCount());
+            assertEquals(0, 
ClientRequestSizeMetrics.totalColumnsRead.getCount());
+        }
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForCASV1() throws Exception
+    {
+        Paxos.setPaxosVariant(Config.PaxosVariant.v1);
+        shouldRecordWriteMetricsForCAS();
+    }
+
+    @Test
+    public void shouldRecordWriteMetricsForCASV2() throws Exception
+    {
+        Paxos.setPaxosVariant(Config.PaxosVariant.v2);
+        shouldRecordWriteMetricsForCAS();
+    }
+
+    public void shouldRecordWriteMetricsForCAS() throws Exception
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int)");
+
+        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+        {
+            client.connect(false);
+            client.execute(new QueryMessage(String.format("INSERT INTO %s.%s 
(pk, v1, v2) VALUES (1, 2, 3)", KEYSPACE, currentTable()), 
QueryOptions.DEFAULT));
+
+            assertEquals(1, 
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+            assertEquals(2, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+            client.execute(new QueryMessage(String.format("UPDATE %s.%s SET v2 
= 4 WHERE pk = 1 IF v1 = 2", KEYSPACE, currentTable()), QueryOptions.DEFAULT));
+
+            assertEquals(3, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+            // We read internally, but don't reflect that in our read metrics.
+            assertEquals(0, ClientRequestSizeMetrics.totalRowsRead.getCount());
+            assertEquals(0, 
ClientRequestSizeMetrics.totalColumnsRead.getCount());
+        }
+    }
+
+    @Test
+    public void shouldNotRecordWriteMetricsForFailedCASV1() throws Exception
+    {
+        Paxos.setPaxosVariant(Config.PaxosVariant.v1);
+        shouldNotRecordWriteMetricsForFailedCAS();
+    }
+
+    @Test
+    public void shouldNotRecordWriteMetricsForFailedCASV2() throws Exception
+    {
+        Paxos.setPaxosVariant(Config.PaxosVariant.v2);
+        shouldNotRecordWriteMetricsForFailedCAS();
+    }
+
+    public void shouldNotRecordWriteMetricsForFailedCAS() throws Exception
+    {
+        createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int)");
+
+        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+        {
+            client.connect(false);
+            client.execute(new QueryMessage(String.format("INSERT INTO %s.%s 
(pk, v1, v2) VALUES (1, 2, 3)", KEYSPACE, currentTable()), 
QueryOptions.DEFAULT));
+
+            assertEquals(1, 
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+            assertEquals(2, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+            client.execute(new QueryMessage(String.format("UPDATE %s.%s SET v2 
= 4 WHERE pk = 1 IF v1 = 4", KEYSPACE, currentTable()), QueryOptions.DEFAULT));
+
+            // We didn't actually write anything, so don't reflect a write 
against the metrics.
+            assertEquals(2, 
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+            // Don't reflect in our read metrics the result returned to the 
client by the failed CAS write. 
+            assertEquals(0, ClientRequestSizeMetrics.totalRowsRead.getCount());
+            assertEquals(0, 
ClientRequestSizeMetrics.totalColumnsRead.getCount());
+        }
+    }
+
+    @Test
+    public void shouldRecordReadMetricsOnSerialReadV1() throws Exception
+    {
+        Paxos.setPaxosVariant(Config.PaxosVariant.v1);
+        shouldRecordReadMetricsOnSerialRead();
+    }
+
+    @Test
+    public void shouldRecordReadMetricsOnSerialReadV2() throws Exception
+    {
+        Paxos.setPaxosVariant(Config.PaxosVariant.v2);
+        shouldRecordReadMetricsOnSerialRead();
+    }
+
+    public void shouldRecordReadMetricsOnSerialRead() throws Exception
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        try (SimpleClient client = new 
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+        {
+            client.connect(false);
+            client.execute(new QueryMessage(String.format("INSERT INTO %s.%s 
(pk, ck, v) VALUES (1, 1, 1)", KEYSPACE, currentTable()), 
QueryOptions.DEFAULT));
+
+            QueryMessage query = new QueryMessage(String.format("SELECT * FROM 
%s.%s WHERE pk = 1 AND ck = 1", KEYSPACE, currentTable()),
+                                                  
QueryOptions.forInternalCalls(ConsistencyLevel.SERIAL, 
Collections.emptyList()));
+            client.execute(query);
+
+            assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+
+            // Both the partition key and clustering key are provided by the 
client in the request.
+            assertEquals(1, 
ClientRequestSizeMetrics.totalColumnsRead.getCount());
+        }
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForGlobalIndexQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+        createIndex("CREATE INDEX on %s (v)");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, 2)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE v = 1");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The index search term is provided by the client in the request, so 
we don't consider that column read.
+        assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForPartitionRestrictedIndexQuery() 
throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+        createIndex("CREATE INDEX on %s (v)");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND v = 1");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The partition key and index search term are provided by the client, 
so we don't consider those columns read.
+        assertEquals(1, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForClusteringKeyIndexQuery() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+        createIndex("CREATE INDEX on %s (ck)");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE ck = 2");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The index search term is provided by the client in the request, so 
we don't consider that column read.
+        assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForFilteringQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, 2)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE v = 1 ALLOW FILTERING");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The filtering term is provided by the client in the request, so we 
don't consider that column read.
+        assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForRangeFilteringQuery() throws 
Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, 2)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE v > 1 ALLOW FILTERING");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The value column is restricted over a range, not bound to a 
particular value.
+        assertEquals(3, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForINFilteringQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk, 
ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, 2)");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE v IN (1) ALLOW FILTERING");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The filtering term is provided by the client in the request, so we 
don't consider that column read.
+        assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+
+    @Test
+    public void shouldRecordReadMetricsForContainsQuery() throws Throwable
+    {
+        createTable("CREATE TABLE %s (pk int, ck int, v set<int>, PRIMARY KEY 
(pk, ck))");
+
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, {1, 2, 
3} )");
+        executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, {4, 5, 
6})");
+        executeNet(CURRENT, "SELECT * FROM %s WHERE v CONTAINS 1 ALLOW 
FILTERING");
+
+        assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+        // The filtering term is provided by the client in the request, so we 
don't consider that column read.
+        assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to