Range deletes in a CAS batch are ignored

Patch by Jeff Jirsa; Reviewed by Jay Zhuang, Sylvain Lebresne for 
CASSANDRA-13655


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/433f24cb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/433f24cb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/433f24cb

Branch: refs/heads/trunk
Commit: 433f24cb04dbcf74029a918ee73155f78d5f8111
Parents: ae88fd6
Author: Jeff Jirsa <jji...@apple.com>
Authored: Mon Sep 11 09:35:01 2017 -0700
Committer: Jeff Jirsa <jji...@apple.com>
Committed: Mon Sep 11 09:35:01 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cql3/statements/BatchStatement.java         |  36 ++++--
 .../cql3/statements/CQL3CasRequest.java         |  31 ++++++
 .../cql3/statements/ModificationStatement.java  |  15 ++-
 .../org/apache/cassandra/cql3/BatchTests.java   |  51 +++++++--
 .../cql3/validation/operations/BatchTest.java   | 111 +++++++++++++++++++
 6 files changed, 219 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index f4360be..76d155e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.15
+ * Range deletes in a CAS batch are ignored (CASSANDRA-13655)
  * Change repair midpoint logging for tiny ranges (CASSANDRA-13603)
  * Better handle corrupt final commitlog segment (CASSANDRA-11995)
  * StreamingHistogram is not thread safe (CASSANDRA-13756)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 76a6460..cd9358c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -417,18 +417,36 @@ public class BatchStatement implements CQLStatement
                        "IN on the clustering key columns is not supported with 
conditional %s",
                        statement.type.isUpdate()? "updates" : "deletions");
 
-            Clustering clustering = 
Iterables.getOnlyElement(statement.createClustering(statementOptions));
+            if (statement.hasSlices())
+            {
+                // All of the conditions require meaningful Clustering, not 
Slices
+                assert !statement.hasConditions();
+
+                Slices slices = statement.createSlices(statementOptions);
+                // If all the ranges were invalid we do not need to do 
anything.
+                if (slices.isEmpty())
+                    continue;
+
+                for (Slice slice : slices)
+                {
+                    casRequest.addRangeDeletion(slice, statement, 
statementOptions, timestamp);
+                }
 
-            if (statement.hasConditions())
+            }
+            else
             {
-                statement.addConditions(clustering, casRequest, 
statementOptions);
-                // As soon as we have a ifNotExists, we set 
columnsWithConditions to null so that everything is in the resultSet
-                if (statement.hasIfNotExistCondition() || 
statement.hasIfExistCondition())
-                    columnsWithConditions = null;
-                else if (columnsWithConditions != null)
-                    Iterables.addAll(columnsWithConditions, 
statement.getColumnsWithConditions());
+                Clustering clustering = 
Iterables.getOnlyElement(statement.createClustering(statementOptions));
+                if (statement.hasConditions())
+                {
+                    statement.addConditions(clustering, casRequest, 
statementOptions);
+                    // As soon as we have a ifNotExists, we set 
columnsWithConditions to null so that everything is in the resultSet
+                    if (statement.hasIfNotExistCondition() || 
statement.hasIfExistCondition())
+                        columnsWithConditions = null;
+                    else if (columnsWithConditions != null)
+                        Iterables.addAll(columnsWithConditions, 
statement.getColumnsWithConditions());
+                }
+                casRequest.addRowUpdate(clustering, statement, 
statementOptions, timestamp);
             }
-            casRequest.addRowUpdate(clustering, statement, statementOptions, 
timestamp);
         }
 
         return Pair.create(casRequest, columnsWithConditions);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index e226a2a..e14ae6c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -56,6 +56,7 @@ public class CQL3CasRequest implements CASRequest
     private final TreeMap<Clustering, RowCondition> conditions;
 
     private final List<RowUpdate> updates = new ArrayList<>();
+    private final List<RangeDeletion> rangeDeletions = new ArrayList<>();
 
     public CQL3CasRequest(CFMetaData cfm,
                           DecoratedKey key,
@@ -78,6 +79,11 @@ public class CQL3CasRequest implements CASRequest
         updates.add(new RowUpdate(clustering, stmt, options, timestamp));
     }
 
+    public void addRangeDeletion(Slice slice, ModificationStatement stmt, 
QueryOptions options, long timestamp)
+    {
+        rangeDeletions.add(new RangeDeletion(slice, stmt, options, timestamp));
+    }
+
     public void addNotExist(Clustering clustering) throws 
