Repository: cassandra Updated Branches: refs/heads/trunk 5712e6e2b -> 997cb663e
Create a system table to expose prepared statements patch by Robert Stupp; reviewed by Sylvain Lebresne for CASSANDRA-8831 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/997cb663 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/997cb663 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/997cb663 Branch: refs/heads/trunk Commit: 997cb663e8c8f164873515f81bb779e435aead6d Parents: 5712e6e Author: Robert Stupp <[email protected]> Authored: Mon Jul 11 21:13:48 2016 +1000 Committer: Robert Stupp <[email protected]> Committed: Mon Jul 11 21:13:48 2016 +1000 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 15 +++ .../apache/cassandra/cql3/QueryProcessor.java | 98 ++++++++++---- .../org/apache/cassandra/db/SystemKeyspace.java | 38 ++++++ .../cassandra/service/CassandraDaemon.java | 4 + .../org/apache/cassandra/utils/MD5Digest.java | 6 + .../cassandra/cql3/PstmtPersistenceTest.java | 134 +++++++++++++++++++ 7 files changed, 271 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/997cb663/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 92c6edb..287121a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * Create a system table to expose prepared statements (CASSANDRA-8831) * Reuse DataOutputBuffer from ColumnIndex (CASSANDRA-11970) * Remove DatabaseDescriptor dependency from SegmentedFile (CASSANDRA-11580) * Add supplied username to authentication error messages (CASSANDRA-12076) http://git-wip-us.apache.org/repos/asf/cassandra/blob/997cb663/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index 7418f3a..52eee1a 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -14,6 +14,21 @@ restore snapshots created with the previous major version using the using the provided 'sstableupgrade' tool. +3.10 +==== + +New features +------------ + - Prepared statements are now persisted in the table prepared_statements in + the system keyspace. Upon startup, this table is used to preload all + previously prepared statements - i.e. in many cases clients do not need to + re-prepare statements against restarted nodes. + +Upgrading +--------- + - Nothing specific to 3.10 but please see previous versions upgrading section, + especially if you are upgrading from 2.2. + 3.8 === http://git-wip-us.apache.org/repos/asf/cassandra/blob/997cb663/src/java/org/apache/cassandra/cql3/QueryProcessor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java index 222204b..4e7323e 100644 --- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java +++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java @@ -33,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; -import com.googlecode.concurrentlinkedhashmap.EntryWeigher; import org.antlr.runtime.*; import org.apache.cassandra.concurrent.ScheduledExecutors; import org.apache.cassandra.config.DatabaseDescriptor; @@ -140,6 +139,33 @@ public class QueryProcessor implements QueryHandler } } + public static void preloadPreparedStatement() + { + ClientState clientState = ClientState.forInternalCalls(); + int count = 0; + for (Pair<String, String> useKeyspaceAndCQL : SystemKeyspace.loadPreparedStatements()) + { + try + { + clientState.setKeyspace(useKeyspaceAndCQL.left); + prepare(useKeyspaceAndCQL.right, clientState, false); + count++; + } + catch (RequestValidationException e) + { + logger.warn("prepared statement recreation error: {}", useKeyspaceAndCQL.right, e); + } + } + logger.info("Preloaded {} prepared statements", count); + } + + @VisibleForTesting + public static void clearPrepraredStatements() + { + preparedStatements.clear(); + thriftPreparedStatements.clear(); + } + private static QueryState internalQueryState() { return InternalStateInstance.INSTANCE.queryState; @@ -446,6 +472,7 @@ public class QueryProcessor implements QueryHandler queryString.substring(0, 200))); MD5Digest statementId = computeId(queryString, keyspace); preparedStatements.put(statementId, prepared); + SystemKeyspace.writePreparedStatement(keyspace, statementId, queryString); return new ResultMessage.Prepared(statementId, prepared); } } @@ -555,14 +582,50 @@ public class QueryProcessor implements QueryHandler private static class MigrationSubscriber extends MigrationListener { - private void removeInvalidPreparedStatements(String ksName, String cfName) + private static void removeInvalidPreparedStatements(String ksName, String cfName) { removeInvalidPreparedStatements(internalStatements.values().iterator(), ksName, cfName); - removeInvalidPreparedStatements(preparedStatements.values().iterator(), ksName, cfName); + removeInvalidPersistentPreparedStatements(preparedStatements.entrySet().iterator(), ksName, cfName); removeInvalidPreparedStatements(thriftPreparedStatements.values().iterator(), ksName, cfName); } - private void removeInvalidPreparedStatements(Iterator<ParsedStatement.Prepared> iterator, String ksName, String cfName) + private static void removeInvalidPreparedStatementsForFunction(String ksName, String functionName) + { + Predicate<Function> matchesFunction = f -> ksName.equals(f.name().keyspace) && functionName.equals(f.name().name); + + for (Iterator<Map.Entry<MD5Digest, ParsedStatement.Prepared>> iter = preparedStatements.entrySet().iterator(); + iter.hasNext();) + { + Map.Entry<MD5Digest, ParsedStatement.Prepared> pstmt = iter.next(); + if (Iterables.any(pstmt.getValue().statement.getFunctions(), matchesFunction)) + { + SystemKeyspace.removePreparedStatement(pstmt.getKey()); + iter.remove(); + } + } + + + Iterators.removeIf(internalStatements.values().iterator(), + statement -> Iterables.any(statement.statement.getFunctions(), matchesFunction)); + + Iterators.removeIf(thriftPreparedStatements.values().iterator(), + statement -> Iterables.any(statement.statement.getFunctions(), matchesFunction)); + } + + private static void removeInvalidPersistentPreparedStatements(Iterator<Map.Entry<MD5Digest, ParsedStatement.Prepared>> iterator, + String ksName, String cfName) + { + while (iterator.hasNext()) + { + Map.Entry<MD5Digest, ParsedStatement.Prepared> entry = iterator.next(); + if (shouldInvalidate(ksName, cfName, entry.getValue().statement)) { + SystemKeyspace.removePreparedStatement(entry.getKey()); + iterator.remove(); + } + } + } + + private static void removeInvalidPreparedStatements(Iterator<ParsedStatement.Prepared> iterator, String ksName, String cfName) { while (iterator.hasNext()) { @@ -571,7 +634,7 @@ public class QueryProcessor implements QueryHandler } } - private boolean shouldInvalidate(String ksName, String cfName, CQLStatement statement) + private static boolean shouldInvalidate(String ksName, String cfName, CQLStatement statement) { String statementKsName; String statementCfName; @@ -621,7 +684,7 @@ public class QueryProcessor implements QueryHandler // in case there are other overloads, we have to remove all overloads since argument type // matching may change (due to type casting) if (Schema.instance.getKSMetaData(ksName).functions.get(new FunctionName(ksName, functionName)).size() > 1) - removeAllInvalidPreparedStatementsForFunction(ksName, functionName); + removeInvalidPreparedStatementsForFunction(ksName, functionName); } public void onUpdateColumnFamily(String ksName, String cfName, boolean affectsStatements) @@ -637,7 +700,7 @@ public class QueryProcessor implements QueryHandler // the new definition is picked (the function is resolved at preparation time). // TODO: if the function has multiple overload, we could invalidate only the statement refering to the overload // that was updated. This requires a few changes however and probably doesn't matter much in practice. - removeAllInvalidPreparedStatementsForFunction(ksName, functionName); + removeInvalidPreparedStatementsForFunction(ksName, functionName); } public void onUpdateAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) @@ -646,7 +709,7 @@ public class QueryProcessor implements QueryHandler // the new definition is picked (the function is resolved at preparation time). // TODO: if the function has multiple overload, we could invalidate only the statement refering to the overload // that was updated. This requires a few changes however and probably doesn't matter much in practice. - removeAllInvalidPreparedStatementsForFunction(ksName, aggregateName); + removeInvalidPreparedStatementsForFunction(ksName, aggregateName); } public void onDropKeyspace(String ksName) @@ -663,27 +726,12 @@ public class QueryProcessor implements QueryHandler public void onDropFunction(String ksName, String functionName, List<AbstractType<?>> argTypes) { - removeAllInvalidPreparedStatementsForFunction(ksName, functionName); + removeInvalidPreparedStatementsForFunction(ksName, functionName); } public void onDropAggregate(String ksName, String aggregateName, List<AbstractType<?>> argTypes) { - removeAllInvalidPreparedStatementsForFunction(ksName, aggregateName); - } - - private static void removeAllInvalidPreparedStatementsForFunction(String ksName, String functionName) - { - removeInvalidPreparedStatementsForFunction(internalStatements.values().iterator(), ksName, functionName); - removeInvalidPreparedStatementsForFunction(preparedStatements.values().iterator(), ksName, functionName); - removeInvalidPreparedStatementsForFunction(thriftPreparedStatements.values().iterator(), ksName, functionName); - } - - private static void removeInvalidPreparedStatementsForFunction(Iterator<ParsedStatement.Prepared> statements, - final String ksName, - final String functionName) - { - Predicate<Function> matchesFunction = f -> ksName.equals(f.name().keyspace) && functionName.equals(f.name().name); - Iterators.removeIf(statements, statement -> Iterables.any(statement.statement.getFunctions(), matchesFunction)); + removeInvalidPreparedStatementsForFunction(ksName, aggregateName); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/997cb663/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 584279d..120125f 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -97,6 +97,7 @@ public final class SystemKeyspace public static final String AVAILABLE_RANGES = "available_ranges"; public static final String VIEWS_BUILDS_IN_PROGRESS = "views_builds_in_progress"; public static final String BUILT_VIEWS = "built_views"; + public static final String PREPARED_STATEMENTS = "prepared_statements"; @Deprecated public static final String LEGACY_HINTS = "hints"; @Deprecated public static final String LEGACY_BATCHLOG = "batchlog"; @@ -265,6 +266,15 @@ public final class SystemKeyspace + "status_replicated boolean," + "PRIMARY KEY ((keyspace_name), view_name))"); + private static final CFMetaData PreparedStatements = + compile(PREPARED_STATEMENTS, + "prepared statements", + "CREATE TABLE %s (" + + "prepared_id blob," + + "logged_keyspace text," + + "query_string text," + + "PRIMARY KEY ((prepared_id)))"); + @Deprecated public static final CFMetaData LegacyHints = compile(LEGACY_HINTS, @@ -435,6 +445,7 @@ public final class SystemKeyspace BuiltViews, LegacyHints, LegacyBatchlog, + PreparedStatements, LegacyKeyspaces, LegacyColumnfamilies, LegacyColumns, @@ -1412,4 +1423,31 @@ public final class SystemKeyspace } } + public static void writePreparedStatement(String loggedKeyspace, MD5Digest key, String cql) + { + executeInternal(String.format("INSERT INTO %s.%s" + + " (logged_keyspace, prepared_id, query_string) VALUES (?, ?, ?)", + NAME, PREPARED_STATEMENTS), + loggedKeyspace, key.byteBuffer(), cql); + logger.debug("stored prepared statement for logged keyspace '{}': '{}'", loggedKeyspace, cql); + } + + public static void removePreparedStatement(MD5Digest key) + { + executeInternal(String.format("DELETE FROM %s.%s" + + " WHERE prepared_id = ?", + NAME, PREPARED_STATEMENTS), + key.byteBuffer()); + } + + public static List<Pair<String, String>> loadPreparedStatements() + { + String query = String.format("SELECT logged_keyspace, query_string FROM %s.%s", NAME, PREPARED_STATEMENTS); + UntypedResultSet resultSet = executeOnceInternal(query); + List<Pair<String, String>> r = new ArrayList<>(); + for (UntypedResultSet.Row row : resultSet) + r.add(Pair.create(row.has("logged_keyspace") ? row.getString("logged_keyspace") : null, + row.getString("query_string"))); + return r; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/997cb663/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 2d21bff..0f82974 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -48,6 +48,7 @@ import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.Schema; import org.apache.cassandra.cql3.functions.ThreadAwareSecurityManager; +import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.exceptions.ConfigurationException; @@ -344,6 +345,9 @@ public class CassandraDaemon SystemKeyspace.finishStartup(); + // Prepared statements + QueryProcessor.preloadPreparedStatement(); + // Metrics String metricsReporterConfigFile = System.getProperty("cassandra.metricsReporterConfigFile"); if (metricsReporterConfigFile != null) http://git-wip-us.apache.org/repos/asf/cassandra/blob/997cb663/src/java/org/apache/cassandra/utils/MD5Digest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/utils/MD5Digest.java b/src/java/org/apache/cassandra/utils/MD5Digest.java index 2feb09e..4e736dc 100644 --- a/src/java/org/apache/cassandra/utils/MD5Digest.java +++ b/src/java/org/apache/cassandra/utils/MD5Digest.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.utils; +import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -55,6 +56,11 @@ public class MD5Digest return compute(toHash.getBytes(StandardCharsets.UTF_8)); } + public ByteBuffer byteBuffer() + { + return ByteBuffer.wrap(bytes); + } + @Override public final int hashCode() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/997cb663/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java new file mode 100644 index 0000000..4ddb797 --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cql3; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.junit.Test; + +import junit.framework.Assert; +import org.apache.cassandra.cql3.statements.ParsedStatement; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.IntegerType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.MD5Digest; + +public class PstmtPersistenceTest extends CQLTester +{ + @Test + public void testCachedPreparedStatements() throws Throwable + { + // need this for pstmt execution/validation tests + requireNetwork(); + + int rows = QueryProcessor.executeOnceInternal("SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS).size(); + Assert.assertEquals(0, rows); + + execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}"); + execute("CREATE TABLE foo.bar (key text PRIMARY KEY, val int)"); + + ClientState clientState = ClientState.forExternalCalls(InetSocketAddress.createUnresolved("127.0.0.1", 1234)); + + createTable("CREATE TABLE %s (pk int PRIMARY KEY, val text)"); + + List<MD5Digest> stmtIds = new ArrayList<>(); + // #0 + stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + SchemaKeyspace.NAME + '.' + SchemaKeyspace.TABLES + " WHERE keyspace_name = ?", clientState, false).statementId); + // #1 + stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId); + // #2 + stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId); + clientState.setKeyspace("foo"); + // #3 + stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + currentTable() + " WHERE pk = ?", clientState, false).statementId); + // #4 + stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = ?", clientState, false).statementId); + + Assert.assertEquals(5, stmtIds.size()); + Assert.assertEquals(5, QueryProcessor.preparedStatementsCount()); + + String queryAll = "SELECT * FROM " + SystemKeyspace.NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS; + + rows = QueryProcessor.executeOnceInternal(queryAll).size(); + Assert.assertEquals(5, rows); + + QueryHandler handler = ClientState.getCQLQueryHandler(); + validatePstmts(stmtIds, handler); + + // clear prepared statements cache + QueryProcessor.clearPrepraredStatements(); + Assert.assertEquals(0, QueryProcessor.preparedStatementsCount()); + for (MD5Digest stmtId : stmtIds) + Assert.assertNull(handler.getPrepared(stmtId)); + + // load prepared statements and validate that these still execute fine + QueryProcessor.preloadPreparedStatement(); + validatePstmts(stmtIds, handler); + + // validate that the prepared statements are in the system table + for (UntypedResultSet.Row row : QueryProcessor.executeOnceInternal(queryAll)) + { + MD5Digest digest = MD5Digest.wrap(ByteBufferUtil.getArray(row.getBytes("prepared_id"))); + ParsedStatement.Prepared prepared = QueryProcessor.instance.getPrepared(digest); + Assert.assertNotNull(prepared); + } + + // add anther prepared statement and sync it to table + QueryProcessor.prepare("SELECT * FROM bar WHERE key = ?", clientState, false); + Assert.assertEquals(6, QueryProcessor.preparedStatementsCount()); + rows = QueryProcessor.executeOnceInternal(queryAll).size(); + Assert.assertEquals(6, rows); + + // drop a keyspace (prepared statements are removed - syncPreparedStatements() remove should the rows, too) + execute("DROP KEYSPACE foo"); + Assert.assertEquals(3, QueryProcessor.preparedStatementsCount()); + rows = QueryProcessor.executeOnceInternal(queryAll).size(); + Assert.assertEquals(3, rows); + + } + + private void validatePstmts(List<MD5Digest> stmtIds, QueryHandler handler) + { + Assert.assertEquals(5, QueryProcessor.preparedStatementsCount()); + QueryOptions optionsStr = QueryOptions.forInternalCalls(Collections.singletonList(UTF8Type.instance.fromString("foobar"))); + QueryOptions optionsInt = QueryOptions.forInternalCalls(Collections.singletonList(Int32Type.instance.decompose(42))); + validatePstmt(handler, stmtIds.get(0), optionsStr); + validatePstmt(handler, stmtIds.get(1), optionsInt); + validatePstmt(handler, stmtIds.get(2), optionsStr); + validatePstmt(handler, stmtIds.get(3), optionsInt); + validatePstmt(handler, stmtIds.get(4), optionsStr); + } + + private static void validatePstmt(QueryHandler handler, MD5Digest stmtId, QueryOptions options) + { + ParsedStatement.Prepared prepared = handler.getPrepared(stmtId); + Assert.assertNotNull(prepared); + handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap()); + } +}
