Fix trigger mutations when base mutation list is immutable

patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for
CASSANDRA-6790


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f7eca98a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f7eca98a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f7eca98a

Branch: refs/heads/cassandra-2.0
Commit: f7eca98a7487b5e4013fbc07e43ebf0055520856
Parents: 553401d
Author: Sam Tunnicliffe <[email protected]>
Authored: Tue Mar 11 14:55:16 2014 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Tue Mar 11 14:55:16 2014 +0300

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/service/StorageProxy.java  |   6 +-
 .../apache/cassandra/triggers/TriggersTest.java | 179 +++++++++++++++++++
 3 files changed, 183 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7eca98a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 39656ff..91037d1 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.0.7
  * Fix saving triggers to schema (CASSANDRA-6789)
+ * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
 
 
 2.0.6

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7eca98a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 14c1ce3..a6db9cd 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -508,13 +508,13 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static void mutateWithTriggers(Collection<? extends IMutation> 
mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically) throws 
WriteTimeoutException, UnavailableException,
-            OverloadedException, InvalidRequestException
+    public static void mutateWithTriggers(Collection<? extends IMutation> 
mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+    throws WriteTimeoutException, UnavailableException, OverloadedException, 
InvalidRequestException
     {
         Collection<RowMutation> tmutations = 
TriggerExecutor.instance.execute(mutations);
         if (mutateAtomically || tmutations != null)
         {
-            Collection<RowMutation> allMutations = (Collection<RowMutation>) 
mutations;
+            Collection<RowMutation> allMutations = new 
ArrayList<>((Collection<RowMutation>) mutations);
             if (tmutations != null)
                 allMutations.addAll(tmutations);
             StorageProxy.mutateAtomically(allMutations, consistencyLevel);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7eca98a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java 
b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
new file mode 100644
index 0000000..6ca3880
--- /dev/null
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.triggers;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.Column;
+import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.TFramedTransportFactory;
+import org.apache.cassandra.thrift.ThriftServer;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.thrift.protocol.TBinaryProtocol;
+
+import static org.junit.Assert.assertEquals;
+
+public class TriggersTest extends SchemaLoader
+{
+    private static boolean triggerCreated = false;
+    private static ThriftServer thriftServer;
+
+    private static String ksName = "triggers_test_ks";
+    private static String cfName = "test_table";
+
+    @Before
+    public void setup() throws Exception
+    {
+        StorageService.instance.initServer(0);
+        if (thriftServer == null || ! thriftServer.isRunning())
+        {
+            thriftServer = new ThriftServer(InetAddress.getLocalHost(), 9170);
+            thriftServer.start();
+        }
+
+        String cql = String.format("CREATE KEYSPACE IF NOT EXISTS %s " +
+                                   "WITH REPLICATION = {'class': 
'SimpleStrategy', 'replication_factor': 1}",
+                                   ksName);
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+
+        cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int, 
v2 int, PRIMARY KEY (k))", ksName, cfName);
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+
+        // no conditional execution of create trigger stmt yet
+        if (! triggerCreated)
+        {
+            cql = String.format("CREATE TRIGGER trigger_1 ON %s.%s USING '%s'",
+                                ksName, cfName, TestTrigger.class.getName());
+            QueryProcessor.process(cql, ConsistencyLevel.ONE);
+            triggerCreated = true;
+        }
+    }
+
+    @AfterClass
+    public static void teardown()
+    {
+        if (thriftServer != null && thriftServer.isRunning())
+        {
+            thriftServer.stop();
+        }
+    }
+
+    @Test
+    public void executeTriggerOnCqlInsert() throws Exception
+    {
+        String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (0, 0)", 
ksName, cfName);
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+        assertUpdateIsAugmented(0);
+    }
+
+    @Test
+    public void executeTriggerOnCqlBatchInsert() throws Exception
+    {
+        String cql = String.format("BEGIN BATCH " +
+                                   "    INSERT INTO %s.%s (k, v1) VALUES (1, 
1); " +
+                                   "APPLY BATCH",
+                                   ksName, cfName);
+        QueryProcessor.process(cql, ConsistencyLevel.ONE);
+        assertUpdateIsAugmented(1);
+    }
+
+    @Test
+    public void executeTriggerOnThriftInsert() throws Exception
+    {
+        Cassandra.Client client = new Cassandra.Client(
+                                        new TBinaryProtocol(
+                                            new 
TFramedTransportFactory().openTransport(
+                                                
InetAddress.getLocalHost().getHostName(), 9170)));
+        client.set_keyspace(ksName);
+        client.insert(ByteBufferUtil.bytes(2),
+                      new ColumnParent(cfName),
+                      getColumnForInsert("v1", 2),
+                      org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+
+        assertUpdateIsAugmented(2);
+    }
+
+    @Test
+    public void executeTriggerOnThriftBatchUpdate() throws Exception
+    {
+        Cassandra.Client client = new Cassandra.Client(
+                                    new TBinaryProtocol(
+                                        new 
TFramedTransportFactory().openTransport(
+                                            
InetAddress.getLocalHost().getHostName(), 9170)));
+        client.set_keyspace(ksName);
+        Mutation mutation = new Mutation();
+        ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+        cosc.setColumn(getColumnForInsert("v1", 3));
+        mutation.setColumn_or_supercolumn(cosc);
+        client.batch_mutate(
+            Collections.singletonMap(ByteBufferUtil.bytes(3),
+                                     Collections.singletonMap(cfName,
+                                                              
Collections.singletonList(mutation))),
+            org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+
+        assertUpdateIsAugmented(3);
+    }
+
+    private void assertUpdateIsAugmented(int key)
+    {
+        UntypedResultSet rs = QueryProcessor.processInternal(
+                                String.format("SELECT * FROM %s.%s WHERE 
k=%s", ksName, cfName, key));
+        assertEquals(999, rs.one().getInt("v2"));
+    }
+
+    private org.apache.cassandra.thrift.Column getColumnForInsert(String 
columnName, int value)
+    {
+        org.apache.cassandra.thrift.Column column = new 
org.apache.cassandra.thrift.Column();
+        column.setName(Schema.instance.getCFMetaData(ksName, 
cfName).comparator.fromString(columnName));
+        column.setValue(ByteBufferUtil.bytes(value));
+        column.setTimestamp(System.currentTimeMillis());
+        return column;
+    }
+
+    public static class TestTrigger implements ITrigger
+    {
+        public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily 
update)
+        {
+            ColumnFamily extraUpdate = 
update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
+            extraUpdate.addColumn(new 
Column(update.metadata().comparator.fromString("v2"),
+                                             ByteBufferUtil.bytes(999)));
+            RowMutation rm = new RowMutation(ksName, key);
+            rm.add(extraUpdate);
+            return Collections.singletonList(rm);
+        }
+    }
+}

Reply via email to