Repository: cassandra Updated Branches: refs/heads/trunk f22e775fb -> ec7206ce6
Merge groupable mutations in TriggerExecutor#execute() patch by Aleksey Yeschenko and Sergio Bossa for CASSANDRA-7047 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c1eb74ce Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c1eb74ce Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c1eb74ce Branch: refs/heads/trunk Commit: c1eb74ce47988c1e75d20ccb9a0320dd305c4b1c Parents: 4e4d7bb Author: Aleksey Yeschenko <[email protected]> Authored: Wed Apr 23 21:30:36 2014 +0300 Committer: Aleksey Yeschenko <[email protected]> Committed: Wed Apr 23 21:30:36 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/RowMutation.java | 5 + .../apache/cassandra/service/StorageProxy.java | 23 +- .../cassandra/triggers/TriggerExecutor.java | 71 ++-- .../cassandra/triggers/TriggerExecutorTest.java | 332 +++++++++++++++++++ .../apache/cassandra/triggers/TriggersTest.java | 39 +-- 6 files changed, 417 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index dbed949..68c335d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,7 @@ * Queries on compact tables can return more rows that requested (CASSANDRA-7052) * USING TIMESTAMP for batches does not work (CASSANDRA-7053) * Fix performance regression from CASSANDRA-5614 (CASSANDRA-6949) + * Merge groupable mutations in TriggerExecutor#execute() (CASSANDRA-7047) Merged from 1.2: * Fix batchlog to account for CF truncation records (CASSANDRA-6999) * Fix CQLSH parsing of functions and BLOB literals (CASSANDRA-7018) http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/db/RowMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowMutation.java b/src/java/org/apache/cassandra/db/RowMutation.java index 49ee2c5..223225e 100644 --- a/src/java/org/apache/cassandra/db/RowMutation.java +++ b/src/java/org/apache/cassandra/db/RowMutation.java @@ -75,6 +75,11 @@ public class RowMutation implements IMutation this(cf.metadata().ksName, key, cf); } + public RowMutation copy() + { + return new RowMutation(keyspaceName, key, new HashMap<>(modifications)); + } + public String getKeyspaceName() { return keyspaceName; http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 8196352..14d5ee2 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -519,21 +519,20 @@ public class StorageProxy implements StorageProxyMBean } } - public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically) + @SuppressWarnings("unchecked") + public static void mutateWithTriggers(Collection<? extends IMutation> mutations, + ConsistencyLevel consistencyLevel, + boolean mutateAtomically) throws WriteTimeoutException, UnavailableException, OverloadedException, InvalidRequestException { - Collection<RowMutation> tmutations = TriggerExecutor.instance.execute(mutations); - if (mutateAtomically || tmutations != null) - { - Collection<RowMutation> allMutations = new ArrayList<>((Collection<RowMutation>) mutations); - if (tmutations != null) - allMutations.addAll(tmutations); - StorageProxy.mutateAtomically(allMutations, consistencyLevel); - } + Collection<RowMutation> augmented = TriggerExecutor.instance.execute(mutations); + + if (augmented != null) + mutateAtomically(augmented, consistencyLevel); + else if (mutateAtomically) + mutateAtomically((Collection<RowMutation>) mutations, consistencyLevel); else - { - StorageProxy.mutate(mutations, consistencyLevel); - } + mutate(mutations, consistencyLevel); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/src/java/org/apache/cassandra/triggers/TriggerExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java index 8ccf937..988c6a7 100644 --- a/src/java/org/apache/cassandra/triggers/TriggerExecutor.java +++ b/src/java/org/apache/cassandra/triggers/TriggerExecutor.java @@ -20,11 +20,9 @@ package org.apache.cassandra.triggers; import java.io.File; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.UUID; +import java.util.*; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -38,6 +36,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.HeapAllocator; +import org.apache.cassandra.utils.Pair; public class TriggerExecutor { @@ -68,7 +67,7 @@ public class TriggerExecutor public ColumnFamily execute(ByteBuffer key, ColumnFamily updates) throws InvalidRequestException { List<RowMutation> intermediate = executeInternal(key, updates); - if (intermediate == null) + if (intermediate == null || intermediate.isEmpty()) return updates; validateForSinglePartition(updates.metadata().getKeyValidator(), updates.id(), key, intermediate); @@ -80,30 +79,62 @@ public class TriggerExecutor return updates; } - public Collection<RowMutation> execute(Collection<? extends IMutation> updates) throws InvalidRequestException + public Collection<RowMutation> execute(Collection<? extends IMutation> mutations) throws InvalidRequestException { boolean hasCounters = false; - Collection<RowMutation> tmutations = null; - for (IMutation mutation : updates) + List<RowMutation> augmentedMutations = null; + + for (IMutation mutation : mutations) { + if (mutation instanceof CounterMutation) + hasCounters = true; + for (ColumnFamily cf : mutation.getColumnFamilies()) { - List<RowMutation> intermediate = executeInternal(mutation.key(), cf); - if (intermediate == null) + List<RowMutation> augmentations = executeInternal(mutation.key(), cf); + if (augmentations == null || augmentations.isEmpty()) continue; - validate(intermediate); - if (tmutations == null) - tmutations = intermediate; - else - tmutations.addAll(intermediate); + validate(augmentations); + + if (augmentedMutations == null) + augmentedMutations = new LinkedList<>(); + augmentedMutations.addAll(augmentations); } - if (mutation instanceof CounterMutation) - hasCounters = true; } - if (tmutations != null && hasCounters) + + if (augmentedMutations == null) + return null; + + if (hasCounters) throw new InvalidRequestException("Counter mutations and trigger mutations cannot be applied together atomically."); - return tmutations; + + @SuppressWarnings("unchecked") + Collection<RowMutation> originalMutations = (Collection<RowMutation>) mutations; + + return mergeMutations(Iterables.concat(originalMutations, augmentedMutations)); + } + + private Collection<RowMutation> mergeMutations(Iterable<RowMutation> mutations) + { + Map<Pair<String, ByteBuffer>, RowMutation> groupedMutations = new HashMap<>(); + + for (RowMutation mutation : mutations) + { + Pair<String, ByteBuffer> key = Pair.create(mutation.getKeyspaceName(), mutation.key()); + RowMutation current = groupedMutations.get(key); + if (current == null) + { + // copy in case the mutation's modifications map is backed by an immutable Collections#singletonMap(). + groupedMutations.put(key, mutation.copy()); + } + else + { + current.addAll(mutation); + } + } + + return groupedMutations.values(); } private void validateForSinglePartition(AbstractType<?> keyValidator, @@ -141,7 +172,7 @@ public class TriggerExecutor */ private List<RowMutation> executeInternal(ByteBuffer key, ColumnFamily columnFamily) { - Map<String,TriggerDefinition> triggers = columnFamily.metadata().getTriggers(); + Map<String, TriggerDefinition> triggers = columnFamily.metadata().getTriggers(); if (triggers.isEmpty()) return null; List<RowMutation> tmutations = Lists.newLinkedList(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java new file mode 100644 index 0000000..ab7f7c4 --- /dev/null +++ b/test/unit/org/apache/cassandra/triggers/TriggerExecutorTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.triggers; + +import java.nio.ByteBuffer; +import java.util.*; +import org.junit.Test; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.TriggerDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; + +import static org.junit.Assert.*; + +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + +public class TriggerExecutorTest +{ + @Test + public void sameKeySameCfColumnFamilies() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName())); + ColumnFamily mutated = TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null)); + assertEquals(bytes("v1"), mutated.getColumn(getColumnName(metadata, "c1")).value()); + assertEquals(bytes("trigger"), mutated.getColumn(getColumnName(metadata, "c2")).value()); + } + + @Test(expected = InvalidRequestException.class) + public void sameKeyDifferentCfColumnFamilies() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName())); + TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null)); + } + + @Test(expected = InvalidRequestException.class) + public void differentKeyColumnFamilies() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName())); + TriggerExecutor.instance.execute(bytes("k1"), makeCf(metadata, "v1", null)); + } + + @Test + public void noTriggerMutations() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", NoOpTrigger.class.getName())); + RowMutation rm = new RowMutation(bytes("k1"), makeCf(metadata, "v1", null)); + assertNull(TriggerExecutor.instance.execute(Collections.singletonList(rm))); + } + + @Test + public void sameKeySameCfRowMutations() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfTrigger.class.getName())); + ColumnFamily cf1 = makeCf(metadata, "k1v1", null); + ColumnFamily cf2 = makeCf(metadata, "k2v1", null); + RowMutation rm1 = new RowMutation(bytes("k1"), cf1); + RowMutation rm2 = new RowMutation(bytes("k2"), cf2); + + List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2))); + assertEquals(2, tmutations.size()); + Collections.sort(tmutations, new RmComparator()); + + List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value()); + + mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value()); + } + + @Test + public void sameKeySameCfPartialRowMutations() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeySameCfPartialTrigger.class.getName())); + ColumnFamily cf1 = makeCf(metadata, "k1v1", null); + ColumnFamily cf2 = makeCf(metadata, "k2v1", null); + RowMutation rm1 = new RowMutation(bytes("k1"), cf1); + RowMutation rm2 = new RowMutation(bytes("k2"), cf2); + + List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2))); + assertEquals(2, tmutations.size()); + Collections.sort(tmutations, new RmComparator()); + + List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2"))); + + mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value()); + } + + @Test + public void sameKeyDifferentCfRowMutations() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentCfTrigger.class.getName())); + ColumnFamily cf1 = makeCf(metadata, "k1v1", null); + ColumnFamily cf2 = makeCf(metadata, "k2v1", null); + RowMutation rm1 = new RowMutation(bytes("k1"), cf1); + RowMutation rm2 = new RowMutation(bytes("k2"), cf2); + + List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2))); + assertEquals(2, tmutations.size()); + Collections.sort(tmutations, new RmComparator()); + + List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies()); + assertEquals(2, mutatedCFs.size()); + + Collections.sort(mutatedCFs, new CfComparator()); + assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2"))); + assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1"))); + assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value()); + + mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies()); + assertEquals(2, mutatedCFs.size()); + + Collections.sort(mutatedCFs, new CfComparator()); + assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2"))); + assertNull(mutatedCFs.get(1).getColumn(getColumnName(metadata, "c1"))); + assertEquals(bytes("trigger"), mutatedCFs.get(1).getColumn(getColumnName(metadata, "c2")).value()); + } + + @Test + public void sameKeyDifferentKsRowMutations() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", SameKeyDifferentKsTrigger.class.getName())); + ColumnFamily cf1 = makeCf(metadata, "k1v1", null); + ColumnFamily cf2 = makeCf(metadata, "k2v1", null); + RowMutation rm1 = new RowMutation(bytes("k1"), cf1); + RowMutation rm2 = new RowMutation(bytes("k2"), cf2); + + List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm1, rm2))); + assertEquals(4, tmutations.size()); + Collections.sort(tmutations, new RmComparator()); + + List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertEquals(bytes("k1v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2"))); + + mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertEquals(bytes("k2v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2"))); + + mutatedCFs = new ArrayList<>(tmutations.get(2).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1"))); + assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value()); + + mutatedCFs = new ArrayList<>(tmutations.get(3).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1"))); + assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value()); + } + + @Test + public void differentKeyRowMutations() throws ConfigurationException, InvalidRequestException + { + CFMetaData metadata = makeCfMetaData("ks1", "cf1", TriggerDefinition.create("test", DifferentKeyTrigger.class.getName())); + ColumnFamily cf = makeCf(metadata, "v1", null); + RowMutation rm = new RowMutation(UTF8Type.instance.fromString("k1"), cf); + + List<? extends IMutation> tmutations = new ArrayList<>(TriggerExecutor.instance.execute(Arrays.asList(rm))); + assertEquals(2, tmutations.size()); + Collections.sort(tmutations, new RmComparator()); + + assertEquals(bytes("k1"), tmutations.get(0).key()); + assertEquals(bytes("otherKey"), tmutations.get(1).key()); + + List<ColumnFamily> mutatedCFs = new ArrayList<>(tmutations.get(0).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertEquals(bytes("v1"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1")).value()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2"))); + + mutatedCFs = new ArrayList<>(tmutations.get(1).getColumnFamilies()); + assertEquals(1, mutatedCFs.size()); + assertNull(mutatedCFs.get(0).getColumn(getColumnName(metadata, "c1"))); + assertEquals(bytes("trigger"), mutatedCFs.get(0).getColumn(getColumnName(metadata, "c2")).value()); + } + + private static CFMetaData makeCfMetaData(String ks, String cf, TriggerDefinition trigger) + { + + CFMetaData metadata = new CFMetaData(ks, cf, ColumnFamilyType.Standard, CompositeType.getInstance(UTF8Type.instance)); + + metadata.keyValidator(UTF8Type.instance); + metadata.addOrReplaceColumnDefinition(ColumnDefinition.partitionKeyDef(UTF8Type.instance.fromString("pkey"), + UTF8Type.instance, + null)); + metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c1"), + UTF8Type.instance, + 0)); + metadata.addOrReplaceColumnDefinition(ColumnDefinition.regularDef(UTF8Type.instance.fromString("c2"), + UTF8Type.instance, + 0)); + try + { + if (trigger != null) + metadata.addTriggerDefinition(trigger); + } + catch (ConfigurationException e) + { + throw new AssertionError(e); + } + + return metadata.rebuild(); + } + + private static ColumnFamily makeCf(CFMetaData metadata, String columnValue1, String columnValue2) + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(metadata); + + if (columnValue1 != null) + cf.addColumn(new Column(getColumnName(metadata, "c1"), bytes(columnValue1))); + + if (columnValue2 != null) + cf.addColumn(new Column(getColumnName(metadata, "c2"), bytes(columnValue2))); + + return cf; + } + + private static ByteBuffer getColumnName(CFMetaData metadata, String stringName) + { + return ((CompositeType) metadata.comparator).builder().add(bytes(stringName)).build(); + } + + public static class NoOpTrigger implements ITrigger + { + public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update) + { + return null; + } + } + + public static class SameKeySameCfTrigger implements ITrigger + { + public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update) + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata()); + cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger"))); + return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf)); + } + } + + public static class SameKeySameCfPartialTrigger implements ITrigger + { + public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update) + { + if (!key.equals(bytes("k2"))) + return null; + + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata()); + cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger"))); + return Collections.singletonList(new RowMutation(update.metadata().ksName, key, cf)); + } + } + + public static class SameKeyDifferentCfTrigger implements ITrigger + { + public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update) + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData(update.metadata().ksName, "otherCf", null)); + cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger"))); + return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf)); + } + } + + public static class SameKeyDifferentKsTrigger implements ITrigger + { + public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update) + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(makeCfMetaData("otherKs", "otherCf", null)); + cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger"))); + return Collections.singletonList(new RowMutation(cf.metadata().ksName, key, cf)); + } + } + + public static class DifferentKeyTrigger implements ITrigger + { + public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update) + { + ColumnFamily cf = ArrayBackedSortedColumns.factory.create(update.metadata()); + cf.addColumn(new Column(getColumnName(update.metadata(), "c2"), bytes("trigger"))); + return Collections.singletonList(new RowMutation(cf.metadata().ksName, bytes("otherKey"), cf)); + } + } + + private static class RmComparator implements Comparator<IMutation> + { + public int compare(IMutation m1, IMutation m2) + { + int cmp = m1.getKeyspaceName().compareTo(m2.getKeyspaceName()); + return cmp != 0 ? cmp : m1.key().compareTo(m2.key()); + } + } + + private static class CfComparator implements Comparator<ColumnFamily> + { + public int compare(ColumnFamily cf1, ColumnFamily cf2) + { + return cf1.metadata().cfName.compareTo(cf2.metadata().cfName); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/c1eb74ce/test/unit/org/apache/cassandra/triggers/TriggersTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/triggers/TriggersTest.java b/test/unit/org/apache/cassandra/triggers/TriggersTest.java index 5b9b27d..bda13ff 100644 --- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java +++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java @@ -50,6 +50,8 @@ import org.apache.thrift.protocol.TBinaryProtocol; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.apache.cassandra.utils.ByteBufferUtil.bytes; + public class TriggersTest extends SchemaLoader { private static boolean triggerCreated = false; @@ -126,7 +128,7 @@ public class TriggersTest extends SchemaLoader new TFramedTransportFactory().openTransport( InetAddress.getLocalHost().getHostName(), 9170))); client.set_keyspace(ksName); - client.insert(ByteBufferUtil.bytes(2), + client.insert(bytes(2), new ColumnParent(cfName), getColumnForInsert("v1", 2), org.apache.cassandra.thrift.ConsistencyLevel.ONE); @@ -147,7 +149,7 @@ public class TriggersTest extends SchemaLoader cosc.setColumn(getColumnForInsert("v1", 3)); mutation.setColumn_or_supercolumn(cosc); client.batch_mutate( - Collections.singletonMap(ByteBufferUtil.bytes(3), + Collections.singletonMap(bytes(3), Collections.singletonMap(cfName, Collections.singletonList(mutation))), org.apache.cassandra.thrift.ConsistencyLevel.ONE); @@ -183,9 +185,9 @@ public class TriggersTest extends SchemaLoader new TFramedTransportFactory().openTransport( InetAddress.getLocalHost().getHostName(), 9170))); client.set_keyspace(ksName); - client.cas(ByteBufferUtil.bytes(6), + client.cas(bytes(6), cfName, - Collections.EMPTY_LIST, + Collections.<org.apache.cassandra.thrift.Column>emptyList(), Collections.singletonList(getColumnForInsert("v1", 6)), org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL, org.apache.cassandra.thrift.ConsistencyLevel.ONE); @@ -241,9 +243,9 @@ public class TriggersTest extends SchemaLoader new TFramedTransportFactory().openTransport( InetAddress.getLocalHost().getHostName(), 9170))); client.set_keyspace(ksName); - client.cas(ByteBufferUtil.bytes(9), + client.cas(bytes(9), cf, - Collections.EMPTY_LIST, + Collections.<org.apache.cassandra.thrift.Column>emptyList(), Collections.singletonList(getColumnForInsert("v1", 9)), org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL, org.apache.cassandra.thrift.ConsistencyLevel.ONE); @@ -266,9 +268,9 @@ public class TriggersTest extends SchemaLoader new TFramedTransportFactory().openTransport( InetAddress.getLocalHost().getHostName(), 9170))); client.set_keyspace(ksName); - client.cas(ByteBufferUtil.bytes(10), + client.cas(bytes(10), cf, - Collections.EMPTY_LIST, + Collections.<org.apache.cassandra.thrift.Column>emptyList(), Collections.singletonList(getColumnForInsert("v1", 10)), org.apache.cassandra.thrift.ConsistencyLevel.LOCAL_SERIAL, org.apache.cassandra.thrift.ConsistencyLevel.ONE); @@ -310,7 +312,7 @@ public class TriggersTest extends SchemaLoader { org.apache.cassandra.thrift.Column column = new org.apache.cassandra.thrift.Column(); column.setName(Schema.instance.getCFMetaData(ksName, cfName).comparator.fromString(columnName)); - column.setValue(ByteBufferUtil.bytes(value)); + column.setValue(bytes(value)); column.setTimestamp(System.currentTimeMillis()); return column; } @@ -321,10 +323,8 @@ public class TriggersTest extends SchemaLoader { ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false); extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"), - ByteBufferUtil.bytes(999))); - RowMutation rm = new RowMutation(ksName, key); - rm.add(extraUpdate); - return Collections.singletonList(rm); + bytes(999))); + return Collections.singletonList(new RowMutation(ksName, key, extraUpdate)); } } @@ -334,12 +334,10 @@ public class TriggersTest extends SchemaLoader { ColumnFamily extraUpdate = update.cloneMeShallow(ArrayBackedSortedColumns.factory, false); extraUpdate.addColumn(new Column(update.metadata().comparator.fromString("v2"), - ByteBufferUtil.bytes(999))); + bytes(999))); int newKey = ByteBufferUtil.toInt(key) + 1000; - RowMutation rm = new RowMutation(ksName, ByteBufferUtil.bytes(newKey)); - rm.add(extraUpdate); - return Collections.singletonList(rm); + return Collections.singletonList(new RowMutation(ksName, bytes(newKey), extraUpdate)); } } @@ -349,11 +347,8 @@ public class TriggersTest extends SchemaLoader { ColumnFamily extraUpdate = ArrayBackedSortedColumns.factory.create(ksName, otherCf); extraUpdate.addColumn(new Column(extraUpdate.metadata().comparator.fromString("v2"), - ByteBufferUtil.bytes(999))); - - RowMutation rm = new RowMutation(ksName, key); - rm.add(extraUpdate); - return Collections.singletonList(rm); + bytes(999))); + return Collections.singletonList(new RowMutation(ksName, key, extraUpdate)); } } }
