Author: eevans
Date: Mon Apr 5 17:01:29 2010
New Revision: 930903
URL: http://svn.apache.org/viewvc?rev=930903&view=rev
Log:
batch_mutate() rpc implementation
Patch by eevans
Modified:
cassandra/trunk/interface/cassandra.avpr
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/test/system/test_avro_server.py
Modified: cassandra/trunk/interface/cassandra.avpr
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.avpr?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.avpr (original)
+++ cassandra/trunk/interface/cassandra.avpr Mon Apr 5 17:01:29 2010
@@ -131,7 +131,6 @@
"batch_mutate": {
"request": [
{"name": "keyspace", "type": "string"},
- /* Map<String, Map<String, List<Mutation>>> mutation_map */
{"name": "mutation_map",
"type": {
"type": "map", "values": {
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
Mon Apr 5 17:01:29 2010
@@ -131,6 +131,11 @@ class ErrorFactory
{
return newTimedOutException(new Utf8(why));
}
+
+ static TimedOutException newTimedOutException()
+ {
+ return newTimedOutException(new Utf8());
+ }
static UnavailableException newUnavailableException(Utf8 why)
{
@@ -143,4 +148,9 @@ class ErrorFactory
{
return newUnavailableException(new Utf8(why));
}
+
+ static UnavailableException newUnavailableException()
+ {
+ return newUnavailableException(new Utf8());
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Mon
Apr 5 17:01:29 2010
@@ -21,7 +21,10 @@ package org.apache.cassandra.avro;
*/
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Comparator;
+
import org.apache.avro.util.Utf8;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
@@ -88,13 +91,13 @@ public class AvroValidation {
}
if (column != null)
- validateColumns(keyspace, column_family, super_column,
Arrays.asList(column));
+ validateColumns(keyspace, column_family, super_column,
Arrays.asList(cp.column));
if (super_column != null)
- validateColumns(keyspace, column_family, null,
Arrays.asList(super_column));
+ validateColumns(keyspace, column_family, null,
Arrays.asList(cp.super_column));
}
// FIXME: could use method in ThriftValidation
- static void validateColumns(String keyspace, String cfName, byte[]
superColumnName, Iterable<byte[]> columnNames)
+ static void validateColumns(String keyspace, String cfName, byte[]
superColumnName, Iterable<ByteBuffer> columnNames)
throws InvalidRequestException
{
if (superColumnName != null)
@@ -108,8 +111,10 @@ public class AvroValidation {
}
AbstractType comparator = ColumnFamily.getComparatorFor(keyspace,
cfName, superColumnName);
- for (byte[] name : columnNames)
+ for (ByteBuffer buff : columnNames)
{
+ byte[] name = buff.array();
+
if (name.length > IColumn.MAX_NAME_LENGTH)
throw newInvalidRequestException("column name length must not
be greater than " + IColumn.MAX_NAME_LENGTH);
if (name.length == 0)
@@ -139,4 +144,77 @@ public class AvroValidation {
if ((cosc.column == null) && (cosc.super_column == null))
throw newInvalidRequestException("ColumnOrSuperColumn must have
one or both of Column or SuperColumn");
}
+
+ static void validateRange(String keyspace, String cfName, byte[]
superName, SliceRange range)
+ throws InvalidRequestException
+ {
+ AbstractType comparator = ColumnFamily.getComparatorFor(keyspace,
cfName, superName);
+ byte[] start = range.start.array();
+ byte[] finish = range.finish.array();
+
+ try
+ {
+ comparator.validate(start);
+ comparator.validate(finish);
+ }
+ catch (MarshalException me)
+ {
+ throw newInvalidRequestException(me.getMessage());
+ }
+
+ if (range.count < 0)
+ throw newInvalidRequestException("Ranges require a non-negative
count.");
+
+ Comparator<byte[]> orderedComparator = range.reversed ?
comparator.getReverseComparator() : comparator;
+ if (start.length > 0 && finish.length > 0 &&
orderedComparator.compare(start, finish) > 0)
+ throw newInvalidRequestException("range finish must come after
start in the order of traversal");
+ }
+
+ static void validateSlicePredicate(String keyspace, String cfName, byte[]
superName, SlicePredicate predicate)
+ throws InvalidRequestException
+ {
+ if (predicate.column_names == null && predicate.slice_range == null)
+ throw newInvalidRequestException("A SlicePredicate must be given a
list of Columns, a SliceRange, or both");
+
+ if (predicate.slice_range != null)
+ validateRange(keyspace, cfName, superName, predicate.slice_range);
+
+ if (predicate.column_names != null)
+ validateColumns(keyspace, cfName, superName,
predicate.column_names);
+ }
+
+ static void validateDeletion(String keyspace, String cfName, Deletion
del) throws InvalidRequestException
+ {
+ if (del.super_column == null && del.predicate == null)
+ throw newInvalidRequestException("A Deletion must have a
SuperColumn, a SlicePredicate, or both.");
+
+ if (del.predicate != null)
+ {
+ validateSlicePredicate(keyspace, cfName, del.super_column.array(),
del.predicate);
+ if (del.predicate.slice_range != null)
+ throw newInvalidRequestException("Deletion does not yet
support SliceRange predicates.");
+ }
+ }
+
+ static void validateMutation(String keyspace, String cfName, Mutation
mutation) throws InvalidRequestException
+ {
+ ColumnOrSuperColumn cosc = mutation.column_or_supercolumn;
+ Deletion del = mutation.deletion;
+
+ if (cosc != null && del != null)
+ throw newInvalidRequestException("Mutation may have either a
ColumnOrSuperColumn or a Deletion, but not both");
+
+ if (cosc != null)
+ {
+ validateColumnOrSuperColumn(keyspace, cfName, cosc);
+ }
+ else if (del != null)
+ {
+ validateDeletion(keyspace, cfName, del);
+ }
+ else
+ {
+ throw newInvalidRequestException("Mutation must have one
ColumnOrSuperColumn, or one Deletion");
+ }
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Mon
Apr 5 17:01:29 2010
@@ -30,12 +30,12 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
-
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.util.Utf8;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ReadCommand;
@@ -326,6 +326,110 @@ public class CassandraServer implements
return rm;
}
+ @Override
+ public Void batch_mutate(Utf8 keyspace, Map<Utf8, Map<Utf8,
GenericArray<Mutation>>> mutationMap, ConsistencyLevel consistencyLevel)
+ throws AvroRemoteException, InvalidRequestException, UnavailableException,
TimedOutException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("batch_mutate");
+
+ String keyspaceString = keyspace.toString();
+
+ List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+ for (Map.Entry<Utf8, Map<Utf8, GenericArray<Mutation>>> mutationEntry:
mutationMap.entrySet())
+ {
+ String key = mutationEntry.getKey().toString();
+ AvroValidation.validateKey(key);
+
+ Map<Utf8, GenericArray<Mutation>> cfToMutations =
mutationEntry.getValue();
+ for (Map.Entry<Utf8, GenericArray<Mutation>> cfMutations :
cfToMutations.entrySet())
+ {
+ String cfName = cfMutations.getKey().toString();
+
+ for (Mutation mutation : cfMutations.getValue())
+ AvroValidation.validateMutation(keyspaceString, cfName,
mutation);
+ }
+ rowMutations.add(getRowMutationFromMutations(keyspaceString, key,
cfToMutations));
+ }
+
+ if (consistencyLevel == ConsistencyLevel.ZERO)
+ {
+ StorageProxy.mutate(rowMutations);
+ }
+ else
+ {
+ try
+ {
+ StorageProxy.mutateBlocking(rowMutations,
thriftConsistencyLevel(consistencyLevel));
+ }
+ catch (TimeoutException te)
+ {
+ throw newTimedOutException();
+ }
+ // FIXME: StorageProxy.mutateBlocking throws Thrift's
UnavailableException
+ catch (org.apache.cassandra.thrift.UnavailableException ue)
+ {
+ throw newUnavailableException();
+ }
+ }
+
+ return null;
+ }
+
+ // FIXME: This is copypasta from o.a.c.db.RowMutation,
(RowMutation.getRowMutation uses Thrift types directly).
+ private static RowMutation getRowMutationFromMutations(String keyspace,
String key, Map<Utf8, GenericArray<Mutation>> cfMap)
+ {
+ RowMutation rm = new RowMutation(keyspace, key.trim());
+
+ for (Map.Entry<Utf8, GenericArray<Mutation>> entry : cfMap.entrySet())
+ {
+ String cfName = entry.getKey().toString();
+
+ for (Mutation mutation : entry.getValue())
+ {
+ if (mutation.deletion != null)
+ deleteColumnOrSuperColumnToRowMutation(rm, cfName,
mutation.deletion);
+ else
+ addColumnOrSuperColumnToRowMutation(rm, cfName,
mutation.column_or_supercolumn);
+ }
+ }
+
+ return rm;
+ }
+
+ // FIXME: This is copypasta from o.a.c.db.RowMutation,
(RowMutation.getRowMutation uses Thrift types directly).
+ private static void addColumnOrSuperColumnToRowMutation(RowMutation rm,
String cfName, ColumnOrSuperColumn cosc)
+ {
+ if (cosc.column == null)
+ {
+ for (Column column : cosc.super_column.columns)
+ rm.add(new QueryPath(cfName, cosc.super_column.name.array(),
column.name.array()), column.value.array(), column.timestamp);
+ }
+ else
+ {
+ rm.add(new QueryPath(cfName, null, cosc.column.name.array()),
cosc.column.value.array(), cosc.column.timestamp);
+ }
+ }
+
+ // FIXME: This is copypasta from o.a.c.db.RowMutation,
(RowMutation.getRowMutation uses Thrift types directly).
+ private static void deleteColumnOrSuperColumnToRowMutation(RowMutation rm,
String cfName, Deletion del)
+ {
+ if (del.predicate != null && del.predicate.column_names != null)
+ {
+ for (ByteBuffer col : del.predicate.column_names)
+ {
+ if (del.super_column == null &&
DatabaseDescriptor.getColumnFamilyType(rm.getTable(), cfName).equals("Super"))
+ rm.delete(new QueryPath(cfName, col.array()),
del.timestamp);
+ else
+ rm.delete(new QueryPath(cfName, del.super_column.array(),
col.array()), del.timestamp);
+ }
+ }
+ else
+ {
+ rm.delete(new QueryPath(cfName, del.super_column.array()),
del.timestamp);
+ }
+ }
+
private org.apache.cassandra.thrift.ConsistencyLevel
thriftConsistencyLevel(ConsistencyLevel consistency)
{
switch (consistency)
Modified: cassandra/trunk/test/system/test_avro_server.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Mon Apr 5 17:01:29 2010
@@ -17,6 +17,7 @@
from . import AvroTester
from time import time
from random import randint
+from avro.ipc import AvroRemoteException
COLUMNS = [
dict(name="c0", value="v0", timestamp=1L),
@@ -142,6 +143,39 @@ class TestRpcOperations(AvroTester):
for i in range(0,3):
assert_cosc(_get_column(self.client, COLUMNS[i]['name']))
+ def test_batch_mutate(self):
+ "performing batch mutation operations"
+ params = dict()
+ params['keyspace'] = 'Keyspace1'
+ params['consistency_level'] = 'ONE'
+
+ mutation_map = dict()
+ mutation_map['key1'] = dict(Standard1=[
+ dict(column_or_supercolumn=dict(column=COLUMNS[0])),
+ dict(column_or_supercolumn=dict(column=COLUMNS[1])),
+ dict(column_or_supercolumn=dict(column=COLUMNS[2]))
+ ])
+
+ params['mutation_map'] = mutation_map
+
+ self.client.request('batch_mutate', params)
+
+ for i in range(0,3):
+ cosc = _get_column(self.client, COLUMNS[i]['name'])
+ assert_cosc(cosc)
+ assert_columns_match(cosc['column'], COLUMNS[i])
+
+ # FIXME: still need to apply a mutation that deletes
+
+ #try:
+ # assert not _get_column(self.client, COLUMNS[1]['name']), \
+ # "Mutation did not delete column %s" % COLUMNS[1]['name']
+ # assert not _get_column(self.client, COLUMNS[2]['name']), \
+ # "Mutation did not delete column %s" % COLUMNS[2]['name']
+ #except AvroRemoteException:
+ # pass
+
+
def test_get_api_version(self):
"getting the remote api version string"
vers = self.client.request('get_api_version', {})