Author: brandonwilliams
Date: Wed Sep 28 22:01:08 2011
New Revision: 1177084

URL: http://svn.apache.org/viewvc?rev=1177084&view=rev
Log:
Fix handling of integer types in pig.
Patch by brandonwilliams, reviewed by Jeremy Hanna for CASSANDRA-2810

Modified:
    
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java

Modified: 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java?rev=1177084&r1=1177083&r2=1177084&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 (original)
+++ 
cassandra/branches/cassandra-0.8/contrib/pig/src/java/org/apache/cassandra/hadoop/pig/CassandraStorage.java
 Wed Sep 28 22:01:08 2011
@@ -17,11 +17,13 @@
 package org.apache.cassandra.hadoop.pig;
 
 import java.io.IOException;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
 import java.util.*;
 
 import org.apache.cassandra.config.ConfigurationException;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.IntegerType;
 import org.apache.cassandra.db.marshal.TypeParser;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.FBUtilities;
@@ -143,18 +145,14 @@ public class CassandraStorage extends Lo
         List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
         Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
 
+        setTupleValue(pair, 0, marshallers.get(0).compose(name));
         if (col instanceof Column)
         {
             // standard
-            pair.set(0, marshallers.get(0).compose(name));
             if (validators.get(name) == null)
-                // Have to special case BytesType because compose returns a 
ByteBuffer
-                if (marshallers.get(1) instanceof BytesType)
-                    pair.set(1, new 
DataByteArray(ByteBufferUtil.getArray(col.value())));
-                else
-                    pair.set(1, marshallers.get(1).compose(col.value()));
+                setTupleValue(pair, 1, 
marshallers.get(1).compose(col.value()));
             else
-                pair.set(1, validators.get(name).compose(col.value()));
+                setTupleValue(pair, 1, 
validators.get(name).compose(col.value()));
             return pair;
         }
 
@@ -167,6 +165,16 @@ public class CassandraStorage extends Lo
         return pair;
     }
 
+    private void setTupleValue(Tuple pair, int position, Object value) throws 
ExecException
+    {
+       if (value instanceof BigInteger)
+           pair.set(position, ((BigInteger) value).intValue());
+       else if (value instanceof ByteBuffer)
+           pair.set(position, new 
DataByteArray(ByteBufferUtil.getArray((ByteBuffer) value)));
+       else
+           pair.set(position, value);
+    }
+
     private CfDef getCfDef(String signature)
     {
         UDFContext context = UDFContext.getUDFContext();
@@ -453,8 +461,6 @@ public class CassandraStorage extends Lo
         DefaultDataBag pairs = (DefaultDataBag) t.get(1);
         ArrayList<Mutation> mutationList = new ArrayList<Mutation>();
         CfDef cfDef = getCfDef(storeSignature);
-        List<AbstractType> marshallers = getDefaultMarshallers(cfDef);
-        Map<ByteBuffer,AbstractType> validators = getValidatorMap(cfDef);
         try
         {
             for (Tuple pair : pairs)
@@ -498,15 +504,8 @@ public class CassandraStorage extends Lo
                    else
                    {
                        org.apache.cassandra.thrift.Column column = new 
org.apache.cassandra.thrift.Column();
-                       column.name = 
marshallers.get(0).decompose((pair.get(0)));
-                       if (validators.get(column.name) == null)
-                           // Have to special case BytesType to convert 
DataByteArray into ByteBuffer
-                           if (marshallers.get(1) instanceof BytesType)
-                               column.value = objToBB(pair.get(1));
-                           else
-                               column.value = 
marshallers.get(1).decompose(pair.get(1));
-                       else
-                           column.value = 
validators.get(column.name).decompose(pair.get(1));
+                       column.name = objToBB(pair.get(0));
+                       column.value = objToBB(pair.get(1));
                        column.setTimestamp(System.currentTimeMillis() * 1000);
                        mutation.column_or_supercolumn = new 
ColumnOrSuperColumn();
                        mutation.column_or_supercolumn.column = column;
@@ -626,3 +625,4 @@ public class CassandraStorage extends Lo
         return cfDef;
     }
 }
+


Reply via email to