Author: brandonwilliams Date: Wed Apr 13 17:42:15 2011 New Revision: 1091857
URL: http://svn.apache.org/viewvc?rev=1091857&view=rev Log: Allow pig to use multiple schemas, fix BytesTypes cast during storage. Patch by Jeremy Hanna, reviewed by brandonwilliams for CASSANDRA-2465 Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Modified: cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1091857&r1=1091856&r2=1091857&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java (original) +++ cassandra/branches/cassandra-0.7/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java Wed Apr 13 17:42:15 2011 @@ -68,7 +68,7 @@ public class CassandraStorage extends Lo public final static String PIG_INITIAL_ADDRESS = "PIG_INITIAL_ADDRESS"; public final static String PIG_PARTITIONER = "PIG_PARTITIONER"; - private static String UDFCONTEXT_SCHEMA_KEY = "cassandra.schema"; + private static String UDFCONTEXT_SCHEMA_KEY_PREFIX = "cassandra.schema"; private final static ByteBuffer BOUND = ByteBufferUtil.EMPTY_BYTE_BUFFER; private static final Log logger = LogFactory.getLog(CassandraStorage.class); @@ -169,7 +169,7 @@ public class CassandraStorage extends Lo { UDFContext context = UDFContext.getUDFContext(); Properties property = context.getUDFProperties(CassandraStorage.class); - return cfdefFromString(property.getProperty(UDFCONTEXT_SCHEMA_KEY)); + return cfdefFromString(property.getProperty(getSchemaContextKey())); } private List<AbstractType> getDefaultMarshallers(CfDef cfDef) throws IOException @@ -226,7 +226,7 @@ public class CassandraStorage extends Lo this.reader = reader; } - private void setLocationFromUri(String location) throws IOException + private void setLocationFromUri(String location) throws IOException { // parse uri into keyspace and columnfamily String names[]; @@ -396,7 +396,7 @@ public class CassandraStorage extends Lo if (validators.get(column.name) == null) // Have to special case BytesType to convert DataByteArray into ByteBuffer if (marshallers.get(1) instanceof BytesType) - column.value = ByteBuffer.wrap(((DataByteArray) pair.get(1)).get()); + column.value = objToBB(pair.get(1)); else column.value = marshallers.get(1).decompose(pair.get(1)); else @@ -446,9 +446,10 @@ public class CassandraStorage extends Lo { UDFContext context = UDFContext.getUDFContext(); Properties property = context.getUDFProperties(CassandraStorage.class); - + + String schemaContextKey = getSchemaContextKey(); // Only get the schema if we haven't already gotten it - if (!property.containsKey(UDFCONTEXT_SCHEMA_KEY)) + if (!property.containsKey(schemaContextKey)) { Cassandra.Client client = null; try @@ -466,7 +467,7 @@ public class CassandraStorage extends Lo break; } } - property.setProperty(UDFCONTEXT_SCHEMA_KEY, cfdefToString(cfDef)); + property.setProperty(schemaContextKey, cfdefToString(cfDef)); } catch (TException e) { @@ -532,4 +533,14 @@ public class CassandraStorage extends Lo } return cfDef; } + + private String getSchemaContextKey() + { + StringBuilder sb = new StringBuilder(UDFCONTEXT_SCHEMA_KEY_PREFIX); + sb.append('.'); + sb.append(keyspace); + sb.append('.'); + sb.append(column_family); + return sb.toString(); + } }
