Repository: cassandra Updated Branches: refs/heads/trunk 2e8ba0364 -> 5a249f26c
Don't re-parse already prepared statements Patch by Benedict Elliot Smith and Tyler Hobbs; reviewed by Aleksey Yeschenko for CASSANDRA-7923 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/24c181ff Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/24c181ff Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/24c181ff Branch: refs/heads/trunk Commit: 24c181ff754d629094f5210f5a53bcad1d0aa81d Parents: ed1681a Author: Tyler Hobbs <[email protected]> Authored: Tue Oct 7 17:59:43 2014 -0500 Committer: Tyler Hobbs <[email protected]> Committed: Tue Oct 7 17:59:43 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/cql3/QueryHandler.java | 2 +- .../apache/cassandra/cql3/QueryProcessor.java | 70 ++++++++++++++------ .../cassandra/thrift/CassandraServer.java | 9 +-- 4 files changed, 57 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/24c181ff/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0595767..b6299c7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.1.1 + * Avoid re-parsing already prepared statements (CASSANDRA-7923) * Fix some Thrift slice deletions and updates of COMPACT STORAGE tables with some clustering columns omitted (CASSANDRA-7990) * Fix filtering for CONTAINS on sets (CASSANDRA-8033) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24c181ff/src/java/org/apache/cassandra/cql3/QueryHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java index 2f28812..d42d90e 100644 --- a/src/java/org/apache/cassandra/cql3/QueryHandler.java +++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java @@ -30,7 +30,7 @@ public interface QueryHandler public ResultMessage process(String query, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException; public ResultMessage.Prepared prepare(String query, QueryState state) throws RequestValidationException; public ParsedStatement.Prepared getPrepared(MD5Digest id); - public CQLStatement getPreparedForThrift(Integer id); + public ParsedStatement.Prepared getPreparedForThrift(Integer id); public ResultMessage processPrepared(CQLStatement statement, QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException; public ResultMessage processBatch(BatchStatement statement, QueryState state, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24c181ff/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 0c75642..2818358 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -68,17 +68,17 @@ public class QueryProcessor implements QueryHandler } }; - private static EntryWeigher<Integer, CQLStatement> thriftMemoryUsageWeigher = new EntryWeigher<Integer, CQLStatement>() + private static EntryWeigher<Integer, ParsedStatement.Prepared> thriftMemoryUsageWeigher = new EntryWeigher<Integer, ParsedStatement.Prepared>() { @Override - public int weightOf(Integer key, CQLStatement value) + public int weightOf(Integer key, ParsedStatement.Prepared value) { - return Ints.checkedCast(measure(key) + measure(value)); + return Ints.checkedCast(measure(key) + measure(value.statement) + measure(value.boundNames)); } }; private static final ConcurrentLinkedHashMap<MD5Digest, ParsedStatement.Prepared> preparedStatements; - private static final ConcurrentLinkedHashMap<Integer, CQLStatement> thriftPreparedStatements; + private static final ConcurrentLinkedHashMap<Integer, ParsedStatement.Prepared> thriftPreparedStatements; // A map for prepared statements used internally (which we don't want to mix with user statement, in particular we don't // bother with expiration on those. @@ -104,12 +104,12 @@ public class QueryProcessor implements QueryHandler } }).build(); - thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, CQLStatement>() + thriftPreparedStatements = new ConcurrentLinkedHashMap.Builder<Integer, ParsedStatement.Prepared>() .maximumWeightedCapacity(MAX_CACHE_PREPARED_MEMORY) .weigher(thriftMemoryUsageWeigher) - .listener(new EvictionListener<Integer, CQLStatement>() + .listener(new EvictionListener<Integer, ParsedStatement.Prepared>() { - public void onEviction(Integer integer, CQLStatement cqlStatement) + public void onEviction(Integer integer, ParsedStatement.Prepared prepared) { metrics.preparedStatementsEvicted.inc(); lastMinuteEvictionsCount.incrementAndGet(); @@ -172,7 +172,7 @@ public class QueryProcessor implements QueryHandler return preparedStatements.get(id); } - public CQLStatement getPreparedForThrift(Integer id) + public ParsedStatement.Prepared getPreparedForThrift(Integer id) { return thriftPreparedStatements.get(id); } @@ -392,6 +392,10 @@ public class QueryProcessor implements QueryHandler public static ResultMessage.Prepared prepare(String queryString, ClientState clientState, boolean forThrift) throws RequestValidationException { + ResultMessage.Prepared existing = getStoredPreparedStatement(queryString, clientState.getRawKeyspace(), forThrift); + if (existing != null) + return existing; + ParsedStatement.Prepared prepared = getStatement(queryString, clientState); int boundTerms = prepared.statement.getBoundTerms(); if (boundTerms > FBUtilities.MAX_UNSIGNED_SHORT) @@ -401,12 +405,40 @@ public class QueryProcessor implements QueryHandler return storePreparedStatement(queryString, clientState.getRawKeyspace(), prepared, forThrift); } + private static MD5Digest computeId(String queryString, String keyspace) + { + String toHash = keyspace == null ? queryString : keyspace + queryString; + return MD5Digest.compute(toHash); + } + + private static Integer computeThriftId(String queryString, String keyspace) + { + String toHash = keyspace == null ? queryString : keyspace + queryString; + return toHash.hashCode(); + } + + private static ResultMessage.Prepared getStoredPreparedStatement(String queryString, String keyspace, boolean forThrift) + throws InvalidRequestException + { + if (forThrift) + { + Integer thriftStatementId = computeThriftId(queryString, keyspace); + ParsedStatement.Prepared existing = thriftPreparedStatements.get(thriftStatementId); + return existing == null ? null : ResultMessage.Prepared.forThrift(thriftStatementId, existing.boundNames); + } + else + { + MD5Digest statementId = computeId(queryString, keyspace); + ParsedStatement.Prepared existing = preparedStatements.get(statementId); + return existing == null ? null : new ResultMessage.Prepared(statementId, existing); + } + } + private static ResultMessage.Prepared storePreparedStatement(String queryString, String keyspace, ParsedStatement.Prepared prepared, boolean forThrift) throws InvalidRequestException { // Concatenate the current keyspace so we don't mix prepared statements between keyspace (#5352). // (if the keyspace is null, queryString has to have a fully-qualified keyspace so it's fine. - String toHash = keyspace == null ? queryString : keyspace + queryString; long statementSize = measure(prepared.statement); // don't execute the statement if it's bigger than the allowed threshold if (statementSize > MAX_CACHE_PREPARED_MEMORY) @@ -415,13 +447,13 @@ public class QueryProcessor implements QueryHandler MAX_CACHE_PREPARED_MEMORY)); if (forThrift) { - int statementId = toHash.hashCode(); - thriftPreparedStatements.put(statementId, prepared.statement); + Integer statementId = computeThriftId(queryString, keyspace); + thriftPreparedStatements.put(statementId, prepared); return ResultMessage.Prepared.forThrift(statementId, prepared.boundNames); } else { - MD5Digest statementId = MD5Digest.compute(toHash); + MD5Digest statementId = computeId(queryString, keyspace); preparedStatements.put(statementId, prepared); return new ResultMessage.Prepared(statementId, prepared); } @@ -521,19 +553,17 @@ public class QueryProcessor implements QueryHandler { private void removeInvalidPreparedStatements(String ksName, String cfName) { - Iterator<ParsedStatement.Prepared> iterator = preparedStatements.values().iterator(); + removeInvalidPreparedStatements(preparedStatements.values().iterator(), ksName, cfName); + removeInvalidPreparedStatements(thriftPreparedStatements.values().iterator(), ksName, cfName); + } + + private void removeInvalidPreparedStatements(Iterator<ParsedStatement.Prepared> iterator, String ksName, String cfName) + { while (iterator.hasNext()) { if (shouldInvalidate(ksName, cfName, iterator.next().statement)) iterator.remove(); } - - Iterator<CQLStatement> thriftIterator = thriftPreparedStatements.values().iterator(); - while (thriftIterator.hasNext()) - { - if (shouldInvalidate(ksName, cfName, thriftIterator.next())) - thriftIterator.remove(); - } } private boolean shouldInvalidate(String ksName, String cfName, CQLStatement statement) http://git-wip-us.apache.org/repos/asf/cassandra/blob/24c181ff/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index a4f25b8..2e76ee4 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -48,6 +48,7 @@ import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql.CQLStatement; import org.apache.cassandra.cql.QueryProcessor; import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.statements.ParsedStatement; import org.apache.cassandra.db.*; import org.apache.cassandra.db.composites.*; import org.apache.cassandra.db.context.CounterContext; @@ -2164,16 +2165,16 @@ public class CassandraServer implements Cassandra.Iface try { ThriftClientState cState = state(); - org.apache.cassandra.cql3.CQLStatement statement = cState.getCQLQueryHandler().getPreparedForThrift(itemId); + ParsedStatement.Prepared prepared = cState.getCQLQueryHandler().getPreparedForThrift(itemId); - if (statement == null) + if (prepared == null) throw new InvalidRequestException(String.format("Prepared query with ID %d not found" + " (either the query was not prepared on this host (maybe the host has been restarted?)" + " or you have prepared too many queries and it has been evicted from the internal cache)", itemId)); - logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundTerms()); + logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, prepared.statement.getBoundTerms()); - return cState.getCQLQueryHandler().processPrepared(statement, + return cState.getCQLQueryHandler().processPrepared(prepared.statement, cState.getQueryState(), QueryOptions.fromProtocolV2(ThriftConversion.fromThrift(cLevel), bindVariables)).toThriftResult(); }
