Author: eevans
Date: Tue May 11 17:20:37 2010
New Revision: 943188

URL: http://svn.apache.org/viewvc?rev=943188&view=rev
Log:
bring avro get() up to date w/ changes in trunk

 * refactoring of private methods in light of multiget() removal in trunk
 * update get() for keyspace argument remove
 * update get() for change to byte[] keys
 * added ttl attribute to column schema

Patch by eevans

Modified:
    cassandra/trunk/interface/cassandra.avpr
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java

Modified: cassandra/trunk/interface/cassandra.avpr
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.avpr?rev=943188&r1=943187&r2=943188&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.avpr (original)
+++ cassandra/trunk/interface/cassandra.avpr Tue May 11 17:20:37 2010
@@ -25,7 +25,8 @@
           "fields": [
             {"name": "name", "type": "bytes"},
             {"name": "value", "type": "bytes"},
-            {"name": "timestamp", "type": "long"}
+            {"name": "timestamp", "type": "long"},
+            {"name": "ttl", "type": "int"}
         ]
       },
       {"name": "SuperColumn", "type": "record",
@@ -100,8 +101,7 @@
   "messages": {
     "get": {
         "request": [
-            {"name": "keyspace", "type": "string"},
-            {"name": "key", "type": "string"},
+            {"name": "key", "type": "bytes"},
             {"name": "column_path", "type": "ColumnPath"},
             {"name": "consistency_level", "type": "ConsistencyLevel"}
         ],

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=943188&r1=943187&r2=943188&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java 
Tue May 11 17:20:37 2010
@@ -120,6 +120,11 @@ class ErrorFactory
         return newNotFoundException(new Utf8(why));
     }
     
+    static NotFoundException newNotFoundException()
+    {
+        return newNotFoundException(new Utf8());
+    }
+    
     static TimedOutException newTimedOutException(Utf8 why)
     {
         TimedOutException exception = new TimedOutException();

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=943188&r1=943187&r2=943188&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Tue 
May 11 17:20:37 2010
@@ -32,6 +32,7 @@ import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ColumnFamilyType;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.utils.FBUtilities;
 
 import static 
org.apache.cassandra.avro.ErrorFactory.newInvalidRequestException;
 import static org.apache.cassandra.avro.AvroRecordFactory.newColumnPath;
@@ -41,12 +42,24 @@ import static org.apache.cassandra.avro.
  */
 public class AvroValidation {
     // FIXME: could use method in ThriftValidation
+    // FIXME: remove me
     static void validateKey(String key) throws InvalidRequestException
     {
         if (key.isEmpty())
             throw newInvalidRequestException("Key may not be empty");
     }
     
+    static void validateKey(byte[] key) throws InvalidRequestException
+    {
+        if (key == null || key.length == 0)
+            throw newInvalidRequestException("Key may not be empty");
+        
+        // check that key can be handled by FBUtilities.writeShortByteArray
+        if (key.length > FBUtilities.MAX_UNSIGNED_SHORT)
+            throw newInvalidRequestException("Key length of " + key.length +
+                    " is longer than maximum of " + 
FBUtilities.MAX_UNSIGNED_SHORT);
+    }
+    
     // FIXME: could use method in ThriftValidation
     static void validateKeyspace(String keyspace) throws 
KeyspaceNotDefinedException
     {

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=943188&r1=943187&r2=943188&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Tue 
May 11 17:20:37 2010
@@ -26,6 +26,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -52,7 +53,8 @@ import static org.apache.cassandra.avro.
 public class CassandraServer implements Cassandra {
     private static Logger logger = 
LoggerFactory.getLogger(CassandraServer.class);
 
-    private final static GenericArray<Column> EMPTY_SUBCOLUMNS = new 
GenericData.Array<Column>(0, Schema.parse("{\"type\":\"array\",\"items\":" + 
Column.SCHEMA$ + "}"));
+    private final static GenericArray<Column> EMPTY_SUBCOLUMNS = new 
GenericData.Array<Column>(0, Schema.createArray(Column.SCHEMA$));
+    private final static GenericArray<ColumnOrSuperColumn> EMPTY_COLUMNS = new 
GenericData.Array<ColumnOrSuperColumn>(0, 
Schema.createArray(ColumnOrSuperColumn.SCHEMA$));
     private final static Utf8 API_VERSION = new Utf8("0.0.0");
     
     private ThreadLocal<AccessLevel> loginDone = new ThreadLocal<AccessLevel>()
@@ -67,118 +69,44 @@ public class CassandraServer implements 
     // Session keyspace.
     private ThreadLocal<String> curKeyspace = new ThreadLocal<String>();
 
-    public ColumnOrSuperColumn get(Utf8 keyspace, Utf8 key, ColumnPath 
columnPath, ConsistencyLevel consistencyLevel)
+    @Override
+    public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath columnPath, 
ConsistencyLevel consistencyLevel)
     throws AvroRemoteException, InvalidRequestException, NotFoundException, 
UnavailableException, TimedOutException {
         if (logger.isDebugEnabled())
             logger.debug("get");
         
-        ColumnOrSuperColumn column = multigetInternal(keyspace.toString(), 
Arrays.asList(key.toString()), columnPath, 
consistencyLevel).get(key.toString());
-        
-        if ((column.column == null) && (column.super_column == null))
-        {
-            throw newNotFoundException("Path not found");
-        }
-        return column;
-    }
-
-    private Map<String, ColumnOrSuperColumn> multigetInternal(String keyspace, 
List<String> keys, ColumnPath cp, ConsistencyLevel level)
-    throws InvalidRequestException, UnavailableException, TimedOutException
-    {
-        AvroValidation.validateColumnPath(keyspace, cp);
+        AvroValidation.validateColumnPath(curKeyspace.get(), columnPath);
         
         // FIXME: This is repetitive.
         byte[] column, super_column;
-        column = cp.column == null ? null : cp.column.array();
-        super_column = cp.super_column == null ? null : 
cp.super_column.array();
+        column = columnPath.column == null ? null : columnPath.column.array();
+        super_column = columnPath.super_column == null ? null : 
columnPath.super_column.array();
         
-        QueryPath path = new QueryPath(cp.column_family.toString(), column == 
null ? null : super_column);
+        QueryPath path = new QueryPath(columnPath.column_family.toString(), 
column == null ? null : super_column);
         List<byte[]> nameAsList = Arrays.asList(column == null ? super_column 
: column);
-        List<ReadCommand> commands = new ArrayList<ReadCommand>();
-        for (String key: keys)
-        {
-            AvroValidation.validateKey(key);
-            // FIXME: string key
-            commands.add(new SliceByNamesReadCommand(keyspace, 
key.getBytes(UTF8), path, nameAsList));
-        }
+        AvroValidation.validateKey(key.array());
+        ReadCommand command = new SliceByNamesReadCommand(curKeyspace.get(), 
key.array(), path, nameAsList);
         
-        Map<String, ColumnOrSuperColumn> columnFamiliesMap = new 
HashMap<String, ColumnOrSuperColumn>();
-        Map<String, Collection<IColumn>> columnsMap = 
multigetColumns(commands, level);
+        Map<DecoratedKey<?>, ColumnFamily> cfamilies = 
readColumnFamily(Arrays.asList(command), consistencyLevel);
+        ColumnFamily cf = 
cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
         
-        for (ReadCommand command: commands)
-        {
-            ColumnOrSuperColumn columnorsupercolumn;
-
-            Collection<IColumn> columns = columnsMap.get(command.key);
-            if (columns == null)
-            {
-               columnorsupercolumn = new ColumnOrSuperColumn();
-            }
-            else
-            {
-                assert columns.size() == 1;
-                IColumn col = columns.iterator().next();
-
-
-                if (col.isMarkedForDelete())
-                {
-                    columnorsupercolumn = new ColumnOrSuperColumn();
-                }
-                else
-                {
-                    columnorsupercolumn = col instanceof 
org.apache.cassandra.db.Column
-                                          ? 
newColumnOrSuperColumn(newColumn(col.name(), col.value(), col.timestamp()))
-                                          : 
newColumnOrSuperColumn(newSuperColumn(col.name(), 
avronateSubColumns(col.getSubColumns())));
-                }
-
-            }
-            // FIXME: assuming string keys
-            columnFamiliesMap.put(new String(command.key, UTF8), 
columnorsupercolumn);
-        }
-
-        return columnFamiliesMap;
-    }
-    
-    private Map<String, Collection<IColumn>> multigetColumns(List<ReadCommand> 
commands, ConsistencyLevel level)
-    throws InvalidRequestException, UnavailableException, TimedOutException
-    {
-        Map<DecoratedKey, ColumnFamily> cfamilies = readColumnFamily(commands, 
level);
-        Map<String, Collection<IColumn>> columnFamiliesMap = new 
HashMap<String, Collection<IColumn>>();
+        if (cf == null)
+            throw newNotFoundException();
         
-        for (ReadCommand command : commands)
-        {
-            ColumnFamily cfamily = 
cfamilies.get(StorageService.getPartitioner().decorateKey(command.key));
-            if (cfamily == null)
-                continue;
-
-            Collection<IColumn> columns = null;
-            if (command.queryPath.superColumnName != null)
-            {
-                IColumn column = 
cfamily.getColumn(command.queryPath.superColumnName);
-                if (column != null)
-                {
-                    columns = column.getSubColumns();
-                }
-            }
-            else
-            {
-                columns = cfamily.getSortedColumns();
-            }
-
-            if (columns != null && columns.size() != 0)
-            {
-                // FIXME: assuming string keys
-                columnFamiliesMap.put(new String(command.key, UTF8), columns);
-            }
-        }
+        GenericArray<ColumnOrSuperColumn> avroColumns = 
avronateColumnFamily(cf, command.queryPath.superColumnName != null, false);
+        
+        if (avroColumns.size() == 0)
+            throw newNotFoundException();
         
-        return columnFamiliesMap;
+        assert avroColumns.size() == 1;
+        return avroColumns.iterator().next();
     }
     
-    protected Map<DecoratedKey, ColumnFamily> 
readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency)
+    protected Map<DecoratedKey<?>, ColumnFamily> 
readColumnFamily(List<ReadCommand> commands, ConsistencyLevel consistency)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
         // TODO - Support multiple column families per row, right now row only 
contains 1 column family
-        Map<DecoratedKey, ColumnFamily> columnFamilyKeyMap = new 
HashMap<DecoratedKey, ColumnFamily>();
+        Map<DecoratedKey<?>, ColumnFamily> columnFamilyKeyMap = new 
HashMap<DecoratedKey<?>, ColumnFamily>();
         
         if (consistency == ConsistencyLevel.ZERO)
             throw newInvalidRequestException("Consistency level zero may not 
be applied to read operations");
@@ -215,7 +143,7 @@ public class CassandraServer implements 
     }
     
     // Don't playa hate, avronate.
-    public GenericArray<Column> avronateSubColumns(Collection<IColumn> columns)
+    private GenericArray<Column> avronateSubColumns(Collection<IColumn> 
columns)
     {
         if (columns == null || columns.isEmpty())
             return EMPTY_SUBCOLUMNS;
@@ -233,6 +161,77 @@ public class CassandraServer implements 
         
         return avroColumns;
     }
+    
+    private GenericArray<ColumnOrSuperColumn> 
avronateColumns(Collection<IColumn> columns, boolean reverseOrder)
+    {
+        ArrayList<ColumnOrSuperColumn> avroColumns = new 
ArrayList<ColumnOrSuperColumn>(columns.size());
+        for (IColumn column : columns)
+        {
+            if (column.isMarkedForDelete())
+                continue;
+            
+            Column avroColumn = newColumn(column.name(), column.value(), 
column.timestamp());
+            
+            if (column instanceof ExpiringColumn)
+                avroColumn.ttl = ((ExpiringColumn)column).getTimeToLive();
+            
+            avroColumns.add(newColumnOrSuperColumn(avroColumn));
+        }
+        
+        if (reverseOrder)
+            Collections.reverse(avroColumns);
+        
+        // FIXME: Teach GenericData.Array how to reverse so that this 
iteration isn't necessary.
+        GenericArray<ColumnOrSuperColumn> avroArray = new 
GenericData.Array<ColumnOrSuperColumn>(avroColumns.size(), 
Schema.createArray(ColumnOrSuperColumn.SCHEMA$));
+        for (ColumnOrSuperColumn cosc : avroColumns)
+            avroArray.add(cosc);
+        
+        return avroArray;
+    }
+    
+    private GenericArray<ColumnOrSuperColumn> 
avronateSuperColumns(Collection<IColumn> columns, boolean reverseOrder)
+    {
+        ArrayList<ColumnOrSuperColumn> avroSuperColumns = new 
ArrayList<ColumnOrSuperColumn>(columns.size());
+        for (IColumn column: columns)
+        {
+            GenericArray<Column> subColumns = 
avronateSubColumns(column.getSubColumns());
+            if (subColumns.size() == 0)
+                continue;
+            SuperColumn superColumn = newSuperColumn(column.name(), 
subColumns);
+            avroSuperColumns.add(newColumnOrSuperColumn(superColumn));
+        }
+        
+        if (reverseOrder)
+            Collections.reverse(avroSuperColumns);
+        
+        // FIXME: Teach GenericData.Array how to reverse so that this 
iteration isn't necessary.
+        GenericArray<ColumnOrSuperColumn> avroArray = new 
GenericData.Array<ColumnOrSuperColumn>(avroSuperColumns.size(), 
Schema.createArray(ColumnOrSuperColumn.SCHEMA$));
+        for (ColumnOrSuperColumn cosc : avroSuperColumns)
+            avroArray.add(cosc);
+        
+        return avroArray;
+    }
+    
+    private GenericArray<ColumnOrSuperColumn> 
avronateColumnFamily(ColumnFamily cf, boolean subColumnsOnly, boolean 
reverseOrder)
+    {
+        if (cf == null || cf.getColumnsMap().size() == 0)
+            return EMPTY_COLUMNS;
+        
+        if (subColumnsOnly)
+        {
+            IColumn column = cf.getColumnsMap().values().iterator().next();
+            Collection<IColumn> subColumns = column.getSubColumns();
+            if (subColumns == null || subColumns.isEmpty())
+                return EMPTY_COLUMNS;
+            else
+                return avronateColumns(subColumns, reverseOrder);
+        }
+        
+        if (cf.isSuper())
+            return avronateSuperColumns(cf.getSortedColumns(), reverseOrder);
+        else
+            return avronateColumns(cf.getSortedColumns(), reverseOrder);
+    }
 
     public Void insert(Utf8 keyspace, Utf8 key, ColumnPath cp, ByteBuffer 
value, long timestamp, ConsistencyLevel consistencyLevel)
     throws AvroRemoteException, InvalidRequestException, UnavailableException, 
TimedOutException


Reply via email to