Phoenix 1050 : Support for DataByteArray

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2ccb62d1
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2ccb62d1
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2ccb62d1

Branch: refs/heads/4.0
Commit: 2ccb62d181afb3e54fe4940f78490f858b9a710a
Parents: 27b3865
Author: mravi <maghamraviki...@gmail.com>
Authored: Sun Sep 21 10:27:51 2014 -0700
Committer: mravi <maghamraviki...@gmail.com>
Committed: Sun Sep 21 10:27:51 2014 -0700

----------------------------------------------------------------------
 .../phoenix/pig/PhoenixHBaseStorerIT.java       | 115 +++++++++++++++++++
 .../org/apache/phoenix/pig/util/TypeUtil.java   |  48 ++++----
 2 files changed, 139 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ccb62d1/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java 
b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
index 1d82362..e0021d9 100644
--- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
+++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java
@@ -23,6 +23,7 @@ import static 
org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR;
 import static org.apache.phoenix.util.TestUtil.LOCALHOST;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertArrayEquals;
 
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -30,6 +31,7 @@ import java.sql.ResultSet;
 import java.sql.Statement;
 import java.util.Collection;
 
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT;
 import org.apache.phoenix.end2end.HBaseManagedTimeTest;
 import org.apache.pig.ExecType;
@@ -37,8 +39,10 @@ import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -182,4 +186,115 @@ public class PhoenixHBaseStorerIT extends 
BaseHBaseManagedTimeIT {
             assertEquals(0, rs.getInt(3));
         }
     }
