Author: rohini
Date: Tue Jan 12 06:52:01 2016
New Revision: 1724173

URL: http://svn.apache.org/viewvc?rev=1724173&view=rev
Log:
PIG-4770: OOM with POPartialAgg in some cases (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
    pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
    pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 12 06:52:01 2016
@@ -81,6 +81,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4770: OOM with POPartialAgg in some cases (rohini)
+
 PIG-4773: [Pig on Tez] Secondary key descending sort in nested foreach after 
union does ascending instead (rohini)
 
 PIG-4774: Fix NPE in SUM,AVG,MIN,MAX UDFs for null bag input (rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
 Tue Jan 12 06:52:01 2016
@@ -68,14 +68,23 @@ public class POPartialAgg extends Physic
     // entry in hash map and average seen reduction
     private static final int NUM_RECS_TO_SAMPLE = 10000;
 
+    // We want to allow bigger list sizes for group all.
+    // But still have a cap on it to avoid JVM finding it hard to allocate 
space
+    // TODO: How high can we go without performance degradation??
+    private static final int MAX_LIST_SIZE = 25000;
+
     // We want to avoid massive ArrayList copies as they get big.
     // Array Lists grow by prevSize + prevSize/2. Given default initial size 
of 10,
     // 9369 is the size of the array after 18 such resizings. This seems like 
a sufficiently
     // large value to trigger spilling/aggregation instead of paying for yet 
another data
     // copy.
-    private static final int MAX_LIST_SIZE = 9368;
+    // For group all cases, we will set this to a higher value
+    private int listSizeThreshold = 9367;
 
-    private static final int DEFAULT_MIN_REDUCTION = 10;
+    // Using default min reduction 7 instead of 10 as processedInputMap size
+    // will be 4096 (hashmap size is power of 2) for both 20000/10 and 20000/7.
+    // So using the lower number 7 as even 7x reduction is worth using map 
side aggregation
+    private static final int DEFAULT_MIN_REDUCTION = 7;
 
     // TODO: these are temporary. The real thing should be using memory usage 
estimation.
     private static final int FIRST_TIER_THRESHOLD = 20000;
@@ -89,6 +98,7 @@ public class POPartialAgg extends Physic
     private ExpressionOperator keyLeaf;
     private List<PhysicalPlan> valuePlans;
     private List<ExpressionOperator> valueLeaves;
+    private boolean isGroupAll;
 
     private transient int numRecsInRawMap;
     private transient int numRecsInProcessedMap;
@@ -123,17 +133,19 @@ public class POPartialAgg extends Physic
     private transient int avgTupleSize;
     private transient Iterator<Entry<Object, List<Tuple>>> spillingIterator;
 
-
     public POPartialAgg(OperatorKey k) {
+        this(k, false);
+    }
+
+    public POPartialAgg(OperatorKey k, boolean isGroupAll) {
         super(k);
+        this.isGroupAll = isGroupAll;
     }
 
     private void init() throws ExecException {
         ALL_POPARTS.put(this, null);
         numRecsInRawMap = 0;
         numRecsInProcessedMap = 0;
-        rawInputMap = Maps.newHashMap();
-        processedInputMap = Maps.newHashMap();
         minOutputReduction = DEFAULT_MIN_REDUCTION;
         numRecordsToSample = NUM_RECS_TO_SAMPLE;
         firstTierThreshold = FIRST_TIER_THRESHOLD;
@@ -158,11 +170,24 @@ public class POPartialAgg extends Physic
         }
         if (percentUsage <= 0) {
             LOG.info("No memory allocated to intermediate memory buffers. 
Turning off partial aggregation.");
-            disableMapAgg();
+            disableMapAgg = true;
             // Set them to true instead of adding another check for 
!disableMapAgg
             sizeReductionChecked = true;
             estimatedMemThresholds = true;
         }
+        // Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0
+        // newHashMapWithExpectedSize does new HashMap(expectedSize + 
expectedSize/3)
+        // to factor in default load factor of 0.75.
+        // For Hashmap, internally its size is always in power of 2.
+        // So for NUM_RECS_TO_SAMPLE=10000, hashmap size will be 16384
+        // With secondTierThreshold of 2857 (minReduction 7), hashmap size 
will be 4096
+        if (!disableMapAgg) {
+            rawInputMap = Maps.newHashMapWithExpectedSize(NUM_RECS_TO_SAMPLE);
+            processedInputMap = 
Maps.newHashMapWithExpectedSize(SECOND_TIER_THRESHOLD);
+        }
+        if (isGroupAll) {
+            listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE);
+        }
         initialized = true;
         SpillableMemoryManager.getInstance().registerSpillable(this);
     }
