Fix queries updating multiple time the same list patch by Benjamin Lerer; reviewed by Sylvain Lebresne for CASSANDRA-13130
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5ef8a8b4 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5ef8a8b4 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5ef8a8b4 Branch: refs/heads/trunk Commit: 5ef8a8b408d4c492f7f2ffbbbe6fce237140c7cb Parents: e4be2d0 Author: Benjamin Lerer <b.le...@gmail.com> Authored: Fri Mar 10 09:57:20 2017 +0100 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Fri Mar 10 09:57:20 2017 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/cql3/Lists.java | 10 +- .../apache/cassandra/cql3/UpdateParameters.java | 31 +++++- .../validation/entities/CollectionsTest.java | 100 +++++++++++++++++++ 4 files changed, 135 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 0982de9..09e4039 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.2.10 + * Fix queries updating multiple time the same list (CASSANDRA-13130) * Fix GRANT/REVOKE when keyspace isn't specified (CASSANDRA-13053) * Avoid race on receiver by starting streaming sender thread after sending init message (CASSANDRA-12886) * Fix "multiple versions of ant detected..." when running ant test (CASSANDRA-13232) http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/src/java/org/apache/cassandra/cql3/Lists.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Lists.java b/src/java/org/apache/cassandra/cql3/Lists.java index da8c48a..cc75476 100644 --- a/src/java/org/apache/cassandra/cql3/Lists.java +++ b/src/java/org/apache/cassandra/cql3/Lists.java @@ -21,15 +21,18 @@ import static org.apache.cassandra.cql3.Constants.UNSET_VALUE; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicReference; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.db.Cell; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.composites.CellName; import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.composites.CompositesBuilder; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -349,7 +352,7 @@ public abstract class Lists if (index == ByteBufferUtil.UNSET_BYTE_BUFFER) throw new InvalidRequestException("Invalid unset value for list index"); - List<Cell> existingList = params.getPrefetchedList(rowKey, column.name); + List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf); int idx = ByteBufferUtil.toInt(index); if (existingList == null || existingList.size() == 0) throw new InvalidRequestException("Attempted to set an element on a list which is null"); @@ -458,7 +461,7 @@ public abstract class Lists public void execute(ByteBuffer rowKey, ColumnFamily cf, Composite prefix, UpdateParameters params) throws InvalidRequestException { assert column.type.isMultiCell() : "Attempted to delete from a frozen list"; - List<Cell> existingList = params.getPrefetchedList(rowKey, column.name); + List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf); // We want to call bind before possibly returning to reject queries where the value provided is not a list. Term.Terminal value = t.bind(params.options); @@ -505,7 +508,8 @@ public abstract class Lists if (index == Constants.UNSET_VALUE) return; - List<Cell> existingList = params.getPrefetchedList(rowKey, column.name); + List<Cell> existingList = params.getPrefetchedList(rowKey, column.name, cf); + int idx = ByteBufferUtil.toInt(index.get(params.options.getProtocolVersion())); if (existingList == null || existingList.size() == 0) throw new InvalidRequestException("Attempted to delete an element from a list which is null"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index e412585..65edef7 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -91,16 +91,39 @@ public class UpdateParameters return new RangeTombstone(slice.start, slice.finish, timestamp - 1, localDeletionTime); } - public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName) + /** + * Returns the prefetched list with the already performed modifications. + * <p>If no modification have yet been performed this method will return the fetched list. + * If some modifications (updates or deletions) have already been done the list returned + * will be the result of the merge of the fetched list and of the pending mutations.</p> + * + * @param rowKey the row key + * @param cql3ColumnName the column name + * @param cf the pending modifications + * @return the prefetched list with the already performed modifications + */ + public List<Cell> getPrefetchedList(ByteBuffer rowKey, ColumnIdentifier cql3ColumnName, ColumnFamily cf) { if (prefetchedLists == null) return Collections.emptyList(); CQL3Row row = prefetchedLists.get(rowKey); - if (row == null) - return Collections.<Cell>emptyList(); - List<Cell> cql3List = row.getMultiCellColumn(cql3ColumnName); + List<Cell> cql3List = row == null ? Collections.<Cell>emptyList() : row.getMultiCellColumn(cql3ColumnName); + + if (!cf.isEmpty()) + { + ColumnFamily currentCf = cf.cloneMe(); + + for (Cell c : cql3List) + currentCf.addColumn(c); + + CFMetaData cfm = currentCf.metadata(); + CQL3Row.RowIterator iterator = cfm.comparator.CQL3RowBuilder(cfm, timestamp).group(currentCf.iterator()); + // We can only update one CQ3Row per partition key at a time (we don't allow IN for clustering key) + cql3List = iterator.hasNext() ? iterator.next().getMultiCellColumn(cql3ColumnName) : null; + } + return (cql3List == null) ? Collections.<Cell>emptyList() : cql3List; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/5ef8a8b4/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java b/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java index 115b755..99d9695 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/CollectionsTest.java @@ -648,4 +648,104 @@ public class CollectionsTest extends CQLTester assertInvalidMessage("The data cannot be deserialized as a map", "INSERT INTO %s (pk, m) VALUES (?, ?)", 1, -1); } + + @Test + public void testMultipleOperationOnListWithinTheSameQuery() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, l list<int>)"); + execute("INSERT INTO %s (pk, l) VALUES (1, [1, 2, 3, 4])"); + + // Checks that when the same element is updated twice the update with the greatest value is the one taken into account + execute("UPDATE %s SET l[?] = ?, l[?] = ? WHERE pk = ?", 2, 7, 2, 8, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 8, 4))); + + execute("UPDATE %s SET l[?] = ?, l[?] = ? WHERE pk = ?", 2, 9, 2, 6, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 9, 4))); + + // Checks that deleting twice the same element will result in the deletion of the element with the index + // and of the following element. + execute("DELETE l[?], l[?] FROM %s WHERE pk = ?", 2, 2, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2))); + + // Checks that the set operation is performed on the added elements and that the greatest value win + execute("UPDATE %s SET l = l + ?, l[?] = ? WHERE pk = ?", list(3, 4), 3, 7, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 3, 7))); + + execute("UPDATE %s SET l = l + ?, l[?] = ? WHERE pk = ?", list(6, 8), 4, 5, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 3, 7, 6, 8))); + + // Checks that the order of the operations matters + assertInvalidMessage("List index 6 out of bound, list has size 6", + "UPDATE %s SET l[?] = ?, l = l + ? WHERE pk = ?", 6, 5, list(9), 1); + + // Checks that the updated element is deleted. + execute("UPDATE %s SET l[?] = ? , l = l - ? WHERE pk = ?", 2, 8, list(8), 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(1, 2, 7, 6))); + + // Checks that we cannot update an element that has been removed. + assertInvalidMessage("List index 3 out of bound, list has size 3", + "UPDATE %s SET l = l - ?, l[?] = ? WHERE pk = ?", list(6), 3, 4, 1); + + // Checks that the element is updated before the other ones are shifted. + execute("UPDATE %s SET l[?] = ? , l = l - ? WHERE pk = ?", 2, 8, list(1), 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(2, 8, 6))); + + // Checks that the element are shifted before the element is updated. + execute("UPDATE %s SET l = l - ?, l[?] = ? WHERE pk = ?", list(2, 6), 0, 9, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, list(9))); + } + + @Test + public void testMultipleOperationOnMapWithinTheSameQuery() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, m map<int, int>)"); + execute("INSERT INTO %s (pk, m) VALUES (1, {0 : 1, 1 : 2, 2 : 3, 3 : 4})"); + + // Checks that when the same element is updated twice the update with the greatest value is the one taken into account + execute("UPDATE %s SET m[?] = ?, m[?] = ? WHERE pk = ?", 2, 7, 2, 8, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, map(0, 1, 1, 2, 2, 8, 3, 4))); + + execute("UPDATE %s SET m[?] = ?, m[?] = ? WHERE pk = ?", 2, 9, 2, 6, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, map(0, 1, 1, 2, 2, 9, 3, 4))); + + // Checks that deleting twice the same element has no side effect + execute("DELETE m[?], m[?] FROM %s WHERE pk = ?", 2, 2, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4))); + + // Checks that the set operation is performed on the added elements and that the greatest value win + execute("UPDATE %s SET m = m + ?, m[?] = ? WHERE pk = ?", map(4, 5), 4, 7, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 4, 7))); + + execute("UPDATE %s SET m = m + ?, m[?] = ? WHERE pk = ?", map(4, 8), 4, 6, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 4, 8))); + + // Checks that, as tombstones win over updates for the same timestamp, the removed element is not readded + execute("UPDATE %s SET m = m - ?, m[?] = ? WHERE pk = ?", set(4), 4, 9, 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4))); + + // Checks that the update is taken into account before the removal + execute("UPDATE %s SET m[?] = ?, m = m - ? WHERE pk = ?", 5, 9, set(5), 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4))); + + // Checks that the set operation is merged with the change of the append and that the greatest value win + execute("UPDATE %s SET m[?] = ?, m = m + ? WHERE pk = ?", 5, 9, map(5, 8, 6, 9), 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 5, 9, 6, 9))); + + execute("UPDATE %s SET m[?] = ?, m = m + ? WHERE pk = ?", 7, 1, map(7, 2), 1); + assertRows(execute("SELECT * FROM %s WHERE pk = ?", 1) , row(1, map(0, 1, 1, 2, 3, 4, 5, 9, 6, 9, 7, 2))); + } + + @Test + public void testMultipleOperationOnSetWithinTheSameQuery() throws Throwable + { + createTable("CREATE TABLE %s (pk int PRIMARY KEY, s set<int>)"); + execute("INSERT INTO %s (pk, s) VALUES (1, {0, 1, 2})"); + + // Checks that the two operation are merged and that the tombstone always win + execute("UPDATE %s SET s = s + ? , s = s - ? WHERE pk = ?", set(3, 4), set(3), 1); + assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, set(0, 1, 2, 4))); + + execute("UPDATE %s SET s = s - ? , s = s + ? WHERE pk = ?", set(3), set(3, 4), 1); + assertRows(execute("SELECT * FROM %s WHERE pk = 1") , row(1, set(0, 1, 2, 4))); + } }