Author: eevans
Date: Thu Jun 17 17:14:25 2010
New Revision: 955675

URL: http://svn.apache.org/viewvc?rev=955675&view=rev
Log:
get_slice() implementation for avro + system tests

Patch by eevans

Modified:
    cassandra/trunk/interface/cassandra.genavro
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/test/system/test_avro_server.py

Modified: cassandra/trunk/interface/cassandra.genavro
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=955675&r1=955674&r2=955675&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Thu Jun 17 17:14:25 2010
@@ -63,7 +63,7 @@ protocol Cassandra {
         bytes finish;
         boolean reversed;
         int count;
-        array<bytes> bitmasks;
+        union { array<bytes>, null } bitmasks;
     }
 
     record SlicePredicate {
@@ -133,6 +133,18 @@ protocol Cassandra {
                             ConsistencyLevel consistency_level)
     throws InvalidRequestException, NotFoundException, UnavailableException,
            TimedOutException;
+    
+    /**
+     * Get the group of columns contained by a column_parent (either a
+     * ColumnFamily name or a ColumnFamily/SuperColumn name pair) specified
+     * by the given SlicePredicate. If no matching values are found, an empty
+     * list is returned.
+     */
+    array<ColumnOrSuperColumn> get_slice(bytes key,
+                                         ColumnParent column_parent,
+                                         SlicePredicate predicate,
+                                         ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException;
 
     void insert(bytes key,
                 ColumnParent column_parent,

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=955675&r1=955674&r2=955675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Thu 
Jun 17 17:14:25 2010
@@ -214,6 +214,12 @@ public class AvroValidation {
         if (start.length > 0 && finish.length > 0 && 
orderedComparator.compare(start, finish) > 0)
             throw newInvalidRequestException("range finish must come after 
start in the order of traversal");
     }
+    
+    static void validateRange(String keyspace, ColumnParent cp, SliceRange 
range) throws InvalidRequestException
+    {
+        byte[] superName = cp.super_column == null ? null : 
cp.super_column.array();
+        validateRange(keyspace, cp.column_family.toString(), superName, range);
+    }
 
     static void validateSlicePredicate(String keyspace, String cfName, byte[] 
superName, SlicePredicate predicate)
     throws InvalidRequestException
@@ -278,4 +284,19 @@ public class AvroValidation {
         
         throw newInvalidRequestException("Clock must have a timestamp set");
     }
+    
+    static void validatePredicate(String keyspace, ColumnParent cp, 
SlicePredicate predicate)
+    throws InvalidRequestException
+    {
+        if (predicate.column_names == null && predicate.slice_range == null)
+            throw newInvalidRequestException("predicate column_names and 
slice_range may not both be null");
+        
+        if (predicate.column_names != null && predicate.slice_range != null)
+            throw newInvalidRequestException("predicate column_names and 
slice_range may not both be set");
+        
+        if (predicate.slice_range != null)
+            validateRange(keyspace, cp, predicate.slice_range);
+        else
+            validateColumns(keyspace, cp, predicate.column_names);
+    }
 }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=955675&r1=955674&r2=955675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu 
Jun 17 17:14:25 2010
@@ -20,7 +20,6 @@ package org.apache.cassandra.avro;
  * 
  */
 
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -32,11 +31,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
+
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.ConfigurationException;
@@ -50,11 +53,8 @@ import org.apache.cassandra.db.marshal.M
 import org.apache.cassandra.db.migration.AddKeyspace;
 import org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.service.StorageProxy;
-import static org.apache.cassandra.utils.FBUtilities.UTF8;
-
 import org.apache.cassandra.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
 import static org.apache.cassandra.avro.AvroRecordFactory.*;
 import static org.apache.cassandra.avro.ErrorFactory.*;
 
@@ -252,6 +252,72 @@ public class CassandraServer implements 
         else
             return avronateColumns(cf.getSortedColumns(), reverseOrder);
     }
+    
+    @Override
+    public GenericArray<ColumnOrSuperColumn> get_slice(ByteBuffer key, 
ColumnParent columnParent,
+            SlicePredicate predicate, ConsistencyLevel consistencyLevel)
+    throws AvroRemoteException, InvalidRequestException, UnavailableException, 
TimedOutException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("get_slice");
+        
+        return multigetSliceInternal(curKeyspace.get(), 
Arrays.asList(key.array()), columnParent, predicate, 
consistencyLevel).get(key.array());
+    }
+    
+    private Map<byte[], GenericArray<ColumnOrSuperColumn>> 
multigetSliceInternal(String keyspace, List<byte[]> keys,
+            ColumnParent columnParent, SlicePredicate predicate, 
ConsistencyLevel consistencyLevel)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        AvroValidation.validateColumnParent(keyspace, columnParent);
+        AvroValidation.validatePredicate(keyspace, columnParent, predicate);
+        
+        byte[] superName = columnParent.super_column == null ? null : 
columnParent.super_column.array();
+        QueryPath queryPath = new 
QueryPath(columnParent.column_family.toString(), superName);
+
+        List<ReadCommand> commands = new ArrayList<ReadCommand>();
+        if (predicate.column_names != null)
+        {
+            for (byte[] key : keys)
+            {
+                AvroValidation.validateKey(key);
+                
+                // FIXME: Copying the collection for the sake of 
SliceByNamesReadCommands
+                Collection<byte[]> column_names = new ArrayList<byte[]>();
+                for (ByteBuffer name : predicate.column_names)
+                    column_names.add(name.array());
+                
+                commands.add(new SliceByNamesReadCommand(keyspace, key, 
queryPath, column_names));
+            }
+        }
+        else
+        {
+            SliceRange range = predicate.slice_range;
+            for (byte[] key : keys)
+            {
+                AvroValidation.validateKey(key);
+                commands.add(new SliceFromReadCommand(keyspace, key, 
queryPath, range.start.array(), range.finish.array(), range.reversed, 
range.count));
+            }
+        }
+        
+        return getSlice(commands, consistencyLevel);
+    }
+    
+    private Map<byte[], GenericArray<ColumnOrSuperColumn>> 
getSlice(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
+    throws InvalidRequestException, UnavailableException, TimedOutException
+    {
+        Map<DecoratedKey<?>, ColumnFamily> columnFamilies = 
readColumnFamily(commands, consistencyLevel);
+        Map<byte[], GenericArray<ColumnOrSuperColumn>> columnFamiliesMap = new 
HashMap<byte[], GenericArray<ColumnOrSuperColumn>>();
+        
+        for (ReadCommand cmd : commands)
+        {
+            ColumnFamily cf = 
columnFamilies.get(StorageService.getPartitioner().decorateKey(cmd.key));
+            boolean reverseOrder = cmd instanceof SliceFromReadCommand && 
((SliceFromReadCommand)cmd).reversed;
+            GenericArray<ColumnOrSuperColumn> avroColumns = 
avronateColumnFamily(cf, cmd.queryPath.superColumnName != null, reverseOrder);
+            columnFamiliesMap.put(cmd.key, avroColumns);
+        }
+        
+        return columnFamiliesMap;
+    }
 
     @Override
     public Void insert(ByteBuffer key, ColumnParent parent, Column column, 
ConsistencyLevel consistencyLevel)

Modified: cassandra/trunk/test/system/test_avro_server.py
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=955675&r1=955674&r2=955675&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Thu Jun 17 17:14:25 2010
@@ -187,6 +187,56 @@ class TestRpcOperations(AvroTester):
         assert_cosc(cosc)
         assert_columns_match(cosc['column'], extra_column)
 
+    def test_get_slice_simple(self):
+        "performing a slice of simple columns"
+        self.__set_keyspace('Keyspace1')
+
+        columns = list(); mutations = list()
+
+        for i in range(6):
+            columns.append(new_column(i))
+
+        for column in columns:
+            mutation = {'column_or_supercolumn': {'column': column}}
+            mutations.append(mutation)
+
+        mutation_params = dict()
+        map_entry = {'key': 'key1', 'mutations': {'Standard1': mutations}}
+        mutation_params['mutation_map'] = [map_entry]
+        mutation_params['consistency_level'] = 'ONE'
+
+        self.client.request('batch_mutate', mutation_params)
+
+        # Slicing on list of column names
+        slice_params= dict()
+        slice_params['key'] = 'key1'
+        slice_params['column_parent'] = {'column_family': 'Standard1'}
+        slice_params['predicate'] = {'column_names': list()}
+        slice_params['predicate']['column_names'].append(columns[0]['name'])
+        slice_params['predicate']['column_names'].append(columns[4]['name'])
+        slice_params['consistency_level'] = 'ONE'
+
+        coscs = self.client.request('get_slice', slice_params)
+
+        for cosc in coscs: assert_cosc(cosc)
+        assert_columns_match(coscs[0]['column'], columns[0])
+        assert_columns_match(coscs[1]['column'], columns[4])
+
+        # Slicing on a range of column names
+        slice_range = dict()
+        slice_range['start'] = columns[2]['name']
+        slice_range['finish'] = columns[5]['name']
+        slice_range['reversed'] = False
+        slice_range['count'] = 1000
+        slice_params['predicate'] = {'slice_range': slice_range}
+
+        coscs = self.client.request('get_slice', slice_params)
+
+        for cosc in coscs: assert_cosc(cosc)
+        assert len(coscs) == 4, "expected 4 results, got %d" % len(coscs)
+        assert_columns_match(coscs[0]['column'], columns[2])
+        assert_columns_match(coscs[3]['column'], columns[5])
+
     def test_describe_keyspaces(self):
         "retrieving a list of all keyspaces"
         keyspaces = self.client.request('describe_keyspaces', {})


Reply via email to