Repository: cassandra Updated Branches: refs/heads/trunk 83f4acc82 -> 8cedb6079
Extend triggers to support CAS updates patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-6882 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/75ff51e1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/75ff51e1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/75ff51e1 Branch: refs/heads/trunk Commit: 75ff51e12485c16f3c408d40f357e07bb26905ea Parents: 5b3b52f Author: Sam Tunnicliffe <[email protected]> Authored: Tue Mar 18 23:42:33 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Tue Mar 18 23:44:53 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/service/StorageProxy.java | 10 ++ .../cassandra/triggers/TriggerExecutor.java | 40 ++++- .../apache/cassandra/triggers/TriggersTest.java | 180 +++++++++++++++++++ 4 files changed, 229 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/75ff51e1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e738a2e..9caca38 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,7 @@ * Properly use the Paxos consistency for (non-protocol) batch (CASSANDRA-6837) * Add paranoid disk failure option (CASSANDRA-6646) * Improve PerRowSecondaryIndex performance (CASSANDRA-6876) + * Extend triggers to support CAS updates (CASSANDRA-6882) Merged from 1.2: * add extra SSL cipher suites (CASSANDRA-6613) * fix nodetool getsstables for blob PK (CASSANDRA-6803) http://git-wip-us.apache.org/repos/asf/cassandra/blob/75ff51e1/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index a5542e6..fda9819 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -234,6 +234,16 @@ public class StorageProxy implements StorageProxyMBean // finish the paxos round w/ the desired updates // TODO turn null updates into delete? + + // Apply triggers to cas updates. A consideration here is that + // triggers emit RowMutations, and so a given trigger implementation + // may generate mutations for partitions other than the one this + // paxos round is scoped for. In this case, TriggerExecutor will + // validate that the generated mutations are targetted at the same + // partition as the initial updates and reject (via an + // InvalidRequestException) any which aren't. + updates = TriggerExecutor.instance.execute(key, updates); + Commit proposal = Commit.newProposal(key, ballot, updates); Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot); if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/75ff51e1/src/java/org/apache/cassandra/triggers/TriggerExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java index 4b3c24a..8ccf937 100644 --- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java +++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java @@ -23,6 +23,7 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.UUID; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -33,8 +34,10 @@ import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.CounterMutation; import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.HeapAllocator; public class TriggerExecutor { @@ -62,6 +65,21 @@ public class TriggerExecutor cachedTriggers.clear(); } + public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException + { + List<RowMutation> intermediate = executeInternal(key, updates); + if (intermediate == null) + return updates; + + validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate); + + for (RowMutation mutation : intermediate) + for (ColumnFamily cf : mutation.getColumnFamilies()) + updates.addAll(cf, HeapAllocator.instance); + + return updates; + } + public Collection<RowMutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException { boolean hasCounters = false; @@ -70,7 +88,7 @@ public class TriggerExecutor { for (ColumnFamily cf : mutation.getColumnFamilies()) { - List<RowMutation> intermediate = execute(mutation.key(), cf); + List<RowMutation> intermediate = executeInternal(mutation.key(), cf); if (intermediate == null) continue; @@ -88,6 +106,24 @@ public class TriggerExecutor return tmutations; } + private void validateForSinglePartition(AbstractType<?> keyValidator, + UUID cfId, + ByteBuffer key, + Collection<RowMutation> tmutations) + throws InvalidRequestException + { + for (RowMutation mutation : tmutations) + { + if (keyValidator.compare(mutation.key(), key) != 0) + throw new InvalidRequestException("Partition key of additional mutation does not match primary update key"); + + for (ColumnFamily cf : mutation.getColumnFamilies()) + if (!cf.id().equals(cfId)) + throw new InvalidRequestException("Column family of additional mutation does not match primary update cf"); + } + validate(tmutations); + } + private void validate(Collection<RowMutation> tmutations) throws InvalidRequestException { for (RowMutation mutation : tmutations) @@ -103,7 +139,7 @@ public class TriggerExecutor * Switch class loader before using the triggers for the column family, if * not loaded them with the custom class loader. */ - private List<RowMutation> execute(ByteBuffer key, ColumnFamily columnFamily) + private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily) { Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers(); if (triggers.isEmpty()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/75ff51e1/test/unit/org/apache/cassandra/triggers/TriggersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java index 6ca3880..5b9b27d 100644 --- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java +++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java @@ -35,10 +35,12 @@ import org.apache.cassandra.db.Column; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.RowMutation; +import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.ColumnOrSuperColumn; import org.apache.cassandra.thrift.ColumnParent; +import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.Mutation; import org.apache.cassandra.thrift.TFramedTransportFactory; import org.apache.cassandra.thrift.ThriftServer; @@ -46,6 +48,7 @@ import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.thrift.protocol.TBinaryProtocol; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; public class TriggersTest extends SchemaLoader { @@ -54,6 +57,7 @@ public class TriggersTest extends SchemaLoader private static String ksName = "triggers_test_ks"; private static String cfName = "test_table"; + private static String otherCf = "other_table"; @Before public void setup() throws Exception @@ -73,6 +77,9 @@ public class TriggersTest extends SchemaLoader cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2 int, PRIMARY KEY (k))", ksName, cfName); QueryProcessor.process(cql, ConsistencyLevel.ONE); + cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2 int, PRIMARY KEY (k))", ksName, otherCf); + QueryProcessor.process(cql, ConsistencyLevel.ONE); + // no conditional execution of create trigger stmt yet if (! triggerCreated) { @@ -148,13 +155,157 @@ public class TriggersTest extends SchemaLoader assertUpdateIsAugmented(3); } + @Test + public void executeTriggerOnCqlInsertWithConditions() throws Exception + { + String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (4, 4) IF NOT EXISTS", ksName, cfName); + QueryProcessor.process(cql, ConsistencyLevel.ONE); + assertUpdateIsAugmented(4); + } + + @Test + public void executeTriggerOnCqlBatchWithConditions() throws Exception + { + String cql = String.format("BEGIN BATCH " + + " INSERT INTO %1$s.%2$s (k, v1) VALUES (5, 5) IF NOT EXISTS; " + + " INSERT INTO %1$s.%2$s (k, v1) VALUES (5, 5); " + + "APPLY BATCH", + ksName, cfName); + QueryProcessor.process(cql, ConsistencyLevel.ONE); + assertUpdateIsAugmented(5); + } + + @Test + public void executeTriggerOnThriftCASOperation() throws Exception + { + Cassandra.Client client = new Cassandra.Client( + new TBinaryProtocol( + new TFramedTransportFactory().openTransport( + InetAddress.getLocalHost().getHostName(), 9170))); + client.set_keyspace(ksName); + client.cas(ByteBufferUtil.bytes(6), + cfName, + Collections.EMPTY_LIST, + Collections.singletonList(getColumnForInsert("v1", 6)), + org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL, + org.apache.cassandra.thrift.ConsistencyLevel.ONE); + + assertUpdateIsAugmented(6); + } + + // Unfortunately, an IRE thrown from StorageProxy.cas + // results in a RuntimeException from QueryProcessor.process + @Test(expected=RuntimeException.class) + public void onCqlUpdateWithConditionsRejectGeneratedUpdatesForDifferentPartition() throws Exception + { + String cf = "cf" + System.nanoTime(); + try + { + setupTableWithTrigger(cf, CrossPartitionTrigger.class); + String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (7, 7) IF NOT EXISTS", ksName, cf); + QueryProcessor.process(cql, ConsistencyLevel.ONE); + } + finally + { + assertUpdateNotExecuted(cf, 7); + } + } + + // Unfortunately, an IRE thrown from StorageProxy.cas + // results in a RuntimeException from QueryProcessor.process + @Test(expected=RuntimeException.class) + public void onCqlUpdateWithConditionsRejectGeneratedUpdatesForDifferentTable() throws Exception + { + String cf = "cf" + System.nanoTime(); + try + { + setupTableWithTrigger(cf, CrossTableTrigger.class); + String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (8, 8) IF NOT EXISTS", ksName, cf); + QueryProcessor.process(cql, ConsistencyLevel.ONE); + } + finally + { + assertUpdateNotExecuted(cf, 7); + } + } + + @Test(expected=InvalidRequestException.class) + public void onThriftCASRejectGeneratedUpdatesForDifferentPartition() throws Exception + { + String cf = "cf" + System.nanoTime(); + try + { + setupTableWithTrigger(cf, CrossPartitionTrigger.class); + Cassandra.Client client = new Cassandra.Client( + new TBinaryProtocol( + new TFramedTransportFactory().openTransport( + InetAddress.getLocalHost().getHostName(), 9170))); + client.set_keyspace(ksName); + client.cas(ByteBufferUtil.bytes(9), + cf, + Collections.EMPTY_LIST, + Collections.singletonList(getColumnForInsert("v1", 9)), + org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL, + org.apache.cassandra.thrift.ConsistencyLevel.ONE); + } + finally + { + assertUpdateNotExecuted(cf, 9); + } + } + + @Test(expected=InvalidRequestException.class) + public void onThriftCASRejectGeneratedUpdatesForDifferentCF() throws Exception + { + String cf = "cf" + System.nanoTime(); + try + { + setupTableWithTrigger(cf, CrossTableTrigger.class); + Cassandra.Client client = new Cassandra.Client( + new TBinaryProtocol( + new TFramedTransportFactory().openTransport( + InetAddress.getLocalHost().getHostName(), 9170))); + client.set_keyspace(ksName); + client.cas(ByteBufferUtil.bytes(10), + cf, + Collections.EMPTY_LIST, + Collections.singletonList(getColumnForInsert("v1", 10)), + org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL, + org.apache.cassandra.thrift.ConsistencyLevel.ONE); + } + finally + { + assertUpdateNotExecuted(cf, 10); + } + } + + private void setupTableWithTrigger(String cf, Class<? extends ITrigger> triggerImpl) + throws RequestExecutionException + { + String cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, v2 int, PRIMARY KEY (k))", ksName, cf); + QueryProcessor.process(cql, ConsistencyLevel.ONE); + + // no conditional execution of create trigger stmt yet + cql = String.format("CREATE TRIGGER trigger_1 ON %s.%s USING '%s'", + ksName, cf, triggerImpl.getName()); + QueryProcessor.process(cql, ConsistencyLevel.ONE); + } + private void assertUpdateIsAugmented(int key) { UntypedResultSet rs = QueryProcessor.processInternal( String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cfName, key)); + assertTrue(String.format("Expected value (%s) for augmented cell v2 was not found", key), rs.one().has("v2")); assertEquals(999, rs.one().getInt("v2")); } + private void assertUpdateNotExecuted(String cf, int key) + { + UntypedResultSet rs = QueryProcessor.processInternal( + String.format("SELECT * FROM %s.%s WHERE k=%s", ksName, cf, key)); + assertTrue(rs.isEmpty()); + } + private org.apache.cassandra.thrift.Column getColumnForInsert(String columnName, int value) { org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); @@ -176,4 +327,33 @@ public class TriggersTest extends SchemaLoader return Collections.singletonList(rm); } } + + public static class CrossPartitionTrigger implements ITrigger + { + public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update) + { + ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false); + extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"), + ByteBufferUtil.bytes(999))); + + int newKey = ByteBufferUtil.toInt(key) + 1000; + RowMutation rm = new RowMutation(ksName, ByteBufferUtil.bytes(newKey)); + rm.add(extraUpdate); + return Collections.singletonList(rm); + } + } + + public static class CrossTableTrigger implements ITrigger + { + public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update) + { + ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf); + extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"), + ByteBufferUtil.bytes(999))); + + RowMutation rm = new RowMutation(ksName, key); + rm.add(extraUpdate); + return Collections.singletonList(rm); + } + } }
