Author: eevans
Date: Thu Jun 17 17:14:25 2010
New Revision: 955675
URL: http://svn.apache.org/viewvc?rev=955675&view=rev
Log:
get_slice() implementation for avro + system tests
Patch by eevans
Modified:
cassandra/trunk/interface/cassandra.genavro
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/test/system/test_avro_server.py
Modified: cassandra/trunk/interface/cassandra.genavro
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=955675&r1=955674&r2=955675&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Thu Jun 17 17:14:25 2010
@@ -63,7 +63,7 @@ protocol Cassandra {
bytes finish;
boolean reversed;
int count;
- array<bytes> bitmasks;
+ union { array<bytes>, null } bitmasks;
}
record SlicePredicate {
@@ -133,6 +133,18 @@ protocol Cassandra {
ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException,
TimedOutException;
+
+ /**
+ * Get the group of columns contained by a column_parent (either a
+ * ColumnFamily name or a ColumnFamily/SuperColumn name pair) specified
+ * by the given SlicePredicate. If no matching values are found, an empty
+ * list is returned.
+ */
+ array<ColumnOrSuperColumn> get_slice(bytes key,
+ ColumnParent column_parent,
+ SlicePredicate predicate,
+ ConsistencyLevel consistency_level)
+ throws InvalidRequestException, UnavailableException, TimedOutException;
void insert(bytes key,
ColumnParent column_parent,
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=955675&r1=955674&r2=955675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Thu
Jun 17 17:14:25 2010
@@ -214,6 +214,12 @@ public class AvroValidation {
if (start.length > 0 && finish.length > 0 &&
orderedComparator.compare(start, finish) > 0)
throw newInvalidRequestException("range finish must come after
start in the order of traversal");
}
+
+ static void validateRange(String keyspace, ColumnParent cp, SliceRange
range) throws InvalidRequestException
+ {
+ byte[] superName = cp.super_column == null ? null :
cp.super_column.array();
+ validateRange(keyspace, cp.column_family.toString(), superName, range);
+ }
static void validateSlicePredicate(String keyspace, String cfName, byte[]
superName, SlicePredicate predicate)
throws InvalidRequestException
@@ -278,4 +284,19 @@ public class AvroValidation {
throw newInvalidRequestException("Clock must have a timestamp set");
}
+
+ static void validatePredicate(String keyspace, ColumnParent cp,
SlicePredicate predicate)
+ throws InvalidRequestException
+ {
+ if (predicate.column_names == null && predicate.slice_range == null)
+ throw newInvalidRequestException("predicate column_names and
slice_range may not both be null");
+
+ if (predicate.column_names != null && predicate.slice_range != null)
+ throw newInvalidRequestException("predicate column_names and
slice_range may not both be set");
+
+ if (predicate.slice_range != null)
+ validateRange(keyspace, cp, predicate.slice_range);
+ else
+ validateColumns(keyspace, cp, predicate.column_names);
+ }
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=955675&r1=955674&r2=955675&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu
Jun 17 17:14:25 2010
@@ -20,7 +20,6 @@ package org.apache.cassandra.avro;
*
*/
-
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -32,11 +31,15 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeoutException;
+
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.avro.util.Utf8;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ConfigurationException;
@@ -50,11 +53,8 @@ import org.apache.cassandra.db.marshal.M
import org.apache.cassandra.db.migration.AddKeyspace;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.service.StorageProxy;
-import static org.apache.cassandra.utils.FBUtilities.UTF8;
-
import org.apache.cassandra.service.StorageService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import static org.apache.cassandra.avro.AvroRecordFactory.*;
import static org.apache.cassandra.avro.ErrorFactory.*;
@@ -252,6 +252,72 @@ public class CassandraServer implements
else
return avronateColumns(cf.getSortedColumns(), reverseOrder);
}
+
+ @Override
+ public GenericArray<ColumnOrSuperColumn> get_slice(ByteBuffer key,
ColumnParent columnParent,
+ SlicePredicate predicate, ConsistencyLevel consistencyLevel)
+ throws AvroRemoteException, InvalidRequestException, UnavailableException,
TimedOutException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("get_slice");
+
+ return multigetSliceInternal(curKeyspace.get(),
Arrays.asList(key.array()), columnParent, predicate,
consistencyLevel).get(key.array());
+ }
+
+ private Map<byte[], GenericArray<ColumnOrSuperColumn>>
multigetSliceInternal(String keyspace, List<byte[]> keys,
+ ColumnParent columnParent, SlicePredicate predicate,
ConsistencyLevel consistencyLevel)
+ throws InvalidRequestException, UnavailableException, TimedOutException
+ {
+ AvroValidation.validateColumnParent(keyspace, columnParent);
+ AvroValidation.validatePredicate(keyspace, columnParent, predicate);
+
+ byte[] superName = columnParent.super_column == null ? null :
columnParent.super_column.array();
+ QueryPath queryPath = new
QueryPath(columnParent.column_family.toString(), superName);
+
+ List<ReadCommand> commands = new ArrayList<ReadCommand>();
+ if (predicate.column_names != null)
+ {
+ for (byte[] key : keys)
+ {
+ AvroValidation.validateKey(key);
+
+ // FIXME: Copying the collection for the sake of
SliceByNamesReadCommands
+ Collection<byte[]> column_names = new ArrayList<byte[]>();
+ for (ByteBuffer name : predicate.column_names)
+ column_names.add(name.array());
+
+ commands.add(new SliceByNamesReadCommand(keyspace, key,
queryPath, column_names));
+ }
+ }
+ else
+ {
+ SliceRange range = predicate.slice_range;
+ for (byte[] key : keys)
+ {
+ AvroValidation.validateKey(key);
+ commands.add(new SliceFromReadCommand(keyspace, key,
queryPath, range.start.array(), range.finish.array(), range.reversed,
range.count));
+ }
+ }
+
+ return getSlice(commands, consistencyLevel);
+ }
+
+ private Map<byte[], GenericArray<ColumnOrSuperColumn>>
getSlice(List<ReadCommand> commands, ConsistencyLevel consistencyLevel)
+ throws InvalidRequestException, UnavailableException, TimedOutException
+ {
+ Map<DecoratedKey<?>, ColumnFamily> columnFamilies =
readColumnFamily(commands, consistencyLevel);
+ Map<byte[], GenericArray<ColumnOrSuperColumn>> columnFamiliesMap = new
HashMap<byte[], GenericArray<ColumnOrSuperColumn>>();
+
+ for (ReadCommand cmd : commands)
+ {
+ ColumnFamily cf =
columnFamilies.get(StorageService.getPartitioner().decorateKey(cmd.key));
+ boolean reverseOrder = cmd instanceof SliceFromReadCommand &&
((SliceFromReadCommand)cmd).reversed;
+ GenericArray<ColumnOrSuperColumn> avroColumns =
avronateColumnFamily(cf, cmd.queryPath.superColumnName != null, reverseOrder);
+ columnFamiliesMap.put(cmd.key, avroColumns);
+ }
+
+ return columnFamiliesMap;
+ }
@Override
public Void insert(ByteBuffer key, ColumnParent parent, Column column,
ConsistencyLevel consistencyLevel)
Modified: cassandra/trunk/test/system/test_avro_server.py
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=955675&r1=955674&r2=955675&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Thu Jun 17 17:14:25 2010
@@ -187,6 +187,56 @@ class TestRpcOperations(AvroTester):
assert_cosc(cosc)
assert_columns_match(cosc['column'], extra_column)
+ def test_get_slice_simple(self):
+ "performing a slice of simple columns"
+ self.__set_keyspace('Keyspace1')
+
+ columns = list(); mutations = list()
+
+ for i in range(6):
+ columns.append(new_column(i))
+
+ for column in columns:
+ mutation = {'column_or_supercolumn': {'column': column}}
+ mutations.append(mutation)
+
+ mutation_params = dict()
+ map_entry = {'key': 'key1', 'mutations': {'Standard1': mutations}}
+ mutation_params['mutation_map'] = [map_entry]
+ mutation_params['consistency_level'] = 'ONE'
+
+ self.client.request('batch_mutate', mutation_params)
+
+ # Slicing on list of column names
+ slice_params= dict()
+ slice_params['key'] = 'key1'
+ slice_params['column_parent'] = {'column_family': 'Standard1'}
+ slice_params['predicate'] = {'column_names': list()}
+ slice_params['predicate']['column_names'].append(columns[0]['name'])
+ slice_params['predicate']['column_names'].append(columns[4]['name'])
+ slice_params['consistency_level'] = 'ONE'
+
+ coscs = self.client.request('get_slice', slice_params)
+
+ for cosc in coscs: assert_cosc(cosc)
+ assert_columns_match(coscs[0]['column'], columns[0])
+ assert_columns_match(coscs[1]['column'], columns[4])
+
+ # Slicing on a range of column names
+ slice_range = dict()
+ slice_range['start'] = columns[2]['name']
+ slice_range['finish'] = columns[5]['name']
+ slice_range['reversed'] = False
+ slice_range['count'] = 1000
+ slice_params['predicate'] = {'slice_range': slice_range}
+
+ coscs = self.client.request('get_slice', slice_params)
+
+ for cosc in coscs: assert_cosc(cosc)
+ assert len(coscs) == 4, "expected 4 results, got %d" % len(coscs)
+ assert_columns_match(coscs[0]['column'], columns[2])
+ assert_columns_match(coscs[3]['column'], columns[5])
+
def test_describe_keyspaces(self):
"retrieving a list of all keyspaces"
keyspaces = self.client.request('describe_keyspaces', {})