Repository: cassandra Updated Branches: refs/heads/trunk 503148b3d -> 5051c0f6e
Support Restricting non-PK Cols in MV Select Statements Patch by Jochen Niebuhr; reviewed by Tyler Hobbs for CASSANDRA-10368 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5051c0f6 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5051c0f6 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5051c0f6 Branch: refs/heads/trunk Commit: 5051c0f6eb3f984600600c9577d6b5ece9038c74 Parents: 503148b Author: Jochen Niebuhr <jnieb...@cfire.de> Authored: Tue Jul 26 13:09:35 2016 -0500 Committer: Tyler Hobbs <tylerlho...@gmail.com> Committed: Wed Jul 27 18:32:41 2016 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../cql3/statements/CreateViewStatement.java | 8 - .../cassandra/cql3/ViewFilteringTest.java | 320 +++++++++++++++++++ 3 files changed, 322 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 04b2a3f..7ad965e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.10 + * Support filtering on non-PRIMARY KEY columns in the CREATE + MATERIALIZED VIEW statement's WHERE clause (CASSANDRA-10368) * Unify STDOUT and SYSTEMLOG logback format (CASSANDRA-12004) * COPY FROM should raise error for non-existing input files (CASSANDRA-12174) * Faster write path (CASSANDRA-12269) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java index 5e14e72..8fe9f7e 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java @@ -203,14 +203,6 @@ public class CreateViewStatement extends SchemaAlteringStatement if (!prepared.boundNames.isEmpty()) throw new InvalidRequestException("Cannot use query parameters in CREATE MATERIALIZED VIEW statements"); - if (!restrictions.nonPKRestrictedColumns(false).isEmpty()) - { - throw new InvalidRequestException(String.format( - "Non-primary key columns cannot be restricted in the SELECT statement used for materialized view " + - "creation (got restrictions on: %s)", - restrictions.nonPKRestrictedColumns(false).stream().map(def -> def.name.toString()).collect(Collectors.joining(", ")))); - } - String whereClauseText = View.relationsToWhereClause(whereClause.relations); Set<ColumnIdentifier> basePrimaryKeyCols = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5051c0f6/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java b/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java index 245ceb7..12cb673 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java @@ -28,7 +28,15 @@ import org.junit.Test; import com.datastax.driver.core.exceptions.InvalidQueryException; import junit.framework.Assert; +import org.apache.cassandra.concurrent.SEPExecutor; +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.utils.FBUtilities; public class ViewFilteringTest extends CQLTester { @@ -61,6 +69,16 @@ public class ViewFilteringTest extends CQLTester views.add(name); } + private void updateView(String query, Object... params) throws Throwable + { + executeNet(protocolVersion, query, params); + while (!(((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getPendingTasks() == 0 + && ((SEPExecutor) StageManager.getStage(Stage.VIEW_MUTATION)).getActiveCount() == 0)) + { + Thread.sleep(1); + } + } + private void dropView(String name) throws Throwable { executeNet(protocolVersion, "DROP MATERIALIZED VIEW " + name); @@ -1303,4 +1321,306 @@ public class ViewFilteringTest extends CQLTester executeNet(protocolVersion, "ALTER TABLE %s RENAME inetval TO foo"); assert !execute("SELECT * FROM mv_test").isEmpty(); } + + @Test + public void testMVCreationWithNonPrimaryRestrictions() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + try { + createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE b IS NOT NULL AND c IS NOT NULL AND d = 1 PRIMARY KEY (a, b, c)"); + dropView("mv_test"); + } catch(Exception e) { + throw new RuntimeException("MV creation with non primary column restrictions failed.", e); + } + + dropTable("DROP TABLE %s"); + } + + @Test + public void testNonPrimaryRestrictions() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, PRIMARY KEY (a, b))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 0, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 0, 1, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 0, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 0, 1, 1, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 0, 0, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 0, 1, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 0, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 1, 1, 0); + + // only accept rows where c = 1 + createView("mv_test", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (a, b, c)"); + + while (!SystemKeyspace.isViewBuilt(keyspace(), "mv_test")) + Thread.sleep(10); + + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"), + row(0, 0, 1, 0), + row(0, 1, 1, 0), + row(1, 0, 1, 0), + row(1, 1, 1, 0) + ); + + // insert new rows that do not match the filter + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 2, 0, 0, 0); + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 2, 1, 2, 0); + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"), + row(0, 0, 1, 0), + row(0, 1, 1, 0), + row(1, 0, 1, 0), + row(1, 1, 1, 0) + ); + + // insert new row that does match the filter + execute("INSERT INTO %s (a, b, c, d) VALUES (?, ?, ?, ?)", 1, 2, 1, 0); + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"), + row(0, 0, 1, 0), + row(0, 1, 1, 0), + row(1, 0, 1, 0), + row(1, 1, 1, 0), + row(1, 2, 1, 0) + ); + + // update rows that don't match the filter + execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 2, 2, 0); + execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 1, 2, 1); + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"), + row(0, 0, 1, 0), + row(0, 1, 1, 0), + row(1, 0, 1, 0), + row(1, 1, 1, 0), + row(1, 2, 1, 0) + ); + + // update a row that does match the filter + execute("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 1, 1, 0); + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"), + row(0, 0, 1, 0), + row(0, 1, 1, 0), + row(1, 0, 1, 1), + row(1, 1, 1, 0), + row(1, 2, 1, 0) + ); + + // delete rows that don't match the filter + execute("DELETE FROM %s WHERE a = ? AND b = ?", 2, 0); + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"), + row(0, 0, 1, 0), + row(0, 1, 1, 0), + row(1, 0, 1, 1), + row(1, 1, 1, 0), + row(1, 2, 1, 0) + ); + + // delete a row that does match the filter + execute("DELETE FROM %s WHERE a = ? AND b = ?", 1, 2); + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"), + row(0, 0, 1, 0), + row(0, 1, 1, 0), + row(1, 0, 1, 1), + row(1, 1, 1, 0) + ); + + // delete a partition that matches the filter + execute("DELETE FROM %s WHERE a = ?", 1); + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test"), + row(0, 0, 1, 0), + row(0, 1, 1, 0) + ); + + dropView("mv_test"); + dropTable("DROP TABLE %s"); + } + + @Test + public void complexRestrictedTimestampUpdateTestWithFlush() throws Throwable + { + complexRestrictedTimestampUpdateTest(true); + } + + @Test + public void complexRestrictedTimestampUpdateTestWithoutFlush() throws Throwable + { + complexRestrictedTimestampUpdateTest(false); + } + + public void complexRestrictedTimestampUpdateTest(boolean flush) throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, d int, e int, PRIMARY KEY (a, b))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + Keyspace ks = Keyspace.open(keyspace()); + + createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (c, a, b)"); + ks.getColumnFamilyStore("mv").disableAutoCompaction(); + + //Set initial values TS=0, matching the restriction and verify view + executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d) VALUES (0, 0, 1, 0) USING TIMESTAMP 0"); + assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0)); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + //update c's timestamp TS=2 + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? and b = ? ", 1, 0, 0); + assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0)); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + //change c's value and TS=3, tombstones c=1 and adds c=0 record + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? and b = ? ", 0, 0, 0); + assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 0, 0, 0)); + + if(flush) + { + ks.getColumnFamilyStore("mv").forceMajorCompaction(); + FBUtilities.waitOnFutures(ks.flush()); + } + + //change c's value back to 1 with TS=4, check we can see d + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE a = ? and b = ? ", 1, 0, 0); + if (flush) + { + ks.getColumnFamilyStore("mv").forceMajorCompaction(); + FBUtilities.waitOnFutures(ks.flush()); + } + + assertRows(execute("SELECT d, e from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0, null)); + + + //Add e value @ TS=1 + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 1 SET e = ? WHERE a = ? and b = ? ", 1, 0, 0); + assertRows(execute("SELECT d, e from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(0, 1)); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + + //Change d value @ TS=2 + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET d = ? WHERE a = ? and b = ? ", 2, 0, 0); + assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(2)); + + if (flush) + FBUtilities.waitOnFutures(ks.flush()); + + + //Change d value @ TS=3 + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? and b = ? ", 1, 0, 0); + assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row(1)); + + + //Tombstone c + executeNet(protocolVersion, "DELETE FROM %s WHERE a = ? and b = ?", 0, 0); + assertRows(execute("SELECT d from mv")); + + //Add back without D + executeNet(protocolVersion, "INSERT INTO %s (a, b, c) VALUES (0, 0, 1)"); + + //Make sure D doesn't pop back in. + assertRows(execute("SELECT d from mv WHERE c = ? and a = ? and b = ?", 1, 0, 0), row((Object) null)); + + + //New partition + // insert a row with timestamp 0 + executeNet(protocolVersion, "INSERT INTO %s (a, b, c, d, e) VALUES (?, ?, ?, ?, ?) USING TIMESTAMP 0", 1, 0, 1, 0, 0); + + // overwrite pk and e with timestamp 1, but don't overwrite d + executeNet(protocolVersion, "INSERT INTO %s (a, b, c, e) VALUES (?, ?, ?, ?) USING TIMESTAMP 1", 1, 0, 1, 0); + + // delete with timestamp 0 (which should only delete d) + executeNet(protocolVersion, "DELETE FROM %s USING TIMESTAMP 0 WHERE a = ? AND b = ?", 1, 0); + assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0), + row(1, 0, 1, null, 0) + ); + + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 2 SET c = ? WHERE a = ? AND b = ?", 1, 1, 1); + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE a = ? AND b = ?", 1, 1, 0); + assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0), + row(1, 0, 1, null, 0) + ); + + executeNet(protocolVersion, "UPDATE %s USING TIMESTAMP 3 SET d = ? WHERE a = ? AND b = ?", 0, 1, 0); + assertRows(execute("SELECT a, b, c, d, e from mv WHERE c = ? and a = ? and b = ?", 1, 1, 0), + row(1, 0, 1, 0, 0) + ); + } + + @Test + public void testRestrictedRegularColumnTimestampUpdates() throws Throwable + { + // Regression test for CASSANDRA-10910 + + createTable("CREATE TABLE %s (" + + "k int PRIMARY KEY, " + + "c int, " + + "val int)"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + createView("mv_rctstest", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE k IS NOT NULL AND c IS NOT NULL AND c = 1 PRIMARY KEY (k,c)"); + + updateView("UPDATE %s SET c = ?, val = ? WHERE k = ?", 0, 0, 0); + updateView("UPDATE %s SET val = ? WHERE k = ?", 1, 0); + updateView("UPDATE %s SET c = ? WHERE k = ?", 1, 0); + assertRows(execute("SELECT c, k, val FROM mv_rctstest"), row(1, 0, 1)); + + updateView("TRUNCATE %s"); + + updateView("UPDATE %s USING TIMESTAMP 1 SET c = ?, val = ? WHERE k = ?", 0, 0, 0); + updateView("UPDATE %s USING TIMESTAMP 3 SET c = ? WHERE k = ?", 1, 0); + updateView("UPDATE %s USING TIMESTAMP 2 SET val = ? WHERE k = ?", 1, 0); + updateView("UPDATE %s USING TIMESTAMP 4 SET c = ? WHERE k = ?", 1, 0); + updateView("UPDATE %s USING TIMESTAMP 3 SET val = ? WHERE k = ?", 2, 0); + assertRows(execute("SELECT c, k, val FROM mv_rctstest"), row(1, 0, 2)); + } + + @Test + public void testOldTimestampsWithRestrictions() throws Throwable + { + createTable("CREATE TABLE %s (" + + "k int, " + + "c int, " + + "val text, " + "" + + "PRIMARY KEY(k, c))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + createView("mv_tstest", "CREATE MATERIALIZED VIEW %s AS SELECT * FROM %%s WHERE val IS NOT NULL AND k IS NOT NULL AND c IS NOT NULL AND val = 'baz' PRIMARY KEY (val,k,c)"); + + for (int i = 0; i < 100; i++) + updateView("INSERT into %s (k,c,val)VALUES(?,?,?)", 0, i % 2, "baz"); + + Keyspace.open(keyspace()).getColumnFamilyStore(currentTable()).forceBlockingFlush(); + + Assert.assertEquals(2, execute("select * from %s").size()); + Assert.assertEquals(2, execute("select * from mv_tstest").size()); + + assertRows(execute("SELECT val from %s where k = 0 and c = 0"), row("baz")); + assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0), row(1)); + + //Make sure an old TS does nothing + updateView("UPDATE %s USING TIMESTAMP 100 SET val = ? where k = ? AND c = ?", "bar", 0, 1); + assertRows(execute("SELECT val from %s where k = 0 and c = 1"), row("baz")); + assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0), row(1)); + assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "bar")); + + //Latest TS + updateView("UPDATE %s SET val = ? where k = ? AND c = ?", "bar", 0, 1); + assertRows(execute("SELECT val from %s where k = 0 and c = 1"), row("bar")); + assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "bar")); + assertRows(execute("SELECT c from mv_tstest where k = 0 and val = ?", "baz"), row(0)); + } }