@@ -224,6 +249,11 @@ public class POPartialAgg extends Physic
             if (mapAggDisabled()) {
                 // disableMapAgg() sets doSpill, so we can't get here while 
there is still contents in the buffered maps.
                 // if we get to this point, everything is flushed, so we can 
simply return the raw tuples from now on.
+                if (rawInputMap != null) {
+                    // Free up the maps for garbage collection
+                    rawInputMap = null;
+                    processedInputMap = null;
+                }
                 return processInput();
             } else {
                 Result inp = processInput();
@@ -294,6 +324,9 @@ public class POPartialAgg extends Physic
                 secondTierThreshold += 1;
                 firstTierThreshold -= 1;
             }
+            if (isGroupAll) {
+                listSizeThreshold = Math.min(firstTierThreshold, 
MAX_LIST_SIZE);
+            }
         }
         estimatedMemThresholds = true;
     }
@@ -344,17 +377,26 @@ public class POPartialAgg extends Physic
             Object key, Tuple inpTuple) throws ExecException {
         List<Tuple> value = map.get(key);
         if (value == null) {
-            value = new ArrayList<Tuple>();
+            if (isGroupAll) {
+                // Set exact array initial size to avoid array copies
+                // listSizeThreshold = numRecordsToSample before estimating 
memory
+                // thresholds and firstTierThreshold after memory estimation
+                int listSize = (map == rawInputMap) ? listSizeThreshold : 
Math.min(secondTierThreshold, MAX_LIST_SIZE);
+                value = new ArrayList<Tuple>(listSize);
+            } else {
+                value = new ArrayList<Tuple>();
+            }
             map.put(key, value);
         }
         value.add(inpTuple);
-        if (value.size() >= MAX_LIST_SIZE) {
+        if (value.size() > listSizeThreshold) {
             boolean isFirst = (map == rawInputMap);
             if (LOG.isDebugEnabled()){
                 LOG.debug("The cache for key " + key + " has grown too large. 
Aggregating " + ((isFirst) ? "first level." : "second level."));
             }
             if (isFirst) {
-                aggregateRawRow(key);
+                // Aggregate and remove just this key to keep size in check
+                aggregateRawRow(key, value);
             } else {
                 aggregateSecondLevel();
             }
@@ -389,8 +431,8 @@ public class POPartialAgg extends Physic
         }
     }
 
-    private void aggregateRawRow(Object key) throws ExecException {
-        List<Tuple> value = rawInputMap.get(key);
+    private void aggregateRawRow(Object key, List<Tuple> value) throws 
ExecException {
+        numRecsInRawMap -= value.size();
         Tuple valueTuple = createValueTuple(key, value);
         Result res = getOutput(key, valueTuple);
         rawInputMap.remove(key);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
 Tue Jan 12 06:52:01 2016
@@ -253,7 +253,7 @@ public class CombinerOptimizerUtil {
                 POLocalRearrange mlr = getNewRearrange(rearrange);
                 POPartialAgg mapAgg = null;
                 if (doMapAgg) {
-                    mapAgg = createPartialAgg(cfe);
+                    mapAgg = createPartialAgg(cfe, isGroupAll(rearrange));
                 }
 
                 // A specialized local rearrange operator will replace
@@ -287,16 +287,31 @@ public class CombinerOptimizerUtil {
         }
     }
 
+    private static boolean isGroupAll(POLocalRearrange lr) {
+        // B: Local Rearrange[tuple]{chararray}(false) - scope-3    ->   
scope-14
+        // |   |
+        // |   Constant(all) - scope-4
+        boolean isGroupAll = false;
+        if (lr.getPlans().size() == 1) {
+            PhysicalPlan plan = lr.getPlans().get(0);
+            if (plan.getKeys().size() == 1 && (plan.getRoots().get(0) 
instanceof ConstantExpression)) {
+                ConstantExpression constExpr = (ConstantExpression) 
plan.getRoots().get(0);
+                isGroupAll = ("all").equals(constExpr.getValue());
+            }
+        }
+        return isGroupAll;
+    }
+
     /**
      * Translate POForEach in combiner into a POPartialAgg
      * @param combineFE
      * @return partial aggregate operator
      * @throws CloneNotSupportedException
      */
-    private static POPartialAgg createPartialAgg(POForEach combineFE) throws 
CloneNotSupportedException {
+    private static POPartialAgg createPartialAgg(POForEach combineFE, boolean 
isGroupAll) throws CloneNotSupportedException {
         String scope = combineFE.getOperatorKey().scope;
         POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope,
-                NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+                NodeIdGenerator.getGenerator().getNextNodeId(scope)), 
isGroupAll);
         poAgg.addOriginalLocation(combineFE.getAlias(), 
combineFE.getOriginalLocations());
         poAgg.setResultType(combineFE.getResultType());
 

Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java 
(original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Tue Jan 
12 06:52:01 2016
@@ -219,10 +219,8 @@ public class SpillableMemoryManager impl
                 Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
                     @Override
                     public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref) 
{
-                        Spillable o1 = o1Ref.get();
-                        Spillable o2 = o2Ref.get();
-                        long o1Size = o1.getMemorySize();
-                        long o2Size = o2.getMemorySize();
+                        long o1Size = o1Ref.getMemorySize();
+                        long o2Size = o2Ref.getMemorySize();
 
                         if (o1Size == o2Size) {
                             return 0;

Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java Tue Jan 12 
06:52:01 2016
@@ -77,8 +77,12 @@ public class TestPOPartialAgg {
     }
 
     private void createPOPartialPlan(int valueCount) throws PlanException {
+        createPOPartialPlan(valueCount, false);
+    }
+
+    private void createPOPartialPlan(int valueCount, boolean isGroupAll) 
throws PlanException {
         parentPlan = new PhysicalPlan();
-        partAggOp = GenPhyOp.topPOPartialAgg();
+        partAggOp = new POPartialAgg(GenPhyOp.getOK(), isGroupAll);
         partAggOp.setParentPlan(parentPlan);
 
         // setup key plan
@@ -357,6 +361,25 @@ public class TestPOPartialAgg {
         assertEquals(new Long(1), spilled.get());
     }
 
+    @Test
+    public void testGroupAll() throws Exception {
+        createPOPartialPlan(1, true);
+        Result res;
+        for (long i=1; i <= 10010; i ++) {
+            Tuple t = tuple("all", tuple(i));
+            partAggOp.attachInput(t);
+            res = partAggOp.getNextTuple();
+            assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+        }
+        // end of all input, now expecting all tuples
+        parentPlan.endOfAllInput = true;
+        res = partAggOp.getNextTuple();
+        assertEquals(tuple("all", tuple(50105055L)), res.result);
+        assertEquals(POStatus.STATUS_OK, res.returnStatus);
+        res = partAggOp.getNextTuple();
+        assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+    }
+
     private static class Spill implements Callable<Long> {
 
         private Spillable spillable;


Reply via email to