Author: daijy
Date: Wed Jan 7 00:41:05 2015
New Revision: 1649981
URL: http://svn.apache.org/r1649981
Log:
PIG-4370: HBaseStorage should support delete markers
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1649981&r1=1649980&r2=1649981&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan 7 00:41:05 2015
@@ -24,6 +24,10 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4370: HBaseStorage should support delete markers (bridiver via daijy)
+
+PIG-4360: HBaseStorage should support setting the timestamp field (bridiver
via daijy)
+
PIG-4337: Split Types and MultiQuery e2e tests into multiple groups (rohini)
PIG-4066: An optimization for ROLLUP operation in Pig (hxquangnhat via
cheolsoo)
@@ -38,8 +42,6 @@ PIG-4367: Port local mode tests to Tez -
PIG-4339: e2e test framework assumes default exectype as mapred (rohini)
-PIG-4360: HBaseStorage should support setting the timestamp field (bridiver
via daijy)
-
PIG-2949: JsonLoader only reads arrays of objects (eyal via daijy)
PIG-4213: CSVExcelStorage not quoting texts containing \r (CR) when storing
(alfonso.nishikawa via daijy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1649981&r1=1649980&r2=1649981&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Jan
7 00:41:05 2015
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -179,6 +180,7 @@ public class HBaseStorage extends LoadFu
private final long maxTimestamp_;
private final long timestamp_;
private boolean includeTimestamp_;
+ private boolean includeTombstone_;
protected transient byte[] gt_;
protected transient byte[] gte_;
@@ -212,7 +214,8 @@ public class HBaseStorage extends LoadFu
validOptions_.addOption("minTimestamp", true, "Record must have
timestamp greater or equal to this value");
validOptions_.addOption("maxTimestamp", true, "Record must have
timestamp less then this value");
validOptions_.addOption("timestamp", true, "Record must have timestamp
equal to this value");
- validOptions_.addOption("includeTimestamp", false, "Record will
include the timestamp after the rowkey on store");
+ validOptions_.addOption("includeTimestamp", false, "Record will
include the timestamp after the rowkey on store (rowkey, timestamp, ...)");
+ validOptions_.addOption("includeTombstone", false, "Record will
include a tombstone marker on store after the rowKey and timestamp (if
included) (rowkey, [timestamp,] tombstone, ...)");
}
/**
@@ -256,7 +259,8 @@ public class HBaseStorage extends LoadFu
* <li>-minTimestamp= Scan's timestamp for min timeRange
* <li>-maxTimestamp= Scan's timestamp for max timeRange
* <li>-timestamp= Scan's specified timestamp
- * <li>-includeTimestamp= Record will include the timestamp after the row
key on store
+ * <li>-includeTimestamp= Record will include the timestamp after the
rowkey on store (rowkey, timestamp, ...)
+ * <li>-includeTombstone= Record will include a tombstone marker on store
after the rowKey and timestamp (if included) (rowkey, [timestamp,] tombstone,
...)
* <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter)
Utf8StorageConverter is the default
* To be used with extreme caution, since this could result in data loss
* (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
@@ -271,7 +275,7 @@ public class HBaseStorage extends LoadFu
configuredOptions_ = parser_.parse(validOptions_, optsArr);
} catch (ParseException e) {
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte]
[-regex] [-columnPrefix] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit]
[-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]
[-includeTimestamp]", validOptions_ );
+ formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte]
[-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim]
[-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp]
[-includeTimestamp] [-includeTombstone]", validOptions_ );
throw e;
}
@@ -354,6 +358,14 @@ public class HBaseStorage extends LoadFu
}
}
+ includeTombstone_ = false;
+ if (configuredOptions_.hasOption("includeTombstone")) {
+ String value =
configuredOptions_.getOptionValue("includeTombstone");
+ if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value)
|| value == null ) {
+ includeTombstone_ = true;
+ }
+ }
+
initScan();
}
@@ -943,18 +955,16 @@ public class HBaseStorage extends LoadFu
byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) :
fieldSchemas[0].getType();
long ts;
- Put put = createPut(t.get(0), type);
-
int startIndex=1;
if (includeTimestamp_) {
- byte timestampType = (fieldSchemas == null) ?
DataType.findType(t.get(1)) : fieldSchemas[1].getType();
+ byte timestampType = (fieldSchemas == null) ?
DataType.findType(t.get(startIndex)) : fieldSchemas[startIndex].getType();
LoadStoreCaster caster = (LoadStoreCaster) caster_;
switch (timestampType) {
- case DataType.BYTEARRAY: ts =
caster.bytesToLong(((DataByteArray)t.get(1)).get()); break;
- case DataType.LONG: ts = ((Long)t.get(1)).longValue(); break;
- case DataType.DATETIME: ts = ((DateTime)t.get(1)).getMillis();
break;
- default: throw new IOException("Unable to find a converter for
tuple field " + t.get(1));
+ case DataType.BYTEARRAY: ts =
caster.bytesToLong(((DataByteArray)t.get(startIndex)).get()); break;
+ case DataType.LONG: ts = ((Long)t.get(startIndex)).longValue();
break;
+ case DataType.DATETIME: ts =
((DateTime)t.get(startIndex)).getMillis(); break;
+ default: throw new IOException("Unable to find a converter for
timestamp field " + t.get(startIndex));
}
startIndex++;
@@ -962,6 +972,25 @@ public class HBaseStorage extends LoadFu
ts = System.currentTimeMillis();
}
+ // check for deletes
+ if (includeTombstone_) {
+ if (((Boolean)t.get(startIndex)).booleanValue()) {
+ Delete delete = createDelete(t.get(0), type, ts);
+ try {
+ // this is a delete so there will be
+ // no put and we are done here
+ writer.write(null, delete);
+ return;
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ startIndex++;
+ }
+
+ Put put = createPut(t.get(0), type);
+
if (LOG.isDebugEnabled()) {
LOG.debug("putNext -- WAL disabled: " + noWAL_);
for (ColumnInfo columnInfo : columnInfo_) {
@@ -1007,6 +1036,25 @@ public class HBaseStorage extends LoadFu
}
/**
+ * Public method to initialize a Delete.
+ *
+ * @param key
+ * @param type
+ * @param timestamp
+ * @return new delete
+ * @throws IOException
+ */
+ public Delete createDelete(Object key, byte type, long timestamp) throws
IOException {
+ Delete delete = new Delete(objToBytes(key, type), timestamp);
+
+ if(noWAL_) {
+ delete.setWriteToWAL(false);
+ }
+
+ return delete;
+ }
+
+ /**
* Public method to initialize a Put. Used to allow assertions of how Puts
* are initialized by unit tests.
*
Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1649981&r1=1649980&r2=1649981&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Wed Jan 7
00:41:05 2015
@@ -1188,6 +1188,49 @@ public class TestHBaseStorage {
/**
* load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it
into
+ * 'TESTTABLE_1' deleting odd row keys
+ *
+ * @throws IOException
+ */
+ @Test
+ public void testStoreToHBase_1_with_delete() throws IOException {
+ prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+ scanTable1(pig, DataFormat.HBaseBinary);
+ pig.registerQuery("b = FOREACH a GENERATE rowKey,
(boolean)(((int)rowKey) % 2), col_a, col_b, col_c;");
+ pig.store("b", "hbase://" + TESTTABLE_1,
+ "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+ + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+ + TESTCOLUMN_C + "','-caster HBaseBinaryConverter
-includeTombstone true')");
+
+ HTable table = new HTable(conf, TESTTABLE_1);
+ ResultScanner scanner = table.getScanner(new Scan());
+ Iterator<Result> iter = scanner.iterator();
+ int count = 0;
+ for (int i = 0; iter.hasNext(); i = i + 2) {
+ Result result = iter.next();
+ String v = String.valueOf(i);
+ String rowKey = Bytes.toString(result.getRow());
+ int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+ double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+ String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+ long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+ long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+ long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+
+ Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+ Assert.assertEquals(i, col_a);
+ Assert.assertEquals(i + 0.0, col_b, 1e-6);
+ Assert.assertEquals("Text_" + i, col_c);
+
+ count++;
+ }
+ Assert.assertEquals(50, count);
+ table.close();
+ }
+
+ /**
+ * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it
into
* 'TESTTABLE_2' using UTF-8 Plain Text format
*
* @throws IOException
@@ -1235,6 +1278,7 @@ public class TestHBaseStorage {
Object key = "somekey";
byte type = DataType.CHARARRAY;
Assert.assertFalse(hbaseStorage.createPut(key, type).getWriteToWAL());
+ Assert.assertFalse(hbaseStorage.createDelete(key, type,
System.currentTimeMillis()).getWriteToWAL());
}
/**
@@ -1249,6 +1293,7 @@ public class TestHBaseStorage {
Object key = "somekey";
byte type = DataType.CHARARRAY;
Assert.assertTrue(hbaseStorage.createPut(key, type).getWriteToWAL());
+ Assert.assertTrue(hbaseStorage.createDelete(key, type,
System.currentTimeMillis()).getWriteToWAL());
}
/**