Author: brandonwilliams
Date: Thu Jul 21 20:10:54 2011
New Revision: 1149341

URL: http://svn.apache.org/viewvc?rev=1149341&view=rev
Log:
Use a UDF-specific context signature.
Patch by Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-2869

Modified:
    
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1149341&r1=1149340&r2=1149341&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 (original)
+++ 
cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 Thu Jul 21 20:10:54 2011
@@ -68,8 +68,6 @@ public class CassandraStorage extends Lo
     public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS";
     public final static String PIG_PARTITIONER = "PIG_PARTITIONER";
 
-    private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema";
-
     private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER;
     private static final Log logger = 
LogFactory.getLog(CassandraStorage.class);
 
@@ -78,6 +76,8 @@ public class CassandraStorage extends Lo
     private boolean slice_reverse = false;
     private String keyspace;
     private String column_family;
+    private String loadSignature;
+    private String storeSignature;
 
     private Configuration conf;
     private RecordReader reader;
@@ -112,7 +112,7 @@ public class CassandraStorage extends Lo
             if (!reader.nextKeyValue())
                 return null;
 
-            CfDef cfDef = getCfDef();
+            CfDef cfDef = getCfDef(loadSignature);
             ByteBuffer key = (ByteBuffer)reader.getCurrentKey();
             SortedMap<ByteBuffer,IColumn> cf = 
(SortedMap<ByteBuffer,IColumn>)reader.getCurrentValue();
             assert key != null && cf != null;
@@ -165,11 +165,11 @@ public class CassandraStorage extends Lo
         return pair;
     }
 
-    private CfDef getCfDef()
+    private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
-        return cfdefFromString(property.getProperty(getSchemaContextKey()));
+        return cfdefFromString(property.getProperty(signature));
     }
 
     private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws 
IOException
@@ -289,7 +289,7 @@ public class CassandraStorage extends Lo
         }
         ConfigHelper.setInputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
-        initSchema();
+        initSchema(loadSignature);
     }
 
     @Override
@@ -298,9 +298,16 @@ public class CassandraStorage extends Lo
         return location;
     }
 
+    @Override
+    public void setUDFContextSignature(String signature)
+    {
+        this.loadSignature = signature;
+    }
+
     /* StoreFunc methods */
     public void setStoreFuncUDFContextSignature(String signature)
     {
+        this.storeSignature = signature;
     }
 
     public String relToAbsPathForStoreLocation(String location, Path curDir) 
throws IOException
@@ -314,7 +321,7 @@ public class CassandraStorage extends Lo
         setLocationFromUri(location);
         ConfigHelper.setOutputColumnFamily(conf, keyspace, column_family);
         setConnectionInformation();
-        initSchema();
+        initSchema(storeSignature);
     }
 
     public OutputFormat getOutputFormat()
@@ -346,7 +353,7 @@ public class CassandraStorage extends Lo
         ByteBuffer key = objToBB(t.get(0));
         DefaultDataBag pairs = (DefaultDataBag) t.get(1);
         ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
-        CfDef cfDef = getCfDef();
+        CfDef cfDef = getCfDef(storeSignature);
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         try
@@ -404,7 +411,6 @@ public class CassandraStorage extends Lo
                        column.timestamp = System.currentTimeMillis() * 1000;
                        mutation.column_or_supercolumn = new 
ColumnOrSuperColumn();
                        mutation.column_or_supercolumn.column = column;
-                       mutationList.add(mutation);
                    }
                }
                mutationList.add(mutation);
@@ -412,7 +418,7 @@ public class CassandraStorage extends Lo
         }
         catch (ClassCastException e)
         {
-            throw new IOException(e + " Output must be (key, 
{(column,value)...}) for ColumnFamily or (key, 
{supercolumn:{(column,value)...}...}) for SuperColumnFamily");
+            throw new IOException(e + " Output must be (key, 
{(column,value)...}) for ColumnFamily or (key, 
{supercolumn:{(column,value)...}...}) for SuperColumnFamily", e);
         }
         try
         {
@@ -430,14 +436,13 @@ public class CassandraStorage extends Lo
 
     /* Methods to get the column family schema from Cassandra */
 
-    private void initSchema()
+    private void initSchema(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
         Properties property = context.getUDFProperties(CassandraStorage.class);
 
-        String schemaContextKey = getSchemaContextKey();
         // Only get the schema if we haven't already gotten it
-        if (!property.containsKey(schemaContextKey))
+        if (!property.containsKey(signature))
         {
             Cassandra.Client client = null;
             try
@@ -455,7 +460,7 @@ public class CassandraStorage extends Lo
                         break;
                     }
                 }
-                property.setProperty(schemaContextKey, cfdefToString(cfDef));
+                property.setProperty(signature, cfdefToString(cfDef));
             }
             catch (TException e)
             {
@@ -521,14 +526,4 @@ public class CassandraStorage extends Lo
         }
         return cfDef;
     }
-
-    private String getSchemaContextKey()
-    {
-        StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX);
-        sb.append('.');
-        sb.append(keyspace);
-        sb.append('.');
-        sb.append(column_family);
-        return sb.toString();
-    }
 }


Reply via email to