Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/config/CFMetaData.java
src/java/org/apache/cassandra/service/StorageProxy.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/362148dd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/362148dd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/362148dd
Branch: refs/heads/cassandra-2.1
Commit: 362148dd233001e3139b7631a9d4f3b06f51b6f2
Parents: 639ddac f7eca98
Author: Aleksey Yeschenko <[email protected]>
Authored: Tue Mar 11 15:20:45 2014 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Tue Mar 11 15:20:45 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
doc/cql3/CQL.textile | 3 -
.../org/apache/cassandra/config/CFMetaData.java | 3 +
.../apache/cassandra/service/StorageProxy.java | 6 +-
.../cassandra/triggers/TriggersSchemaTest.java | 126 +++++++++++++
.../apache/cassandra/triggers/TriggersTest.java | 179 +++++++++++++++++++
6 files changed, 313 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362148dd/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 709b05a,91037d1..607e2dc
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,9 +1,18 @@@
-2.0.7
+2.1.0-beta2
+ * Add broadcast_rpc_address option to cassandra.yaml (CASSANDRA-5899)
+ * Auto reload GossipingPropertyFileSnitch config (CASSANDRA-5897)
+ * Fix overflow of memtable_total_space_in_mb (CASSANDRA-6573)
+ * Fix ABTC NPE (CASSANDRA-6692)
+ * Allow nodetool to use a file or prompt for password (CASSANDRA-6660)
+ * Fix AIOOBE when concurrently accessing ABSC (CASSANDRA-6742)
+ * Fix assertion error in ALTER TYPE RENAME (CASSANDRA-6705)
+ * Scrub should not always clear out repaired status (CASSANDRA-5351)
+ * Improve handling of range tombstone for wide partitions (CASSANDRA-6446)
+ * Fix ClassCastException for compact table with composites (CASSANDRA-6738)
+ * Fix potentially repairing with wrong nodes (CASSANDRA-6808)
+Merged from 2.0:
+ * Fix saving triggers to schema (CASSANDRA-6789)
+ * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790)
-
-
-2.0.6
* Avoid race-prone second "scrub" of system keyspace (CASSANDRA-6797)
* Pool CqlRecordWriter clients by inetaddress rather than Range
(CASSANDRA-6665)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362148dd/doc/cql3/CQL.textile
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362148dd/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/config/CFMetaData.java
index 25b7314,ff40e65..ac5dea7
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@@ -1670,45 -1507,39 +1670,48 @@@ public final class CFMetaDat
*
* @param timestamp Timestamp to use
*
- * @return RowMutation to use to completely remove cf from schema
+ * @return Mutation to use to completely remove cf from schema
*/
- public RowMutation dropFromSchema(long timestamp)
+ public Mutation dropFromSchema(long timestamp)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS,
SystemKeyspace.getSchemaKSKey(ksName));
- ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS,
SystemKeyspace.getSchemaKSKey(ksName));
+ ColumnFamily cf = mutation.addOrGet(SchemaColumnFamiliesCf);
int ldt = (int) (System.currentTimeMillis() / 1000);
- ColumnNameBuilder builder =
SchemaColumnFamiliesCf.getCfDef().getColumnNameBuilder();
- builder.add(ByteBufferUtil.bytes(cfName));
- cf.addAtom(new RangeTombstone(builder.build(),
builder.buildAsEndOfRange(), timestamp, ldt));
+ Composite prefix = SchemaColumnFamiliesCf.comparator.make(cfName);
+ cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
- for (ColumnDefinition cd : column_metadata.values())
- cd.deleteFromSchema(rm, cfName,
getColumnDefinitionComparator(cd), timestamp);
+ for (ColumnDefinition cd : allColumns())
+ cd.deleteFromSchema(mutation, timestamp);
for (TriggerDefinition td : triggers.values())
- td.deleteFromSchema(rm, cfName, timestamp);
+ td.deleteFromSchema(mutation, cfName, timestamp);
+
+ return mutation;
+ }
- return rm;
+ public boolean isPurged()
+ {
+ return isPurged;
+ }
+
+ void markPurged()
+ {
+ isPurged = true;
}
- public void toSchema(RowMutation rm, long timestamp)
+ public void toSchema(Mutation mutation, long timestamp)
{
- toSchemaNoColumnsNoTriggers(rm, timestamp);
+ toSchemaNoColumnsNoTriggers(mutation, timestamp);
+ for (TriggerDefinition td : triggers.values())
- td.toSchema(rm, cfName, timestamp);
++ td.toSchema(mutation, cfName, timestamp);
+
- for (ColumnDefinition cd : column_metadata.values())
- cd.toSchema(rm, cfName, getColumnDefinitionComparator(cd),
timestamp);
+ for (ColumnDefinition cd : allColumns())
+ cd.toSchema(mutation, timestamp);
}
- private void toSchemaNoColumnsNoTriggers(RowMutation rm, long timestamp)
+ private void toSchemaNoColumnsNoTriggers(Mutation mutation, long
timestamp)
{
// For property that can be null (and can be changed), we insert
tombstones, to make sure
// we don't keep a property the user has removed
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362148dd/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageProxy.java
index 5a51838,a6db9cd..22f050b
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@@ -511,13 -508,13 +511,13 @@@ public class StorageProxy implements St
}
}
- public static void mutateWithTriggers(Collection<? extends IMutation>
mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically) throws
WriteTimeoutException, UnavailableException,
- OverloadedException, InvalidRequestException
+ public static void mutateWithTriggers(Collection<? extends IMutation>
mutations, ConsistencyLevel consistencyLevel, boolean mutateAtomically)
+ throws WriteTimeoutException, UnavailableException, OverloadedException,
InvalidRequestException
{
- Collection<RowMutation> tmutations =
TriggerExecutor.instance.execute(mutations);
+ Collection<Mutation> tmutations =
TriggerExecutor.instance.execute(mutations);
if (mutateAtomically || tmutations != null)
{
- Collection<Mutation> allMutations = (Collection<Mutation>)
mutations;
- Collection<RowMutation> allMutations = new
ArrayList<>((Collection<RowMutation>) mutations);
++ Collection<Mutation> allMutations = new
ArrayList<>((Collection<Mutation>) mutations);
if (tmutations != null)
allMutations.addAll(tmutations);
StorageProxy.mutateAtomically(allMutations, consistencyLevel);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/362148dd/test/unit/org/apache/cassandra/triggers/TriggersTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/triggers/TriggersTest.java
index 0000000,6ca3880..947674f
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/triggers/TriggersTest.java
+++ b/test/unit/org/apache/cassandra/triggers/TriggersTest.java
@@@ -1,0 -1,179 +1,179 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.triggers;
+
+ import java.net.InetAddress;
+ import java.nio.ByteBuffer;
+ import java.util.Collection;
+ import java.util.Collections;
+
+ import org.junit.AfterClass;
+ import org.junit.Before;
+ import org.junit.Test;
+
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.config.Schema;
+ import org.apache.cassandra.cql3.QueryProcessor;
+ import org.apache.cassandra.cql3.UntypedResultSet;
+ import org.apache.cassandra.db.ArrayBackedSortedColumns;
-import org.apache.cassandra.db.Column;
++import org.apache.cassandra.db.Cell;
+ import org.apache.cassandra.db.ColumnFamily;
+ import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.RowMutation;
++import org.apache.cassandra.db.Mutation;
++import org.apache.cassandra.db.composites.CellNames;
+ import org.apache.cassandra.service.StorageService;
+ import org.apache.cassandra.thrift.Cassandra;
+ import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+ import org.apache.cassandra.thrift.ColumnParent;
-import org.apache.cassandra.thrift.Mutation;
+ import org.apache.cassandra.thrift.TFramedTransportFactory;
+ import org.apache.cassandra.thrift.ThriftServer;
-import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.thrift.protocol.TBinaryProtocol;
+
++import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
+ import static org.junit.Assert.assertEquals;
+
+ public class TriggersTest extends SchemaLoader
+ {
+ private static boolean triggerCreated = false;
+ private static ThriftServer thriftServer;
+
+ private static String ksName = "triggers_test_ks";
+ private static String cfName = "test_table";
+
+ @Before
+ public void setup() throws Exception
+ {
+ StorageService.instance.initServer(0);
+ if (thriftServer == null || ! thriftServer.isRunning())
+ {
- thriftServer = new ThriftServer(InetAddress.getLocalHost(), 9170);
++ thriftServer = new ThriftServer(InetAddress.getLocalHost(), 9170,
50);
+ thriftServer.start();
+ }
+
+ String cql = String.format("CREATE KEYSPACE IF NOT EXISTS %s " +
+ "WITH REPLICATION = {'class':
'SimpleStrategy', 'replication_factor': 1}",
+ ksName);
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+
+ cql = String.format("CREATE TABLE IF NOT EXISTS %s.%s (k int, v1 int,
v2 int, PRIMARY KEY (k))", ksName, cfName);
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+
+ // no conditional execution of create trigger stmt yet
+ if (! triggerCreated)
+ {
+ cql = String.format("CREATE TRIGGER trigger_1 ON %s.%s USING
'%s'",
+ ksName, cfName, TestTrigger.class.getName());
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+ triggerCreated = true;
+ }
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (thriftServer != null && thriftServer.isRunning())
+ {
+ thriftServer.stop();
+ }
+ }
+
+ @Test
+ public void executeTriggerOnCqlInsert() throws Exception
+ {
+ String cql = String.format("INSERT INTO %s.%s (k, v1) VALUES (0, 0)",
ksName, cfName);
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+ assertUpdateIsAugmented(0);
+ }
+
+ @Test
+ public void executeTriggerOnCqlBatchInsert() throws Exception
+ {
+ String cql = String.format("BEGIN BATCH " +
+ " INSERT INTO %s.%s (k, v1) VALUES (1,
1); " +
+ "APPLY BATCH",
+ ksName, cfName);
+ QueryProcessor.process(cql, ConsistencyLevel.ONE);
+ assertUpdateIsAugmented(1);
+ }
+
+ @Test
+ public void executeTriggerOnThriftInsert() throws Exception
+ {
+ Cassandra.Client client = new Cassandra.Client(
+ new TBinaryProtocol(
+ new
TFramedTransportFactory().openTransport(
+
InetAddress.getLocalHost().getHostName(), 9170)));
+ client.set_keyspace(ksName);
- client.insert(ByteBufferUtil.bytes(2),
++ client.insert(bytes(2),
+ new ColumnParent(cfName),
+ getColumnForInsert("v1", 2),
+ org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+
+ assertUpdateIsAugmented(2);
+ }
+
+ @Test
+ public void executeTriggerOnThriftBatchUpdate() throws Exception
+ {
+ Cassandra.Client client = new Cassandra.Client(
+ new TBinaryProtocol(
+ new
TFramedTransportFactory().openTransport(
+
InetAddress.getLocalHost().getHostName(), 9170)));
+ client.set_keyspace(ksName);
- Mutation mutation = new Mutation();
++ org.apache.cassandra.thrift.Mutation mutation = new
org.apache.cassandra.thrift.Mutation();
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ cosc.setColumn(getColumnForInsert("v1", 3));
+ mutation.setColumn_or_supercolumn(cosc);
+ client.batch_mutate(
- Collections.singletonMap(ByteBufferUtil.bytes(3),
++ Collections.singletonMap(bytes(3),
+ Collections.singletonMap(cfName,
+
Collections.singletonList(mutation))),
+ org.apache.cassandra.thrift.ConsistencyLevel.ONE);
+
+ assertUpdateIsAugmented(3);
+ }
+
+ private void assertUpdateIsAugmented(int key)
+ {
+ UntypedResultSet rs = QueryProcessor.processInternal(
+ String.format("SELECT * FROM %s.%s WHERE
k=%s", ksName, cfName, key));
+ assertEquals(999, rs.one().getInt("v2"));
+ }
+
+ private org.apache.cassandra.thrift.Column getColumnForInsert(String
columnName, int value)
+ {
+ org.apache.cassandra.thrift.Column column = new
org.apache.cassandra.thrift.Column();
- column.setName(Schema.instance.getCFMetaData(ksName,
cfName).comparator.fromString(columnName));
- column.setValue(ByteBufferUtil.bytes(value));
++ column.setName(Schema.instance.getCFMetaData(ksName,
cfName).comparator.asAbstractType().fromString(columnName));
++ column.setValue(bytes(value));
+ column.setTimestamp(System.currentTimeMillis());
+ return column;
+ }
+
+ public static class TestTrigger implements ITrigger
+ {
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily
update)
++ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily
update)
+ {
+ ColumnFamily extraUpdate =
update.cloneMeShallow(ArrayBackedSortedColumns.factory, false);
- extraUpdate.addColumn(new
Column(update.metadata().comparator.fromString("v2"),
- ByteBufferUtil.bytes(999)));
- RowMutation rm = new RowMutation(ksName, key);
- rm.add(extraUpdate);
- return Collections.singletonList(rm);
++ extraUpdate.addColumn(new
Cell(CellNames.compositeDense(bytes("v2")),
++ bytes(999)));
++ Mutation mutation = new Mutation(ksName, key);
++ mutation.add(extraUpdate);
++ return Collections.singletonList(mutation);
+ }
+ }
+ }