Author: daijy
Date: Wed Jan  7 00:41:05 2015
New Revision: 1649981

URL: http://svn.apache.org/r1649981
Log:
PIG-4370: HBaseStorage should support delete markers

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1649981&r1=1649980&r2=1649981&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan  7 00:41:05 2015
@@ -24,6 +24,10 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4370: HBaseStorage should support delete markers (bridiver via daijy)
+
+PIG-4360: HBaseStorage should support setting the timestamp field (bridiver 
via daijy)
+
 PIG-4337: Split Types and MultiQuery e2e tests into multiple groups (rohini)
 
 PIG-4066: An optimization for ROLLUP operation in Pig (hxquangnhat via 
cheolsoo)
@@ -38,8 +42,6 @@ PIG-4367: Port local mode tests to Tez -
 
 PIG-4339: e2e test framework assumes default exectype as mapred (rohini)
 
-PIG-4360: HBaseStorage should support setting the timestamp field (bridiver 
via daijy)
-
 PIG-2949: JsonLoader only reads arrays of objects (eyal via daijy)
 
 PIG-4213: CSVExcelStorage not quoting texts containing \r (CR) when storing 
(alfonso.nishikawa via daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1649981&r1=1649980&r2=1649981&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Jan 
 7 00:41:05 2015
@@ -49,6 +49,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -179,6 +180,7 @@ public class HBaseStorage extends LoadFu
     private final long maxTimestamp_;
     private final long timestamp_;
     private boolean includeTimestamp_;
+    private boolean includeTombstone_;
 
     protected transient byte[] gt_;
     protected transient byte[] gte_;
@@ -212,7 +214,8 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("minTimestamp", true, "Record must have 
timestamp greater or equal to this value");
         validOptions_.addOption("maxTimestamp", true, "Record must have 
timestamp less then this value");
         validOptions_.addOption("timestamp", true, "Record must have timestamp 
equal to this value");
-        validOptions_.addOption("includeTimestamp", false, "Record will 
include the timestamp after the rowkey on store");
+        validOptions_.addOption("includeTimestamp", false, "Record will 
include the timestamp after the rowkey on store (rowkey, timestamp, ...)");
+        validOptions_.addOption("includeTombstone", false, "Record will 
include a tombstone marker on store after the rowKey and timestamp (if 
included) (rowkey, [timestamp,] tombstone, ...)");
     }
 
     /**
@@ -256,7 +259,8 @@ public class HBaseStorage extends LoadFu
      * <li>-minTimestamp= Scan's timestamp for min timeRange
      * <li>-maxTimestamp= Scan's timestamp for max timeRange
      * <li>-timestamp= Scan's specified timestamp
-     * <li>-includeTimestamp= Record will include the timestamp after the row 
key on store
+     * <li>-includeTimestamp= Record will include the timestamp after the 
rowkey on store (rowkey, timestamp, ...)
+     * <li>-includeTombstone= Record will include a tombstone marker on store 
after the rowKey and timestamp (if included) (rowkey, [timestamp,] tombstone, 
...)
      * <li>-caster=(HBaseBinaryConverter|Utf8StorageConverter) 
Utf8StorageConverter is the default
      * To be used with extreme caution, since this could result in data loss
      * (see http://hbase.apache.org/book.html#perf.hbase.client.putwal).
@@ -271,7 +275,7 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-columnPrefix] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] 
[-delim] [-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] 
[-includeTimestamp]", validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim] 
[-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] 
[-includeTimestamp] [-includeTombstone]", validOptions_ );
             throw e;
         }
 
@@ -354,6 +358,14 @@ public class HBaseStorage extends LoadFu
             }
         }
 
+        includeTombstone_ = false;
+        if (configuredOptions_.hasOption("includeTombstone")) {
+            String value = 
configuredOptions_.getOptionValue("includeTombstone");
+            if ("true".equalsIgnoreCase(value) || "".equalsIgnoreCase(value) 
|| value == null ) {
+                includeTombstone_ = true;
+            }
+        }
+
         initScan();
     }
 
@@ -943,18 +955,16 @@ public class HBaseStorage extends LoadFu
         byte type = (fieldSchemas == null) ? DataType.findType(t.get(0)) : 
fieldSchemas[0].getType();
         long ts;
 
-        Put put = createPut(t.get(0), type);
-
         int startIndex=1;
         if (includeTimestamp_) {
-            byte timestampType = (fieldSchemas == null) ? 
DataType.findType(t.get(1)) : fieldSchemas[1].getType();
+            byte timestampType = (fieldSchemas == null) ? 
DataType.findType(t.get(startIndex)) : fieldSchemas[startIndex].getType();
             LoadStoreCaster caster = (LoadStoreCaster) caster_;
 
             switch (timestampType) {
-            case DataType.BYTEARRAY: ts = 
caster.bytesToLong(((DataByteArray)t.get(1)).get()); break;
-            case DataType.LONG: ts = ((Long)t.get(1)).longValue(); break;
-            case DataType.DATETIME: ts = ((DateTime)t.get(1)).getMillis(); 
break;
-            default: throw new IOException("Unable to find a converter for 
tuple field " + t.get(1));
+            case DataType.BYTEARRAY: ts = 
caster.bytesToLong(((DataByteArray)t.get(startIndex)).get()); break;
+            case DataType.LONG: ts = ((Long)t.get(startIndex)).longValue(); 
break;
+            case DataType.DATETIME: ts = 
((DateTime)t.get(startIndex)).getMillis(); break;
+            default: throw new IOException("Unable to find a converter for 
timestamp field " + t.get(startIndex));
             }
 
             startIndex++;
@@ -962,6 +972,25 @@ public class HBaseStorage extends LoadFu
             ts = System.currentTimeMillis();
         }
 
+        // check for deletes
+        if (includeTombstone_) {
+            if (((Boolean)t.get(startIndex)).booleanValue()) {
+                Delete delete = createDelete(t.get(0), type, ts);
+                try {
+                    // this is a delete so there will be
+                    // no put and we are done here
+                    writer.write(null, delete);
+                    return;
+                } catch (InterruptedException e) {
+                    throw new IOException(e);
+                }
+            }
+
+            startIndex++;
+        }
+
+        Put put = createPut(t.get(0), type);
+
         if (LOG.isDebugEnabled()) {
             LOG.debug("putNext -- WAL disabled: " + noWAL_);
             for (ColumnInfo columnInfo : columnInfo_) {
@@ -1007,6 +1036,25 @@ public class HBaseStorage extends LoadFu
     }
 
     /**
+     * Public method to initialize a Delete.
+     *
+     * @param key
+     * @param type
+     * @param timestamp
+     * @return new delete
+     * @throws IOException
+     */
+    public Delete createDelete(Object key, byte type, long timestamp) throws 
IOException {
+        Delete delete = new Delete(objToBytes(key, type), timestamp);
+
+        if(noWAL_) {
+            delete.setWriteToWAL(false);
+        }
+
+        return delete;
+    }
+
+    /**
      * Public method to initialize a Put. Used to allow assertions of how Puts
      * are initialized by unit tests.
      *

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java?rev=1649981&r1=1649980&r2=1649981&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorage.java Wed Jan  7 
00:41:05 2015
@@ -1188,6 +1188,49 @@ public class TestHBaseStorage {
 
     /**
      * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
+     * 'TESTTABLE_1' deleting odd row keys
+     *
+     * @throws IOException
+     */
+    @Test
+    public void testStoreToHBase_1_with_delete() throws IOException {
+        prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
+        scanTable1(pig, DataFormat.HBaseBinary);
+        pig.registerQuery("b = FOREACH a GENERATE rowKey, 
(boolean)(((int)rowKey) % 2), col_a, col_b, col_c;");
+        pig.store("b",  "hbase://" + TESTTABLE_1,
+                "org.apache.pig.backend.hadoop.hbase.HBaseStorage('"
+                + TESTCOLUMN_A + " " + TESTCOLUMN_B + " "
+                + TESTCOLUMN_C + "','-caster HBaseBinaryConverter 
-includeTombstone true')");
+
+        HTable table = new HTable(conf, TESTTABLE_1);
+        ResultScanner scanner = table.getScanner(new Scan());
+        Iterator<Result> iter = scanner.iterator();
+        int count = 0;
+        for (int i = 0; iter.hasNext(); i = i + 2) {
+            Result result = iter.next();
+            String v = String.valueOf(i);
+            String rowKey = Bytes.toString(result.getRow());
+            int col_a = Bytes.toInt(getColValue(result, TESTCOLUMN_A));
+            double col_b = Bytes.toDouble(getColValue(result, TESTCOLUMN_B));
+            String col_c = Bytes.toString(getColValue(result, TESTCOLUMN_C));
+
+            long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
+            long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
+            long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
+            
+            Assert.assertEquals("00".substring(v.length()) + v, rowKey);
+            Assert.assertEquals(i, col_a);
+            Assert.assertEquals(i + 0.0, col_b, 1e-6);
+            Assert.assertEquals("Text_" + i, col_c);
+
+            count++;
+        }
+        Assert.assertEquals(50, count);
+        table.close();
+    }
+
+    /**
+     * load from hbase 'TESTTABLE_1' using HBaseBinary format, and store it 
into
      * 'TESTTABLE_2' using UTF-8 Plain Text format
      *
      * @throws IOException
@@ -1235,6 +1278,7 @@ public class TestHBaseStorage {
         Object key = "somekey";
         byte type = DataType.CHARARRAY;
         Assert.assertFalse(hbaseStorage.createPut(key, type).getWriteToWAL());
+        Assert.assertFalse(hbaseStorage.createDelete(key, type, 
System.currentTimeMillis()).getWriteToWAL());
     }
 
     /**
@@ -1249,6 +1293,7 @@ public class TestHBaseStorage {
         Object key = "somekey";
         byte type = DataType.CHARARRAY;
         Assert.assertTrue(hbaseStorage.createPut(key, type).getWriteToWAL());
+        Assert.assertTrue(hbaseStorage.createDelete(key, type, 
System.currentTimeMillis()).getWriteToWAL());
     }
 
     /**


Reply via email to