Range deletes in a CAS batch are ignored Patch by Jeff Jirsa; Reviewed by Jay Zhuang, Sylvain Lebresne for CASSANDRA-13655
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/433f24cb Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/433f24cb Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/433f24cb Branch: refs/heads/trunk Commit: 433f24cb04dbcf74029a918ee73155f78d5f8111 Parents: ae88fd6 Author: Jeff Jirsa <jji...@apple.com> Authored: Mon Sep 11 09:35:01 2017 -0700 Committer: Jeff Jirsa <jji...@apple.com> Committed: Mon Sep 11 09:35:01 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/BatchStatement.java | 36 ++++-- .../cql3/statements/CQL3CasRequest.java | 31 ++++++ .../cql3/statements/ModificationStatement.java | 15 ++- .../org/apache/cassandra/cql3/BatchTests.java | 51 +++++++-- .../cql3/validation/operations/BatchTest.java | 111 +++++++++++++++++++ 6 files changed, 219 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index f4360be..76d155e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Range deletes in a CAS batch are ignored (CASSANDRA-13655) * Change repair midpoint logging for tiny ranges (CASSANDRA-13603) * Better handle corrupt final commitlog segment (CASSANDRA-11995) * StreamingHistogram is not thread safe (CASSANDRA-13756) http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 76a6460..cd9358c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -417,18 +417,36 @@ public class BatchStatement implements CQLStatement "IN on the clustering key columns is not supported with conditional %s", statement.type.isUpdate()? "updates" : "deletions"); - Clustering clustering = Iterables.getOnlyElement(statement.createClustering(statementOptions)); + if (statement.hasSlices()) + { + // All of the conditions require meaningful Clustering, not Slices + assert !statement.hasConditions(); + + Slices slices = statement.createSlices(statementOptions); + // If all the ranges were invalid we do not need to do anything. + if (slices.isEmpty()) + continue; + + for (Slice slice : slices) + { + casRequest.addRangeDeletion(slice, statement, statementOptions, timestamp); + } - if (statement.hasConditions()) + } + else { - statement.addConditions(clustering, casRequest, statementOptions); - // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet - if (statement.hasIfNotExistCondition() || statement.hasIfExistCondition()) - columnsWithConditions = null; - else if (columnsWithConditions != null) - Iterables.addAll(columnsWithConditions, statement.getColumnsWithConditions()); + Clustering clustering = Iterables.getOnlyElement(statement.createClustering(statementOptions)); + if (statement.hasConditions()) + { + statement.addConditions(clustering, casRequest, statementOptions); + // As soon as we have a ifNotExists, we set columnsWithConditions to null so that everything is in the resultSet + if (statement.hasIfNotExistCondition() || statement.hasIfExistCondition()) + columnsWithConditions = null; + else if (columnsWithConditions != null) + Iterables.addAll(columnsWithConditions, statement.getColumnsWithConditions()); + } + casRequest.addRowUpdate(clustering, statement, statementOptions, timestamp); } - casRequest.addRowUpdate(clustering, statement, statementOptions, timestamp); } return Pair.create(casRequest, columnsWithConditions); http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index e226a2a..e14ae6c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -56,6 +56,7 @@ public class CQL3CasRequest implements CASRequest private final TreeMap<Clustering, RowCondition> conditions; private final List<RowUpdate> updates = new ArrayList<>(); + private final List<RangeDeletion> rangeDeletions = new ArrayList<>(); public CQL3CasRequest(CFMetaData cfm, DecoratedKey key, @@ -78,6 +79,11 @@ public class CQL3CasRequest implements CASRequest updates.add(new RowUpdate(clustering, stmt, options, timestamp)); } + public void addRangeDeletion(Slice slice, ModificationStatement stmt, QueryOptions options, long timestamp) + { + rangeDeletions.add(new RangeDeletion(slice, stmt, options, timestamp)); + } + public void addNotExist(Clustering clustering) throws InvalidRequestException { addExistsCondition(clustering, new NotExistCondition(clustering), true); @@ -226,6 +232,8 @@ public class CQL3CasRequest implements CASRequest PartitionUpdate update = new PartitionUpdate(cfm, key, updatedColumns(), conditions.size()); for (RowUpdate upd : updates) upd.applyUpdates(current, update); + for (RangeDeletion upd : rangeDeletions) + upd.applyUpdates(current, update); Keyspace.openAndGetStore(cfm).indexManager.validate(update); @@ -264,6 +272,29 @@ public class CQL3CasRequest implements CASRequest } } + private class RangeDeletion + { + private final Slice slice; + private final ModificationStatement stmt; + private final QueryOptions options; + private final long timestamp; + + private RangeDeletion(Slice slice, ModificationStatement stmt, QueryOptions options, long timestamp) + { + this.slice = slice; + this.stmt = stmt; + this.options = options; + this.timestamp = timestamp; + } + + public void applyUpdates(FilteredPartition current, PartitionUpdate updates) throws InvalidRequestException + { + Map<DecoratedKey, Partition> map = stmt.requiresRead() ? Collections.<DecoratedKey, Partition>singletonMap(key, current) : null; + UpdateParameters params = new UpdateParameters(cfm, updates.columns(), options, timestamp, stmt.getTimeToLive(options), map); + stmt.addUpdateForKey(updates, slice, params); + } + } + private static abstract class RowCondition { public final Clustering clustering; http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 1722f02..0afd34d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -402,6 +402,13 @@ public abstract class ModificationStatement implements CQLStatement return !conditions.isEmpty(); } + public boolean hasSlices() + { + return type.allowClusteringColumnSlices() + && getRestrictions().hasClusteringColumnsRestriction() + && getRestrictions().isColumnRange(); + } + public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException { @@ -626,11 +633,9 @@ public abstract class ModificationStatement implements CQLStatement { List<ByteBuffer> keys = buildPartitionKeyNames(options); - if (type.allowClusteringColumnSlices() - && restrictions.hasClusteringColumnsRestriction() - && restrictions.isColumnRange()) + if (hasSlices()) { - Slices slices = createSlice(options); + Slices slices = createSlices(options); // If all the ranges were invalid we do not need to do anything. if (slices.isEmpty()) @@ -693,7 +698,7 @@ public abstract class ModificationStatement implements CQLStatement } } - private Slices createSlice(QueryOptions options) + Slices createSlices(QueryOptions options) { SortedSet<Slice.Bound> startBounds = restrictions.getClusteringColumnsBounds(Bound.START, options); SortedSet<Slice.Bound> endBounds = restrictions.getClusteringColumnsBounds(Bound.END, options); http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/test/unit/org/apache/cassandra/cql3/BatchTests.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/BatchTests.java b/test/unit/org/apache/cassandra/cql3/BatchTests.java index 73923fb..260db4e 100644 --- a/test/unit/org/apache/cassandra/cql3/BatchTests.java +++ b/test/unit/org/apache/cassandra/cql3/BatchTests.java @@ -30,7 +30,7 @@ import org.junit.Test; import java.io.IOException; -public class BatchTests +public class BatchTests extends CQLTester { private static EmbeddedCassandraService cassandra; @@ -39,6 +39,7 @@ public class BatchTests private static PreparedStatement counter; private static PreparedStatement noncounter; + private static PreparedStatement clustering; @BeforeClass() public static void setup() throws ConfigurationException, IOException @@ -59,58 +60,79 @@ public class BatchTests " id int PRIMARY KEY,\n" + " val counter,\n" + ");"); + session.execute("CREATE TABLE junit.clustering (\n" + + " id int,\n" + + " clustering1 int,\n" + + " clustering2 int,\n" + + " clustering3 int,\n" + + " val text, \n" + + " PRIMARY KEY(id, clustering1, clustering2, clustering3)" + + ");"); noncounter = session.prepare("insert into junit.noncounter(id, val)values(?,?)"); counter = session.prepare("update junit.counter set val = val + ? where id = ?"); + clustering = session.prepare("insert into junit.clustering(id, clustering1, clustering2, clustering3, val) values(?,?,?,?,?)"); } @Test(expected = InvalidQueryException.class) public void testMixedInCounterBatch() { - sendBatch(BatchStatement.Type.COUNTER, true, true); + sendBatch(BatchStatement.Type.COUNTER, true, true, false); } @Test(expected = InvalidQueryException.class) public void testMixedInLoggedBatch() { - sendBatch(BatchStatement.Type.LOGGED, true, true); + sendBatch(BatchStatement.Type.LOGGED, true, true, false); } @Test(expected = InvalidQueryException.class) public void testMixedInUnLoggedBatch() { - sendBatch(BatchStatement.Type.UNLOGGED, true, true); + sendBatch(BatchStatement.Type.UNLOGGED, true, true, false); } @Test(expected = InvalidQueryException.class) public void testNonCounterInCounterBatch() { - sendBatch(BatchStatement.Type.COUNTER, false, true); + sendBatch(BatchStatement.Type.COUNTER, false, true, false); } @Test public void testNonCounterInLoggedBatch() { - sendBatch(BatchStatement.Type.LOGGED, false, true); + sendBatch(BatchStatement.Type.LOGGED, false, true, false); } @Test public void testNonCounterInUnLoggedBatch() { - sendBatch(BatchStatement.Type.UNLOGGED, false, true); + sendBatch(BatchStatement.Type.UNLOGGED, false, true, false); } @Test public void testCounterInCounterBatch() { - sendBatch(BatchStatement.Type.COUNTER, true, false); + sendBatch(BatchStatement.Type.COUNTER, true, false, false); } @Test public void testCounterInUnLoggedBatch() { - sendBatch(BatchStatement.Type.UNLOGGED, true, false); + sendBatch(BatchStatement.Type.UNLOGGED, true, false, false); + } + + @Test + public void testTableWithClusteringInLoggedBatch() + { + sendBatch(BatchStatement.Type.LOGGED, false, false, true); + } + + @Test + public void testTableWithClusteringInUnLoggedBatch() + { + sendBatch(BatchStatement.Type.UNLOGGED, false, false, true); } @Test @@ -123,7 +145,7 @@ public class BatchTests @Test(expected = InvalidQueryException.class) public void testCounterInLoggedBatch() { - sendBatch(BatchStatement.Type.LOGGED, true, false); + sendBatch(BatchStatement.Type.LOGGED, true, false, false); } @Test(expected = InvalidQueryException.class) @@ -138,10 +160,10 @@ public class BatchTests session.execute(b); } - public void sendBatch(BatchStatement.Type type, boolean addCounter, boolean addNonCounter) + public void sendBatch(BatchStatement.Type type, boolean addCounter, boolean addNonCounter, boolean addClustering) { - assert addCounter || addNonCounter; + assert addCounter || addNonCounter || addClustering; BatchStatement b = new BatchStatement(type); for (int i = 0; i < 10; i++) @@ -151,6 +173,11 @@ public class BatchTests if (addCounter) b.add(counter.bind((long)i, i)); + + if (addClustering) + { + b.add(clustering.bind(i, i, i, i, "foo")); + } } session.execute(b); http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java index e8f169d..87d0cde 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java @@ -239,4 +239,115 @@ public class BatchTest extends CQLTester row(1,2,2), row(1,3,3)); } + + @Test + public void testBatchAndConditionalInteraction() throws Throwable + { + + createTable(String.format("CREATE TABLE %s.clustering (\n" + + " id int,\n" + + " clustering1 int,\n" + + " clustering2 int,\n" + + " clustering3 int,\n" + + " val int, \n" + + " PRIMARY KEY(id, clustering1, clustering2, clustering3)" + + ")", KEYSPACE)); + + execute("DELETE FROM " + KEYSPACE +".clustering WHERE id=1"); + + String clusteringInsert = "INSERT INTO " + KEYSPACE + ".clustering(id, clustering1, clustering2, clustering3, val) VALUES(%s, %s, %s, %s, %s); "; + String clusteringUpdate = "UPDATE " + KEYSPACE + ".clustering SET val=%s WHERE id=%s AND clustering1=%s AND clustering2=%s AND clustering3=%s ;"; + String clusteringConditionalUpdate = "UPDATE " + KEYSPACE + ".clustering SET val=%s WHERE id=%s AND clustering1=%s AND clustering2=%s AND clustering3=%s IF val=%s ;"; + String clusteringDelete = "DELETE FROM " + KEYSPACE + ".clustering WHERE id=%s AND clustering1=%s AND clustering2=%s AND clustering3=%s ;"; + String clusteringRangeDelete = "DELETE FROM " + KEYSPACE + ".clustering WHERE id=%s AND clustering1=%s ;"; + + + execute("BEGIN BATCH " + String.format(clusteringInsert, 1, 1, 1, 1, 1) + " APPLY BATCH"); + + assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1"), row(1, 1, 1, 1, 1)); + + StringBuilder cmd2 = new StringBuilder(); + cmd2.append("BEGIN BATCH "); + cmd2.append(String.format(clusteringInsert, 1, 1, 1, 2, 2)); + cmd2.append(String.format(clusteringConditionalUpdate, 11, 1, 1, 1, 1, 1)); + cmd2.append("APPLY BATCH "); + execute(cmd2.toString()); + + + assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1"), + row(1, 1, 1, 1, 11), + row(1, 1, 1, 2, 2) + ); + + + StringBuilder cmd3 = new StringBuilder(); + cmd3.append("BEGIN BATCH "); + cmd3.append(String.format(clusteringInsert, 1, 1, 2, 3, 23)); + cmd3.append(String.format(clusteringConditionalUpdate, 22, 1, 1, 1, 2, 2)); + cmd3.append(String.format(clusteringDelete, 1, 1, 1, 1)); + cmd3.append("APPLY BATCH "); + execute(cmd3.toString()); + + assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1"), + row(1, 1, 1, 2, 22), + row(1, 1, 2, 3, 23) + ); + + StringBuilder cmd4 = new StringBuilder(); + cmd4.append("BEGIN BATCH "); + cmd4.append(String.format(clusteringInsert, 1, 2, 3, 4, 1234)); + cmd4.append(String.format(clusteringConditionalUpdate, 234, 1, 1, 1, 2, 22)); + cmd4.append("APPLY BATCH "); + execute(cmd4.toString()); + + System.out.println(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1")); + assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1"), + row(1, 1, 1, 2, 234), + row(1, 1, 2, 3, 23), + row(1, 2, 3, 4, 1234) + ); + + StringBuilder cmd5 = new StringBuilder(); + cmd5.append("BEGIN BATCH "); + cmd5.append(String.format(clusteringRangeDelete, 1, 2)); + cmd5.append(String.format(clusteringConditionalUpdate, 1234, 1, 1, 1, 2, 234)); + cmd5.append("APPLY BATCH "); + execute(cmd5.toString()); + + System.out.println(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1")); + assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1"), + row(1, 1, 1, 2, 1234), + row(1, 1, 2, 3, 23) + ); + + StringBuilder cmd6 = new StringBuilder(); + cmd6.append("BEGIN BATCH "); + cmd6.append(String.format(clusteringUpdate, 345, 1, 3, 4, 5)); + cmd6.append(String.format(clusteringConditionalUpdate, 1, 1, 1, 1, 2, 1234)); + cmd6.append("APPLY BATCH "); + execute(cmd6.toString()); + + System.out.println(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1")); + assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1"), + row(1, 1, 1, 2, 1), + row(1, 1, 2, 3, 23), + row(1, 3, 4, 5, 345) + ); + + + StringBuilder cmd7 = new StringBuilder(); + cmd7.append("BEGIN BATCH "); + cmd7.append(String.format(clusteringDelete, 1, 3, 4, 5)); + cmd7.append(String.format(clusteringConditionalUpdate, 2300, 1, 1, 2, 3, 1)); // SHOULD NOT MATCH + cmd7.append("APPLY BATCH "); + execute(cmd7.toString()); + + System.out.println(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1")); + assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE id=1"), + row(1, 1, 1, 2, 1), + row(1, 1, 2, 3, 23), + row(1, 3, 4, 5, 345) + ); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org