Repository: cassandra
Updated Branches:
  refs/heads/cassandra-3.11 bb7e522b4 -> 9562b9b69
  refs/heads/trunk 26e025804 -> 9c6f87c35


Properly evict pstmts from prepared statements cache

patch by Robert Stupp; reviewed by Benjamin Lerer for CASSANDRA-13641


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9562b9b6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9562b9b6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9562b9b6

Branch: refs/heads/cassandra-3.11
Commit: 9562b9b69e08b84ec1e8e431a846548fa8a83b44
Parents: bb7e522
Author: Robert Stupp <sn...@snazy.de>
Authored: Wed Jun 28 21:15:03 2017 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Wed Jun 28 21:15:03 2017 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/cql3/QueryProcessor.java   |   9 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   6 ++
 test/conf/cassandra.yaml                        |   1 +
 .../cassandra/cql3/PstmtPersistenceTest.java    | 108 ++++++++++++++-----
 5 files changed, 100 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 4297a15..88aa1ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.11.1
+ * Properly evict pstmts from prepared statements cache (CASSANDRA-13641)
 Merged from 3.0:
  * Fix secondary index queries on COMPACT tables (CASSANDRA-13627)
  * Nodetool listsnapshots output is missing a newline, if there are no 
snapshots (CASSANDRA-13568)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java 
b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index f5ce7e4..0e0ba3c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -88,6 +88,7 @@ public class QueryProcessor implements QueryHandler
                              .listener((md5Digest, prepared) -> {
                                  metrics.preparedStatementsEvicted.inc();
                                  lastMinuteEvictionsCount.incrementAndGet();
+                                 
SystemKeyspace.removePreparedStatement(md5Digest);
                              }).build();
 
         thriftPreparedStatements = new 
ConcurrentLinkedHashMap.Builder<Integer, ParsedStatement.Prepared>()
@@ -162,11 +163,17 @@ public class QueryProcessor implements QueryHandler
         logger.info("Preloaded {} prepared statements", count);
     }
 
+    /**
+     * Clears the prepared statement cache.
+     * @param memoryOnly {@code true} if only the in memory caches must be 
cleared, {@code false} otherwise.
+     */
     @VisibleForTesting
-    public static void clearPrepraredStatements()
+    public static void clearPreparedStatements(boolean memoryOnly)
     {
         preparedStatements.clear();
         thriftPreparedStatements.clear();
+        if (!memoryOnly)
+            SystemKeyspace.resetPreparedStatements();
     }
 
     private static QueryState internalQueryState()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java 
b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 82c9752..6c45329 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1488,6 +1488,12 @@ public final class SystemKeyspace
                         key.byteBuffer());
     }
 
