Author: brandonwilliams
Date: Wed Apr 13 17:42:15 2011
New Revision: 1091857

URL: http://svn.apache.org/viewvc?rev=1091857&view=rev
Log:
Allow pig to use multiple schemas, fix BytesTypes cast during storage.
Patch by Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-2465

Modified:
    
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1091857&r1=1091856&r2=1091857&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 Wed Apr 13 17:42:15 2011
@@ -68,7 +68,7 @@ public class CassandraStorage extends Lo
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
 
-    private static String UDFCONTEXT_SCHEMA_KEY = "cassandra.schema";
+    private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema";
 
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = 
LogFactory.getLog(CassandraStorage.class);
@@ -169,7 +169,7 @@ public class CassandraStorage extends Lo
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
-        return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY));
+        return cfdefFromString(property.getProperty(getSchemaContextKey()));
     }
 
     private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws 
IOException
@@ -226,7 +226,7 @@ public class CassandraStorage extends Lo
         this.reader = reader;
     }
 
-     private void setLocationFromUri(String location) throws IOException
+    private void setLocationFromUri(String location) throws IOException
     {
         // parse uri into keyspace and columnfamily
         String names[];
@@ -396,7 +396,7 @@ public class CassandraStorage extends Lo
                        if (validators.get(column.name) == null)
                            // Have to special case BytesType to convert 
DataByteArray into ByteBuffer
                            if (marshallers.get(1) instanceof BytesType)
-                               column.value = ByteBuffer.wrap(((DataByteArray) 
pair.get(1)).get());
+                               column.value = objToBB(pair.get(1));
                            else
                                column.value = 
marshallers.get(1).decompose(pair.get(1));
                        else
@@ -446,9 +446,10 @@ public class CassandraStorage extends Lo
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
-        
+
+        String schemaContextKey = getSchemaContextKey();
         // Only get the schema if we haven't already gotten it
-        if (!property.containsKey(UDFCONTEXT_SCHEMA_KEY))
+        if (!property.containsKey(schemaContextKey))
         {
             Cassandra.Client client = null;
             try
@@ -466,7 +467,7 @@ public class CassandraStorage extends Lo
                         break;
                     }
                 }
-                property.setProperty(UDFCONTEXT_SCHEMA_KEY, 
cfdefToString(cfDef));
+                property.setProperty(schemaContextKey, cfdefToString(cfDef));
             }
             catch (TException e)
             {
@@ -532,4 +533,14 @@ public class CassandraStorage extends Lo
         }
         return cfDef;
     }
+
+    private String getSchemaContextKey()
+    {
+        StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX);
+        sb.append('.');
+        sb.append(keyspace);
+        sb.append('.');
+        sb.append(column_family);
+        return sb.toString();
+    }
 }


Reply via email to