Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 dbcc4eae6 -> 42b91d445
Add metrics to track usage of PreparedStatments patch by tjake; reviewed by carl for (CASSANDRA-7719) Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/42b91d44 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/42b91d44 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/42b91d44 Branch: refs/heads/cassandra-2.1 Commit: 42b91d4453a3f7ac698c9a871c5d75fcbf8a578f Parents: dbcc4ea Author: Jake Luciani <j...@apache.org> Authored: Wed Sep 10 16:18:32 2014 -0400 Committer: Jake Luciani <j...@apache.org> Committed: Wed Sep 10 16:18:32 2014 -0400 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cql3/QueryProcessor.java | 67 +++++++---- .../cassandra/metrics/CqlStatementMetrics.java | 53 +++++++++ .../apache/cassandra/service/ClientState.java | 2 +- .../apache/cassandra/cql3/CqlMetricsTest.java | 110 +++++++++++++++++++ 5 files changed, 213 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 60fd4c9..177ac76 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * Add metrics for tracking PreparedStatement use (CASSANDRA-7719) * (cqlsh) tab-completion for triggers (CASSANDRA-7824) * (cqlsh): Support for query paging (CASSANDRA-7514) * (cqlsh): Show progress of COPY operations (CASSANDRA-7789) http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index f2bf305..99972a2 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -22,11 +22,14 @@ import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentHashMap; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Ints; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import com.googlecode.concurrentlinkedhashmap.EntryWeigher; +import com.googlecode.concurrentlinkedhashmap.EvictionListener; import org.antlr.runtime.*; +import org.apache.cassandra.metrics.CqlStatementMetrics; import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,7 +59,6 @@ public class QueryProcessor implements QueryHandler private static final Logger logger = LoggerFactory.getLogger(QueryProcessor.class); private static final MemoryMeter meter = new MemoryMeter().withGuessing(MemoryMeter.Guess.FALLBACK_BEST); private static final long MAX_CACHE_PREPARED_MEMORY = Runtime.getRuntime().maxMemory() / 256; - private static final int MAX_CACHE_PREPARED_COUNT = 10000; private static EntryWeigher<MD5Digest, ParsedStatement.Prepared> cqlMemoryUsageWeigher = new EntryWeigher<MD5Digest, ParsedStatement.Prepared>() { @@ -83,15 +85,34 @@ public class QueryProcessor implements QueryHandler // bother with expiration on those. private static final ConcurrentMap<String, ParsedStatement.Prepared> internalStatements = new ConcurrentHashMap<>(); + @VisibleForTesting + public static final CqlStatementMetrics metrics = new CqlStatementMetrics(); + static { preparedStatements = new ConcurrentLinkedHashMap.Builder<MD5Digest, ParsedStatement.Prepared>() .maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY) .weigher(cqlMemoryUsageWeigher) - .build(); + .listener(new EvictionListener<MD5Digest, ParsedStatement.Prepared>() + { + @Override + public void onEviction(MD5Digest md5Digest, ParsedStatement.Prepared prepared) + { + metrics.activePreparedStatements.dec(); + } + }).build(); + thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, CQLStatement>() .maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY) .weigher(thriftMemoryUsageWeigher) + .listener(new EvictionListener<Integer, CQLStatement>() + { + @Override + public void onEviction(Integer integer, CQLStatement cqlStatement) + { + metrics.activePreparedStatements.dec(); + } + }) .build(); } @@ -174,7 +195,7 @@ public class QueryProcessor implements QueryHandler Cell.MAX_NAME_LENGTH)); } - public static ResultMessage processStatement(CQLStatement statement, + private static ResultMessage processStatement(CQLStatement statement, QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException @@ -203,6 +224,9 @@ public class QueryProcessor implements QueryHandler if (prepared.getBoundTerms() != options.getValues().size()) throw new InvalidRequestException("Invalid amount of bind variables"); + if (!queryState.getClientState().isInternal) + metrics.executedUnprepared.inc(); + return processStatement(prepared, queryState, options); } @@ -370,24 +394,28 @@ public class QueryProcessor implements QueryHandler throw new InvalidRequestException(String.format("Prepared statement of size %d bytes is larger than allowed maximum of %d bytes.", statementSize, MAX_CACHE_PREPARED_MEMORY)); - - if (forThrift) + try { - int statementId = toHash.hashCode(); - thriftPreparedStatements.put(statementId, prepared.statement); - logger.trace(String.format("Stored prepared statement #%d with %d bind markers", - statementId, - prepared.statement.getBoundTerms())); - return ResultMessage.Prepared.forThrift(statementId, prepared.boundNames); - } - else + if (forThrift) + { + int statementId = toHash.hashCode(); + thriftPreparedStatements.put(statementId, prepared.statement); + logger.trace("Stored prepared statement #{} with {} bind markers", + statementId, + prepared.statement.getBoundTerms()); + return ResultMessage.Prepared.forThrift(statementId, prepared.boundNames); + } else + { + MD5Digest statementId = MD5Digest.compute(toHash); + preparedStatements.put(statementId, prepared); + logger.trace("Stored prepared statement #{} with {} bind markers", + statementId, + prepared.statement.getBoundTerms()); + return new ResultMessage.Prepared(statementId, prepared); + } + } finally { - MD5Digest statementId = MD5Digest.compute(toHash); - preparedStatements.put(statementId, prepared); - logger.trace(String.format("Stored prepared statement %s with %d bind markers", - statementId, - prepared.statement.getBoundTerms())); - return new ResultMessage.Prepared(statementId, prepared); + metrics.activePreparedStatements.inc(); } } @@ -410,6 +438,7 @@ public class QueryProcessor implements QueryHandler logger.trace("[{}] '{}'", i+1, variables.get(i)); } + metrics.executedPrepared.inc(); return processStatement(statement, queryState, options); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/src/java/org/apache/cassandra/metrics/CqlStatementMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/CqlStatementMetrics.java b/src/java/org/apache/cassandra/metrics/CqlStatementMetrics.java new file mode 100644 index 0000000..02b4ad0 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/CqlStatementMetrics.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + +import com.yammer.metrics.Metrics; +import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Gauge; +import com.yammer.metrics.util.RatioGauge; + + +public class CqlStatementMetrics +{ + private final MetricNameFactory factory = new DefaultNameFactory("CqlStatement"); + public final Counter activePreparedStatements = Metrics.newCounter(factory.createMetricName("ActivePreparedStatements")); + public final Counter executedPrepared = Metrics.newCounter(factory.createMetricName("ExecutedPrepared")); + public final Counter executedUnprepared = Metrics.newCounter(factory.createMetricName("ExecutedUnPrepared")); + + public final Gauge<Double> preparedRatio = Metrics.newGauge(factory.createMetricName("PreparedUnpreparedRatio"), new RatioGauge() + { + protected double getNumerator() + { + long num = executedPrepared.count(); + return num == 0 ? 1 : num; + } + + protected double getDenominator() + { + long den = executedUnprepared.count(); + return den == 0 ? 1 : den; + } + }); + + public void reset() + { + executedPrepared.clear(); + executedUnprepared.clear(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index c0396cb..725690e 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -103,7 +103,7 @@ public class ClientState // isInternal is used to mark ClientState as used by some internal component // that should have an ability to modify system keyspace. - private final boolean isInternal; + public final boolean isInternal; // The remote address of the client - null for internal clients. private final SocketAddress remoteAddress; http://git-wip-us.apache.org/repos/asf/cassandra/blob/42b91d44/test/unit/org/apache/cassandra/cql3/CqlMetricsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CqlMetricsTest.java b/test/unit/org/apache/cassandra/cql3/CqlMetricsTest.java new file mode 100644 index 0000000..dc9c6a4 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/CqlMetricsTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.EmbeddedCassandraService; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import java.io.IOException; + + +@RunWith(OrderedJUnit4ClassRunner.class) +public class CqlMetricsTest +{ + + private static EmbeddedCassandraService cassandra; + + private static Cluster cluster; + private static Session session; + private static PreparedStatement metricsStatement; + + @BeforeClass() + public static void setup() throws ConfigurationException, IOException + { + cassandra = new EmbeddedCassandraService(); + cassandra.start(); + + cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build(); + session = cluster.connect(); + + session.execute("drop keyspace if exists junit;"); + session.execute("create keyspace junit WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };"); + session.execute("CREATE TABLE junit.metricstest (\n" + + " id int PRIMARY KEY,\n" + + " val text\n" + + ");"); + } + + @Test + public void testActivePreparedStatements() + { + assert QueryProcessor.metrics.activePreparedStatements.count() == 0; + + metricsStatement = session.prepare("insert into junit.metricstest(id, val)values(?,?)"); + + assert QueryProcessor.metrics.activePreparedStatements.count() == 1; + } + + @Test + public void testExecutedPrepared() + { + QueryProcessor.metrics.reset(); + + assert QueryProcessor.metrics.activePreparedStatements.count() == 1; + assert QueryProcessor.metrics.executedPrepared.count() == 0; + assert QueryProcessor.metrics.executedUnprepared.count() == 0; + assert QueryProcessor.metrics.preparedRatio.value() == 1.0; + + for (int i = 0; i < 10; i++) + { + session.execute(metricsStatement.bind(i, "val"+i)); + } + + assert QueryProcessor.metrics.executedPrepared.count() == 10; + assert QueryProcessor.metrics.executedUnprepared.count() == 0; + assert QueryProcessor.metrics.preparedRatio.value() == 10d/1d; + + } + + @Test + public void testExecutedUnPrepared() + { + QueryProcessor.metrics.reset(); + + assert QueryProcessor.metrics.activePreparedStatements.count() == 1; + assert QueryProcessor.metrics.executedPrepared.count() == 0; + assert QueryProcessor.metrics.executedUnprepared.count() == 0; + + for (int i = 0; i < 10; i++) + { + session.execute(String.format("insert into junit.metricstest(id, val)values(%d,'%s')",i, "val"+1)); + } + + assert QueryProcessor.metrics.executedPrepared.count() == 0; + assert QueryProcessor.metrics.executedUnprepared.count() == 10; + assert QueryProcessor.metrics.preparedRatio.value() == 1d/10d; + } +}