Author: daijy
Date: Tue Jan 6 03:14:15 2015
New Revision: 1649730
URL: http://svn.apache.org/r1649730
Log:
PIG-4360: HBaseStorage should support setting the timestamp field
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1649730&r1=1649729&r2=1649730&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 6 03:14:15 2015
@@ -30,6 +30,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4360: HBaseStorage should support setting the timestamp field (bridiver
via daijy)
+
PIG-2949: JsonLoader only reads arrays of objects (eyal via daijy)
PIG-4213: CSVExcelStorage not quoting texts containing \r (CR) when storing
(alfonso.nishikawa via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1649730&r1=1649729&r2=1649730&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Tue Jan
6 03:14:15 2015
@@ -178,6 +178,7 @@ public class HBaseStorage extends LoadFu
private final long minTimestamp_;
private final long maxTimestamp_;
private final long timestamp_;
+ private boolean includeTimestamp_;
protected transient byte[] gt_;
protected transient byte[] gte_;
@@ -211,6 +212,7 @@ public class HBaseStorage extends LoadFu
validOptions_.addOption("minTimestamp", true, "Record must have
timestamp greater or equal to this value");
validOptions_.addOption("maxTimestamp", true, "Record must have
timestamp less then this value");
validOptions_.addOption("timestamp", true, "Record must have timestamp
equal to this value");
+ validOptions_.addOption("includeTimestamp", false, "Record will
include the timestamp after the rowkey on store");
}
/**
@@ -254,6 +256,7 @@ public class HBaseStorage extends LoadFu
* <li>-minTimestamp= Scan's timestamp for min timeRange
* <li>-maxTimestamp= Scan's timestamp for max timeRange
* <li>-timestamp= Scan's specified timestamp
+ * <li>-includeTimestamp= Record will include the timestamp after the row
key on store
* <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter)
Utf8StorageConverter is the default
* To be used with extreme caution, since this could result in data loss
* (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
@@ -268,7 +271,7 @@ public class HBaseStorage extends LoadFu
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte]
[-regex] [-columnPrefix] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit]
[-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]",
validOptions_ );
+ formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte]
[-regex] [-columnPrefix] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit]
[-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]
[-includeTimestamp]", validOptions_ );
throw e;
}
@@ -343,6 +346,14 @@ public class HBaseStorage extends LoadFu
timestamp_ = 0;
}
+ includeTimestamp_ = false;
+ if (configuredOptions_.hasOption("includeTimestamp")) {
+ String value =
configuredOptions_.getOptionValue("includeTimestamp");
+ if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value)
|| value == null ) {//the empty string and null check is for backward compat.
+ includeTimestamp_ = true;
+ }
+ }
+
initScan();
}
@@ -930,10 +941,27 @@ public class HBaseStorage extends LoadFu
public void putNext(Tuple t) throws IOException {
ResourceFieldSchema[] fieldSchemas = (schema_ == null) ? null :
schema_.getFields();
byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) :
fieldSchemas[0].getType();
- long ts=System.currentTimeMillis();
+ long ts;
Put put = createPut(t.get(0), type);
+ int startIndex=1;
+ if (includeTimestamp_) {
+ byte timestampType = (fieldSchemas == null) ?
DataType.findType(t.get(1)) : fieldSchemas[1].getType();
+ LoadStoreCaster caster = (LoadStoreCaster) caster_;
+
+ switch (timestampType) {
+ case DataType.BYTEARRAY: ts =
caster.bytesToLong(((DataByteArray)t.get(1)).get()); break;
+ case DataType.LONG: ts = ((Long)t.get(1)).longValue(); break;
+ case DataType.DATETIME: ts = ((DateTime)t.get(1)).getMillis();
break;
+ default: throw new IOException("Unable to find a converter for
tuple field " + t.get(1));
+ }
+
+ startIndex++;
+ } else {
+ ts = System.currentTimeMillis();
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("putNext -- WAL disabled: " + noWAL_);
for (ColumnInfo columnInfo : columnInfo_) {
@@ -941,8 +969,8 @@ public class HBaseStorage extends LoadFu
}
}
- for (int i=1;i<t.size();++i){
- ColumnInfo columnInfo = columnInfo_.get(i-1);
+ for (int i=startIndex;i<t.size();++i){
+ ColumnInfo columnInfo = columnInfo_.get(i-startIndex);
if (LOG.isDebugEnabled()) {
LOG.debug("putNext - tuple: " + i + ", value=" + t.get(i) +
", cf:column=" + columnInfo);
Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1649730&r1=1649729&r2=1649730&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Tue Jan 6
03:14:15 2015
@@ -1047,6 +1047,147 @@ public class TestHBaseStorage {
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it
into
+ * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_1_with_timestamp() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+ scanTable1(pig, DataFormat.HBaseBinary);
+ long timestamp = System.currentTimeMillis();
+ pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l,
col_a, col_b, col_c;");
+ pig.store("b", "hbase://" + TESTTABLE_2,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ + TESTCOLUMN_C + "','-caster HBaseBinaryConverter
-includeTimestamp true')");
+
+ HTable table = new HTable(conf, TESTTABLE_2);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int i = 0;
+ for (i = 0; iter.hasNext(); ++i) {
+ Result result = iter.next();
+ String v = String.valueOf(i);
+ String rowKey = Bytes.toString(result.getRow());
+ int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+ double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+ String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+ long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+ long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+ long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+
+ Assert.assertEquals(timestamp, col_a_ts);
+ Assert.assertEquals(timestamp, col_b_ts);
+ Assert.assertEquals(timestamp, col_c_ts);
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i, col_a);
+ Assert.assertEquals(i + 0.0, col_b, 1e-6);
+ Assert.assertEquals("Text_" + i, col_c);
+ }
+ Assert.assertEquals(100, i);
+ table.close();
+ }
+
+ /**
+ * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it
into
+ * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_1_with_datetime_timestamp() throws
IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+ scanTable1(pig, DataFormat.HBaseBinary);
+ long timestamp = System.currentTimeMillis();
+ pig.registerQuery("b = FOREACH a GENERATE rowKey, ToDate(" + timestamp
+ "l), col_a, col_b, col_c;");
+ pig.store("b", "hbase://" + TESTTABLE_2,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ + TESTCOLUMN_C + "','-caster HBaseBinaryConverter
-includeTimestamp true')");
+
+ HTable table = new HTable(conf, TESTTABLE_2);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int i = 0;
+ for (i = 0; iter.hasNext(); ++i) {
+ Result result = iter.next();
+ String v = String.valueOf(i);
+ String rowKey = Bytes.toString(result.getRow());
+ int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+ double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+ String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+ long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+ long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+ long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+
+ Assert.assertEquals(timestamp, col_a_ts);
+ Assert.assertEquals(timestamp, col_b_ts);
+ Assert.assertEquals(timestamp, col_c_ts);
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i, col_a);
+ Assert.assertEquals(i + 0.0, col_b, 1e-6);
+ Assert.assertEquals("Text_" + i, col_c);
+ }
+ Assert.assertEquals(100, i);
+ table.close();
+ }
+
+ /**
+ * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it
into
+ * 'TESTTABLE_2' using HBaseBinaryFormat setting the timestamp
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_1_with_bytearray_timestamp() throws
IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ prepareTable(TESTTABLE_2, false, DataFormat.HBaseBinary);
+ scanTable1(pig, DataFormat.HBaseBinary);
+ long timestamp = System.currentTimeMillis();
+ pig.registerQuery("b = FOREACH a GENERATE rowKey, " + timestamp + "l
as timestamp:bytearray, col_a, col_b, col_c;");
+ pig.store("b", "hbase://" + TESTTABLE_2,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ + TESTCOLUMN_C + "','-caster HBaseBinaryConverter
-includeTimestamp true')");
+
+ HTable table = new HTable(conf, TESTTABLE_2);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int i = 0;
+ for (i = 0; iter.hasNext(); ++i) {
+ Result result = iter.next();
+ String v = String.valueOf(i);
+ String rowKey = Bytes.toString(result.getRow());
+ int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+ double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+ String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+ long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+ long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+ long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+
+ Assert.assertEquals(timestamp, col_a_ts);
+ Assert.assertEquals(timestamp, col_b_ts);
+ Assert.assertEquals(timestamp, col_c_ts);
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i, col_a);
+ Assert.assertEquals(i + 0.0, col_b, 1e-6);
+ Assert.assertEquals("Text_" + i, col_c);
+ }
+ Assert.assertEquals(100, i);
+ table.close();
+ }
+
+ /**
+ * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it
into
* 'TESTTABLE_2' using UTF-8 Plain Text format
*
* @throws IOException
@@ -1357,4 +1498,16 @@ public class TestHBaseStorage {
return result.getValue(colArray[0], colArray[1]);
}
+
+ /**
+ * Helper to deal with fetching a timestamp based on a cf:colname string
spec
+ * @param result
+ * @param colName
+ * @return
+ */
+ private static long getColTimestamp(Result result, String colName) {
+ byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
+ return result.getColumnLatestCell(colArray[0],
colArray[1]).getTimestamp();
+ }
+
}