Author: brandonwilliams
Date: Wed Sep 28 22:01:01 2011
New Revision: 1177083

URL: http://svn.apache.org/viewvc?rev=1177083&view=rev
Log:
Pig storage handler implements LoadMetadata interface.
Patch by brandonwilliams, reviewed by Jeremy Hanna for CASSANDRA-2777

Modified:
    
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1177083&r1=1177082&r2=1177083&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 Wed Sep 28 22:01:01 2011
@@ -31,7 +31,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.cassandra.db.Column;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.SuperColumn;
-import org.apache.cassandra.db.marshal.AbstractType;
+import org.apache.cassandra.db.marshal.*;
 import org.apache.cassandra.hadoop.*;
 import org.apache.cassandra.thrift.Mutation;
 import org.apache.cassandra.thrift.Deletion;
@@ -46,6 +46,7 @@ import org.apache.pig.*;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.*;
+import org.apache.pig.ResourceSchema.ResourceFieldSchema;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.thrift.TDeserializer;
@@ -61,7 +62,7 @@ import org.apache.thrift.transport.TTran
  *
  * A row from a standard CF will be returned as nested tuples: (key, ((name1, 
val1), (name2, val2))).
  */
-public class CassandraStorage extends LoadFunc implements StoreFuncInterface
+public class CassandraStorage extends LoadFunc implements StoreFuncInterface, 
LoadMetadata
 {
     // system environment variables that can be set to configure connection 
info:
     // alternatively, Hadoop JobConf variables can be set using keys from 
ConfigHelper
@@ -293,6 +294,103 @@ public class CassandraStorage extends Lo
         initSchema(loadSignature);
     }
 
+    public ResourceSchema getSchema(String location, Job job) throws 
IOException
+    {
+        setLocation(location, job);
+        CfDef cfDef = getCfDef(loadSignature);
+
+        if (cfDef.column_type.equals("Super"))
+            return null;
+        // top-level schema, no type
+        ResourceSchema schema = new ResourceSchema();
+
+        // add key
+        ResourceFieldSchema keyFieldSchema = new ResourceFieldSchema();
+        keyFieldSchema.setName("key");
+        keyFieldSchema.setType(DataType.CHARARRAY); //TODO: get key type
+
+        // will become the bag of tuples
+        ResourceFieldSchema bagFieldSchema = new ResourceFieldSchema();
+        bagFieldSchema.setName("columns");
+        bagFieldSchema.setType(DataType.BAG);
+        ResourceSchema bagSchema = new ResourceSchema();
+
+
+        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
+        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
+        List<ResourceFieldSchema> tupleFields = new 
ArrayList<ResourceFieldSchema>();
+
+        // default comparator/validator
+        ResourceSchema innerTupleSchema = new ResourceSchema();
+        ResourceFieldSchema tupleField = new ResourceFieldSchema();
+        tupleField.setType(DataType.TUPLE);
+        tupleField.setSchema(innerTupleSchema);
+
+        ResourceFieldSchema colSchema = new ResourceFieldSchema();
+        colSchema.setName("name");
+        colSchema.setType(getPigType(marshallers.get(0)));
+        tupleFields.add(colSchema);
+
+        ResourceFieldSchema valSchema = new ResourceFieldSchema();
+        AbstractType validator = marshallers.get(1);
+        valSchema.setName("value");
+        valSchema.setType(getPigType(validator));
+        tupleFields.add(valSchema);
+
+        // defined validators/indexes
+        for (ColumnDef cdef : cfDef.column_metadata)
+        {
+            colSchema = new ResourceFieldSchema();
+            colSchema.setName(new String(cdef.getName()));
+            colSchema.setType(getPigType(marshallers.get(0)));
+            tupleFields.add(colSchema);
+
+            valSchema = new ResourceFieldSchema();
+            validator = validators.get(cdef.getName());
+            if (validator == null)
+                validator = marshallers.get(1);
+            valSchema.setName("value");
+            valSchema.setType(getPigType(validator));
+            tupleFields.add(valSchema);
+        }
+        innerTupleSchema.setFields(tupleFields.toArray(new 
ResourceFieldSchema[tupleFields.size()]));
+
+        // a bag can contain only one tuple, but that tuple can contain 
anything
+        bagSchema.setFields(new ResourceFieldSchema[] { tupleField });
+        bagFieldSchema.setSchema(bagSchema);
+        // top level schema contains everything
+        schema.setFields(new ResourceFieldSchema[] { keyFieldSchema, 
bagFieldSchema });
+        return schema;
+    }
+
+    private byte getPigType(AbstractType type)
+    {
+        if (type instanceof LongType)
+            return DataType.LONG;
+        else if (type instanceof IntegerType)
+            return DataType.INTEGER;
+        else if (type instanceof AsciiType)
+            return DataType.CHARARRAY;
+        else if (type instanceof UTF8Type)
+            return DataType.CHARARRAY;
+        return DataType.BYTEARRAY;
+    }
+
+    public ResourceStatistics getStatistics(String location, Job job)
+    {
+        return null;
+    }
+
+    public String[] getPartitionKeys(String location, Job job)
+    {
+        return null;
+    }
+
+    public void setPartitionFilter(Expression partitionFilter)
+    {
+        // no-op
+    }
+
     @Override
     public String relativeToAbsolutePath(String location, Path curDir) throws 
IOException
     {


Reply via email to