InvalidRequestException
     {
         addExistsCondition(clustering, new NotExistCondition(clustering), 
true);
@@ -226,6 +232,8 @@ public class CQL3CasRequest implements CASRequest
         PartitionUpdate update = new PartitionUpdate(cfm, key, 
updatedColumns(), conditions.size());
         for (RowUpdate upd : updates)
             upd.applyUpdates(current, update);
+        for (RangeDeletion upd : rangeDeletions)
+            upd.applyUpdates(current, update);
 
         Keyspace.openAndGetStore(cfm).indexManager.validate(update);
 
@@ -264,6 +272,29 @@ public class CQL3CasRequest implements CASRequest
         }
     }
 
+    private class RangeDeletion
+    {
+        private final Slice slice;
+        private final ModificationStatement stmt;
+        private final QueryOptions options;
+        private final long timestamp;
+
+        private RangeDeletion(Slice slice, ModificationStatement stmt, 
QueryOptions options, long timestamp)
+        {
+            this.slice = slice;
+            this.stmt = stmt;
+            this.options = options;
+            this.timestamp = timestamp;
+        }
+
+        public void applyUpdates(FilteredPartition current, PartitionUpdate 
updates) throws InvalidRequestException
+        {
+            Map<DecoratedKey, Partition> map = stmt.requiresRead() ? 
Collections.<DecoratedKey, Partition>singletonMap(key, current) : null;
+            UpdateParameters params = new UpdateParameters(cfm, 
updates.columns(), options, timestamp, stmt.getTimeToLive(options), map);
+            stmt.addUpdateForKey(updates, slice, params);
+        }
+    }
+
     private static abstract class RowCondition
     {
         public final Clustering clustering;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 1722f02..0afd34d 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -402,6 +402,13 @@ public abstract class ModificationStatement implements 
CQLStatement
         return !conditions.isEmpty();
     }
 
+    public boolean hasSlices()
+    {
+        return type.allowClusteringColumnSlices()
+               && getRestrictions().hasClusteringColumnsRestriction()
+               && getRestrictions().isColumnRange();
+    }
+
     public ResultMessage execute(QueryState queryState, QueryOptions options)
     throws RequestExecutionException, RequestValidationException
     {
@@ -626,11 +633,9 @@ public abstract class ModificationStatement implements 
CQLStatement
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
 
-        if (type.allowClusteringColumnSlices()
-                && restrictions.hasClusteringColumnsRestriction()
-                && restrictions.isColumnRange())
+        if (hasSlices())
         {
-            Slices slices = createSlice(options);
+            Slices slices = createSlices(options);
 
             // If all the ranges were invalid we do not need to do anything.
             if (slices.isEmpty())
@@ -693,7 +698,7 @@ public abstract class ModificationStatement implements 
CQLStatement
         }
     }
 
