This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4892331eb4 Coordinator level metrics for read response and mutation
row and column counts
4892331eb4 is described below
commit 4892331eb49f72d2e18432a2af56538d11c7c5d1
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Apr 27 15:08:11 2022 -0500
Coordinator level metrics for read response and mutation row and column
counts
patch by Caleb Rackliffe; reviewed by Berenguer Blasi for CASSANDRA-18155
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/config/Config.java | 2 +
.../cassandra/config/DatabaseDescriptor.java | 10 +
.../restrictions/ClusteringColumnRestrictions.java | 1 -
.../cql3/restrictions/RestrictionSet.java | 10 +-
.../cql3/restrictions/RestrictionSetWrapper.java | 12 +
.../cassandra/cql3/restrictions/Restrictions.java | 10 +
.../cql3/restrictions/SingleRestriction.java | 8 +
.../cql3/restrictions/StatementRestrictions.java | 48 ++
.../cassandra/cql3/statements/BatchStatement.java | 2 +
.../cql3/statements/ModificationStatement.java | 7 +
.../cassandra/cql3/statements/SelectStatement.java | 36 +-
.../cassandra/db/partitions/PartitionUpdate.java | 56 ++
.../metrics/ClientRequestSizeMetrics.java | 91 +++
.../org/apache/cassandra/service/StorageProxy.java | 20 +-
.../cassandra/service/StorageProxyMBean.java | 3 +
.../org/apache/cassandra/service/paxos/Paxos.java | 4 +
.../ClientRequestRowAndColumnMetricsTest.java | 625 +++++++++++++++++++++
18 files changed, 929 insertions(+), 17 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 2fc4f017b4..f48dc48bc6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Coordinator level metrics for read response and mutation row and column
counts (CASSANDRA-18155)
* Add CQL functions for dynamic data masking (CASSANDRA-17941)
* Print friendly error when nodetool attempts to connect to uninitialized
server (CASSANDRA-11537)
* Use G1GC by default, and update default G1GC settings (CASSANDRA-18027)
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index c2992e6e4b..3aff5fe956 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -1064,6 +1064,8 @@ public class Config
public volatile boolean sstable_read_rate_persistence_enabled = false;
+ public volatile boolean client_request_size_metrics_enabled = true;
+
public volatile int max_top_size_partition_count = 10;
public volatile int max_top_tombstone_partition_count = 10;
public volatile DataStorageSpec.LongBytesBound min_tracked_partition_size
= new DataStorageSpec.LongBytesBound("1MiB");
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index af11e89b74..1cf36a064b 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -4572,4 +4572,14 @@ public class DatabaseDescriptor
conf.sstable_read_rate_persistence_enabled = enabled;
}
}
+
+ public static boolean getClientRequestSizeMetricsEnabled()
+ {
+ return conf.client_request_size_metrics_enabled;
+ }
+
+ public static void setClientRequestSizeMetricsEnabled(boolean enabled)
+ {
+ conf.client_request_size_metrics_enabled = enabled;
+ }
}
diff --git
a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
index c1d0c52ca7..146ec20557 100644
---
a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
+++
b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
@@ -228,5 +228,4 @@ final class ClusteringColumnRestrictions extends
RestrictionSetWrapper
{
return restriction.isContains() || restriction.isLIKE() || index !=
restriction.getFirstColumn().position();
}
-
}
diff --git
a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index 7a5d5b964b..344498ce0e 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -105,6 +105,15 @@ final class RestrictionSet implements Restrictions,
Iterable<SingleRestriction>
return new ArrayList<>(restrictions.keySet());
}
+ /**
+ * @return a direct reference to the key set from {@link #restrictions}
with no defenseive copying
+ */
+ @Override
+ public Collection<ColumnMetadata> getColumnDefinitions()
+ {
+ return restrictions.keySet();
+ }
+
@Override
public void addFunctionsTo(List<Function> functions)
{
@@ -332,7 +341,6 @@ final class RestrictionSet implements Restrictions,
Iterable<SingleRestriction>
/**
* {@code Iterator} decorator that removes duplicates in an ordered one.
*
- * @param iterator the decorated iterator
* @param <E> the iterator element type.
*/
private static final class DistinctIterator<E> extends AbstractIterator<E>
diff --git
a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
index 9803adc459..8c89bca314 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.cql3.restrictions;
+import java.util.Collection;
import java.util.List;
import java.util.Set;
@@ -57,6 +58,17 @@ class RestrictionSetWrapper implements Restrictions
return restrictions.getColumnDefs();
}
+ @Override
+ public Collection<ColumnMetadata> getColumnDefinitions()
+ {
+ return restrictions.getColumnDefinitions();
+ }
+
+ public RestrictionSet getRestrictionSet()
+ {
+ return restrictions;
+ }
+
public void addFunctionsTo(List<Function> functions)
{
restrictions.addFunctionsTo(functions);
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
index 77e0dd92fe..0ad7530e76 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restrictions.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.cql3.restrictions;
+import java.util.Collection;
import java.util.Set;
import org.apache.cassandra.schema.ColumnMetadata;
@@ -34,6 +35,15 @@ public interface Restrictions extends Restriction
*/
Set<Restriction> getRestrictions(ColumnMetadata columnDef);
+ /**
+ * This method exists in addition to {@link #getColumnDefs()} in case
implementations want to
+ * provide columns definitions that are not strictly in position order.
+ */
+ default Collection<ColumnMetadata> getColumnDefinitions()
+ {
+ return getColumnDefs();
+ }
+
/**
* Checks if this <code>Restrictions</code> is empty or not.
*
diff --git
a/src/java/org/apache/cassandra/cql3/restrictions/SingleRestriction.java
b/src/java/org/apache/cassandra/cql3/restrictions/SingleRestriction.java
index 42b0b4ece7..989d5dabdf 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/SingleRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleRestriction.java
@@ -51,6 +51,14 @@ public interface SingleRestriction extends Restriction
return false;
}
+ /**
+ * @return <code>true</code> if this restriction is based on equality
comparison rather than a range or negation
+ */
+ default boolean isEqualityBased()
+ {
+ return isEQ() || isIN() || isContains();
+ }
+
public default boolean isNotNull()
{
return false;
diff --git
a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 371bc589c3..b74d8adc43 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -390,6 +390,54 @@ public final class StatementRestrictions
.anyMatch(p -> ((SingleRestriction) p).isEQ());
}
+ /**
+ * This method determines whether a specified column is restricted on
equality or something equivalent, like IN.
+ * It can be used in conjunction with the columns selected by a query to
determine which of those columns is
+ * already bound by the client (and from its perspective, not retrieved by
the database).
+ *
+ * @param column a column from the same table these restrictions are
against
+ *
+ * @return <code>true</code> if the given column is restricted on equality
+ */
+ public boolean isEqualityRestricted(ColumnMetadata column)
+ {
+ if (column.kind == ColumnMetadata.Kind.PARTITION_KEY)
+ {
+ if (partitionKeyRestrictions.hasOnlyEqualityRestrictions())
+ for (ColumnMetadata restricted :
partitionKeyRestrictions.getColumnDefinitions())
+ if (restricted.name.equals(column.name))
+ return true;
+ }
+ else if (column.kind == ColumnMetadata.Kind.CLUSTERING)
+ {
+ if (hasClusteringColumnsRestrictions())
+ {
+ for (SingleRestriction restriction :
clusteringColumnsRestrictions.getRestrictionSet())
+ {
+ if (restriction.isEqualityBased())
+ {
+ if (restriction.isMultiColumn())
+ {
+ for (ColumnMetadata restricted :
restriction.getColumnDefs())
+ if (restricted.name.equals(column.name))
+ return true;
+ }
+ else if
(restriction.getFirstColumn().name.equals(column.name))
+ return true;
+ }
+ }
+ }
+ }
+ else if (hasNonPrimaryKeyRestrictions())
+ {
+ for (SingleRestriction restriction : nonPrimaryKeyRestrictions)
+ if (restriction.getFirstColumn().name.equals(column.name) &&
restriction.isEqualityBased())
+ return true;
+ }
+
+ return false;
+ }
+
/**
* Returns the <code>Restrictions</code> for the specified type of columns.
*
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index ab761ad9c1..b034db4482 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.metrics.BatchMetrics;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
import org.apache.cassandra.service.*;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.transport.messages.ResultMessage;
@@ -442,6 +443,7 @@ public class BatchStatement implements CQLStatement
boolean mutateAtomic = (isLogged() && mutations.size() > 1);
StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic,
queryStartNanoTime);
+ ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations);
}
private void updatePartitionsPerBatchMetrics(int updatedPartitions)
diff --git
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index c69cf4945a..c40109c607 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -33,6 +33,7 @@ import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.schema.ColumnMetadata;
import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.SchemaConstants;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.ViewMetadata;
import org.apache.cassandra.cql3.*;
@@ -51,6 +52,7 @@ import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
import org.apache.cassandra.service.ClientState;
import org.apache.cassandra.service.QueryState;
import org.apache.cassandra.service.StorageProxy;
@@ -513,8 +515,13 @@ public abstract class ModificationStatement implements
CQLStatement.SingleKeyspa
options.getNowInSeconds(queryState),
queryStartNanoTime);
if (!mutations.isEmpty())
+ {
StorageProxy.mutateWithTriggers(mutations, cl, false,
queryStartNanoTime);
+ if (!SchemaConstants.isSystemKeyspace(metadata.keyspace))
+
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(mutations);
+ }
+
return null;
}
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 75c12f065e..774fb3b77a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -60,6 +60,7 @@ import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.view.View;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.exceptions.*;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
import org.apache.cassandra.index.IndexRegistry;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.service.ClientState;
@@ -260,21 +261,30 @@ public class SelectStatement implements
CQLStatement.SingleKeyspaceCqlStatement
if (options.isReadThresholdsEnabled())
query.trackWarnings();
+ ResultMessage.Rows rows;
if (aggregationSpec == null && (pageSize <= 0 ||
(query.limits().count() <= pageSize)))
- return execute(query, options, state.getClientState(), selectors,
nowInSec, userLimit, null, queryStartNanoTime);
-
- QueryPager pager = getPager(query, options);
-
- return execute(state,
- Pager.forDistributedQuery(pager, cl,
state.getClientState()),
- options,
- selectors,
- pageSize,
- nowInSec,
- userLimit,
- aggregationSpec,
- queryStartNanoTime);
+ {
+ rows = execute(query, options, state.getClientState(), selectors,
nowInSec, userLimit, null, queryStartNanoTime);
+ }
+ else
+ {
+ QueryPager pager = getPager(query, options);
+
+ rows = execute(state,
+ Pager.forDistributedQuery(pager, cl,
state.getClientState()),
+ options,
+ selectors,
+ pageSize,
+ nowInSec,
+ userLimit,
+ aggregationSpec,
+ queryStartNanoTime);
+ }
+ if (!SchemaConstants.isSystemKeyspace(table.keyspace))
+ ClientRequestSizeMetrics.recordReadResponseMetrics(rows,
restrictions, selection);
+
+ return rows;
}
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 9f854c670a..7f2ca7d465 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -466,6 +466,62 @@ public class PartitionUpdate extends AbstractBTreePartition
return marks;
}
+ /**
+ *
+ * @return the estimated number of rows affected by this mutation
+ */
+ public int affectedRowCount()
+ {
+ // If there is a partition-level deletion, we intend to delete at
least one row.
+ if (!partitionLevelDeletion().isLive())
+ return 1;
+
+ int count = 0;
+
+ // Each range delete should correspond to at least one intended row
deletion.
+ if (deletionInfo().hasRanges())
+ count += deletionInfo().rangeCount();
+
+ count += rowCount();
+
+ if (!staticRow().isEmpty())
+ count++;
+
+ return count;
+ }
+
+ /**
+ *
+ * @return the estimated total number of columns that either have live
data or are covered by a delete
+ */
+ public int affectedColumnCount()
+ {
+ // If there is a partition-level deletion, we intend to delete at
least the columns of one row.
+ if (!partitionLevelDeletion().isLive())
+ return metadata().regularAndStaticColumns().size();
+
+ int count = 0;
+
+ // Each range delete should correspond to at least one intended row
deletion, and with it, its regular columns.
+ if (deletionInfo().hasRanges())
+ count += deletionInfo().rangeCount() *
metadata().regularColumns().size();
+
+ for (Row row : this)
+ {
+ if (row.deletion().isLive())
+ // If the row is live, this will include simple tombstones as
well as cells w/ actual data.
+ count += row.columnCount();
+ else
+ // We have a row deletion, so account for the columns that
might be deleted.
+ count += metadata().regularColumns().size();
+ }
+
+ if (!staticRow().isEmpty())
+ count += staticRow().columnCount();
+
+ return count;
+ }
+
private static void addMarksForRow(Row row, List<CounterMark> marks)
{
for (Cell<?> cell : row.cells())
diff --git
a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
new file mode 100644
index 0000000000..4def87ea16
--- /dev/null
+++ b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.util.Collection;
+
+import com.codahale.metrics.Counter;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.cql3.restrictions.StatementRestrictions;
+import org.apache.cassandra.cql3.selection.Selection;
+import org.apache.cassandra.db.IMutation;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.schema.ColumnMetadata;
+import org.apache.cassandra.transport.messages.ResultMessage;
+
+import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
+
+public class ClientRequestSizeMetrics
+{
+ private static final String TYPE = "ClientRequestSize";
+
+ public static final Counter totalColumnsRead =
Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ColumnsRead", null));
+ public static final Counter totalRowsRead =
Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "RowsRead", null));
+ public static final Counter totalColumnsWritten =
Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "ColumnsWritten",
null));
+ public static final Counter totalRowsWritten =
Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "RowsWritten", null));
+
+ public static void recordReadResponseMetrics(ResultMessage.Rows rows,
StatementRestrictions restrictions, Selection selection)
+ {
+ if (!DatabaseDescriptor.getClientRequestSizeMetricsEnabled())
+ return;
+
+ int rowCount = rows.result.size();
+ ClientRequestSizeMetrics.totalRowsRead.inc(rowCount);
+
+ int nonRestrictedColumns = selection.getColumns().size();
+
+ for (ColumnMetadata column : selection.getColumns())
+ if (restrictions.isEqualityRestricted(column))
+ nonRestrictedColumns--;
+
+ long columnCount = (long) rowCount * nonRestrictedColumns;
+ ClientRequestSizeMetrics.totalColumnsRead.inc(columnCount);
+ }
+
+ public static void recordRowAndColumnCountMetrics(Collection<? extends
IMutation> mutations)
+ {
+ if (!DatabaseDescriptor.getClientRequestSizeMetricsEnabled())
+ return;
+
+ int rowCount = 0;
+ int columnCount = 0;
+
+ for (IMutation mutation : mutations)
+ {
+ for (PartitionUpdate update : mutation.getPartitionUpdates())
+ {
+ columnCount += update.affectedColumnCount();
+ rowCount += update.affectedRowCount();
+ }
+ }
+
+ ClientRequestSizeMetrics.totalColumnsWritten.inc(columnCount);
+ ClientRequestSizeMetrics.totalRowsWritten.inc(rowCount);
+ }
+
+ public static void recordRowAndColumnCountMetrics(PartitionUpdate update)
+ {
+ if (!DatabaseDescriptor.getClientRequestSizeMetricsEnabled())
+ return;
+
+
ClientRequestSizeMetrics.totalColumnsWritten.inc(update.affectedColumnCount());
+
ClientRequestSizeMetrics.totalRowsWritten.inc(update.affectedRowCount());
+ }
+}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 52039c1d9f..0aafa9539f 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -108,6 +108,7 @@ import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.locator.Replicas;
import org.apache.cassandra.metrics.CASClientRequestMetrics;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
import org.apache.cassandra.metrics.DenylistMetrics;
import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.metrics.StorageMetrics;
@@ -362,6 +363,9 @@ public class StorageProxy implements StorageProxyMBean
// Create the desired updates
PartitionUpdate updates = request.makeUpdates(current,
clientState, ballot);
+ // Update the metrics before triggers potentially add
mutations.
+
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(updates);
+
long size = updates.dataSize();
casWriteMetrics.mutationSize.update(size);
writeMetricsForLevel(consistencyForPaxos).mutationSize.update(size);
@@ -1714,8 +1718,8 @@ public class StorageProxy implements StorageProxyMBean
// we build this ONLY to perform the sufficiency check that
happens on construction
ReplicaPlans.forWrite(keyspace, cm.consistency(), tk,
ReplicaPlans.writeAll);
- // This host isn't a replica, so mark the request as being remote.
If this host is a
- // replica, applyCounterMutationOnCoordinator() in the branch
above will call performWrite(), and
+ // This host isn't a replica, so mark the request as being remote.
If this host is a
+ // replica, applyCounterMutationOnCoordinator() in the branch
above will call performWrite(), and
// there we'll mark a local request against the metrics.
writeMetrics.remoteRequests.mark();
@@ -3207,4 +3211,16 @@ public class StorageProxy implements StorageProxyMBean
{
DatabaseDescriptor.setSStableReadRatePersistenceEnabled(enabled);
}
+
+ @Override
+ public boolean getClientRequestSizeMetricsEnabled()
+ {
+ return DatabaseDescriptor.getClientRequestSizeMetricsEnabled();
+ }
+
+ @Override
+ public void setClientRequestSizeMetricsEnabled(boolean enabled)
+ {
+ DatabaseDescriptor.setClientRequestSizeMetricsEnabled(enabled);
+ }
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
index 4a3adfd5bd..4ba41d6340 100644
--- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java
@@ -141,4 +141,7 @@ public interface StorageProxyMBean
boolean getSStableReadRatePersistenceEnabled();
void setSStableReadRatePersistenceEnabled(boolean enabled);
+
+ boolean getClientRequestSizeMetricsEnabled();
+ void setClientRequestSizeMetricsEnabled(boolean enabled);
}
diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java
b/src/java/org/apache/cassandra/service/paxos/Paxos.java
index 5bdb75c78f..c8bf3b9579 100644
--- a/src/java/org/apache/cassandra/service/paxos/Paxos.java
+++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.locator.Replica;
import org.apache.cassandra.locator.ReplicaLayout;
import org.apache.cassandra.locator.ReplicaLayout.ForTokenWrite;
import org.apache.cassandra.locator.ReplicaPlan.ForRead;
+import org.apache.cassandra.metrics.ClientRequestSizeMetrics;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -701,6 +702,9 @@ public class Paxos
// TODO "turn null updates into delete?" - what does this
TODO even mean?
PartitionUpdate updates = request.makeUpdates(current,
clientState, begin.ballot);
+ // Update the metrics before triggers potentially add
mutations.
+
ClientRequestSizeMetrics.recordRowAndColumnCountMetrics(updates);
+
// Apply triggers to cas updates. A consideration here is
that
// triggers emit Mutations, and so a given trigger
implementation
// may generate mutations for partitions other than the
one this
diff --git
a/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
b/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
new file mode 100644
index 0000000000..cb4b38897f
--- /dev/null
+++
b/test/unit/org/apache/cassandra/metrics/ClientRequestRowAndColumnMetricsTest.java
@@ -0,0 +1,625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.metrics;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.config.Config;
+import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.statements.BatchStatement;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.paxos.Paxos;
+import org.apache.cassandra.transport.SimpleClient;
+import org.apache.cassandra.transport.messages.BatchMessage;
+import org.apache.cassandra.transport.messages.QueryMessage;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.apache.cassandra.transport.ProtocolVersion.CURRENT;
+
+public class ClientRequestRowAndColumnMetricsTest extends CQLTester
+{
+ @BeforeClass
+ public static void setup()
+ {
+ requireNetwork();
+ }
+
+ @Before
+ public void clearMetrics()
+ {
+
ClientRequestSizeMetrics.totalRowsRead.dec(ClientRequestSizeMetrics.totalRowsRead.getCount());
+
ClientRequestSizeMetrics.totalColumnsRead.dec(ClientRequestSizeMetrics.totalColumnsRead.getCount());
+
ClientRequestSizeMetrics.totalRowsWritten.dec(ClientRequestSizeMetrics.totalRowsWritten.getCount());
+
ClientRequestSizeMetrics.totalColumnsWritten.dec(ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+ StorageProxy.instance.setClientRequestSizeMetricsEnabled(true);
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForMultiRowPartitionSelection() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1");
+
+ assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The partition key is provided by the client in the request, so we
don't consider those columns as read.
+ assertEquals(4, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsWithOnlyPartitionKeyInSelect() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+ executeNet(CURRENT, "SELECT pk FROM %s WHERE pk = 1");
+
+ assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The partition key is provided by the client in the request, so we
don't consider that column read.
+ assertEquals(0, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsWithOnlyClusteringKeyInSelect() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+ executeNet(CURRENT, "SELECT ck FROM %s WHERE pk = 1");
+
+ assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The partition key is provided by the client in the request, so we
don't consider that column read.
+ assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldNotRecordReadMetricsWhenDisabled() throws Throwable
+ {
+ StorageProxy.instance.setClientRequestSizeMetricsEnabled(false);
+
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1");
+
+ assertEquals(0, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ assertEquals(0, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsWithSingleRowSelection() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck = 1");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // Both the partition key and clustering key are provided by the
client in the request.
+ assertEquals(1, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsWithSliceRestriction() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck > 0");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The partition key is selected, but the restriction over the
clustering key is a slice.
+ assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsWithINRestrictionSinglePartition()
throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck IN (0, 1)");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The partition key and clustering key are both selected.
+ assertEquals(1, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsWithINRestrictionMultiplePartitions()
throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 3)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (4, 5, 6)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk IN (1, 4)");
+
+ assertEquals(2, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The partition key is selected, but there is no clustering
restriction.
+ assertEquals(4, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForMultiColumnClusteringRestriction()
throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, ck3 int, v
int, PRIMARY KEY (pk, ck1, ck2, ck3))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck1, ck2, ck3, v) VALUES (1,
2, 3, 4, 6)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck1 = 2 AND
(ck2, ck3) = (3, 4)");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The entire primary key is selected, so only one value is actually
read.
+ assertEquals(1, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForClusteringSlice() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, ck3 int, v
int, PRIMARY KEY (pk, ck1, ck2, ck3))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck1, ck2, ck3, v) VALUES (1,
2, 3, 4, 6)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND ck1 = 2 AND ck2
= 3 AND ck3 >= 4");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The last clustering key element isn't bound, so count it as being
read.
+ assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForTokenAndClusteringSlice() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck1 int, ck2 int, ck3 int, v
int, PRIMARY KEY (pk, ck1, ck2, ck3))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck1, ck2, ck3, v) VALUES (1,
2, 3, 4, 6)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE token(pk) = token(1) AND
ck1 = 2 AND ck2 = 3 AND ck3 >= 4 ALLOW FILTERING");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // Last clustering is a slice, and the partition key is restricted on
token, so count them as read.
+ assertEquals(3, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForSingleValueRow() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(1,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldNotRecordWriteMetricsWhenDisabled() throws Throwable
+ {
+ StorageProxy.instance.setClientRequestSizeMetricsEnabled(false);
+
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+
+ assertEquals(0, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(0,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForMultiValueRow() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int, v3
int)");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, v1, v2, v3) VALUES (1, 2, 3,
4)");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(3,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForBatch() throws Exception
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int)");
+
+ try (SimpleClient client = new
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+ {
+ client.connect(false);
+
+ String first = String.format("INSERT INTO %s.%s (pk, v1, v2)
VALUES (1, 10, 100)", KEYSPACE, currentTable());
+ String second = String.format("INSERT INTO %s.%s (pk, v1, v2)
VALUES (2, 20, 200)", KEYSPACE, currentTable());
+
+ List<List<ByteBuffer>> values =
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+ BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED,
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
+ client.execute(batch);
+
+ // The metrics should reflect the batch as a single write
operation with multiple rows and columns.
+ assertEquals(2,
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(4,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForCellDeletes() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int, v3
int)");
+
+ executeNet(CURRENT, "DELETE v1, v2, v3 FROM %s WHERE pk = 1");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(3,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForCellNulls() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int, v3
int)");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, v1, v2, v3) VALUES (1, null,
null, null)");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(3,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForSingleStaticInsert() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int,
v2 int, PRIMARY KEY (pk, ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v0, v1, v2) VALUES (0, 1,
2, 3, 4)");
+
+ assertEquals(2, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(3,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForBatchedStaticInserts() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int,
v2 int, PRIMARY KEY (pk, ck))");
+
+ try (SimpleClient client = new
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+ {
+ client.connect(false);
+
+ String first = String.format("INSERT INTO %s.%s (pk, ck, v0, v1,
v2) VALUES (0, 1, 2, 3, 4)", KEYSPACE, currentTable());
+ String second = String.format("INSERT INTO %s.%s (pk, ck, v0, v1,
v2) VALUES (0, 2, 3, 5, 6)", KEYSPACE, currentTable());
+
+ List<List<ByteBuffer>> values =
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+ BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED,
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
+ client.execute(batch);
+
+ // Two normal rows and the single static row:
+ assertEquals(3,
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ // Two normal columns per insert, and then one columns for the
static row:
+ assertEquals(5,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForRowDelete() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int,
v2 int, PRIMARY KEY (pk, ck))");
+
+ executeNet(CURRENT, "DELETE FROM %s WHERE pk = 1 AND ck = 1");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ // The columns metric should account for all regular columns.
+ assertEquals(2,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForRangeDelete() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int,
v2 int, PRIMARY KEY (pk, ck))");
+
+ executeNet(CURRENT, "DELETE FROM %s WHERE pk = 1 AND ck > 1");
+
+ // The range delete is intended to delete at least one row, but that
is only a lower bound.
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ // The columns metric should account for all regular columns.
+ assertEquals(2,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForPartitionDelete() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v0 int static, v1 int,
v2 int, PRIMARY KEY (pk, ck))");
+
+ executeNet(CURRENT, "DELETE FROM %s WHERE pk = 1");
+
+ // A partition deletion intends to delete at least one row.
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ // If we delete one row, we intended to delete all its regular and
static columns.
+ assertEquals(3,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForIntraRowBatch() throws Exception
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v1 int, v2 int, PRIMARY
KEY (pk, ck))");
+
+ try (SimpleClient client = new
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+ {
+ client.connect(false);
+
+ String first = String.format("INSERT INTO %s.%s (pk, ck, v1, v2)
VALUES (1, 2, 3, 4)", KEYSPACE, currentTable());
+ String second = String.format("DELETE FROM %s.%s WHERE pk = 1 AND
ck > 1", KEYSPACE, currentTable());
+
+ List<List<ByteBuffer>> values =
ImmutableList.of(Collections.emptyList(), Collections.emptyList());
+ BatchMessage batch = new BatchMessage(BatchStatement.Type.LOGGED,
ImmutableList.of(first, second), values, QueryOptions.DEFAULT);
+ client.execute(batch);
+
+ // Both operations affect the same row, but writes and deletes are
distinct.
+ assertEquals(2,
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(4,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+ }
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForIfNotExistsV1() throws Exception
+ {
+ Paxos.setPaxosVariant(Config.PaxosVariant.v1);
+ shouldRecordWriteMetricsForIfNotExists();
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForIfNotExistsV2() throws Exception
+ {
+ Paxos.setPaxosVariant(Config.PaxosVariant.v2);
+ shouldRecordWriteMetricsForIfNotExists();
+ }
+
+ public void shouldRecordWriteMetricsForIfNotExists() throws Exception
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int, v3
int)");
+
+ try (SimpleClient client = new
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+ {
+ client.connect(false);
+ client.execute(new QueryMessage(String.format("INSERT INTO %s.%s
(pk, v1, v2, v3) VALUES (1, 2, 3, 4) IF NOT EXISTS", KEYSPACE, currentTable()),
QueryOptions.DEFAULT));
+
+ assertEquals(1,
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(3,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+ // We read internally, but don't reflect that in our read metrics.
+ assertEquals(0, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ assertEquals(0,
ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForCASV1() throws Exception
+ {
+ Paxos.setPaxosVariant(Config.PaxosVariant.v1);
+ shouldRecordWriteMetricsForCAS();
+ }
+
+ @Test
+ public void shouldRecordWriteMetricsForCASV2() throws Exception
+ {
+ Paxos.setPaxosVariant(Config.PaxosVariant.v2);
+ shouldRecordWriteMetricsForCAS();
+ }
+
+ public void shouldRecordWriteMetricsForCAS() throws Exception
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int)");
+
+ try (SimpleClient client = new
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+ {
+ client.connect(false);
+ client.execute(new QueryMessage(String.format("INSERT INTO %s.%s
(pk, v1, v2) VALUES (1, 2, 3)", KEYSPACE, currentTable()),
QueryOptions.DEFAULT));
+
+ assertEquals(1,
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(2,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+ client.execute(new QueryMessage(String.format("UPDATE %s.%s SET v2
= 4 WHERE pk = 1 IF v1 = 2", KEYSPACE, currentTable()), QueryOptions.DEFAULT));
+
+ assertEquals(3,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+ // We read internally, but don't reflect that in our read metrics.
+ assertEquals(0, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ assertEquals(0,
ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+ }
+
+ @Test
+ public void shouldNotRecordWriteMetricsForFailedCASV1() throws Exception
+ {
+ Paxos.setPaxosVariant(Config.PaxosVariant.v1);
+ shouldNotRecordWriteMetricsForFailedCAS();
+ }
+
+ @Test
+ public void shouldNotRecordWriteMetricsForFailedCASV2() throws Exception
+ {
+ Paxos.setPaxosVariant(Config.PaxosVariant.v2);
+ shouldNotRecordWriteMetricsForFailedCAS();
+ }
+
+ public void shouldNotRecordWriteMetricsForFailedCAS() throws Exception
+ {
+ createTable("CREATE TABLE %s (pk int PRIMARY KEY, v1 int, v2 int)");
+
+ try (SimpleClient client = new
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+ {
+ client.connect(false);
+ client.execute(new QueryMessage(String.format("INSERT INTO %s.%s
(pk, v1, v2) VALUES (1, 2, 3)", KEYSPACE, currentTable()),
QueryOptions.DEFAULT));
+
+ assertEquals(1,
ClientRequestSizeMetrics.totalRowsWritten.getCount());
+ assertEquals(2,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+ client.execute(new QueryMessage(String.format("UPDATE %s.%s SET v2
= 4 WHERE pk = 1 IF v1 = 4", KEYSPACE, currentTable()), QueryOptions.DEFAULT));
+
+ // We didn't actually write anything, so don't reflect a write
against the metrics.
+ assertEquals(2,
ClientRequestSizeMetrics.totalColumnsWritten.getCount());
+
+ // Don't reflect in our read metrics the result returned to the
client by the failed CAS write.
+ assertEquals(0, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ assertEquals(0,
ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+ }
+
+ @Test
+ public void shouldRecordReadMetricsOnSerialReadV1() throws Exception
+ {
+ Paxos.setPaxosVariant(Config.PaxosVariant.v1);
+ shouldRecordReadMetricsOnSerialRead();
+ }
+
+ @Test
+ public void shouldRecordReadMetricsOnSerialReadV2() throws Exception
+ {
+ Paxos.setPaxosVariant(Config.PaxosVariant.v2);
+ shouldRecordReadMetricsOnSerialRead();
+ }
+
+ public void shouldRecordReadMetricsOnSerialRead() throws Exception
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ try (SimpleClient client = new
SimpleClient(nativeAddr.getHostAddress(), nativePort, CURRENT))
+ {
+ client.connect(false);
+ client.execute(new QueryMessage(String.format("INSERT INTO %s.%s
(pk, ck, v) VALUES (1, 1, 1)", KEYSPACE, currentTable()),
QueryOptions.DEFAULT));
+
+ QueryMessage query = new QueryMessage(String.format("SELECT * FROM
%s.%s WHERE pk = 1 AND ck = 1", KEYSPACE, currentTable()),
+
QueryOptions.forInternalCalls(ConsistencyLevel.SERIAL,
Collections.emptyList()));
+ client.execute(query);
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+
+ // Both the partition key and clustering key are provided by the
client in the request.
+ assertEquals(1,
ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForGlobalIndexQuery() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+ createIndex("CREATE INDEX on %s (v)");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, 2)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE v = 1");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The index search term is provided by the client in the request, so
we don't consider that column read.
+ assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForPartitionRestrictedIndexQuery()
throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+ createIndex("CREATE INDEX on %s (v)");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE pk = 1 AND v = 1");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The partition key and index search term are provided by the client,
so we don't consider those columns read.
+ assertEquals(1, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForClusteringKeyIndexQuery() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+ createIndex("CREATE INDEX on %s (ck)");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 2, 2)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE ck = 2");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The index search term is provided by the client in the request, so
we don't consider that column read.
+ assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForFilteringQuery() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, 2)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE v = 1 ALLOW FILTERING");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The filtering term is provided by the client in the request, so we
don't consider that column read.
+ assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForRangeFilteringQuery() throws
Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, 2)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE v > 1 ALLOW FILTERING");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The value column is restricted over a range, not bound to a
particular value.
+ assertEquals(3, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForINFilteringQuery() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v int, PRIMARY KEY (pk,
ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, 1)");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, 2)");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE v IN (1) ALLOW FILTERING");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The filtering term is provided by the client in the request, so we
don't consider that column read.
+ assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+
+ @Test
+ public void shouldRecordReadMetricsForContainsQuery() throws Throwable
+ {
+ createTable("CREATE TABLE %s (pk int, ck int, v set<int>, PRIMARY KEY
(pk, ck))");
+
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (1, 1, {1, 2,
3} )");
+ executeNet(CURRENT, "INSERT INTO %s (pk, ck, v) VALUES (2, 2, {4, 5,
6})");
+ executeNet(CURRENT, "SELECT * FROM %s WHERE v CONTAINS 1 ALLOW
FILTERING");
+
+ assertEquals(1, ClientRequestSizeMetrics.totalRowsRead.getCount());
+ // The filtering term is provided by the client in the request, so we
don't consider that column read.
+ assertEquals(2, ClientRequestSizeMetrics.totalColumnsRead.getCount());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]