Author: rohini
Date: Mon Sep 21 23:04:34 2015
New Revision: 1704439

URL: http://svn.apache.org/viewvc?rev=1704439&view=rev
Log:
PIG-4663: HBaseStorage should allow the MaxResultsPerColumnFamily limit to 
avoid memory or scan timeout issues (pmazak via rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
    pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1704439&r1=1704438&r2=1704439&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Sep 21 23:04:34 2015
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4663: HBaseStorage should allow the MaxResultsPerColumnFamily limit to 
avoid memory or scan timeout issues (pmazak via rohini)
+
 PIG-4673: Built In UDF - REPLACE_MULTI : For a given string, search and 
replace all occurrences
  of search keys with replacement values ([email protected] via daijy)
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1704439&r1=1704438&r2=1704439&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java 
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Mon Sep 
21 23:04:34 2015
@@ -204,6 +204,7 @@ public class HBaseStorage extends LoadFu
         validOptions_.addOption("cacheBlocks", true, "Set whether blocks 
should be cached for the scan");
         validOptions_.addOption("caching", true, "Number of rows scanners 
should cache");
         validOptions_.addOption("limit", true, "Per-region limit");
+        validOptions_.addOption("maxResultsPerColumnFamily", true, "Limit the 
maximum number of values returned per row per column family");
         validOptions_.addOption("delim", true, "Column delimiter");
         validOptions_.addOption("ignoreWhitespace", true, "Ignore spaces when 
parsing columns");
         validOptions_.addOption("caster", true, "Caster to use for converting 
values. A class name, " +
@@ -250,6 +251,7 @@ public class HBaseStorage extends LoadFu
      * <li>-lte=maxKeyVal
      * <li>-regex=match regex on KeyVal
      * <li>-limit=numRowsPerRegion max number of rows to retrieve per region
+     * <li>-maxResultsPerColumnFamily= Limit the maximum number of values 
returned per row per column family
      * <li>-delim=char delimiter to use when parsing column names (default is 
space or comma)
      * <li>-ignoreWhitespace=(true|false) ignore spaces when parsing column 
names (default true)
      * <li>-cacheBlocks=(true|false) Set whether blocks should be cached for 
the scan (default false).
@@ -274,7 +276,7 @@ public class HBaseStorage extends LoadFu
             configuredOptions_ = parser_.parse(validOptions_, optsArr);
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
-            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] [-delim] 
[-ignoreWhitespace] [-minTimestamp] [-maxTimestamp] [-timestamp] 
[-includeTimestamp] [-includeTombstone]", validOptions_ );
+            formatter.printHelp( "[-loadKey] [-gt] [-gte] [-lt] [-lte] 
[-regex] [-cacheBlocks] [-caching] [-caster] [-noWAL] [-limit] 
[-maxResultsPerColumnFamily] [-delim] [-ignoreWhitespace] [-minTimestamp] 
[-maxTimestamp] [-timestamp] [-includeTimestamp] [-includeTombstone]", 
validOptions_ );
             throw e;
         }
 
@@ -468,6 +470,10 @@ public class HBaseStorage extends LoadFu
         if (configuredOptions_.hasOption("timestamp")){
             scan.setTimeStamp(timestamp_);
         }
+        if (configuredOptions_.hasOption("maxResultsPerColumnFamily")){
+            int maxResultsPerColumnFamily_ = 
Integer.valueOf(configuredOptions_.getOptionValue("maxResultsPerColumnFamily"));
+            scan.setMaxResultsPerColumnFamily(maxResultsPerColumnFamily_);
+        }
 
         // if the group of columnInfos for this family doesn't contain a 
prefix, we don't need
         // to set any filters, we can just call addColumn or addFamily. See 
javadocs below.

Modified: pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java?rev=1704439&r1=1704438&r2=1704439&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestHBaseStorageParams.java Mon Sep 21 
23:04:34 2015
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
 import org.apache.pig.impl.util.UDFContext;
 import org.junit.Assert;
@@ -26,6 +27,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Properties;
 
 public class TestHBaseStorageParams {
@@ -77,6 +79,23 @@ public class TestHBaseStorageParams {
       doColumnParseTest(storage, "foo:a", "foo:b ", " foo:c,d");
     }
 
+    /**
+     * Assert that -maxResultsPerColumnFamily actually gets set on Scan
+     */
+    @Test
+    public void testSetsMaxResultsPerColumnFamily() throws Exception {
+        Field scanField = HBaseStorage.class.getDeclaredField("scan");
+        scanField.setAccessible(true);
+
+        HBaseStorage storageNoMax = new HBaseStorage("", "");
+        Scan scan = (Scan)scanField.get(storageNoMax);
+        Assert.assertEquals(-1, scan.getMaxResultsPerColumnFamily());
+
+        HBaseStorage storageWithMax = new HBaseStorage("", 
"-maxResultsPerColumnFamily 123");
+        scan = (Scan)scanField.get(storageWithMax);
+        Assert.assertEquals(123, scan.getMaxResultsPerColumnFamily());
+    }
+
     private void doColumnParseTest(HBaseStorage storage, String... names) {
       Assert.assertEquals("Wrong column count",
         names.length, storage.getColumnInfoList().size());


Reply via email to