-    private Slices createSlice(QueryOptions options)
+    Slices createSlices(QueryOptions options)
     {
         SortedSet<Slice.Bound> startBounds = 
restrictions.getClusteringColumnsBounds(Bound.START, options);
         SortedSet<Slice.Bound> endBounds = 
restrictions.getClusteringColumnsBounds(Bound.END, options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/test/unit/org/apache/cassandra/cql3/BatchTests.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/BatchTests.java 
b/test/unit/org/apache/cassandra/cql3/BatchTests.java
index 73923fb..260db4e 100644
--- a/test/unit/org/apache/cassandra/cql3/BatchTests.java
+++ b/test/unit/org/apache/cassandra/cql3/BatchTests.java
@@ -30,7 +30,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
-public class BatchTests
+public class BatchTests extends  CQLTester
 {
     private static EmbeddedCassandraService cassandra;
 
@@ -39,6 +39,7 @@ public class BatchTests
 
     private static PreparedStatement counter;
     private static PreparedStatement noncounter;
+    private static PreparedStatement clustering;
 
     @BeforeClass()
     public static void setup() throws ConfigurationException, IOException
@@ -59,58 +60,79 @@ public class BatchTests
                 "  id int PRIMARY KEY,\n" +
                 "  val counter,\n" +
                 ");");
+        session.execute("CREATE TABLE junit.clustering (\n" +
+                "  id int,\n" +
+                "  clustering1 int,\n" +
+                "  clustering2 int,\n" +
+                "  clustering3 int,\n" +
+                "  val text, \n" +
+                " PRIMARY KEY(id, clustering1, clustering2, clustering3)" +
+                ");");
 
 
         noncounter = session.prepare("insert into junit.noncounter(id, 
val)values(?,?)");
         counter = session.prepare("update junit.counter set val = val + ? 
where id = ?");
+        clustering = session.prepare("insert into junit.clustering(id, 
clustering1, clustering2, clustering3, val) values(?,?,?,?,?)");
     }
 
     @Test(expected = InvalidQueryException.class)
     public void testMixedInCounterBatch()
     {
-       sendBatch(BatchStatement.Type.COUNTER, true, true);
+       sendBatch(BatchStatement.Type.COUNTER, true, true, false);
     }
 
     @Test(expected = InvalidQueryException.class)
     public void testMixedInLoggedBatch()
     {
-        sendBatch(BatchStatement.Type.LOGGED, true, true);
+        sendBatch(BatchStatement.Type.LOGGED, true, true, false);
     }
 
     @Test(expected = InvalidQueryException.class)
     public void testMixedInUnLoggedBatch()
     {
-        sendBatch(BatchStatement.Type.UNLOGGED, true, true);
+        sendBatch(BatchStatement.Type.UNLOGGED, true, true, false);
     }
 
     @Test(expected = InvalidQueryException.class)
     public void testNonCounterInCounterBatch()
     {
-        sendBatch(BatchStatement.Type.COUNTER, false, true);
+        sendBatch(BatchStatement.Type.COUNTER, false, true, false);
     }
 
     @Test
     public void testNonCounterInLoggedBatch()
     {
-        sendBatch(BatchStatement.Type.LOGGED, false, true);
+        sendBatch(BatchStatement.Type.LOGGED, false, true, false);
     }
 
     @Test
     public void testNonCounterInUnLoggedBatch()
     {
-        sendBatch(BatchStatement.Type.UNLOGGED, false, true);
+        sendBatch(BatchStatement.Type.UNLOGGED, false, true, false);
     }
 
     @Test
     public void testCounterInCounterBatch()
     {
-        sendBatch(BatchStatement.Type.COUNTER, true, false);
+        sendBatch(BatchStatement.Type.COUNTER, true, false, false);
     }
 
     @Test
     public void testCounterInUnLoggedBatch()
     {
-        sendBatch(BatchStatement.Type.UNLOGGED, true, false);
+        sendBatch(BatchStatement.Type.UNLOGGED, true, false, false);
+    }
+
+    @Test
+    public void testTableWithClusteringInLoggedBatch()
+    {
+        sendBatch(BatchStatement.Type.LOGGED, false, false, true);
+    }
+
+    @Test
+    public void testTableWithClusteringInUnLoggedBatch()
+    {
+        sendBatch(BatchStatement.Type.UNLOGGED, false, false, true);
     }
 
     @Test
@@ -123,7 +145,7 @@ public class BatchTests
     @Test(expected = InvalidQueryException.class)
     public void testCounterInLoggedBatch()
     {
-        sendBatch(BatchStatement.Type.LOGGED, true, false);
+        sendBatch(BatchStatement.Type.LOGGED, true, false, false);
     }
 
     @Test(expected = InvalidQueryException.class)
@@ -138,10 +160,10 @@ public class BatchTests
         session.execute(b);
     }
 
-    public void sendBatch(BatchStatement.Type type, boolean addCounter, 
boolean addNonCounter)
+    public void sendBatch(BatchStatement.Type type, boolean addCounter, 
boolean addNonCounter, boolean addClustering)
     {
 
-        assert addCounter || addNonCounter;
+        assert addCounter || addNonCounter || addClustering;
         BatchStatement b = new BatchStatement(type);
 
         for (int i = 0; i < 10; i++)
@@ -151,6 +173,11 @@ public class BatchTests
 
             if (addCounter)
                 b.add(counter.bind((long)i, i));
+
+            if (addClustering)
+            {
+                b.add(clustering.bind(i, i, i, i, "foo"));
+            }
         }
 
         session.execute(b);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/433f24cb/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java 
b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
index e8f169d..87d0cde 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java
@@ -239,4 +239,115 @@ public class BatchTest extends CQLTester
                    row(1,2,2),
                    row(1,3,3));
     }
