Author: rohini
Date: Tue Jan 12 06:52:01 2016
New Revision: 1724173
URL: http://svn.apache.org/viewvc?rev=1724173&view=rev
Log:
PIG-4770: OOM with POPartialAgg in some cases (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jan 12 06:52:01 2016
@@ -81,6 +81,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4770: OOM with POPartialAgg in some cases (rohini)
+
PIG-4773: [Pig on Tez] Secondary key descending sort in nested foreach after
union does ascending instead (rohini)
PIG-4774: Fix NPE in SUM,AVG,MIN,MAX UDFs for null bag input (rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
Tue Jan 12 06:52:01 2016
@@ -68,14 +68,23 @@ public class POPartialAgg extends Physic
// entry in hash map and average seen reduction
private static final int NUM_RECS_TO_SAMPLE = 10000;
+ // We want to allow bigger list sizes for group all.
+ // But still have a cap on it to avoid JVM finding it hard to allocate
space
+ // TODO: How high can we go without performance degradation??
+ private static final int MAX_LIST_SIZE = 25000;
+
// We want to avoid massive ArrayList copies as they get big.
// Array Lists grow by prevSize + prevSize/2. Given default initial size
of 10,
// 9369 is the size of the array after 18 such resizings. This seems like
a sufficiently
// large value to trigger spilling/aggregation instead of paying for yet
another data
// copy.
- private static final int MAX_LIST_SIZE = 9368;
+ // For group all cases, we will set this to a higher value
+ private int listSizeThreshold = 9367;
- private static final int DEFAULT_MIN_REDUCTION = 10;
+ // Using default min reduction 7 instead of 10 as processedInputMap size
+ // will be 4096 (hashmap size is power of 2) for both 20000/10 and 20000/7.
+ // So using the lower number 7 as even 7x reduction is worth using map
side aggregation
+ private static final int DEFAULT_MIN_REDUCTION = 7;
// TODO: these are temporary. The real thing should be using memory usage
estimation.
private static final int FIRST_TIER_THRESHOLD = 20000;
@@ -89,6 +98,7 @@ public class POPartialAgg extends Physic
private ExpressionOperator keyLeaf;
private List<PhysicalPlan> valuePlans;
private List<ExpressionOperator> valueLeaves;
+ private boolean isGroupAll;
private transient int numRecsInRawMap;
private transient int numRecsInProcessedMap;
@@ -123,17 +133,19 @@ public class POPartialAgg extends Physic
private transient int avgTupleSize;
private transient Iterator<Entry<Object, List<Tuple>>> spillingIterator;
-
public POPartialAgg(OperatorKey k) {
+ this(k, false);
+ }
+
+ public POPartialAgg(OperatorKey k, boolean isGroupAll) {
super(k);
+ this.isGroupAll = isGroupAll;
}
private void init() throws ExecException {
ALL_POPARTS.put(this, null);
numRecsInRawMap = 0;
numRecsInProcessedMap = 0;
- rawInputMap = Maps.newHashMap();
- processedInputMap = Maps.newHashMap();
minOutputReduction = DEFAULT_MIN_REDUCTION;
numRecordsToSample = NUM_RECS_TO_SAMPLE;
firstTierThreshold = FIRST_TIER_THRESHOLD;
@@ -158,11 +170,24 @@ public class POPartialAgg extends Physic
}
if (percentUsage <= 0) {
LOG.info("No memory allocated to intermediate memory buffers.
Turning off partial aggregation.");
- disableMapAgg();
+ disableMapAgg = true;
// Set them to true instead of adding another check for
!disableMapAgg
sizeReductionChecked = true;
estimatedMemThresholds = true;
}
+ // Avoid hashmap resizing. TODO: Investigate loadfactor of 0.90 or 1.0
+ // newHashMapWithExpectedSize does new HashMap(expectedSize +
expectedSize/3)
+ // to factor in default load factor of 0.75.
+ // For Hashmap, internally its size is always in power of 2.
+ // So for NUM_RECS_TO_SAMPLE=10000, hashmap size will be 16384
+ // With secondTierThreshold of 2857 (minReduction 7), hashmap size
will be 4096
+ if (!disableMapAgg) {
+ rawInputMap = Maps.newHashMapWithExpectedSize(NUM_RECS_TO_SAMPLE);
+ processedInputMap =
Maps.newHashMapWithExpectedSize(SECOND_TIER_THRESHOLD);
+ }
+ if (isGroupAll) {
+ listSizeThreshold = Math.min(numRecordsToSample, MAX_LIST_SIZE);
+ }
initialized = true;
SpillableMemoryManager.getInstance().registerSpillable(this);
}
@@ -224,6 +249,11 @@ public class POPartialAgg extends Physic
if (mapAggDisabled()) {
// disableMapAgg() sets doSpill, so we can't get here while
there is still contents in the buffered maps.
// if we get to this point, everything is flushed, so we can
simply return the raw tuples from now on.
+ if (rawInputMap != null) {
+ // Free up the maps for garbage collection
+ rawInputMap = null;
+ processedInputMap = null;
+ }
return processInput();
} else {
Result inp = processInput();
@@ -294,6 +324,9 @@ public class POPartialAgg extends Physic
secondTierThreshold += 1;
firstTierThreshold -= 1;
}
+ if (isGroupAll) {
+ listSizeThreshold = Math.min(firstTierThreshold,
MAX_LIST_SIZE);
+ }
}
estimatedMemThresholds = true;
}
@@ -344,17 +377,26 @@ public class POPartialAgg extends Physic
Object key, Tuple inpTuple) throws ExecException {
List<Tuple> value = map.get(key);
if (value == null) {
- value = new ArrayList<Tuple>();
+ if (isGroupAll) {
+ // Set exact array initial size to avoid array copies
+ // listSizeThreshold = numRecordsToSample before estimating
memory
+ // thresholds and firstTierThreshold after memory estimation
+ int listSize = (map == rawInputMap) ? listSizeThreshold :
Math.min(secondTierThreshold, MAX_LIST_SIZE);
+ value = new ArrayList<Tuple>(listSize);
+ } else {
+ value = new ArrayList<Tuple>();
+ }
map.put(key, value);
}
value.add(inpTuple);
- if (value.size() >= MAX_LIST_SIZE) {
+ if (value.size() > listSizeThreshold) {
boolean isFirst = (map == rawInputMap);
if (LOG.isDebugEnabled()){
LOG.debug("The cache for key " + key + " has grown too large.
Aggregating " + ((isFirst) ? "first level." : "second level."));
}
if (isFirst) {
- aggregateRawRow(key);
+ // Aggregate and remove just this key to keep size in check
+ aggregateRawRow(key, value);
} else {
aggregateSecondLevel();
}
@@ -389,8 +431,8 @@ public class POPartialAgg extends Physic
}
}
- private void aggregateRawRow(Object key) throws ExecException {
- List<Tuple> value = rawInputMap.get(key);
+ private void aggregateRawRow(Object key, List<Tuple> value) throws
ExecException {
+ numRecsInRawMap -= value.size();
Tuple valueTuple = createValueTuple(key, value);
Result res = getOutput(key, valueTuple);
rawInputMap.remove(key);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
Tue Jan 12 06:52:01 2016
@@ -253,7 +253,7 @@ public class CombinerOptimizerUtil {
POLocalRearrange mlr = getNewRearrange(rearrange);
POPartialAgg mapAgg = null;
if (doMapAgg) {
- mapAgg = createPartialAgg(cfe);
+ mapAgg = createPartialAgg(cfe, isGroupAll(rearrange));
}
// A specialized local rearrange operator will replace
@@ -287,16 +287,31 @@ public class CombinerOptimizerUtil {
}
}
+ private static boolean isGroupAll(POLocalRearrange lr) {
+ // B: Local Rearrange[tuple]{chararray}(false) - scope-3 ->
scope-14
+ // | |
+ // | Constant(all) - scope-4
+ boolean isGroupAll = false;
+ if (lr.getPlans().size() == 1) {
+ PhysicalPlan plan = lr.getPlans().get(0);
+ if (plan.getKeys().size() == 1 && (plan.getRoots().get(0)
instanceof ConstantExpression)) {
+ ConstantExpression constExpr = (ConstantExpression)
plan.getRoots().get(0);
+ isGroupAll = ("all").equals(constExpr.getValue());
+ }
+ }
+ return isGroupAll;
+ }
+
/**
* Translate POForEach in combiner into a POPartialAgg
* @param combineFE
* @return partial aggregate operator
* @throws CloneNotSupportedException
*/
- private static POPartialAgg createPartialAgg(POForEach combineFE) throws
CloneNotSupportedException {
+ private static POPartialAgg createPartialAgg(POForEach combineFE, boolean
isGroupAll) throws CloneNotSupportedException {
String scope = combineFE.getOperatorKey().scope;
POPartialAgg poAgg = new POPartialAgg(new OperatorKey(scope,
- NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+ NodeIdGenerator.getGenerator().getNextNodeId(scope)),
isGroupAll);
poAgg.addOriginalLocation(combineFE.getAlias(),
combineFE.getOriginalLocations());
poAgg.setResultType(combineFE.getResultType());
Modified: pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java
(original)
+++ pig/trunk/src/org/apache/pig/impl/util/SpillableMemoryManager.java Tue Jan
12 06:52:01 2016
@@ -219,10 +219,8 @@ public class SpillableMemoryManager impl
Collections.sort(spillablesSR, new Comparator<SpillablePtr>() {
@Override
public int compare(SpillablePtr o1Ref, SpillablePtr o2Ref)
{
- Spillable o1 = o1Ref.get();
- Spillable o2 = o2Ref.get();
- long o1Size = o1.getMemorySize();
- long o2Size = o2.getMemorySize();
+ long o1Size = o1Ref.getMemorySize();
+ long o2Size = o2Ref.getMemorySize();
if (o1Size == o2Size) {
return 0;
Modified: pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java?rev=1724173&r1=1724172&r2=1724173&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPOPartialAgg.java Tue Jan 12
06:52:01 2016
@@ -77,8 +77,12 @@ public class TestPOPartialAgg {
}
private void createPOPartialPlan(int valueCount) throws PlanException {
+ createPOPartialPlan(valueCount, false);
+ }
+
+ private void createPOPartialPlan(int valueCount, boolean isGroupAll)
throws PlanException {
parentPlan = new PhysicalPlan();
- partAggOp = GenPhyOp.topPOPartialAgg();
+ partAggOp = new POPartialAgg(GenPhyOp.getOK(), isGroupAll);
partAggOp.setParentPlan(parentPlan);
// setup key plan
@@ -357,6 +361,25 @@ public class TestPOPartialAgg {
assertEquals(new Long(1), spilled.get());
}
+ @Test
+ public void testGroupAll() throws Exception {
+ createPOPartialPlan(1, true);
+ Result res;
+ for (long i=1; i <= 10010; i ++) {
+ Tuple t = tuple("all", tuple(i));
+ partAggOp.attachInput(t);
+ res = partAggOp.getNextTuple();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ }
+ // end of all input, now expecting all tuples
+ parentPlan.endOfAllInput = true;
+ res = partAggOp.getNextTuple();
+ assertEquals(tuple("all", tuple(50105055L)), res.result);
+ assertEquals(POStatus.STATUS_OK, res.returnStatus);
+ res = partAggOp.getNextTuple();
+ assertEquals(POStatus.STATUS_EOP, res.returnStatus);
+ }
+
private static class Spill implements Callable<Long> {
private Spillable spillable;