Author: eevans
Date: Tue May 11 17:20:37 2010
New Revision: 943188
URL: http://svn.apache.org/viewvc?rev=943188&view=rev
Log:
bring avro get() up to date w/ changes in trunk
* refactoring of private methods in light of multiget() removal in trunk
* update get() for keyspace argument remove
* update get() for change to byte[] keys
* added ttl attribute to column schema
Patch by eevans
Modified:
cassandra/trunk/interface/cassandra.avpr
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
Modified: cassandra/trunk/interface/cassandra.avpr
URL:
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.avpr?rev=943188&r1=943187&r2=943188&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.avpr (original)
+++ cassandra/trunk/interface/cassandra.avpr Tue May 11 17:20:37 2010
@@ -25,7 +25,8 @@
"fields": [
{"name": "name", "type": "bytes"},
{"name": "value", "type": "bytes"},
- {"name": "timestamp", "type": "long"}
+ {"name": "timestamp", "type": "long"},
+ {"name": "ttl", "type": "int"}
]
},
{"name": "SuperColumn", "type": "record",
@@ -100,8 +101,7 @@
"messages": {
"get": {
"request": [
- {"name": "keyspace", "type": "string"},
- {"name": "key", "type": "string"},
+ {"name": "key", "type": "bytes"},
{"name": "column_path", "type": "ColumnPath"},
{"name": "consistency_level", "type": "ConsistencyLevel"}
],
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=943188&r1=943187&r2=943188&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
Tue May 11 17:20:37 2010
@@ -120,6 +120,11 @@ class ErrorFactory
return newNotFoundException(new Utf8(why));
}
+ static NotFoundException newNotFoundException()
+ {
+ return newNotFoundException(new Utf8());
+ }
+
static TimedOutException newTimedOutException(Utf8 why)
{
TimedOutException exception = new TimedOutException();
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=943188&r1=943187&r2=943188&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Tue
May 11 17:20:37 2010
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
import static
org.apache.cassandra.avro.ErrorFactory.newInvalidRequestException;
import static org.apache.cassandra.avro.AvroRecordFactory.newColumnPath;
@@ -41,12 +42,24 @@ import static org.apache.cassandra.avro.
*/
public class AvroValidation {
// FIXME: could use method in ThriftValidation
+ // FIXME: remove me
static void validateKey(String key) throws InvalidRequestException
{
if (key.isEmpty())
throw newInvalidRequestException("Key may not be empty");
}
+ static void validateKey(byte[] key) throws InvalidRequestException
+ {
+ if (key == null || key.length == 0)
+ throw newInvalidRequestException("Key may not be empty");
+
+ // check that key can be handled by FBUtilities.writeShortByteArray
+ if (key.length > FBUtilities.MAX_UNSIGNED_SHORT)
+ throw newInvalidRequestException("Key length of " + key.length +
+ " is longer than maximum of " +
FBUtilities.MAX_UNSIGNED_SHORT);
+ }
+
// FIXME: could use method in ThriftValidation
static void validateKeyspace(String keyspace) throws
KeyspaceNotDefinedException
{
Modified:
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=943188&r1=943187&r2=943188&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Tue
May 11 17:20:37 2010
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -52,7 +53,8 @@ import static org.apache.cassandra.avro.
public class CassandraServer implements Cassandra {
private static Logger logger =
LoggerFactory.getLogger(CassandraServer.class);
- private final static GenericArray<Column> EMPTY_SUBCOLUMNS = new
GenericData.Array<Column>(0, Schema.parse("{\"type\":\"array\",\"items\":" +
Column.SCHEMA$ + "}"));
+ private final static GenericArray<Column> EMPTY_SUBCOLUMNS = new
GenericData.Array<Column>(0, Schema.createArray(Column.SCHEMA$));
+ private final static GenericArray<ColumnOrSuperColumn> EMPTY_COLUMNS = new
GenericData.Array<ColumnOrSuperColumn>(0,
Schema.createArray(ColumnOrSuperColumn.SCHEMA$));
private final static Utf8 API_VERSION = new Utf8("0.0.0");
private ThreadLocal<AccessLevel> loginDone = new ThreadLocal<AccessLevel>()
@@ -67,118 +69,44 @@ public class CassandraServer implements
// Session keyspace.
private ThreadLocal<String> curKeyspace = new ThreadLocal<String>();
- public ColumnOrSuperColumn get(Utf8 keyspace, Utf8 key, ColumnPath
columnPath, ConsistencyLevel consistencyLevel)
+ @Override
+ public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath columnPath,
ConsistencyLevel consistencyLevel)
throws AvroRemoteException, InvalidRequestException, NotFoundException,
UnavailableException, TimedOutException {
if (logger.isDebugEnabled())
logger.debug("get");
- ColumnOrSuperColumn column = multigetInternal(keyspace.toString(),
Arrays.asList(key.toString()), columnPath,
consistencyLevel).get(key.toString());
-
- if ((column.column == null) && (column.super_column == null))
- {
- throw newNotFoundException("Path not found");
- }
- return column;
- }
-
- private Map<String, ColumnOrSuperColumn> multigetInternal(String keyspace,
List<String> keys, ColumnPath cp, ConsistencyLevel level)
- throws InvalidRequestException, UnavailableException, TimedOutException
- {
- AvroValidation.validateColumnPath(keyspace, cp);
+ AvroValidation.validateColumnPath(curKeyspace.get(), columnPath);
// FIXME: This is repetitive.
byte[] column, super_column;
- column = cp.column == null ? null : cp.column.array();
- super_column = cp.super_column == null ? null :
cp.super_column.array();
+ column = columnPath.column == null ? null : columnPath.column.array();
+ super_column = columnPath.super_column == null ? null :
columnPath.super_column.array();
- QueryPath path = new QueryPath(cp.column_family.toString(), column ==
null ? null : super_column);
+ QueryPath path = new QueryPath(columnPath.column_family.toString(),
column == null ? null : super_column);
List<byte[]> nameAsList = Arrays.asList(column == null ? super_column
: column);
- List<ReadCommand> commands = new ArrayList<ReadCommand>();
- for (String key: keys)
- {
- AvroValidation.validateKey(key);
- // FIXME: string key
- commands.add(new SliceByNamesReadCommand(keyspace,
key.getBytes(UTF8), path, nameAsList));
- }
+ AvroValidation.validateKey(key.array());
+ ReadCommand command = new SliceByNamesReadCommand(curKeyspace.get(),
key.array(), path, nameAsList);
- Map<String, ColumnOrSuperColumn> columnFamiliesMap = new
HashMap<String, ColumnOrSuperColumn>();
- Map<String, Collection<IColumn>> columnsMap =
multigetColumns(commands, level);
+ Map<DecoratedKey<?>, ColumnFamily> cfamilies =
readColumnFamily(Arrays.asList(command), consistencyLevel);
+ ColumnFamily cf =
cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
- for (ReadCommand command: commands)
- {
- ColumnOrSuperColumn columnorsupercolumn;
-
- Collection<IColumn> columns = columnsMap.get(command.key);
- if (columns == null)
- {
- columnorsupercolumn = new ColumnOrSuperColumn();
- }
- else
- {
- assert columns.size() == 1;
- IColumn col = columns.iterator().next();
-
-
- if (col.isMarkedForDelete())
- {
- columnorsupercolumn = new ColumnOrSuperColumn();
- }
- else
- {
- columnorsupercolumn = col instanceof
org.apache.cassandra.db.Column
- ?
newColumnOrSuperColumn(newColumn(col.name(), col.value(), col.timestamp()))
- :
newColumnOrSuperColumn(newSuperColumn(col.name(),
avronateSubColumns(col.getSubColumns())));
- }
-
- }
- // FIXME: assuming string keys
- columnFamiliesMap.put(new String(command.key, UTF8),
columnorsupercolumn);
- }
-
- return columnFamiliesMap;
- }
-
- private Map<String, Collection<IColumn>> multigetColumns(List<ReadCommand>
commands, ConsistencyLevel level)
- throws InvalidRequestException, UnavailableException, TimedOutException
- {
- Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(commands,
level);
- Map<String, Collection<IColumn>> columnFamiliesMap = new
HashMap<String, Collection<IColumn>>();
+ if (cf == null)
+ throw newNotFoundException();
- for (ReadCommand command : commands)
- {
- ColumnFamily cfamily =
cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
- if (cfamily == null)
- continue;
-
- Collection<IColumn> columns = null;
- if (command.queryPath.superColumnName != null)
- {
- IColumn column =
cfamily.getColumn(command.queryPath.superColumnName);
- if (column != null)
- {
- columns = column.getSubColumns();
- }
- }
- else
- {
- columns = cfamily.getSortedColumns();
- }
-
- if (columns != null && columns.size() != 0)
- {
- // FIXME: assuming string keys
- columnFamiliesMap.put(new String(command.key, UTF8), columns);
- }
- }
+ GenericArray<ColumnOrSuperColumn> avroColumns =
avronateColumnFamily(cf, command.queryPath.superColumnName != null, false);
+
+ if (avroColumns.size() == 0)
+ throw newNotFoundException();
- return columnFamiliesMap;
+ assert avroColumns.size() == 1;
+ return avroColumns.iterator().next();
}
- protected Map<DecoratedKey, ColumnFamily>
readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency)
+ protected Map<DecoratedKey<?>, ColumnFamily>
readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency)
throws InvalidRequestException, UnavailableException, TimedOutException
{
// TODO - Support multiple column families per row, right now row only
contains 1 column family
- Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new
HashMap<DecoratedKey, ColumnFamily>();
+ Map<DecoratedKey<?>, ColumnFamily> columnFamilyKeyMap = new
HashMap<DecoratedKey<?>, ColumnFamily>();
if (consistency == ConsistencyLevel.ZERO)
throw newInvalidRequestException("Consistency level zero may not
be applied to read operations");
@@ -215,7 +143,7 @@ public class CassandraServer implements
}
// Don't playa hate, avronate.
- public GenericArray<Column> avronateSubColumns(Collection<IColumn> columns)
+ private GenericArray<Column> avronateSubColumns(Collection<IColumn>
columns)
{
if (columns == null || columns.isEmpty())
return EMPTY_SUBCOLUMNS;
@@ -233,6 +161,77 @@ public class CassandraServer implements
return avroColumns;
}
+
+ private GenericArray<ColumnOrSuperColumn>
avronateColumns(Collection<IColumn> columns, boolean reverseOrder)
+ {
+ ArrayList<ColumnOrSuperColumn> avroColumns = new
ArrayList<ColumnOrSuperColumn>(columns.size());
+ for (IColumn column : columns)
+ {
+ if (column.isMarkedForDelete())
+ continue;
+
+ Column avroColumn = newColumn(column.name(), column.value(),
column.timestamp());
+
+ if (column instanceof ExpiringColumn)
+ avroColumn.ttl = ((ExpiringColumn)column).getTimeToLive();
+
+ avroColumns.add(newColumnOrSuperColumn(avroColumn));
+ }
+
+ if (reverseOrder)
+ Collections.reverse(avroColumns);
+
+ // FIXME: Teach GenericData.Array how to reverse so that this
iteration isn't necessary.
+ GenericArray<ColumnOrSuperColumn> avroArray = new
GenericData.Array<ColumnOrSuperColumn>(avroColumns.size(),
Schema.createArray(ColumnOrSuperColumn.SCHEMA$));
+ for (ColumnOrSuperColumn cosc : avroColumns)
+ avroArray.add(cosc);
+
+ return avroArray;
+ }
+
+ private GenericArray<ColumnOrSuperColumn>
avronateSuperColumns(Collection<IColumn> columns, boolean reverseOrder)
+ {
+ ArrayList<ColumnOrSuperColumn> avroSuperColumns = new
ArrayList<ColumnOrSuperColumn>(columns.size());
+ for (IColumn column: columns)
+ {
+ GenericArray<Column> subColumns =
avronateSubColumns(column.getSubColumns());
+ if (subColumns.size() == 0)
+ continue;
+ SuperColumn superColumn = newSuperColumn(column.name(),
subColumns);
+ avroSuperColumns.add(newColumnOrSuperColumn(superColumn));
+ }
+
+ if (reverseOrder)
+ Collections.reverse(avroSuperColumns);
+
+ // FIXME: Teach GenericData.Array how to reverse so that this
iteration isn't necessary.
+ GenericArray<ColumnOrSuperColumn> avroArray = new
GenericData.Array<ColumnOrSuperColumn>(avroSuperColumns.size(),
Schema.createArray(ColumnOrSuperColumn.SCHEMA$));
+ for (ColumnOrSuperColumn cosc : avroSuperColumns)
+ avroArray.add(cosc);
+
+ return avroArray;
+ }
+
+ private GenericArray<ColumnOrSuperColumn>
avronateColumnFamily(ColumnFamily cf, boolean subColumnsOnly, boolean
reverseOrder)
+ {
+ if (cf == null || cf.getColumnsMap().size() == 0)
+ return EMPTY_COLUMNS;
+
+ if (subColumnsOnly)
+ {
+ IColumn column = cf.getColumnsMap().values().iterator().next();
+ Collection<IColumn> subColumns = column.getSubColumns();
+ if (subColumns == null || subColumns.isEmpty())
+ return EMPTY_COLUMNS;
+ else
+ return avronateColumns(subColumns, reverseOrder);
+ }
+
+ if (cf.isSuper())
+ return avronateSuperColumns(cf.getSortedColumns(), reverseOrder);
+ else
+ return avronateColumns(cf.getSortedColumns(), reverseOrder);
+ }
public Void insert(Utf8 keyspace, Utf8 key, ColumnPath cp, ByteBuffer
value, long timestamp, ConsistencyLevel consistencyLevel)
throws AvroRemoteException, InvalidRequestException, UnavailableException,
TimedOutException