Author: eevans
Date: Wed Sep 1 20:53:49 2010
New Revision: 991701
URL: http://svn.apache.org/viewvc?rev=991701&view=rev
Log:
avro: get_indexed_slices implementation
Patch by Nick Bailey; reviewed by eevans
Modified:
cassandra/trunk/interface/cassandra.genavro
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
Modified: cassandra/trunk/interface/cassandra.genavro
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=991701&r1=991700&r2=991701&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Wed Sep 1 20:53:49 2010
@@ -73,6 +73,27 @@ protocol Cassandra {
array<string> endpoints;
}
+ enum IndexOperator {
+ EQ, GTE, GT, LTE, LT
+ }
+
+ record IndexExpression {
+ bytes column_name;
+ IndexOperator op;
+ bytes value;
+ }
+
+ record IndexClause {
+ array<IndexExpression> expressions;
+ bytes start_key;
+ int count;
+ }
+
+ record KeySlice {
+ bytes key;
+ array<ColumnOrSuperColumn> columns;
+ }
+
record Deletion {
Clock clock;
union { bytes, null } super_column;
@@ -222,6 +243,16 @@ protocol Cassandra {
ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException;
+ /**
+ * Returns the subset of columns specified in SlicePredicate for
+ * the rows matching the IndexClause.
+ */
+ array<KeySlice> get_indexed_slices(ColumnParent column_parent,
+ IndexClause index_clause,
+ SlicePredicate column_predicate,
+ ConsistencyLevel consistency_level)
+ throws InvalidRequestException, UnavailableException, TimedOutException;
+
/**
* Returns the number of columns matching a predicate for a particular
* key, ColumnFamily, and optionally SuperColumn.
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=991701&r1=991700&r2=991701&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
Wed Sep 1 20:53:49 2010
@@ -110,6 +110,15 @@ public class AvroRecordFactory
entry.columns = columns;
return entry;
}
+
+ public static KeySlice newKeySlice(byte[] key, List<ColumnOrSuperColumn>
columns) {
+ KeySlice slice = new KeySlice();
+ ByteBuffer wrappedKey = (key != null) ? ByteBuffer.wrap(key) : null;
+ slice.key = wrappedKey;
+ slice.columns = columns;
+ return slice;
+ }
+
}
class ErrorFactory
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=991701&r1=991700&r2=991701&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Wed
Sep 1 20:53:49 2010
@@ -24,6 +24,7 @@ package org.apache.cassandra.avro;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.Set;
import org.apache.avro.util.Utf8;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -31,6 +32,7 @@ import org.apache.cassandra.db.ColumnFam
import org.apache.cassandra.db.IClock;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ColumnFamilyType;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.TimestampClock;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
@@ -297,4 +299,19 @@ public class AvroValidation
else
validateColumns(keyspace, cp, predicate.column_names);
}
+
+ static void validateIndexClauses(String keyspace, String columnFamily,
IndexClause index_clause)
+ throws InvalidRequestException
+ {
+ if (index_clause.expressions.isEmpty())
+ throw newInvalidRequestException("index clause list may not be
empty");
+ Set<byte[]> indexedColumns =
Table.open(keyspace).getColumnFamilyStore(columnFamily).getIndexedColumns();
+ for (IndexExpression expression : index_clause.expressions)
+ {
+ if (expression.op.equals(IndexOperator.EQ) &&
indexedColumns.contains(expression.column_name))
+ return;
+ }
+ throw newInvalidRequestException("No indexed columns present in index
clause with operator EQ");
+ }
+
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=991701&r1=991700&r2=991701&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Wed
Sep 1 20:53:49 2010
@@ -1062,4 +1062,93 @@ public class CassandraServer implements
}
return null;
}
+
+ @Override
+ public List<KeySlice> get_indexed_slices(ColumnParent column_parent,
IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel
consistency_level)
+ throws InvalidRequestException, UnavailableException, TimedOutException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("scan");
+
+ try
+ {
+ clientState.hasKeyspaceAccess(Permission.READ_VALUE);
+ }
+ catch (org.apache.cassandra.thrift.InvalidRequestException thriftE) {
+ throw newInvalidRequestException(thriftE);
+ }
+
+ String keyspace = clientState.getKeyspace();
+ AvroValidation.validateColumnParent(keyspace, column_parent);
+ AvroValidation.validatePredicate(keyspace, column_parent,
column_predicate);
+ AvroValidation.validateIndexClauses(keyspace,
column_parent.column_family.toString(), index_clause);
+
+ List<Row> rows;
+ try
+ {
+ rows = StorageProxy.scan(keyspace.toString(),
+ column_parent.column_family.toString(),
+ thriftIndexClause(index_clause),
+ thriftSlicePredicate(column_predicate),
+
thriftConsistencyLevel(consistency_level));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (TimeoutException e)
+ {
+ throw new TimedOutException();
+ }
+ return avronateKeySlices(rows, column_parent, column_predicate);
+ }
+
+ private List<KeySlice> avronateKeySlices(List<Row> rows, ColumnParent
column_parent, SlicePredicate predicate)
+ {
+ List<KeySlice> keySlices = new ArrayList<KeySlice>(rows.size());
+ boolean reversed = predicate.slice_range != null &&
predicate.slice_range.reversed;
+ for (Row row : rows)
+ {
+ List<ColumnOrSuperColumn> avronatedColumns =
avronateColumnFamily(row.cf, column_parent.super_column != null, reversed);
+ keySlices.add(newKeySlice(row.key.key, avronatedColumns));
+ }
+
+ return keySlices;
+ }
+
+ private org.apache.cassandra.thrift.SlicePredicate
thriftSlicePredicate(SlicePredicate avro_pred) {
+ List<byte[]> bufs = new ArrayList<byte[]>();
+ for(ByteBuffer buf : avro_pred.column_names)
+ bufs.add(buf.array());
+
+ return new
org.apache.cassandra.thrift.SlicePredicate().setColumn_names(bufs).setSlice_range(thriftSliceRange(avro_pred.slice_range));
+ }
+
+ private org.apache.cassandra.thrift.SliceRange thriftSliceRange(SliceRange
avro_range) {
+ return new
org.apache.cassandra.thrift.SliceRange(avro_range.start.array(),
avro_range.finish.array(), avro_range.reversed, avro_range.count);
+ }
+
+ private org.apache.cassandra.thrift.IndexClause
thriftIndexClause(IndexClause avro_clause) {
+ List<org.apache.cassandra.thrift.IndexExpression> expressions = new
ArrayList<org.apache.cassandra.thrift.IndexExpression>();
+ for(IndexExpression exp : avro_clause.expressions)
+ expressions.add(thriftIndexExpression(exp));
+
+ return new org.apache.cassandra.thrift.IndexClause(expressions,
avro_clause.start_key.array(), avro_clause.count);
+ }
+
+ private org.apache.cassandra.thrift.IndexExpression
thriftIndexExpression(IndexExpression avro_exp) {
+ return new
org.apache.cassandra.thrift.IndexExpression(avro_exp.column_name.array(),
thriftIndexOperator(avro_exp.op), avro_exp.value.array());
+ }
+
+ private org.apache.cassandra.thrift.IndexOperator
thriftIndexOperator(IndexOperator avro_op) {
+ switch (avro_op)
+ {
+ case EQ: return org.apache.cassandra.thrift.IndexOperator.EQ;
+ case GTE: return org.apache.cassandra.thrift.IndexOperator.GTE;
+ case GT: return org.apache.cassandra.thrift.IndexOperator.GT;
+ case LTE: return org.apache.cassandra.thrift.IndexOperator.LTE;
+ case LT: return org.apache.cassandra.thrift.IndexOperator.LT;
+ }
+ return null;
+ }
}