+
+    @Test
+    public void testBatchAndConditionalInteraction() throws Throwable
+    {
+
+        createTable(String.format("CREATE TABLE %s.clustering (\n" +
+                "  id int,\n" +
+                "  clustering1 int,\n" +
+                "  clustering2 int,\n" +
+                "  clustering3 int,\n" +
+                "  val int, \n" +
+                " PRIMARY KEY(id, clustering1, clustering2, clustering3)" +
+                ")", KEYSPACE));
+
+        execute("DELETE FROM " + KEYSPACE +".clustering WHERE id=1");
+
+        String clusteringInsert = "INSERT INTO " + KEYSPACE + ".clustering(id, 
clustering1, clustering2, clustering3, val) VALUES(%s, %s, %s, %s, %s); ";
+        String clusteringUpdate = "UPDATE " + KEYSPACE + ".clustering SET 
val=%s WHERE id=%s AND clustering1=%s AND clustering2=%s AND clustering3=%s ;";
+        String clusteringConditionalUpdate = "UPDATE " + KEYSPACE + 
".clustering SET val=%s WHERE id=%s AND clustering1=%s AND clustering2=%s AND 
clustering3=%s IF val=%s ;";
+        String clusteringDelete = "DELETE FROM " + KEYSPACE + ".clustering 
WHERE id=%s AND clustering1=%s AND clustering2=%s AND clustering3=%s ;";
+        String clusteringRangeDelete = "DELETE FROM " + KEYSPACE + 
".clustering WHERE id=%s AND clustering1=%s ;";
+
+
+        execute("BEGIN BATCH " + String.format(clusteringInsert, 1, 1, 1, 1, 
1) + " APPLY BATCH");
+
+        assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE 
id=1"), row(1, 1, 1, 1, 1));
+
+        StringBuilder cmd2 = new StringBuilder();
+        cmd2.append("BEGIN BATCH ");
+        cmd2.append(String.format(clusteringInsert, 1, 1, 1, 2, 2));
+        cmd2.append(String.format(clusteringConditionalUpdate, 11, 1, 1, 1, 1, 
1));
+        cmd2.append("APPLY BATCH ");
+        execute(cmd2.toString());
+
+
+        assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE 
id=1"),
+                row(1, 1, 1, 1, 11),
+                row(1, 1, 1, 2, 2)
+        );
+
+
+        StringBuilder cmd3 = new StringBuilder();
+        cmd3.append("BEGIN BATCH ");
+        cmd3.append(String.format(clusteringInsert, 1, 1, 2, 3, 23));
+        cmd3.append(String.format(clusteringConditionalUpdate, 22, 1, 1, 1, 2, 
2));
+        cmd3.append(String.format(clusteringDelete, 1, 1, 1, 1));
+        cmd3.append("APPLY BATCH ");
+        execute(cmd3.toString());
+
+        assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE 
id=1"),
+                row(1, 1, 1, 2, 22),
+                row(1, 1, 2, 3, 23)
+        );
+
+        StringBuilder cmd4 = new StringBuilder();
+        cmd4.append("BEGIN BATCH ");
+        cmd4.append(String.format(clusteringInsert, 1, 2, 3, 4, 1234));
+        cmd4.append(String.format(clusteringConditionalUpdate, 234, 1, 1, 1, 
2, 22));
+        cmd4.append("APPLY BATCH ");
+        execute(cmd4.toString());
+
+        System.out.println(execute("SELECT * FROM " + KEYSPACE+".clustering 
WHERE id=1"));
+        assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE 
id=1"),
+                row(1, 1, 1, 2, 234),
+                row(1, 1, 2, 3, 23),
+                row(1, 2, 3, 4, 1234)
+        );
+
+        StringBuilder cmd5 = new StringBuilder();
+        cmd5.append("BEGIN BATCH ");
+        cmd5.append(String.format(clusteringRangeDelete, 1, 2));
+        cmd5.append(String.format(clusteringConditionalUpdate, 1234, 1, 1, 1, 
2, 234));
+        cmd5.append("APPLY BATCH ");
+        execute(cmd5.toString());
+
+        System.out.println(execute("SELECT * FROM " + KEYSPACE+".clustering 
WHERE id=1"));
+        assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE 
id=1"),
+                row(1, 1, 1, 2, 1234),
+                row(1, 1, 2, 3, 23)
+        );
+
+        StringBuilder cmd6 = new StringBuilder();
+        cmd6.append("BEGIN BATCH ");
+        cmd6.append(String.format(clusteringUpdate, 345, 1, 3, 4, 5));
+        cmd6.append(String.format(clusteringConditionalUpdate, 1, 1, 1, 1, 2, 
1234));
+        cmd6.append("APPLY BATCH ");
+        execute(cmd6.toString());
+
+        System.out.println(execute("SELECT * FROM " + KEYSPACE+".clustering 
WHERE id=1"));
+        assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE 
id=1"),
+                row(1, 1, 1, 2, 1),
+                row(1, 1, 2, 3, 23),
+                row(1, 3, 4, 5, 345)
+        );
+
+
+        StringBuilder cmd7 = new StringBuilder();
+        cmd7.append("BEGIN BATCH ");
+        cmd7.append(String.format(clusteringDelete, 1, 3, 4, 5));
+        cmd7.append(String.format(clusteringConditionalUpdate, 2300, 1, 1, 2, 
3, 1));  // SHOULD NOT MATCH
+        cmd7.append("APPLY BATCH ");
+        execute(cmd7.toString());
+
+        System.out.println(execute("SELECT * FROM " + KEYSPACE+".clustering 
WHERE id=1"));
+        assertRows(execute("SELECT * FROM " + KEYSPACE+".clustering WHERE 
id=1"),
+                row(1, 1, 1, 2, 1),
+                row(1, 1, 2, 3, 23),
+                row(1, 3, 4, 5, 345)
+        );
+    }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to