Author: eevans
Date: Mon Apr  5 17:01:29 2010
New Revision: 930903

URL: http://svn.apache.org/viewvc?rev=930903&view=rev
Log:
batch_mutate() rpc implementation

Patch by eevans

Modified:
    cassandra/trunk/interface/cassandra.avpr
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/test/system/test_avro_server.py

Modified: cassandra/trunk/interface/cassandra.avpr
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.avpr?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.avpr (original)
+++ cassandra/trunk/interface/cassandra.avpr Mon Apr  5 17:01:29 2010
@@ -131,7 +131,6 @@
     "batch_mutate": {
         "request": [
             {"name": "keyspace", "type": "string"},
-            /* Map<String, Map<String, List<Mutation>>> mutation_map */
             {"name": "mutation_map",
                 "type": {
                     "type": "map", "values": {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java 
Mon Apr  5 17:01:29 2010
@@ -131,6 +131,11 @@ class ErrorFactory
     {
         return newTimedOutException(new Utf8(why));
     }
+
+    static TimedOutException newTimedOutException()
+    {
+        return newTimedOutException(new Utf8());
+    }
     
     static UnavailableException newUnavailableException(Utf8 why)
     {
@@ -143,4 +148,9 @@ class ErrorFactory
     {
         return newUnavailableException(new Utf8(why));
     }
+    
+    static UnavailableException newUnavailableException()
+    {
+        return newUnavailableException(new Utf8());
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Mon 
Apr  5 17:01:29 2010
@@ -21,7 +21,10 @@ package org.apache.cassandra.avro;
  */
 
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Comparator;
+
 import org.apache.avro.util.Utf8;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
@@ -88,13 +91,13 @@ public class AvroValidation {
         }
          
         if (column != null)
-            validateColumns(keyspace, column_family, super_column, 
Arrays.asList(column));
+            validateColumns(keyspace, column_family, super_column, 
Arrays.asList(cp.column));
         if (super_column != null)
-            validateColumns(keyspace, column_family, null, 
Arrays.asList(super_column));
+            validateColumns(keyspace, column_family, null, 
Arrays.asList(cp.super_column));
     }
     
     // FIXME: could use method in ThriftValidation
-    static void validateColumns(String keyspace, String cfName, byte[] 
superColumnName, Iterable<byte[]> columnNames)
+    static void validateColumns(String keyspace, String cfName, byte[] 
superColumnName, Iterable<ByteBuffer> columnNames)
     throws InvalidRequestException
     {
         if (superColumnName != null)
@@ -108,8 +111,10 @@ public class AvroValidation {
         }
         
         AbstractType comparator = ColumnFamily.getComparatorFor(keyspace, 
cfName, superColumnName);
-        for (byte[] name : columnNames)
+        for (ByteBuffer buff : columnNames)
         {
+            byte[] name = buff.array();
+
             if (name.length > IColumn.MAX_NAME_LENGTH)
                 throw newInvalidRequestException("column name length must not 
be greater than " + IColumn.MAX_NAME_LENGTH);
             if (name.length == 0)
@@ -139,4 +144,77 @@ public class AvroValidation {
         if ((cosc.column == null) && (cosc.super_column == null))
             throw newInvalidRequestException("ColumnOrSuperColumn must have 
one or both of Column or SuperColumn");
     }
+
+    static void validateRange(String keyspace, String cfName, byte[] 
superName, SliceRange range)
+    throws InvalidRequestException
+    {
+        AbstractType comparator = ColumnFamily.getComparatorFor(keyspace, 
cfName, superName);
+        byte[] start = range.start.array();
+        byte[] finish = range.finish.array();
+
+        try
+        {
+            comparator.validate(start);
+            comparator.validate(finish);
+        }
+        catch (MarshalException me)
+        {
+            throw newInvalidRequestException(me.getMessage());
+        }
+
+        if (range.count < 0)
+            throw newInvalidRequestException("Ranges require a non-negative 
count.");
+
+        Comparator<byte[]> orderedComparator = range.reversed ? 
comparator.getReverseComparator() : comparator;
+        if (start.length > 0 && finish.length > 0 && 
orderedComparator.compare(start, finish) > 0)
+            throw newInvalidRequestException("range finish must come after 
start in the order of traversal");
+    }
+
+    static void validateSlicePredicate(String keyspace, String cfName, byte[] 
superName, SlicePredicate predicate)
+    throws InvalidRequestException
+    {
+        if (predicate.column_names == null && predicate.slice_range == null)
+            throw newInvalidRequestException("A SlicePredicate must be given a 
list of Columns, a SliceRange, or both");
+
+        if (predicate.slice_range != null)
+            validateRange(keyspace, cfName, superName, predicate.slice_range);
+
+        if (predicate.column_names != null)
+            validateColumns(keyspace, cfName, superName, 
predicate.column_names);
+    }
+
+    static void validateDeletion(String keyspace, String  cfName, Deletion 
del) throws InvalidRequestException
+    {
+        if (del.super_column == null && del.predicate == null)
+            throw newInvalidRequestException("A Deletion must have a 
SuperColumn, a SlicePredicate, or both.");
+
+        if (del.predicate != null)
+        {
+            validateSlicePredicate(keyspace, cfName, del.super_column.array(), 
del.predicate);
+            if (del.predicate.slice_range != null)
+                throw newInvalidRequestException("Deletion does not yet 
support SliceRange predicates.");
+        }
+    }
+
+    static void validateMutation(String keyspace, String cfName, Mutation 
mutation) throws InvalidRequestException
+    {
+        ColumnOrSuperColumn cosc = mutation.column_or_supercolumn;
+        Deletion del = mutation.deletion;
+
+        if (cosc != null && del != null)
+            throw newInvalidRequestException("Mutation may have either a 
ColumnOrSuperColumn or a Deletion, but not both");
+
+        if (cosc != null)
+        {
+            validateColumnOrSuperColumn(keyspace, cfName, cosc);
+        }
+        else if (del != null)
+        {
+            validateDeletion(keyspace, cfName, del);
+        }
+        else
+        {
+            throw newInvalidRequestException("Mutation must have one 
ColumnOrSuperColumn, or one Deletion");
+        }
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Mon 
Apr  5 17:01:29 2010
@@ -30,12 +30,12 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.util.Utf8;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ReadCommand;
@@ -326,6 +326,110 @@ public class CassandraServer implements 
         return rm;
     }
 
+    @Override
+    public Void batch_mutate(Utf8 keyspace, Map<Utf8, Map<Utf8, 
GenericArray<Mutation>>> mutationMap, ConsistencyLevel consistencyLevel)
+    throws AvroRemoteException, InvalidRequestException, UnavailableException, 
TimedOutException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("batch_mutate");
+        
+        String keyspaceString = keyspace.toString();
+        
+        List<RowMutation> rowMutations = new ArrayList<RowMutation>();
+        for (Map.Entry<Utf8, Map<Utf8, GenericArray<Mutation>>> mutationEntry: 
mutationMap.entrySet())
+        {
+            String key = mutationEntry.getKey().toString();
+            AvroValidation.validateKey(key);
+            
+            Map<Utf8, GenericArray<Mutation>> cfToMutations = 
mutationEntry.getValue();
+            for (Map.Entry<Utf8, GenericArray<Mutation>> cfMutations : 
cfToMutations.entrySet())
+            {
+                String cfName = cfMutations.getKey().toString();
+                
+                for (Mutation mutation : cfMutations.getValue())
+                    AvroValidation.validateMutation(keyspaceString, cfName, 
mutation);
+            }
+            rowMutations.add(getRowMutationFromMutations(keyspaceString, key, 
cfToMutations));
+        }
+        
+        if (consistencyLevel == ConsistencyLevel.ZERO)
+        {
+            StorageProxy.mutate(rowMutations);
+        }
+        else
+        {
+            try
+            {
+                StorageProxy.mutateBlocking(rowMutations, 
thriftConsistencyLevel(consistencyLevel));
+            }
+            catch (TimeoutException te)
+            {
+                throw newTimedOutException();
+            }
+            // FIXME: StorageProxy.mutateBlocking throws Thrift's 
UnavailableException
+            catch (org.apache.cassandra.thrift.UnavailableException ue)
+            {
+                throw newUnavailableException();
+            }
+        }
+        
+        return null;
+    }
+    
+    // FIXME: This is copypasta from o.a.c.db.RowMutation, 
(RowMutation.getRowMutation uses Thrift types directly).
+    private static RowMutation getRowMutationFromMutations(String keyspace, 
String key, Map<Utf8, GenericArray<Mutation>> cfMap)
+    {
+        RowMutation rm = new RowMutation(keyspace, key.trim());
+        
+        for (Map.Entry<Utf8, GenericArray<Mutation>> entry : cfMap.entrySet())
+        {
+            String cfName = entry.getKey().toString();
+            
+            for (Mutation mutation : entry.getValue())
+            {
+                if (mutation.deletion != null)
+                    deleteColumnOrSuperColumnToRowMutation(rm, cfName, 
mutation.deletion);
+                else
+                    addColumnOrSuperColumnToRowMutation(rm, cfName, 
mutation.column_or_supercolumn);
+            }
+        }
+        
+        return rm;
+    }
+    
+    // FIXME: This is copypasta from o.a.c.db.RowMutation, 
(RowMutation.getRowMutation uses Thrift types directly).
+    private static void addColumnOrSuperColumnToRowMutation(RowMutation rm, 
String cfName, ColumnOrSuperColumn cosc)
+    {
+        if (cosc.column == null)
+        {
+            for (Column column : cosc.super_column.columns)
+                rm.add(new QueryPath(cfName, cosc.super_column.name.array(), 
column.name.array()), column.value.array(), column.timestamp);
+        }
+        else
+        {
+            rm.add(new QueryPath(cfName, null, cosc.column.name.array()), 
cosc.column.value.array(), cosc.column.timestamp);
+        }
+    }
+    
+    // FIXME: This is copypasta from o.a.c.db.RowMutation, 
(RowMutation.getRowMutation uses Thrift types directly).
+    private static void deleteColumnOrSuperColumnToRowMutation(RowMutation rm, 
String cfName, Deletion del)
+    {
+        if (del.predicate != null && del.predicate.column_names != null)
+        {
+            for (ByteBuffer col : del.predicate.column_names)
+            {
+                if (del.super_column == null && 
DatabaseDescriptor.getColumnFamilyType(rm.getTable(), cfName).equals("Super"))
+                    rm.delete(new QueryPath(cfName, col.array()), 
del.timestamp);
+                else
+                    rm.delete(new QueryPath(cfName, del.super_column.array(), 
col.array()), del.timestamp);
+            }
+        }
+        else
+        {
+            rm.delete(new QueryPath(cfName, del.super_column.array()), 
del.timestamp);
+        }
+    }
+    
     private org.apache.cassandra.thrift.ConsistencyLevel 
thriftConsistencyLevel(ConsistencyLevel consistency)
     {
         switch (consistency)

Modified: cassandra/trunk/test/system/test_avro_server.py
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=930903&r1=930902&r2=930903&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Mon Apr  5 17:01:29 2010
@@ -17,6 +17,7 @@
 from . import AvroTester
 from time import time
 from random import randint
+from avro.ipc import AvroRemoteException
 
 COLUMNS = [
     dict(name="c0", value="v0", timestamp=1L),
@@ -142,6 +143,39 @@ class TestRpcOperations(AvroTester):
         for i in range(0,3):
             assert_cosc(_get_column(self.client, COLUMNS[i]['name']))
 
+    def test_batch_mutate(self):
+        "performing batch mutation operations"
+        params = dict()
+        params['keyspace'] = 'Keyspace1'
+        params['consistency_level'] = 'ONE'
+
+        mutation_map = dict()
+        mutation_map['key1'] = dict(Standard1=[
+            dict(column_or_supercolumn=dict(column=COLUMNS[0])),
+            dict(column_or_supercolumn=dict(column=COLUMNS[1])),
+            dict(column_or_supercolumn=dict(column=COLUMNS[2]))
+        ])
+
+        params['mutation_map'] = mutation_map
+
+        self.client.request('batch_mutate', params)
+
+        for i in range(0,3):
+            cosc = _get_column(self.client, COLUMNS[i]['name'])
+            assert_cosc(cosc)
+            assert_columns_match(cosc['column'], COLUMNS[i])
+
+        # FIXME: still need to apply a mutation that deletes
+
+        #try:
+        #    assert not _get_column(self.client, COLUMNS[1]['name']), \
+        #        "Mutation did not delete column %s" % COLUMNS[1]['name']
+        #    assert not _get_column(self.client, COLUMNS[2]['name']), \
+        #        "Mutation did not delete column %s" % COLUMNS[2]['name']
+        #except AvroRemoteException:
+        #    pass
+
+
     def test_get_api_version(self):
         "getting the remote api version string"
         vers = self.client.request('get_api_version', {})


Reply via email to