Author: daijy
Date: Tue Jan  6 03:14:15 2015
New Revision: 1649730

URL: http://svn.apache.org/r1649730
Log:
PIG-4360: HBaseStorage should support setting the timestamp field

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1649730&r1=1649729&r2=1649730&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan  6 03:14:15 2015
@@ -30,6 +30,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4360: HBaseStorage should support setting the timestamp field (bridiver 
via daijy)
+
 PIG-2949: JsonLoader only reads arrays of objects (eyal via daijy)
 
 PIG-4213: CSVExcelStorage not quoting texts containing \r (CR) when storing 
(alfonso.nishikawa via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1649730&r1=1649729&r2=1649730&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Jan 
 6 03:14:15 2015
@@ -178,6 +178,7 @@ public class HBaseStorage extends LoadFu
     private final long minTimestamp_;
     private final long maxTimestamp_;
     private final long timestamp_;
+    private boolean includeTimestamp_;
 
     protected transient byte[] gt_;
     protected transient byte[] gte_;
@@ -211,6 +212,7 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("minTimestamp", true, "Record must have 
timestamp greater or equal to this value");
         validOptions_.addOption("maxTimestamp", true, "Record must have 
timestamp less then this value");
         validOptions_.addOption("timestamp", true, "Record must have timestamp 
equal to this value");
+        validOptions_.addOption("includeTimestamp", false, "Record will 
include the timestamp after the rowkey on store");
     }
 
     /**
@@ -254,6 +256,7 @@ public class HBaseStorage extends LoadFu
      * <li>-minTimestamp= Scan's timestamp for min timeRange
      * <li>-maxTimestamp= Scan's timestamp for max timeRange
      * <li>-timestamp= Scan's specified timestamp
+     * <li>-includeTimestamp= Record will include the timestamp after the row 
key on store
      * <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter) 
Utf8StorageConverter is the default
      * To be used with extreme caution, since this could result in data loss
      * (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
@@ -268,7 +271,7 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-columnPrefix] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] 
[-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]", 
validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-columnPrefix] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] 
[-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] 
[-includeTimestamp]", validOptions_ );
             throw e;
         }
 
@@ -343,6 +346,14 @@ public class HBaseStorage extends LoadFu
             timestamp_ = 0;
         }
 
+        includeTimestamp_ = false;
+        if (configuredOptions_.hasOption("includeTimestamp")) {
+            String value = 
configuredOptions_.getOptionValue("includeTimestamp");
+            if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) 
|| value == null ) {//the empty string and null check is for backward compat.
+                includeTimestamp_ = true;
+            }
+        }
+
         initScan();
     }
 
@@ -930,10 +941,27 @@ public class HBaseStorage extends LoadFu
     public void putNext(Tuple t) throws IOException {
         ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null : 
schema_.getFields();
         byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) : 
fieldSchemas[0].getType();
-        long ts=System.currentTimeMillis();
+        long ts;
 
         Put put = createPut(t.get(0), type);
 
+        int startIndex=1;
+        if (includeTimestamp_) {
+            byte timestampType = (fieldSchemas == null) ? 
DataType.findType(t.get(1)) : fieldSchemas[1].getType();
+            LoadStoreCaster caster = (LoadStoreCaster) caster_;
+
+            switch (timestampType) {
+            case DataType.BYTEARRAY: ts = 
caster.bytesToLong(((DataByteArray)t.get(1)).get()); break;
+            case DataType.LONG: ts = ((Long)t.get(1)).longValue(); break;
+            case DataType.DATETIME: ts = ((DateTime)t.get(1)).getMillis(); 
break;
+            default: throw new IOException("Unable to find a converter for 
tuple field " + t.get(1));
+            }
+
+            startIndex++;
+        } else {
+            ts = System.currentTimeMillis();
+        }
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("putNext -- WAL disabled: " + noWAL_);
             for (ColumnInfo columnInfo : columnInfo_) {
@@ -941,8 +969,8 @@ public class HBaseStorage extends LoadFu
             }
         }
 
-        for (int i=1;i<t.size();++i){
-            ColumnInfo columnInfo = columnInfo_.get(i-1);
+        for (int i=startIndex;i<t.size();++i){
+            ColumnInfo columnInfo = columnInfo_.get(i-startIndex);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) +
                         ", cf:column=" + columnInfo);

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1649730&r1=1649729&r2=1649730&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Tue Jan  6 
03:14:15 2015
@@ -1047,6 +1047,147 @@ public class TestHBaseStorage {
 
     /**
      * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
+     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_timestamp() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        long timestamp = System.currentTimeMillis();
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l, 
col_a, col_b, col_c;");
+        pig.store("b",  "hbase://" + TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter 
-includeTimestamp true')");
+
+        HTable table = new HTable(conf, TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+            
+            Assert.assertEquals(timestamp, col_a_ts);
+            Assert.assertEquals(timestamp, col_b_ts);
+            Assert.assertEquals(timestamp, col_c_ts);
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + i, col_c);
+        }
+        Assert.assertEquals(100, i);
+        table.close();
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
+     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_datetime_timestamp() throws 
IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        long timestamp = System.currentTimeMillis();
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, ToDate(" + timestamp 
+ "l), col_a, col_b, col_c;");
+        pig.store("b",  "hbase://" + TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter 
-includeTimestamp true')");
+
+        HTable table = new HTable(conf, TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+            
+            Assert.assertEquals(timestamp, col_a_ts);
+            Assert.assertEquals(timestamp, col_b_ts);
+            Assert.assertEquals(timestamp, col_c_ts);
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + i, col_c);
+        }
+        Assert.assertEquals(100, i);
+        table.close();
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
+     * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_bytearray_timestamp() throws 
IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        long timestamp = System.currentTimeMillis();
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l 
as timestamp:bytearray, col_a, col_b, col_c;");
+        pig.store("b",  "hbase://" + TESTTABLE_2,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter 
-includeTimestamp true')");
+
+        HTable table = new HTable(conf, TESTTABLE_2);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int i = 0;
+        for (i = 0; iter.hasNext(); ++i) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+            
+            Assert.assertEquals(timestamp, col_a_ts);
+            Assert.assertEquals(timestamp, col_b_ts);
+            Assert.assertEquals(timestamp, col_c_ts);
+
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + i, col_c);
+        }
+        Assert.assertEquals(100, i);
+        table.close();
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
      * 'TESTTABLE_2' using UTF-8 Plain Text format
      *
      * @throws IOException
@@ -1357,4 +1498,16 @@ public class TestHBaseStorage {
         return result.getValue(colArray[0], colArray[1]);
 
     }
+
+    /**
+     * Helper to deal with fetching a timestamp based on a cf:colname string 
spec
+     * @param result
+     * @param colName
+     * @return
+     */
+    private static long getColTimestamp(Result result, String colName) {
+        byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
+        return result.getColumnLatestCell(colArray[0], 
colArray[1]).getTimestamp();
+    }
+
 }


Reply via email to