+    
+    /**
+     * Test storage of DataByteArray columns to Phoenix
+     * Maps the DataByteArray with the target PhoenixDataType and persists in 
HBase. 
+    * @throws Exception
+     */
+    @Test
+    public void testStoreWithBinaryDataTypes() throws Exception {
+     
+       final String tableName = "TABLE3";
+        final Statement stmt = conn.createStatement();
+
+        stmt.execute("CREATE TABLE " + tableName +
+                " (col1 BIGINT NOT NULL, col2 INTEGER , col3 FLOAT, col4 
DOUBLE , col5 TINYINT , " +
+                "  col6 BOOLEAN , col7 VARBINARY CONSTRAINT my_pk PRIMARY KEY 
(col1))");
+
+        final Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+
+        int rows = 10;
+        for (int i = 1; i <= rows; i++) {
+            Tuple t = tupleFactory.newTuple();
+            t.append(i);
+            t.append(new DataByteArray(Bytes.toBytes(i * 5)));
+            t.append(new DataByteArray(Bytes.toBytes(i * 10.0F)));
+            t.append(new DataByteArray(Bytes.toBytes(i * 15.0D)));
+            t.append(new DataByteArray(Bytes.toBytes(i)));
+            t.append(new DataByteArray(Bytes.toBytes( i % 2 == 0)));
+            t.append(new DataByteArray(Bytes.toBytes(i)));
+            list.add(t);
+        }
+        data.set("in", 
"col1:int,col2:bytearray,col3:bytearray,col4:bytearray,col5:bytearray,col6:bytearray,col7:bytearray
 ", list);
+
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+
+        pigServer.registerQuery("Store A into 'hbase://" + tableName
+                               + "' using " + 
PhoenixHBaseStorage.class.getName() + "('"
+                                + zkQuorum + "', '-batchSize 1000');");
+
+        if (pigServer.executeBatch().get(0).getStatus() != 
JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job failed", pigServer.executeBatch()
+                    .get(0).getException());
+        }
+
+        final ResultSet rs = stmt
+                .executeQuery(String.format("SELECT col1 , col2 , col3 , col4 
, col5 , col6, col7  FROM %s ORDER BY col1" , tableName));
+
+        int count = 0;
+        for (int i = 1; i <= rows; i++) {
+            assertTrue(rs.next());
+            assertEquals(i, rs.getInt(1));
+            assertEquals(i * 5, rs.getInt(2));
+            assertEquals(i * 10.0F, rs.getFloat(3),0.0);
+            assertEquals(i * 15.0D, rs.getInt(4),0.0);
+            assertEquals(i,rs.getInt(5));
+            assertEquals(i % 2 == 0, rs.getBoolean(6));
+            assertArrayEquals(Bytes.toBytes(i), rs.getBytes(7));
+            count++;
+        }
+        assertEquals(rows, count);
+     }
+    
+    @Test
+    public void testStoreWithDateTime() throws Exception {
+     
+       final String tableName = "TABLE4";
+        final Statement stmt = conn.createStatement();
+
+        stmt.execute("CREATE TABLE " + tableName +
+                " (col1 BIGINT NOT NULL, col2 DATE , col3 TIME, " +
+                " col4 TIMESTAMP CONSTRAINT my_pk PRIMARY KEY (col1))");
+
+        long now = System.currentTimeMillis();
+        final DateTime dt = new DateTime(now);
+        
+        final Data data = Storage.resetData(pigServer);
+        final Collection<Tuple> list = Lists.newArrayList();
+        Tuple t = tupleFactory.newTuple();
+        
+        t.append(1);
+        t.append(dt);
+        t.append(dt);
+        t.append(dt);
+       
+        list.add(t);
+        
+        data.set("in", "col1:int,col2:datetime,col3:datetime,col4:datetime", 
list);
+
+        pigServer.setBatchOn();
+        pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();");
+
+        pigServer.registerQuery("Store A into 'hbase://" + tableName
+                               + "' using " + 
PhoenixHBaseStorage.class.getName() + "('"
+                                + zkQuorum + "', '-batchSize 1000');");
+
+        if (pigServer.executeBatch().get(0).getStatus() != 
JOB_STATUS.COMPLETED) {
+            throw new RuntimeException("Job failed", pigServer.executeBatch()
+                    .get(0).getException());
+        }
+
+        final ResultSet rs = stmt
+                .executeQuery(String.format("SELECT col1 , col2 , col3 , col4 
FROM %s " , tableName));
+
+        assertTrue(rs.next());
+        assertEquals(1, rs.getInt(1));
+        assertEquals(now, rs.getDate(2).getTime());
+        assertEquals(now, rs.getTime(3).getTime());
+        assertEquals(now, rs.getTimestamp(4).getTime());
+     
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2ccb62d1/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java 
b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
index f3cacfd..1cdd66d 100644
--- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
+++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java
@@ -31,6 +31,7 @@ import org.apache.phoenix.pig.hadoop.PhoenixRecord;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.pig.PigException;
 import org.apache.pig.ResourceSchema.ResourceFieldSchema;
+import org.apache.pig.backend.hadoop.hbase.HBaseBinaryConverter;
 import org.apache.pig.builtin.Utf8StorageConverter;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
@@ -45,7 +46,7 @@ import com.google.common.collect.ImmutableMap.Builder;
 public final class TypeUtil {
        
     private static final Log LOG = LogFactory.getLog(TypeUtil.class);
-       private static final Utf8StorageConverter utf8Converter = new 
Utf8StorageConverter();
+    private static final HBaseBinaryConverter binaryConverter = new 
HBaseBinaryConverter ();
        private static final ImmutableMap<PDataType,Byte> 
phoenixTypeToPigDataType = init();    
        
        private TypeUtil(){
@@ -97,7 +98,6 @@ public final class TypeUtil {
                if (obj == null) {
                        return null;
                }
-       
                PDataType sqlType;
 
                switch (type) {
@@ -108,6 +108,7 @@ public final class TypeUtil {
                        sqlType = PDataType.VARCHAR;
                        break;
                case DataType.DOUBLE:
+               case DataType.BIGDECIMAL:
                        sqlType = PDataType.DOUBLE;
                        break;
                case DataType.FLOAT:
@@ -117,6 +118,7 @@ public final class TypeUtil {
                        sqlType = PDataType.INTEGER;
                        break;
                case DataType.LONG:
+               case DataType.BIGINTEGER:
                        sqlType = PDataType.LONG;
                        break;
                case DataType.BOOLEAN:
@@ -125,6 +127,9 @@ public final class TypeUtil {
                case DataType.DATETIME:
                        sqlType = PDataType.DATE;
                        break;
+               case DataType.BYTE:
+                       sqlType = PDataType.TINYINT;
+                       break;
                default:
                        throw new RuntimeException("Unknown type " + 
obj.getClass().getName()
                                        + " passed to PhoenixHBaseStorage");
@@ -150,16 +155,17 @@ public final class TypeUtil {
                if(inferredPType == null) {
                        return null;
                }
-               
-               if(inferredPType == PDataType.VARBINARY && targetPhoenixType != 
PDataType.VARBINARY) {
+
+               if(inferredPType == PDataType.VARBINARY) {
                        try {
                                o = castBytes(o, targetPhoenixType);
-                               inferredPType = getType(o, 
DataType.findType(o));
+                               if(targetPhoenixType != PDataType.VARBINARY && 
targetPhoenixType != PDataType.BINARY) {
+                                       inferredPType = getType(o, 
DataType.findType(o));       
+                               }
                        } catch (IOException e) {
                                throw new RuntimeException("Error while casting 
bytes for object " +o);
                        }
                }
-
                if(inferredPType == PDataType.DATE) {
                        int inferredSqlType = targetPhoenixType.getSqlType();
 
@@ -192,42 +198,36 @@ public final class TypeUtil {
         * @return Object
         * @throws IOException
         */
-    public static Object castBytes(Object o, PDataType targetPhoenixType) 
throws IOException {
+       private static Object castBytes(Object o, PDataType targetPhoenixType) 
throws IOException {
         byte[] bytes = ((DataByteArray)o).get();
         
         switch(targetPhoenixType) {
         case CHAR:
         case VARCHAR:
-            return utf8Converter.bytesToCharArray(bytes);
+            return binaryConverter.bytesToCharArray(bytes);
         case UNSIGNED_SMALLINT:
         case SMALLINT:
-            return utf8Converter.bytesToInteger(bytes).shortValue();
+            return binaryConverter.bytesToInteger(bytes).shortValue();
         case UNSIGNED_TINYINT:
         case TINYINT:
-            return utf8Converter.bytesToInteger(bytes).byteValue();
+            return binaryConverter.bytesToInteger(bytes).byteValue();
         case UNSIGNED_INT:
         case INTEGER:
-            return utf8Converter.bytesToInteger(bytes);
+               return binaryConverter.bytesToInteger(bytes);
         case BOOLEAN:
-            return utf8Converter.bytesToBoolean(bytes);
-        case DECIMAL:
-            return utf8Converter.bytesToBigDecimal(bytes);
+            return binaryConverter.bytesToBoolean(bytes);
         case FLOAT:
         case UNSIGNED_FLOAT:
-            return utf8Converter.bytesToFloat(bytes);
+            return binaryConverter.bytesToFloat(bytes);
         case DOUBLE:
         case UNSIGNED_DOUBLE:
-            return utf8Converter.bytesToDouble(bytes);
+            return binaryConverter.bytesToDouble(bytes);
         case UNSIGNED_LONG:
         case LONG:
-            return utf8Converter.bytesToLong(bytes);
-        case TIME:
-        case TIMESTAMP:
-        case DATE:
-        case UNSIGNED_TIME:
-        case UNSIGNED_TIMESTAMP:
-        case UNSIGNED_DATE:
-               return utf8Converter.bytesToDateTime(bytes);
+            return binaryConverter.bytesToLong(bytes);
+        case VARBINARY : 
+        case BINARY:
+                return bytes;
         default:
                return o;
         }        

Reply via email to