Phoenix 1050 : Support for DataByteArray.
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/598bc323 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/598bc323 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/598bc323 Branch: refs/heads/master Commit: 598bc323f49c8d0413c45019fafcbe94036cf67a Parents: f2c9bba Author: mravi <maghamraviki...@gmail.com> Authored: Sun Sep 21 10:17:26 2014 -0700 Committer: mravi <maghamraviki...@gmail.com> Committed: Sun Sep 21 10:17:26 2014 -0700 ---------------------------------------------------------------------- .../phoenix/pig/PhoenixHBaseStorerIT.java | 115 +++++++++++++++++++ .../org/apache/phoenix/pig/util/TypeUtil.java | 48 ++++---- 2 files changed, 139 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/598bc323/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java index 1d82362..e0021d9 100644 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java +++ b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixHBaseStorerIT.java @@ -23,6 +23,7 @@ import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR; import static org.apache.phoenix.util.TestUtil.LOCALHOST; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertArrayEquals; import java.sql.Connection; import java.sql.DriverManager; @@ -30,6 +31,7 @@ import java.sql.ResultSet; import java.sql.Statement; import java.util.Collection; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; import org.apache.phoenix.end2end.HBaseManagedTimeTest; import org.apache.pig.ExecType; @@ -37,8 +39,10 @@ import org.apache.pig.PigServer; import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS; import org.apache.pig.builtin.mock.Storage; import org.apache.pig.builtin.mock.Storage.Data; +import org.apache.pig.data.DataByteArray; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.joda.time.DateTime; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -182,4 +186,115 @@ public class PhoenixHBaseStorerIT extends BaseHBaseManagedTimeIT { assertEquals(0, rs.getInt(3)); } } + + /** + * Test storage of DataByteArray columns to Phoenix + * Maps the DataByteArray with the target PhoenixDataType and persists in HBase. + * @throws Exception + */ + @Test + public void testStoreWithBinaryDataTypes() throws Exception { + + final String tableName = "TABLE3"; + final Statement stmt = conn.createStatement(); + + stmt.execute("CREATE TABLE " + tableName + + " (col1 BIGINT NOT NULL, col2 INTEGER , col3 FLOAT, col4 DOUBLE , col5 TINYINT , " + + " col6 BOOLEAN , col7 VARBINARY CONSTRAINT my_pk PRIMARY KEY (col1))"); + + final Data data = Storage.resetData(pigServer); + final Collection<Tuple> list = Lists.newArrayList(); + + int rows = 10; + for (int i = 1; i <= rows; i++) { + Tuple t = tupleFactory.newTuple(); + t.append(i); + t.append(new DataByteArray(Bytes.toBytes(i * 5))); + t.append(new DataByteArray(Bytes.toBytes(i * 10.0F))); + t.append(new DataByteArray(Bytes.toBytes(i * 15.0D))); + t.append(new DataByteArray(Bytes.toBytes(i))); + t.append(new DataByteArray(Bytes.toBytes( i % 2 == 0))); + t.append(new DataByteArray(Bytes.toBytes(i))); + list.add(t); + } + data.set("in", "col1:int,col2:bytearray,col3:bytearray,col4:bytearray,col5:bytearray,col6:bytearray,col7:bytearray ", list); + + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); + + pigServer.registerQuery("Store A into 'hbase://" + tableName + + "' using " + PhoenixHBaseStorage.class.getName() + "('" + + zkQuorum + "', '-batchSize 1000');"); + + if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) { + throw new RuntimeException("Job failed", pigServer.executeBatch() + .get(0).getException()); + } + + final ResultSet rs = stmt + .executeQuery(String.format("SELECT col1 , col2 , col3 , col4 , col5 , col6, col7 FROM %s ORDER BY col1" , tableName)); + + int count = 0; + for (int i = 1; i <= rows; i++) { + assertTrue(rs.next()); + assertEquals(i, rs.getInt(1)); + assertEquals(i * 5, rs.getInt(2)); + assertEquals(i * 10.0F, rs.getFloat(3),0.0); + assertEquals(i * 15.0D, rs.getInt(4),0.0); + assertEquals(i,rs.getInt(5)); + assertEquals(i % 2 == 0, rs.getBoolean(6)); + assertArrayEquals(Bytes.toBytes(i), rs.getBytes(7)); + count++; + } + assertEquals(rows, count); + } + + @Test + public void testStoreWithDateTime() throws Exception { + + final String tableName = "TABLE4"; + final Statement stmt = conn.createStatement(); + + stmt.execute("CREATE TABLE " + tableName + + " (col1 BIGINT NOT NULL, col2 DATE , col3 TIME, " + + " col4 TIMESTAMP CONSTRAINT my_pk PRIMARY KEY (col1))"); + + long now = System.currentTimeMillis(); + final DateTime dt = new DateTime(now); + + final Data data = Storage.resetData(pigServer); + final Collection<Tuple> list = Lists.newArrayList(); + Tuple t = tupleFactory.newTuple(); + + t.append(1); + t.append(dt); + t.append(dt); + t.append(dt); + + list.add(t); + + data.set("in", "col1:int,col2:datetime,col3:datetime,col4:datetime", list); + + pigServer.setBatchOn(); + pigServer.registerQuery("A = LOAD 'in' USING mock.Storage();"); + + pigServer.registerQuery("Store A into 'hbase://" + tableName + + "' using " + PhoenixHBaseStorage.class.getName() + "('" + + zkQuorum + "', '-batchSize 1000');"); + + if (pigServer.executeBatch().get(0).getStatus() != JOB_STATUS.COMPLETED) { + throw new RuntimeException("Job failed", pigServer.executeBatch() + .get(0).getException()); + } + + final ResultSet rs = stmt + .executeQuery(String.format("SELECT col1 , col2 , col3 , col4 FROM %s " , tableName)); + + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals(now, rs.getDate(2).getTime()); + assertEquals(now, rs.getTime(3).getTime()); + assertEquals(now, rs.getTimestamp(4).getTime()); + + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/598bc323/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java index f3cacfd..1cdd66d 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java @@ -31,6 +31,7 @@ import org.apache.phoenix.pig.hadoop.PhoenixRecord; import org.apache.phoenix.schema.PDataType; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.backend.hadoop.hbase.HBaseBinaryConverter; import org.apache.pig.builtin.Utf8StorageConverter; import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; @@ -45,7 +46,7 @@ import com.google.common.collect.ImmutableMap.Builder; public final class TypeUtil { private static final Log LOG = LogFactory.getLog(TypeUtil.class); - private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter(); + private static final HBaseBinaryConverter binaryConverter = new HBaseBinaryConverter (); private static final ImmutableMap<PDataType,Byte> phoenixTypeToPigDataType = init(); private TypeUtil(){ @@ -97,7 +98,6 @@ public final class TypeUtil { if (obj == null) { return null; } - PDataType sqlType; switch (type) { @@ -108,6 +108,7 @@ public final class TypeUtil { sqlType = PDataType.VARCHAR; break; case DataType.DOUBLE: + case DataType.BIGDECIMAL: sqlType = PDataType.DOUBLE; break; case DataType.FLOAT: @@ -117,6 +118,7 @@ public final class TypeUtil { sqlType = PDataType.INTEGER; break; case DataType.LONG: + case DataType.BIGINTEGER: sqlType = PDataType.LONG; break; case DataType.BOOLEAN: @@ -125,6 +127,9 @@ public final class TypeUtil { case DataType.DATETIME: sqlType = PDataType.DATE; break; + case DataType.BYTE: + sqlType = PDataType.TINYINT; + break; default: throw new RuntimeException("Unknown type " + obj.getClass().getName() + " passed to PhoenixHBaseStorage"); @@ -150,16 +155,17 @@ public final class TypeUtil { if(inferredPType == null) { return null; } - - if(inferredPType == PDataType.VARBINARY && targetPhoenixType != PDataType.VARBINARY) { + + if(inferredPType == PDataType.VARBINARY) { try { o = castBytes(o, targetPhoenixType); - inferredPType = getType(o, DataType.findType(o)); + if(targetPhoenixType != PDataType.VARBINARY && targetPhoenixType != PDataType.BINARY) { + inferredPType = getType(o, DataType.findType(o)); + } } catch (IOException e) { throw new RuntimeException("Error while casting bytes for object " +o); } } - if(inferredPType == PDataType.DATE) { int inferredSqlType = targetPhoenixType.getSqlType(); @@ -192,42 +198,36 @@ public final class TypeUtil { * @return Object * @throws IOException */ - public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException { + private static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException { byte[] bytes = ((DataByteArray)o).get(); switch(targetPhoenixType) { case CHAR: case VARCHAR: - return utf8Converter.bytesToCharArray(bytes); + return binaryConverter.bytesToCharArray(bytes); case UNSIGNED_SMALLINT: case SMALLINT: - return utf8Converter.bytesToInteger(bytes).shortValue(); + return binaryConverter.bytesToInteger(bytes).shortValue(); case UNSIGNED_TINYINT: case TINYINT: - return utf8Converter.bytesToInteger(bytes).byteValue(); + return binaryConverter.bytesToInteger(bytes).byteValue(); case UNSIGNED_INT: case INTEGER: - return utf8Converter.bytesToInteger(bytes); + return binaryConverter.bytesToInteger(bytes); case BOOLEAN: - return utf8Converter.bytesToBoolean(bytes); - case DECIMAL: - return utf8Converter.bytesToBigDecimal(bytes); + return binaryConverter.bytesToBoolean(bytes); case FLOAT: case UNSIGNED_FLOAT: - return utf8Converter.bytesToFloat(bytes); + return binaryConverter.bytesToFloat(bytes); case DOUBLE: case UNSIGNED_DOUBLE: - return utf8Converter.bytesToDouble(bytes); + return binaryConverter.bytesToDouble(bytes); case UNSIGNED_LONG: case LONG: - return utf8Converter.bytesToLong(bytes); - case TIME: - case TIMESTAMP: - case DATE: - case UNSIGNED_TIME: - case UNSIGNED_TIMESTAMP: - case UNSIGNED_DATE: - return utf8Converter.bytesToDateTime(bytes); + return binaryConverter.bytesToLong(bytes); + case VARBINARY : + case BINARY: + return bytes; default: return o; }