Author: brandonwilliams Date: Thu Sep 2 16:17:21 2010 New Revision: 992001
URL: http://svn.apache.org/viewvc?rev=992001&view=rev Log: avro: get_range_slices implementation. Patch by Jeremy Hanna, reviewed by brandonwilliams Modified: cassandra/trunk/interface/cassandra.genavro cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Modified: cassandra/trunk/interface/cassandra.genavro URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=992001&r1=992000&r2=992001&view=diff ============================================================================== --- cassandra/trunk/interface/cassandra.genavro (original) +++ cassandra/trunk/interface/cassandra.genavro Thu Sep 2 16:17:21 2010 @@ -89,6 +89,22 @@ protocol Cassandra { int count; } + /** + * The semantics of start keys and tokens are slightly different. + * Keys are start-inclusive; tokens are start-exclusive. Token + * ranges may also wrap -- that is, the end token may be less + * than the start one. Thus, a range from keyX to keyX is a + * one-element range, but a range from tokenY to tokenY is the + * full ring. + */ + record KeyRange { + union { bytes, null } start_key; + union { bytes, null } end_key; + union { string, null } start_token; + union { string, null } end_token; + int count; + } + record KeySlice { bytes key; array<ColumnOrSuperColumn> columns; @@ -346,4 +362,12 @@ protocol Cassandra { */ array<TokenRange> describe_ring(string keyspace) throws InvalidRequestException; + + /** + *returns a subset of columns for a contiguous range of keys. + */ + array<KeySlice> get_range_slices(ColumnParent column_parent, + SlicePredicate predicate, + KeyRange range, + union { ConsistencyLevel, null } consistency_level); } Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=992001&r1=992000&r2=992001&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Thu Sep 2 16:17:21 2010 @@ -36,6 +36,10 @@ import org.apache.cassandra.db.Table; import org.apache.cassandra.db.TimestampClock; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.MarshalException; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.RandomPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; import static org.apache.cassandra.avro.ErrorFactory.newInvalidRequestException; @@ -300,6 +304,42 @@ public class AvroValidation validateColumns(keyspace, cp, predicate.column_names); } + public static void validateKeyRange(KeyRange range) + throws InvalidRequestException + { + if ((range.start_key == null) != (range.end_key == null)) + { + throw newInvalidRequestException("start key and end key must either both be non-null, or both be null"); + } + if ((range.start_token == null) != (range.end_token == null)) + { + throw newInvalidRequestException("start token and end token must either both be non-null, or both be null"); + } + if ((range.start_key == null) == (range.start_token == null)) + { + throw newInvalidRequestException("exactly one of {start key, end key} or {start token, end token} must be specified"); + } + + if (range.start_key != null) + { + IPartitioner p = StorageService.getPartitioner(); + Token startToken = p.getToken(range.start_key.array()); + Token endToken = p.getToken(range.end_key.array()); + if (startToken.compareTo(endToken) > 0 && !endToken.equals(p.getMinimumToken())) + { + if (p instanceof RandomPartitioner) + throw newInvalidRequestException("start key's md5 sorts after end key's md5. this is not allowed; you probably should not specify end key at all, under RandomPartitioner"); + else + throw newInvalidRequestException("start key must sort before (or equal to) finish key in your partitioner!"); + } + } + + if (range.count <= 0) + { + throw newInvalidRequestException("maxRows must be positive"); + } + } + static void validateIndexClauses(String keyspace, String columnFamily, IndexClause index_clause) throws InvalidRequestException { Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=992001&r1=992000&r2=992001&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu Sep 2 16:17:21 2010 @@ -43,6 +43,8 @@ import org.apache.avro.util.Utf8; import org.apache.cassandra.avro.InvalidRequestException; import org.apache.cassandra.db.migration.DropKeyspace; import org.apache.cassandra.db.migration.RenameKeyspace; +import org.apache.cassandra.dht.*; +import org.apache.cassandra.thrift.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -65,8 +67,6 @@ import org.apache.cassandra.db.migration import org.apache.cassandra.db.migration.DropColumnFamily; import org.apache.cassandra.db.migration.Migration; import org.apache.cassandra.db.migration.RenameColumnFamily; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.scheduler.IRequestScheduler; import org.apache.cassandra.service.ClientState; @@ -1064,6 +1064,71 @@ public class CassandraServer implements } @Override + public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate slice_predicate, KeyRange range, ConsistencyLevel consistency_level) + throws InvalidRequestException, TimedOutException + { + String keyspace = clientState.getKeyspace(); + try + { + clientState.hasKeyspaceAccess(Permission.READ_VALUE); + } + catch (org.apache.cassandra.thrift.InvalidRequestException thriftE) + { + throw newInvalidRequestException(thriftE); + } + + AvroValidation.validateColumnParent(keyspace, column_parent); + AvroValidation.validatePredicate(keyspace, column_parent, slice_predicate); + AvroValidation.validateKeyRange(range); + + List<Row> rows; + try + { + IPartitioner p = StorageService.getPartitioner(); + AbstractBounds bounds; + if (range.start_key == null) + { + Token.TokenFactory tokenFactory = p.getTokenFactory(); + Token left = tokenFactory.fromString(range.start_token.toString()); + Token right = tokenFactory.fromString(range.end_token.toString()); + bounds = new Range(left, right); + } + else + { + bounds = new Bounds(p.getToken(range.start_key.array()), p.getToken(range.end_key.array())); + } + try + { + schedule(); + rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, + thriftColumnParent(column_parent), + thriftSlicePredicate(slice_predicate), + bounds, + range.count), + thriftConsistencyLevel(consistency_level)); + } + catch (org.apache.cassandra.thrift.UnavailableException thriftE) + { + throw newUnavailableException(thriftE); + } + finally + { + release(); + } + assert rows != null; + } + catch (TimeoutException e) + { + throw new TimedOutException(); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + return avronateKeySlices(rows, column_parent, slice_predicate); + } + + @Override public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException { @@ -1074,7 +1139,8 @@ public class CassandraServer implements { clientState.hasKeyspaceAccess(Permission.READ_VALUE); } - catch (org.apache.cassandra.thrift.InvalidRequestException thriftE) { + catch (org.apache.cassandra.thrift.InvalidRequestException thriftE) + { throw newInvalidRequestException(thriftE); } @@ -1115,13 +1181,32 @@ public class CassandraServer implements return keySlices; } - + + private org.apache.cassandra.thrift.ColumnParent thriftColumnParent(ColumnParent avro_column_parent) + { + org.apache.cassandra.thrift.ColumnParent cp = new org.apache.cassandra.thrift.ColumnParent(avro_column_parent.column_family.toString()); + if (avro_column_parent.super_column != null) + cp.super_column = avro_column_parent.super_column.array(); + + return cp; + } + private org.apache.cassandra.thrift.SlicePredicate thriftSlicePredicate(SlicePredicate avro_pred) { - List<byte[]> bufs = new ArrayList<byte[]>(); - for(ByteBuffer buf : avro_pred.column_names) - bufs.add(buf.array()); + // One or the other are set, so check for nulls of either + + List<byte[]> bufs = null; + if (avro_pred.column_names != null) + { + bufs = new ArrayList<byte[]>(); + for(ByteBuffer buf : avro_pred.column_names) + bufs.add(buf.array()); + } + + org.apache.cassandra.thrift.SliceRange slice_range = (avro_pred.slice_range != null) + ? thriftSliceRange(avro_pred.slice_range) + : null; - return new org.apache.cassandra.thrift.SlicePredicate().setColumn_names(bufs).setSlice_range(thriftSliceRange(avro_pred.slice_range)); + return new org.apache.cassandra.thrift.SlicePredicate().setColumn_names(bufs).setSlice_range(slice_range); } private org.apache.cassandra.thrift.SliceRange thriftSliceRange(SliceRange avro_range) {
