Repository: cassandra Updated Branches: refs/heads/trunk 04b2528bf -> d3617a347
Keep columns for different tables separated in BatchStatement patch by slebresne; reviewed by blambov for CASSANDRA-10554 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f901a74c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f901a74c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f901a74c Branch: refs/heads/trunk Commit: f901a74c88078b772e26791ba987c2545ea14b52 Parents: c10ae57 Author: Sylvain Lebresne <[email protected]> Authored: Mon Oct 26 17:40:03 2015 +0100 Committer: Sylvain Lebresne <[email protected]> Committed: Tue Oct 27 13:54:50 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cql3/statements/BatchStatement.java | 35 ++++++++++++++++++-- .../cql3/statements/ModificationStatement.java | 2 +- .../cql3/statements/UpdatesCollector.java | 10 +++--- .../cql3/validation/operations/BatchTest.java | 25 ++++++++++++++ 5 files changed, 65 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/f901a74c/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 286e490..f8dffb0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Fix batches on multiple tables (CASSANDRA-10554) * Ensure compaction options are validated when updating KeyspaceMetadata (CASSANDRA-10569) * Flatten Iterator Transformation Hierarchy (CASSANDRA-9975) * Remove token generator (CASSANDRA-5261) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f901a74c/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index d63bbc8..1f1d507 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.helpers.MessageFormatter; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.*; @@ -55,8 +56,12 @@ public class BatchStatement implements CQLStatement private final int boundTerms; public final Type type; private final List<ModificationStatement> statements; - private final PartitionColumns updatedColumns; + + // Columns modified for each table (keyed by the table ID) + private final Map<UUID, PartitionColumns> updatedColumns; + // Columns on which there is conditions. Note that if there is any, then the batch can only be on a single partition (and thus table). private final PartitionColumns conditionColumns; + private final boolean updatesRegularRows; private final boolean updatesStaticRow; private final Attributes attrs; @@ -89,14 +94,14 @@ public class BatchStatement implements CQLStatement this.attrs = attrs; boolean hasConditions = false; - PartitionColumns.Builder regularBuilder = PartitionColumns.builder(); + MultiTableColumnsBuilder regularBuilder = new MultiTableColumnsBuilder(); PartitionColumns.Builder conditionBuilder = PartitionColumns.builder(); boolean updateRegular = false; boolean updateStatic = false; for (ModificationStatement stmt : statements) { - regularBuilder.addAll(stmt.updatedColumns()); + regularBuilder.addAll(stmt.cfm, stmt.updatedColumns()); updateRegular |= stmt.updatesRegularRows(); if (stmt.hasConditions()) { @@ -523,4 +528,28 @@ public class BatchStatement implements CQLStatement return new ParsedStatement.Prepared(batchStatement, boundNames, partitionKeyBindIndexes); } } + + private static class MultiTableColumnsBuilder + { + private final Map<UUID, PartitionColumns.Builder> perTableBuilders = new HashMap<>(); + + public void addAll(CFMetaData table, PartitionColumns columns) + { + PartitionColumns.Builder builder = perTableBuilders.get(table.cfId); + if (builder == null) + { + builder = PartitionColumns.builder(); + perTableBuilders.put(table.cfId, builder); + } + builder.addAll(columns); + } + + public Map<UUID, PartitionColumns> build() + { + Map<UUID, PartitionColumns> m = new HashMap<>(); + for (Map.Entry<UUID, PartitionColumns.Builder> p : perTableBuilders.entrySet()) + m.put(p.getKey(), p.getValue().build()); + return m; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f901a74c/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 1ea1e4d..eb0f9ff 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -602,7 +602,7 @@ public abstract class ModificationStatement implements CQLStatement */ private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now) { - UpdatesCollector collector = new UpdatesCollector(updatedColumns, 1); + UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), 1); addUpdates(collector, options, local, now); collector.validateIndexedColumns(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f901a74c/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java index 8fc5ef5..1d65a78 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdatesCollector.java @@ -34,9 +34,9 @@ import org.apache.cassandra.db.partitions.PartitionUpdate; final class UpdatesCollector { /** - * The columns that will be updated. + * The columns that will be updated for each table (keyed by the table ID). */ - private final PartitionColumns updatedColumns; + private final Map<UUID, PartitionColumns> updatedColumns; /** * The estimated number of updated row. @@ -48,7 +48,7 @@ final class UpdatesCollector */ private final Map<String, Map<ByteBuffer, IMutation>> mutations = new HashMap<>(); - public UpdatesCollector(PartitionColumns updatedColumns, int updatedRows) + public UpdatesCollector(Map<UUID, PartitionColumns> updatedColumns, int updatedRows) { super(); this.updatedColumns = updatedColumns; @@ -70,7 +70,9 @@ final class UpdatesCollector PartitionUpdate upd = mut.get(cfm); if (upd == null) { - upd = new PartitionUpdate(cfm, dk, updatedColumns, updatedRows); + PartitionColumns columns = updatedColumns.get(cfm.cfId); + assert columns != null; + upd = new PartitionUpdate(cfm, dk, columns, updatedRows); mut.add(upd); } return upd; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f901a74c/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java index 43e3a30..66226eb 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/BatchTest.java @@ -174,4 +174,29 @@ public class BatchTest extends CQLTester { assertEmpty(execute("BEGIN BATCH APPLY BATCH;")); } + + @Test + public void testBatchMultipleTable() throws Throwable + { + String tbl1 = KEYSPACE + "." + createTableName(); + String tbl2 = KEYSPACE + "." + createTableName(); + + schemaChange(String.format("CREATE TABLE %s (k1 int PRIMARY KEY, v11 int, v12 int)", tbl1)); + schemaChange(String.format("CREATE TABLE %s (k2 int PRIMARY KEY, v21 int, v22 int)", tbl2)); + + execute("BEGIN BATCH " + + String.format("UPDATE %s SET v11 = 1 WHERE k1 = 0;", tbl1) + + String.format("UPDATE %s SET v12 = 2 WHERE k1 = 0;", tbl1) + + String.format("UPDATE %s SET v21 = 3 WHERE k2 = 0;", tbl2) + + String.format("UPDATE %s SET v22 = 4 WHERE k2 = 0;", tbl2) + + "APPLY BATCH;"); + + assertRows(execute(String.format("SELECT * FROM %s", tbl1)), row(0, 1, 2)); + assertRows(execute(String.format("SELECT * FROM %s", tbl2)), row(0, 3, 4)); + + flush(); + + assertRows(execute(String.format("SELECT * FROM %s", tbl1)), row(0, 1, 2)); + assertRows(execute(String.format("SELECT * FROM %s", tbl2)), row(0, 3, 4)); + } }
