Repository: cassandra Updated Branches: refs/heads/trunk 735bd65d2 -> be01d8eb8
Fix AssertionError while flushing memtable due to materialized views incorrectly inserting empty rows Patch by thobbs; reviewed by tjake for CASSANDRA-10614 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2998bb3c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2998bb3c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2998bb3c Branch: refs/heads/trunk Commit: 2998bb3cc45cf5e7225d0cb1ee7cfbd13e3edb57 Parents: 7fd2eaa Author: Tyler Hobbs <[email protected]> Authored: Wed Oct 28 14:33:10 2015 -0500 Committer: T Jake Luciani <[email protected]> Committed: Fri Nov 6 14:32:51 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/cassandra/db/view/TemporalRow.java | 31 +++++++++++++++ src/java/org/apache/cassandra/db/view/View.java | 24 ++++++++--- .../cassandra/cql3/ViewFilteringTest.java | 14 +++++++ .../org/apache/cassandra/cql3/ViewTest.java | 42 ++++++++++++++++++++ 5 files changed, 108 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2998bb3c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 53e1e4a..3662e6e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,6 @@ 3.0 + * Fix AssertionError while flushing memtable due to materialized views + incorrectly inserting empty rows (CASSANDRA-10614) * Store UDA initcond as CQL literal in the schema table, instead of a blob (CASSANDRA-10650) * Don't use -1 for the position of partition key in schema (CASSANDRA-10491) * Fix distinct queries in mixed version cluster (CASSANDRA-10573) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2998bb3c/src/java/org/apache/cassandra/db/view/TemporalRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java index d876b00..01da6e7 100644 --- a/src/java/org/apache/cassandra/db/view/TemporalRow.java +++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java @@ -243,6 +243,16 @@ public class TemporalRow row.addColumnValue(column, path, newCell.timestamp, newCell.ttl, newCell.localDeletionTime, newCell.value, newCell.isNew); } + + @Override + public String toString() + { + return MoreObjects.toStringHelper(this) + .add("numSet", numSet) + .add("existingCell", existingCell) + .add("newCell", newCell) + .toString(); + } } } @@ -286,6 +296,22 @@ public class TemporalRow } @Override + public String toString() + { + return MoreObjects.toStringHelper(this) + .add("table", baseCfs.keyspace.getName() + "." + baseCfs.getTableName()) + .add("basePartitionKey", ByteBufferUtil.bytesToHex(basePartitionKey)) + .add("startRow", startRow.toString(baseCfs.metadata)) + .add("startIsNew", startIsNew) + .add("nowInSec", nowInSec) + .add("viewClusteringTtl", viewClusteringTtl) + .add("viewClusteringTimestamp", viewClusteringTimestamp) + .add("viewClusteringLocalDeletionTime", viewClusteringLocalDeletionTime) + .add("columnValues", columnValues) + .toString(); + } + + @Override public boolean equals(Object o) { if (this == o) return true; @@ -459,6 +485,11 @@ public class TemporalRow return builder; } + public Clustering baseClustering() + { + return startRow.clustering(); + } + static class Set implements Iterable<TemporalRow> { private final ColumnFamilyStore baseCfs; http://git-wip-us.apache.org/repos/asf/cassandra/blob/2998bb3c/src/java/org/apache/cassandra/db/view/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/View.java b/src/java/org/apache/cassandra/db/view/View.java index 9d5f472..9388288 100644 --- a/src/java/org/apache/cassandra/db/view/View.java +++ b/src/java/org/apache/cassandra/db/view/View.java @@ -85,7 +85,7 @@ public class View private Columns columns; - private final boolean viewHasAllPrimaryKeys; + private final boolean viewPKIncludesOnlyBasePKColumns; private final boolean includeAllColumns; private ViewBuilder builder; @@ -104,7 +104,7 @@ public class View name = definition.viewName; includeAllColumns = definition.includeAllColumns; - viewHasAllPrimaryKeys = updateDefinition(definition); + viewPKIncludesOnlyBasePKColumns = updateDefinition(definition); this.rawSelect = definition.select; } @@ -210,7 +210,7 @@ public class View if (!selectQuery.selectsClustering(partition.partitionKey(), row.clustering())) continue; - if (includeAllColumns || viewHasAllPrimaryKeys || !row.deletion().isLive()) + if (includeAllColumns || !row.deletion().isLive()) return true; if (row.primaryKeyLivenessInfo().isLive(FBUtilities.nowInSeconds())) @@ -313,7 +313,7 @@ public class View private PartitionUpdate createRangeTombstoneForRow(TemporalRow temporalRow) { // Primary Key and Clustering columns do not generate tombstones - if (viewHasAllPrimaryKeys) + if (viewPKIncludesOnlyBasePKColumns) return null; boolean hasUpdate = false; @@ -375,7 +375,14 @@ public class View } } - return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, regularBuilder.build()); + Row row = regularBuilder.build(); + + // although we check for empty rows in updateAppliesToView(), if there are any good rows in the PartitionUpdate, + // all rows in the partition will be processed, and we need to handle empty/non-live rows here (CASSANDRA-10614) + if (row.isEmpty()) + return null; + + return PartitionUpdate.singleRowUpdate(viewCfm, partitionKey, row); } /** @@ -650,9 +657,16 @@ public class View if (!updateAffectsView(partition)) return null; + ReadQuery selectQuery = getReadQuery(); Collection<Mutation> mutations = null; for (TemporalRow temporalRow : rowSet) { + // In updateAffectsView, we check the partition to see if there is at least one row that matches the + // filters and is live. If there is more than one row in the partition, we need to re-check each one + // invididually. + if (partition.rowCount() != 1 && !selectQuery.selectsClustering(partition.partitionKey(), temporalRow.baseClustering())) + continue; + // If we are building, there is no need to check for partition tombstones; those values will not be present // in the partition data if (!isBuilding) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2998bb3c/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java b/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java index 2d789c3..245ceb7 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewFilteringTest.java @@ -1078,6 +1078,20 @@ public class ViewFilteringTest extends CQLTester row(0, 1, 1, 0) ); + // insert a partition with one matching and one non-matching row using a batch (CASSANDRA-10614) + String tableName = KEYSPACE + "." + currentTable(); + execute("BEGIN BATCH " + + "INSERT INTO " + tableName + " (a, b, c, d) VALUES (?, ?, ?, ?); " + + "INSERT INTO " + tableName + " (a, b, c, d) VALUES (?, ?, ?, ?); " + + "APPLY BATCH", + 4, 4, 0, 0, + 4, 4, 1, 1); + assertRowsIgnoringOrder(execute("SELECT a, b, c, d FROM mv_test" + i), + row(0, 0, 1, 0), + row(0, 1, 1, 0), + row(4, 4, 1, 1) + ); + dropView("mv_test" + i); dropTable("DROP TABLE %s"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2998bb3c/test/unit/org/apache/cassandra/cql3/ViewTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/ViewTest.java b/test/unit/org/apache/cassandra/cql3/ViewTest.java index 2353167..ff1ba2f 100644 --- a/test/unit/org/apache/cassandra/cql3/ViewTest.java +++ b/test/unit/org/apache/cassandra/cql3/ViewTest.java @@ -38,6 +38,7 @@ import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.Schema; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.serializers.SimpleDateSerializer; @@ -811,6 +812,47 @@ public class ViewTest extends CQLTester } @Test + public void testIgnoreUpdate() throws Throwable + { + // regression test for CASSANDRA-10614 + + createTable("CREATE TABLE %s (" + + "a int, " + + "b int, " + + "c int, " + + "d int, " + + "PRIMARY KEY (a, b))"); + + execute("USE " + keyspace()); + executeNet(protocolVersion, "USE " + keyspace()); + + createView("mv", "CREATE MATERIALIZED VIEW %s AS SELECT a, b, c FROM %%s WHERE a IS NOT NULL AND b IS NOT NULL PRIMARY KEY (b, a)"); + + updateView("INSERT INTO %s (a, b, c) VALUES (?, ?, ?)", 0, 0, 0); + assertRows(execute("SELECT a, b, c from mv WHERE b = ?", 0), row(0, 0, 0)); + + updateView("UPDATE %s SET d = ? WHERE a = ? AND b = ?", 0, 0, 0); + assertRows(execute("SELECT a, b, c from mv WHERE b = ?", 0), row(0, 0, 0)); + + // Note: errors here may result in the test hanging when the memtables are flushed as part of the table drop, + // because empty rows in the memtable will cause the flush to fail. This will result in a test timeout that + // should not be ignored. + String table = KEYSPACE + "." + currentTable(); + updateView("BEGIN BATCH " + + "INSERT INTO " + table + " (a, b, c, d) VALUES (?, ?, ?, ?); " + // should be accepted + "UPDATE " + table + " SET d = ? WHERE a = ? AND b = ?; " + // should be ignored + "APPLY BATCH", + 0, 0, 0, 0, + 1, 0, 1); + assertRows(execute("SELECT a, b, c from mv WHERE b = ?", 0), row(0, 0, 0)); + assertEmpty(execute("SELECT a, b, c from mv WHERE b = ?", 1)); + + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore("mv"); + cfs.forceBlockingFlush(); + Assert.assertEquals(1, cfs.getLiveSSTables().size()); + } + + @Test public void testTwoTablesOneView() throws Throwable { execute("USE " + keyspace());