+    public static void resetPreparedStatements()
+    {
+        ColumnFamilyStore availableRanges = 
Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getColumnFamilyStore(PREPARED_STATEMENTS);
+        availableRanges.truncateBlocking();
+    }
+
     public static List<Pair<String, String>> loadPreparedStatements()
     {
         String query = String.format("SELECT logged_keyspace, query_string 
FROM %s.%s", SchemaConstants.SYSTEM_KEYSPACE_NAME, PREPARED_STATEMENTS);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml
index cf02634..96ca9a0 100644
--- a/test/conf/cassandra.yaml
+++ b/test/conf/cassandra.yaml
@@ -44,3 +44,4 @@ row_cache_class_name: org.apache.cassandra.cache.OHCProvider
 row_cache_size_in_mb: 16
 enable_user_defined_functions: true
 enable_scripted_user_defined_functions: true
+prepared_statements_cache_size_mb: 1

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9562b9b6/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java 
b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 380dbda..e7adc8e 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
+import org.junit.Before;
 import org.junit.Test;
 
 import junit.framework.Assert;
@@ -36,16 +37,23 @@ import org.apache.cassandra.service.QueryState;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.MD5Digest;
 
+import static org.junit.Assert.*;
+
 public class PstmtPersistenceTest extends CQLTester
 {
+    @Before
+    public void setUp()
+    {
+        QueryProcessor.clearPreparedStatements(false);
+    }
+ 
     @Test
     public void testCachedPreparedStatements() throws Throwable
     {
         // need this for pstmt execution/validation tests
         requireNetwork();
 
-        int rows = QueryProcessor.executeOnceInternal("SELECT * FROM " + 
SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + 
SystemKeyspace.PREPARED_STATEMENTS).size();
-        Assert.assertEquals(0, rows);
+        assertEquals(0, numberOfStatementsOnDisk());
 
         execute("CREATE KEYSPACE IF NOT EXISTS foo WITH replication = 
{'class': 'SimpleStrategy', 'replication_factor': '1'}");
         execute("CREATE TABLE foo.bar (key text PRIMARY KEY, val int)");
@@ -56,30 +64,27 @@ public class PstmtPersistenceTest extends CQLTester
 
         List<MD5Digest> stmtIds = new ArrayList<>();
         // #0
-        stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + 
SchemaConstants.SCHEMA_KEYSPACE_NAME + '.' + SchemaKeyspace.TABLES + " WHERE 
keyspace_name = ?", clientState, false).statementId);
+        stmtIds.add(prepareStatement("SELECT * FROM %s WHERE keyspace_name = 
?", SchemaConstants.SCHEMA_KEYSPACE_NAME, SchemaKeyspace.TABLES, clientState));
         // #1
-        stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + 
currentTable() + " WHERE pk = ?", clientState, false).statementId);
+        stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", 
clientState));
         // #2
-        stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = 
?", clientState, false).statementId);
+        stmtIds.add(prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", 
"bar", clientState));
         clientState.setKeyspace("foo");
         // #3
-        stmtIds.add(QueryProcessor.prepare("SELECT * FROM " + KEYSPACE + '.' + 
currentTable() + " WHERE pk = ?", clientState, false).statementId);
+        stmtIds.add(prepareStatement("SELECT * FROM %s WHERE pk = ?", 
clientState));
         // #4
-        stmtIds.add(QueryProcessor.prepare("SELECT * FROM foo.bar WHERE key = 
?", clientState, false).statementId);
+        stmtIds.add(prepareStatement("SELECT * FROM %S WHERE key = ?", "foo", 
"bar", clientState));
 
-        Assert.assertEquals(5, stmtIds.size());
-        Assert.assertEquals(5, QueryProcessor.preparedStatementsCount());
-
-        String queryAll = "SELECT * FROM " + 
SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS;
+        assertEquals(5, stmtIds.size());
+        assertEquals(5, QueryProcessor.preparedStatementsCount());
 
-        rows = QueryProcessor.executeOnceInternal(queryAll).size();
-        Assert.assertEquals(5, rows);
+        Assert.assertEquals(5, numberOfStatementsOnDisk());
 
         QueryHandler handler = ClientState.getCQLQueryHandler();
         validatePstmts(stmtIds, handler);
 
         // clear prepared statements cache
-        QueryProcessor.clearPrepraredStatements();
+        QueryProcessor.clearPreparedStatements(true);
         Assert.assertEquals(0, QueryProcessor.preparedStatementsCount());
         for (MD5Digest stmtId : stmtIds)
             Assert.assertNull(handler.getPrepared(stmtId));
@@ -88,7 +93,9 @@ public class PstmtPersistenceTest extends CQLTester
         QueryProcessor.preloadPreparedStatement();
         validatePstmts(stmtIds, handler);
 
+
         // validate that the prepared statements are in the system table
+        String queryAll = "SELECT * FROM " + 
SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + SystemKeyspace.PREPARED_STATEMENTS;
         for (UntypedResultSet.Row row : 
QueryProcessor.executeOnceInternal(queryAll))
         {
             MD5Digest digest = 
MD5Digest.wrap(ByteBufferUtil.getArray(row.getBytes("prepared_id")));
@@ -97,22 +104,19 @@ public class PstmtPersistenceTest extends CQLTester
         }
 
         // add anther prepared statement and sync it to table
-        QueryProcessor.prepare("SELECT * FROM bar WHERE key = ?", clientState, 
false);
-        Assert.assertEquals(6, QueryProcessor.preparedStatementsCount());
-        rows = QueryProcessor.executeOnceInternal(queryAll).size();
-        Assert.assertEquals(6, rows);
+        prepareStatement("SELECT * FROM %s WHERE key = ?", "foo", "bar", 
clientState);
+        assertEquals(6, numberOfStatementsInMemory());
+        assertEquals(6, numberOfStatementsOnDisk());
 
         // drop a keyspace (prepared statements are removed - 
syncPreparedStatements() remove should the rows, too)
         execute("DROP KEYSPACE foo");
-        Assert.assertEquals(3, QueryProcessor.preparedStatementsCount());
-        rows = QueryProcessor.executeOnceInternal(queryAll).size();
-        Assert.assertEquals(3, rows);
-
+        assertEquals(3, numberOfStatementsInMemory());
+        assertEquals(3, numberOfStatementsOnDisk());
     }
 
     private void validatePstmts(List<MD5Digest> stmtIds, QueryHandler handler)
     {
-        Assert.assertEquals(5, QueryProcessor.preparedStatementsCount());
+        assertEquals(5, QueryProcessor.preparedStatementsCount());
         QueryOptions optionsStr = 
QueryOptions.forInternalCalls(Collections.singletonList(UTF8Type.instance.fromString("foobar")));
         QueryOptions optionsInt = 
QueryOptions.forInternalCalls(Collections.singletonList(Int32Type.instance.decompose(42)));
         validatePstmt(handler, stmtIds.get(0), optionsStr);
@@ -125,7 +129,63 @@ public class PstmtPersistenceTest extends CQLTester
     private static void validatePstmt(QueryHandler handler, MD5Digest stmtId, 
QueryOptions options)
     {
         ParsedStatement.Prepared prepared = handler.getPrepared(stmtId);
-        Assert.assertNotNull(prepared);
+        assertNotNull(prepared);
         handler.processPrepared(prepared.statement, 
QueryState.forInternalCalls(), options, Collections.emptyMap(), 
System.nanoTime());
     }
+
+    @Test
+    public void testPstmtInvalidation() throws Throwable
+    {
+        ClientState clientState = ClientState.forInternalCalls();
+
+        createTable("CREATE TABLE %s (key int primary key, val int)");
+
+        for (int cnt = 1; cnt < 10000; cnt++)
+        {
+            prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) USING 
TIMESTAMP " + cnt, clientState);
+
+            if (numberOfEvictedStatements() > 0)
+            {
+                assertEquals("Number of statements in table and in cache don't 
match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+                // prepare a more statements to trigger more evictions
+                for (int cnt2 = 1; cnt2 < 10; cnt2++)
+                    prepareStatement("INSERT INTO %s (key, val) VALUES (?, ?) 
USING TIMESTAMP " + cnt2, clientState);
+
+                // each new prepared statement should have caused an eviction
+                assertEquals("eviction count didn't increase by the expected 
number", numberOfEvictedStatements(), 10);
+                assertEquals("Number of statements in table and in cache don't 
match", numberOfStatementsInMemory(), numberOfStatementsOnDisk());
+
+                return;
+            }
+        }
+
+        fail("Prepared statement eviction does not work");
+    }
+
+    private long numberOfStatementsOnDisk() throws Throwable
+    {
+        UntypedResultSet.Row row = execute("SELECT COUNT(*) FROM " + 
SchemaConstants.SYSTEM_KEYSPACE_NAME + '.' + 
SystemKeyspace.PREPARED_STATEMENTS).one();
+        return row.getLong("count");
+    }
+
+    private long numberOfStatementsInMemory()
+    {
+        return QueryProcessor.preparedStatementsCount();
+    }
+
+    private long numberOfEvictedStatements()
+    {
+        return QueryProcessor.metrics.preparedStatementsEvicted.getCount();
+    }
+
+    private MD5Digest prepareStatement(String stmt, ClientState clientState)
+    {
+        return prepareStatement(stmt, keyspace(), currentTable(), clientState);
+    }
+
+    private MD5Digest prepareStatement(String stmt, String keyspace, String 
table, ClientState clientState)
+    {
+        return QueryProcessor.prepare(String.format(stmt, keyspace + "." + 
table), clientState, false).statementId;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to