Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Thu Nov 27 12:49:54 2014 @@ -108,20 +108,22 @@ public class POLimit extends PhysicalOpe default: throw new RuntimeException("Limit requires an integer parameter"); } - if (variableLimit <= 0) - throw new RuntimeException("Limit requires a positive integer parameter"); + if (variableLimit < 0) + throw new RuntimeException("Limit requires a zero or a positive integer parameter"); this.setLimit(variableLimit); } Result inp = null; while (true) { + // illustrator ignore LIMIT before the post processing + if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar >= mLimit) { + inp = RESULT_EOP; + break; + } inp = processInput(); if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR) break; illustratorMarkup(inp.result, null, 0); - // illustrator ignore LIMIT before the post processing - if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit) - inp.returnStatus = POStatus.STATUS_EOP; soFar++; break;
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Thu Nov 27 12:49:54 2014 @@ -18,6 +18,7 @@ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; import java.io.IOException; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -65,6 +66,9 @@ public class POLoad extends PhysicalOper private boolean isTmpLoad; private long limit=-1; + + private transient List<String> cacheFiles = null; + private transient List<String> shipFiles = null; public POLoad(OperatorKey k) { this(k,-1, null); @@ -252,4 +256,20 @@ public class POLoad extends PhysicalOper public void setLimit(long limit) { this.limit = limit; } + + public List<String> getCacheFiles() { + return cacheFiles; + } + + public void setCacheFiles(List<String> cf) { + cacheFiles = cf; + } + + public List<String> getShipFiles() { + return shipFiles; + } + + public void setShipFiles(List<String> sf) { + shipFiles = sf; + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 27 12:49:54 2014 @@ -22,11 +22,13 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil; import org.apache.pig.data.AccumulativeBag; import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; @@ -75,16 +77,16 @@ public class POPackage extends PhysicalO protected static final BagFactory mBagFactory = BagFactory.getInstance(); protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); - private boolean firstTime = true; - - private boolean useDefaultBag = false; - private boolean lastBagReadOnly = true; protected Packager pkgr; protected PigNullableWritable keyWritable; + private transient boolean initialized; + private transient boolean useDefaultBag; + private transient int accumulativeBatchSize; + public POPackage(OperatorKey k) { this(k, -1, null); } @@ -189,15 +191,17 @@ public class POPackage extends PhysicalO */ @Override public Result getNextTuple() throws ExecException { - if(firstTime){ - firstTime = false; + if (!initialized) { + initialized = true; if (PigMapReduce.sJobConfInternal.get() != null) { String bagType = PigMapReduce.sJobConfInternal.get().get( - "pig.cachedbag.type"); + PigConfiguration.PIG_CACHEDBAG_TYPE); if (bagType != null && bagType.equalsIgnoreCase("default")) { useDefaultBag = true; } } + accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize(); + // If multiquery, the last bag is InternalCachedBag and should not // set ReadOnly flag, otherwise we will materialize again to another // InternalCachedBag @@ -220,9 +224,7 @@ public class POPackage extends PhysicalO // create bag wrapper to pull tuples in many batches // all bags have reference to the sample tuples buffer // which contains tuples from one batch - POPackageTupleBuffer buffer = new POPackageTupleBuffer(); - buffer.setKey(key); - buffer.setIterator(tupIter); + POPackageTupleBuffer buffer = new POPackageTupleBuffer(accumulativeBatchSize, key, tupIter); for (int i = 0; i < numInputs; i++) { dbs[i] = new AccumulativeBag(buffer, i); } @@ -317,29 +319,16 @@ public class POPackage extends PhysicalO private Object currKey; @SuppressWarnings("unchecked") - public POPackageTupleBuffer() { - batchSize = 20000; - if (PigMapReduce.sJobConfInternal.get() != null) { - String size = PigMapReduce.sJobConfInternal.get().get("pig.accumulative.batchsize"); - if (size != null) { - batchSize = Integer.parseInt(size); - } - } - + public POPackageTupleBuffer(int batchSize, Object key, Iterator<NullableTuple> iter) { + this.batchSize = batchSize; + this.currKey = key; + this.iter = iter; this.bags = new List[numInputs]; for(int i=0; i<numInputs; i++) { - this.bags[i] = new ArrayList<Tuple>(); + this.bags[i] = new ArrayList<Tuple>(batchSize); } } - public void setKey(Object key) { - this.currKey = key; - } - - public void setIterator(Iterator<NullableTuple> iter) { - this.iter = iter; - } - @Override public boolean hasNextBatch() { return iter.hasNext(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java Thu Nov 27 12:49:54 2014 @@ -36,14 +36,15 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; +import org.apache.pig.data.InternalCachedBag; import org.apache.pig.data.SelfSpillBag.MemoryLimits; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; +import org.apache.pig.impl.util.GroupingSpillable; import org.apache.pig.impl.util.Spillable; import org.apache.pig.impl.util.SpillableMemoryManager; @@ -56,7 +57,7 @@ import com.google.common.collect.Maps; * map. Once that map fills up or all input has been seen, results are * piped out into the next operator (caller of getNext()). */ -public class POPartialAgg extends PhysicalOperator implements Spillable { +public class POPartialAgg extends PhysicalOperator implements Spillable, GroupingSpillable { private static final Log LOG = LogFactory.getLog(POPartialAgg.class); private static final long serialVersionUID = 1L; @@ -83,33 +84,44 @@ public class POPartialAgg extends Physic private static final WeakHashMap<POPartialAgg, Byte> ALL_POPARTS = new WeakHashMap<POPartialAgg, Byte>(); private static final TupleFactory TF = TupleFactory.getInstance(); - private static final BagFactory BG = BagFactory.getInstance(); private PhysicalPlan keyPlan; private ExpressionOperator keyLeaf; - private List<PhysicalPlan> valuePlans; private List<ExpressionOperator> valueLeaves; - private int numRecsInRawMap = 0; - private int numRecsInProcessedMap = 0; + private transient int numRecsInRawMap; + private transient int numRecsInProcessedMap; - private Map<Object, List<Tuple>> rawInputMap = Maps.newHashMap(); - private Map<Object, List<Tuple>> processedInputMap = Maps.newHashMap(); + private transient Map<Object, List<Tuple>> rawInputMap; + private transient Map<Object, List<Tuple>> processedInputMap; - private boolean disableMapAgg = false; - private boolean sizeReductionChecked = false; - private boolean inputsExhausted = false; - private volatile boolean doSpill = false; - private transient MemoryLimits memLimits; - - private transient boolean initialized = false; - private int firstTierThreshold = FIRST_TIER_THRESHOLD; - private int secondTierThreshold = SECOND_TIER_THRESHOLD; - private int sizeReduction = 1; - private int avgTupleSize = 0; - private Iterator<Entry<Object, List<Tuple>>> spillingIterator; - private boolean estimatedMemThresholds = false; + //Transient booleans always initialize to false + private transient boolean initialized; + private transient boolean disableMapAgg; + private transient boolean sizeReductionChecked; + private transient boolean inputsExhausted; + private transient boolean estimatedMemThresholds; + // The doSpill flag is set when spilling is running or needs to run. + // It is set by POPartialAgg when its buffers are full after having run aggregations and + // the records have to be emitted to the map output. + // The doContingentSpill flag is set when the SpillableMemoryManager is notified + // by GC that the runtime is low on memory and the SpillableMemoryManager identifies + // the particular buffer as a good spill candidate because it is large. The contingent spill logic tries + // to satisfy the memory manager's request for freeing memory by aggregating data + // rather than just spilling records to disk. + private transient volatile boolean doSpill; + private transient volatile boolean doContingentSpill; + private transient volatile Object spillLock; + + private transient int minOutputReduction; + private transient float percentUsage; + private transient int numRecordsToSample; + private transient int firstTierThreshold; + private transient int secondTierThreshold; + private transient int sizeReduction; + private transient int avgTupleSize; + private transient Iterator<Entry<Object, List<Tuple>>> spillingIterator; public POPartialAgg(OperatorKey k) { @@ -118,10 +130,38 @@ public class POPartialAgg extends Physic private void init() throws ExecException { ALL_POPARTS.put(this, null); - float percent = getPercentUsageFromProp(); - if (percent <= 0) { + numRecsInRawMap = 0; + numRecsInProcessedMap = 0; + rawInputMap = Maps.newHashMap(); + processedInputMap = Maps.newHashMap(); + minOutputReduction = DEFAULT_MIN_REDUCTION; + numRecordsToSample = NUM_RECS_TO_SAMPLE; + firstTierThreshold = FIRST_TIER_THRESHOLD; + secondTierThreshold = SECOND_TIER_THRESHOLD; + sizeReduction = 1; + avgTupleSize = 0; + percentUsage = 0.2F; + spillLock = new Object(); + if (PigMapReduce.sJobConfInternal.get() != null) { + String usage = PigMapReduce.sJobConfInternal.get().get( + PigConfiguration.PIG_CACHEDBAG_MEMUSAGE); + if (usage != null) { + percentUsage = Float.parseFloat(usage); + } + minOutputReduction = PigMapReduce.sJobConfInternal.get().getInt( + PigConfiguration.PIG_EXEC_MAP_PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION); + if (minOutputReduction <= 0) { + LOG.info("Specified reduction is < 0 (" + minOutputReduction + "). Using default " + + DEFAULT_MIN_REDUCTION); + minOutputReduction = DEFAULT_MIN_REDUCTION; + } + } + if (percentUsage <= 0) { LOG.info("No memory allocated to intermediate memory buffers. Turning off partial aggregation."); disableMapAgg(); + // Set them to true instead of adding another check for !disableMapAgg + sizeReductionChecked = true; + estimatedMemThresholds = true; } initialized = true; SpillableMemoryManager.getInstance().registerSpillable(this); @@ -145,17 +185,36 @@ public class POPartialAgg extends Physic } while (true) { - if (!sizeReductionChecked && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) { + if (!sizeReductionChecked && numRecsInRawMap >= numRecordsToSample) { checkSizeReduction(); + if (doContingentSpill && !doSpill) { + LOG.info("Avoided emitting records during spill memory call."); + doContingentSpill = false; + } } - if (!estimatedMemThresholds && numRecsInRawMap >= NUM_RECS_TO_SAMPLE) { + if (!estimatedMemThresholds && numRecsInRawMap >= numRecordsToSample) { estimateMemThresholds(); } + if (doContingentSpill) { + // Don't aggregate if spilling. Avoid concurrent update of spilling iterator. + if (doSpill == false) { + // SpillableMemoryManager requested a spill to reduce memory + // consumption. See if we can avoid it. + aggregateBothLevels(false, false); + if (shouldSpill()) { + startSpill(false); + } else { + LOG.info("Avoided emitting records during spill memory call."); + doContingentSpill = false; + } + } + } if (doSpill) { - startSpill(); + startSpill(true); Result result = spillResult(); if (result.returnStatus == POStatus.STATUS_EOP) { doSpill = false; + doContingentSpill = false; } if (result.returnStatus != POStatus.STATUS_EOP || inputsExhausted) { @@ -174,8 +233,8 @@ public class POPartialAgg extends Physic if (parentPlan.endOfAllInput) { // parent input is over. flush what we have. inputsExhausted = true; - startSpill(); LOG.info("Spilling last bits."); + startSpill(true); continue; } else { return EOP_RESULT; @@ -197,15 +256,9 @@ public class POPartialAgg extends Physic numRecsInRawMap += 1; addKeyValToMap(rawInputMap, key, inpTuple); - if (shouldAggregateFirstLevel()) { - aggregateFirstLevel(); - } - if (shouldAggregateSecondLevel()) { - aggregateSecondLevel(); - } + aggregateBothLevels(true, true); if (shouldSpill()) { - LOG.info("Starting spill."); - startSpill(); // next time around, we'll start emitting. + startSpill(false); // next time around, we'll start emitting. } } } @@ -214,10 +267,10 @@ public class POPartialAgg extends Physic private void estimateMemThresholds() { if (!mapAggDisabled()) { - LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() + " POPArtialAgg objects."); - - float percent = getPercentUsageFromProp(); - memLimits = new MemoryLimits(ALL_POPARTS.size(), percent); + LOG.info("Getting mem limits; considering " + ALL_POPARTS.size() + + " POPArtialAgg objects." + " with memory percentage " + + percentUsage); + MemoryLimits memLimits = new MemoryLimits(ALL_POPARTS.size(), percentUsage); int estTotalMem = 0; int estTuples = 0; for (Map.Entry<Object, List<Tuple>> entry : rawInputMap.entrySet()) { @@ -234,30 +287,39 @@ public class POPartialAgg extends Physic firstTierThreshold = (int) (0.5 + totalTuples * (1f - (1f / sizeReduction))); secondTierThreshold = (int) (0.5 + totalTuples * (1f / sizeReduction)); LOG.info("Setting thresholds. Primary: " + firstTierThreshold + ". Secondary: " + secondTierThreshold); + // The second tier should at least allow one tuple before it tries to aggregate. + // This code retains the total number of tuples in the buffer while guaranteeing + // the second tier has at least one tuple. + if (secondTierThreshold == 0) { + secondTierThreshold += 1; + firstTierThreshold -= 1; + } } estimatedMemThresholds = true; } private void checkSizeReduction() throws ExecException { - int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap; - aggregateFirstLevel(); - aggregateSecondLevel(); - int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap; - LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap); - int minReduction = getMinOutputReductionFromProp(); - LOG.info("Observed reduction factor: from " + numBeforeReduction + - " to " + numAfterReduction + - " => " + numBeforeReduction / numAfterReduction + "."); - if ( numBeforeReduction / numAfterReduction < minReduction) { - LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minReduction); - disableMapAgg(); + if (!mapAggDisabled()) { + int numBeforeReduction = numRecsInProcessedMap + numRecsInRawMap; + aggregateBothLevels(false, false); + int numAfterReduction = numRecsInProcessedMap + numRecsInRawMap; + LOG.info("After reduction, processed map: " + numRecsInProcessedMap + "; raw map: " + numRecsInRawMap); + LOG.info("Observed reduction factor: from " + numBeforeReduction + + " to " + numAfterReduction + + " => " + numBeforeReduction / numAfterReduction + "."); + if ( numBeforeReduction / numAfterReduction < minOutputReduction) { + LOG.info("Disabling in-memory aggregation, since observed reduction is less than " + minOutputReduction); + disableMapAgg(); + } + sizeReduction = numBeforeReduction / numAfterReduction; + sizeReductionChecked = true; } - sizeReduction = numBeforeReduction / numAfterReduction; - sizeReductionChecked = true; } private void disableMapAgg() throws ExecException { - startSpill(); + // Do not aggregate as when disableMapAgg is called aggregation is + // called and size reduction checked + startSpill(false); disableMapAgg = true; } @@ -266,16 +328,10 @@ public class POPartialAgg extends Physic } private boolean shouldAggregateFirstLevel() { - if (LOG.isInfoEnabled() && numRecsInRawMap > firstTierThreshold) { - LOG.info("Aggregating " + numRecsInRawMap + " raw records."); - } return (numRecsInRawMap > firstTierThreshold); } private boolean shouldAggregateSecondLevel() { - if (LOG.isInfoEnabled() && numRecsInProcessedMap > secondTierThreshold) { - LOG.info("Aggregating " + numRecsInProcessedMap + " secondary records."); - } return (numRecsInProcessedMap > secondTierThreshold); } @@ -305,27 +361,13 @@ public class POPartialAgg extends Physic } } - private void startSpill() throws ExecException { + private void startSpill(boolean aggregate) throws ExecException { // If spillingIterator is null, we are already spilling and don't need to set up. if (spillingIterator != null) return; - if (!rawInputMap.isEmpty()) { - if (LOG.isInfoEnabled()) { - LOG.info("In startSpill(), aggregating raw inputs. " + numRecsInRawMap + " tuples."); - } - aggregateFirstLevel(); - if (LOG.isInfoEnabled()) { - LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples."); - } - } - if (!processedInputMap.isEmpty()) { - if (LOG.isInfoEnabled()) { - LOG.info("In startSpill(), aggregating processed inputs. " + numRecsInProcessedMap + " tuples."); - } - aggregateSecondLevel(); - if (LOG.isInfoEnabled()) { - LOG.info("processed inputs: " + numRecsInProcessedMap + " tuples."); - } + LOG.info("Starting spill."); + if (aggregate) { + aggregateBothLevels(false, true); } doSpill = true; spillingIterator = processedInputMap.entrySet().iterator(); @@ -374,15 +416,41 @@ public class POPartialAgg extends Physic return numEntriesInTarget; } + private void aggregateBothLevels(boolean checkThresholdForFirst, + boolean checkThresholdForSecond) throws ExecException { + // When processed map is initially empty, just aggregate first level as + // aggregating second level immediately would not yield anything + boolean aggregateSecondLevel = !processedInputMap.isEmpty(); + if (!checkThresholdForFirst || shouldAggregateFirstLevel()) { + aggregateFirstLevel(); + } + if (aggregateSecondLevel && (!checkThresholdForSecond || shouldAggregateSecondLevel())) { + aggregateSecondLevel(); + } + } + private void aggregateFirstLevel() throws ExecException { + if (rawInputMap.isEmpty()) { + return; + } + int rawTuples = numRecsInRawMap; + int processedTuples = numRecsInProcessedMap; numRecsInProcessedMap = aggregate(rawInputMap, processedInputMap, numRecsInProcessedMap); numRecsInRawMap = 0; + LOG.info("Aggregated " + rawTuples+ " raw tuples." + + " Processed tuples before aggregation = " + processedTuples + + ", after aggregation = " + numRecsInProcessedMap); } private void aggregateSecondLevel() throws ExecException { + if (processedInputMap.isEmpty()) { + return; + } + int processedTuples = numRecsInProcessedMap; Map<Object, List<Tuple>> newMap = Maps.newHashMapWithExpectedSize(processedInputMap.size()); numRecsInProcessedMap = aggregate(processedInputMap, newMap, 0); processedInputMap = newMap; + LOG.info("Aggregated " + processedTuples + " processed tuples to " + numRecsInProcessedMap + " tuples"); } private Tuple createValueTuple(Object key, List<Tuple> inpTuples) throws ExecException { @@ -390,7 +458,14 @@ public class POPartialAgg extends Physic valueTuple.set(0, key); for (int i = 0; i < valuePlans.size(); i++) { - DataBag bag = BG.newDefaultBag(); + DataBag bag = null; + if (doContingentSpill) { + // Don't use additional memory since we already have memory stress + bag = new InternalCachedBag(); + } else { + // Take 10% of memory, need fine tune later + bag = new InternalCachedBag(1, 0.1F); + } valueTuple.set(i + 1, bag); } for (Tuple t : inpTuples) { @@ -424,29 +499,6 @@ public class POPartialAgg extends Physic v.visitPartialAgg(this); } - private int getMinOutputReductionFromProp() { - int minReduction = PigMapReduce.sJobConfInternal.get().getInt( - PigConfiguration.PARTAGG_MINREDUCTION, DEFAULT_MIN_REDUCTION); - if (minReduction <= 0) { - LOG.info("Specified reduction is < 0 (" + minReduction + "). Using default " + DEFAULT_MIN_REDUCTION); - minReduction = DEFAULT_MIN_REDUCTION; - } - return minReduction; - } - - private float getPercentUsageFromProp() { - float percent = 0.2F; - if (PigMapReduce.sJobConfInternal.get() != null) { - String usage = PigMapReduce.sJobConfInternal.get().get( - PigConfiguration.PROP_CACHEDBAG_MEMUSAGE); - if (usage != null) { - percent = Float.parseFloat(usage); - } - } - return percent; - } - - private Result getResult(ExpressionOperator op) throws ExecException { Result res; switch (op.getResultType()) { @@ -536,9 +588,26 @@ public class POPartialAgg extends Physic @Override public long spill() { - LOG.info("Spill triggered by SpillableMemoryManager"); - doSpill = true; - return 0; + if (mapAggDisabled()) { + return 0; + } else { + LOG.info("Spill triggered by SpillableMemoryManager"); + doContingentSpill = true; + synchronized(spillLock) { + if (!sizeReductionChecked) { + numRecordsToSample = numRecsInRawMap; + } + try { + while (doContingentSpill == true) { + Thread.sleep(50); //Keeping it on the lower side for now. Tune later + } + } catch (InterruptedException e) { + LOG.warn("Interrupted exception while waiting for spill to finish", e); + } + LOG.info("Finished spill for SpillableMemoryManager call"); + return 1; + } + } } @Override Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Thu Nov 27 12:49:54 2014 @@ -25,6 +25,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.PigConfiguration; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; @@ -77,7 +78,10 @@ public class POSort extends PhysicalOper private long limit; public boolean isUDFComparatorUsed = false; private DataBag sortedBag; - transient Iterator<Tuple> it; + + private transient Iterator<Tuple> it; + private transient boolean initialized; + private transient boolean useDefaultBag; public POSort( OperatorKey k, @@ -256,17 +260,19 @@ public class POSort extends PhysicalOper if (!inputsAccumulated) { res = processInput(); + if (!initialized) { + initialized = true; + if (PigMapReduce.sJobConfInternal.get() != null) { + String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_SORT_TYPE); + if (bagType != null && bagType.equalsIgnoreCase("default")) { + useDefaultBag = true; + } + } + } // by default, we create InternalSortedBag, unless user configures - // explicitly to use old bag - String bagType = null; - if (PigMapReduce.sJobConfInternal.get() != null) { - bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.sort.type"); - } - if (bagType != null && bagType.equalsIgnoreCase("default")) { - sortedBag = BagFactory.getInstance().newSortedBag(mComparator); - } else { - sortedBag = new InternalSortedBag(3, mComparator); - } + // explicitly to use old bag + sortedBag = useDefaultBag ? BagFactory.getInstance().newSortedBag(mComparator) + : new InternalSortedBag(3, mComparator); while (res.returnStatus != POStatus.STATUS_EOP) { if (res.returnStatus == POStatus.STATUS_ERR) { @@ -377,6 +383,7 @@ public class POSort extends PhysicalOper } + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { if(illustrator != null) { illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple) in); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Thu Nov 27 12:49:54 2014 @@ -81,6 +81,9 @@ public class POStore extends PhysicalOpe private String signature; + private transient List<String> cacheFiles = null; + private transient List<String> shipFiles = null; + public POStore(OperatorKey k) { this(k, -1, null); } @@ -313,4 +316,20 @@ public class POStore extends PhysicalOpe public void setStoreFunc(StoreFuncInterface storeFunc) { this.storer = storeFunc; } + + public List<String> getCacheFiles() { + return cacheFiles; + } + + public void setCacheFiles(List<String> cf) { + cacheFiles = cf; + } + + public List<String> getShipFiles() { + return shipFiles; + } + + public void setShipFiles(List<String> sf) { + shipFiles = sf; + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Nov 27 12:49:54 2014 @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.Map; +import org.apache.pig.PigConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; @@ -92,8 +93,8 @@ public class Packager implements Illustr private PackageType pkgType; - boolean firstTime = true; - boolean useDefaultBag = false; + private transient boolean initialized; + private transient boolean useDefaultBag; protected POPackage parent = null; @@ -473,10 +474,10 @@ public class Packager implements Illustr } public void checkBagType() { - if(firstTime){ - firstTime = false; + if(!initialized){ + initialized = true; if (PigMapReduce.sJobConfInternal.get() != null) { - String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type"); + String bagType = PigMapReduce.sJobConfInternal.get().get(PigConfiguration.PIG_CACHEDBAG_TYPE); if (bagType != null && bagType.equalsIgnoreCase("default")) { useDefaultBag = true; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Nov 27 12:49:54 2014 @@ -19,16 +19,20 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.TreeMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparable; @@ -89,11 +93,26 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator.LoRearrangeDiscoverer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor; import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper; +import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.NullablePartitionWritable; import org.apache.pig.impl.io.NullableTuple; @@ -102,6 +121,7 @@ import org.apache.pig.impl.plan.Operator import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.ObjectSerializer; import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.DataSinkDescriptor; @@ -135,6 +155,9 @@ import org.apache.tez.runtime.library.ap import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput; import org.apache.tez.runtime.library.input.OrderedGroupedKVInput; import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput; +import org.apache.tez.runtime.library.input.UnorderedKVInput; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; /** * A visitor to construct DAG out of Tez plan. @@ -146,6 +169,7 @@ public class TezDagBuilder extends TezOp private Map<String, LocalResource> localResources; private PigContext pc; private Configuration globalConf; + private long intermediateTaskInputSize; public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag, Map<String, LocalResource> localResources) { @@ -162,6 +186,19 @@ public class TezDagBuilder extends TezOp } catch (IOException e) { throw new RuntimeException("Error while fetching delegation tokens", e); } + + try { + intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf), FileLocalizer.getTemporaryResourcePath(pc)); + } catch (Exception e) { + log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e); + intermediateTaskInputSize = 134217728L; + } + // At least 128MB. Else we will end up with too many tasks + intermediateTaskInputSize = Math.max(intermediateTaskInputSize, 134217728L); + intermediateTaskInputSize = Math.min(intermediateTaskInputSize, + globalConf.getLong( + InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); } @Override @@ -173,8 +210,7 @@ public class TezDagBuilder extends TezOp Vertex to = null; try { if (!tezOp.isVertexGroup()) { - boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false; - to = newVertex(tezOp, isMap); + to = newVertex(tezOp); dag.addVertex(to); } else { // For union, we construct VertexGroup after iterating the @@ -248,7 +284,8 @@ public class TezDagBuilder extends TezOp } return GroupInputEdge.create(from, to, edgeProperty, - InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())); + InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()) + .setHistoryText(edgeProperty.getEdgeDestination().getHistoryText())); } /** @@ -339,8 +376,9 @@ public class TezDagBuilder extends TezOp MRToTezHelper.processMRSettings(conf, globalConf); - in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); - out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); + String historyString = convertToHistoryText("", conf); + in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString); + out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString); if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) { // Use custom edge @@ -378,7 +416,7 @@ public class TezDagBuilder extends TezOp .serialize(new byte[] { combRearrange.getKeyType() })); } - private Vertex newVertex(TezOperator tezOp, boolean isMap) throws IOException, + private Vertex newVertex(TezOperator tezOp) throws IOException, ClassNotFoundException, InterruptedException { ProcessorDescriptor procDesc = ProcessorDescriptor.create( tezOp.getProcessorName()); @@ -395,12 +433,24 @@ public class TezDagBuilder extends TezOp Job job = new Job(payloadConf); payloadConf = (JobConf) job.getConfiguration(); - if (tezOp.sampleOperator != null) { - payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.sampleOperator.getOperatorKey().toString()); + if (tezOp.getSampleOperator() != null) { + payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString()); } - if (tezOp.sortOperator != null) { - payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString()); + if (tezOp.getSortOperator() != null) { + // Required by Sample Aggregation job for estimating quantiles + payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.getSortOperator().getOperatorKey().toString()); + // PIG-4162: Order by/Skew Join in intermediate stage. + // Increasing order by parallelism may not be required as it is + // usually followed by limit other than store. But would benefit + // cases like skewed join followed by group by. + if (tezOp.getSortOperator().getEstimatedParallelism() != -1 + && TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) { + payloadConf.setLong( + InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + intermediateTaskInputSize); + } + } payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp())); @@ -443,8 +493,7 @@ public class TezDagBuilder extends TezOp tezOp.plan.remove(pack); payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack)); setIntermediateOutputKeyValue(keyType, payloadConf, tezOp); - POShuffleTezLoad newPack; - newPack = new POShuffleTezLoad(pack); + POShuffleTezLoad newPack = new POShuffleTezLoad(pack); if (tezOp.isSkewedJoin()) { newPack.setSkewedJoins(true); } @@ -455,7 +504,7 @@ public class TezDagBuilder extends TezOp // backend. Map<Integer, String> localRearrangeMap = new TreeMap<Integer, String>(); for (TezOperator pred : mPlan.getPredecessors(tezOp)) { - if (tezOp.sampleOperator != null && tezOp.sampleOperator == pred) { + if (tezOp.getSampleOperator() != null && tezOp.getSampleOperator() == pred) { // skip sample vertex input } else { String inputKey = pred.getOperatorKey().toString(); @@ -511,7 +560,7 @@ public class TezDagBuilder extends TezOp } } } - JobControlCompiler.setOutputFormat(job); + setOutputFormat(job); // set parent plan in all operators. currently the parent plan is really // used only when POStream, POSplit are present in the plan @@ -546,19 +595,22 @@ public class TezDagBuilder extends TezOp } // set various parallelism into the job conf for later analysis, PIG-2779 - payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel); - payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism()); - payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism()); + payloadConf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pc.defaultParallel); + payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism()); + payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism()); + + TezScriptState ss = TezScriptState.get(); + ss.addVertexSettingsToConf(dag.getName(), tezOp, payloadConf); // Take our assembled configuration and create a vertex UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf); - procDesc.setUserPayload(userPayload); + procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(), payloadConf)); Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), procDesc, tezOp.getVertexParallelism(), - isMap ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf)); + tezOp.isUseMRMapSettings() ? MRHelpers.getResourceForMRMapper(globalConf) : MRHelpers.getResourceForMRReducer(globalConf)); Map<String, String> taskEnv = new HashMap<String, String>(); - MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, isMap); + MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, tezOp.isUseMRMapSettings()); vertex.setTaskEnvironment(taskEnv); // All these classes are @InterfaceAudience.Private in Hadoop. Switch to Tez methods in TEZ-1012 @@ -571,7 +623,7 @@ public class TezDagBuilder extends TezOp MRApps.setupDistributedCache(globalConf, localResources); vertex.addTaskLocalFiles(localResources); - vertex.setTaskLaunchCmdOpts(isMap ? MRHelpers.getJavaOptsForMRMapper(globalConf) + vertex.setTaskLaunchCmdOpts(tezOp.isUseMRMapSettings() ? MRHelpers.getJavaOptsForMRMapper(globalConf) : MRHelpers.getJavaOptsForMRReducer(globalConf)); log.info("For vertex - " + tezOp.getOperatorKey().toString() @@ -591,7 +643,8 @@ public class TezDagBuilder extends TezOp DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName()) .setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder() .setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf)) - .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())), + .setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer())) + .setHistoryText(convertToHistoryText("", payloadConf)), InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), dag.getCredentials())); } @@ -609,7 +662,8 @@ public class TezDagBuilder extends TezOp OutputDescriptor storeOutDescriptor = OutputDescriptor.create( MROutput.class.getName()).setUserPayload(TezUtils - .createUserPayloadFromConf(outputPayLoad)); + .createUserPayloadFromConf(outputPayLoad)) + .setHistoryText(convertToHistoryText("", outputPayLoad)); if (tezOp.getVertexGroupStores() != null) { OperatorKey vertexGroupKey = tezOp.getVertexGroupStores().get(store.getOperatorKey()); if (vertexGroupKey != null) { @@ -632,14 +686,16 @@ public class TezDagBuilder extends TezOp new PigOutputFormat().checkOutputSpecs(job); } + String vmPluginName = null; + Configuration vmPluginConf = null; + // Set the right VertexManagerPlugin if (tezOp.getEstimatedParallelism() != -1) { if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) { // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able // to decrease/increase parallelism of sorting vertex dynamically // based on the numQuantiles calculated by sample aggregation vertex - vertex.setVertexManagerPlugin(VertexManagerPluginDescriptor.create( - PartitionerDefinedVertexManager.class.getName())); + vmPluginName = PartitionerDefinedVertexManager.class.getName(); log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString()); } else { boolean containScatterGather = false; @@ -655,24 +711,50 @@ public class TezDagBuilder extends TezOp if (containScatterGather && !containCustomPartitioner) { // Use auto-parallelism feature of ShuffleVertexManager to dynamically // reduce the parallelism of the vertex - VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create( - ShuffleVertexManager.class.getName()); - Configuration vmPluginConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false); + vmPluginName = ShuffleVertexManager.class.getName(); + vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf; vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); - if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, - InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)!= + if (stores.size() <= 0) { + // Intermediate reduce. Set the bytes per reducer to be block size. + vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + intermediateTaskInputSize); + } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) != InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) { vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); } - vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf)); - vertex.setVertexManagerPlugin(vmPluginDescriptor); log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString()); } } } - + if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(ShuffleVertexManager.class.getName()))) { + if (tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName())) { + // Setting SRC_FRACTION to 0.00001 so that even if there are 100K source tasks, + // limit job starts when 1 source task finishes. + // If limit is part of a group by or join because their parallelism is 1, + // we should leave the configuration with the defaults. + vmPluginName = ShuffleVertexManager.class.getName(); + vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf; + vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, "0.00001"); + vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, "0.00001"); + log.info("Set " + ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 for limit vertex " + tezOp.getOperatorKey().toString()); + } + } + // else if(tezOp.isLimitAfterSort()) + // TODO: PIG-4049 If standalone Limit we need a new VertexManager or new input + // instead of ShuffledMergedInput. For limit part of the sort (order by parallel 1) itself + // need to enhance PartitionerDefinedVertexManager + + if (vmPluginName != null) { + VertexManagerPluginDescriptor vmPluginDescriptor = VertexManagerPluginDescriptor.create(vmPluginName); + if (vmPluginConf != null) { + vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf)) + .setHistoryText(convertToHistoryText(vmPluginName, vmPluginConf)); + } + vertex.setVertexManagerPlugin(vmPluginDescriptor); + } // Reset udfcontext jobconf. It is not supposed to be set in the front end UDFContext.getUDFContext().addJobConf(null); return vertex; @@ -943,4 +1025,48 @@ public class TezDagBuilder extends TezOp conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_SECONDARY_COMPARATOR_CLASS, comparatorClass); } + + private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) { + // the OutputFormat we report to Hadoop is always PigOutputFormat which + // can be wrapped with LazyOutputFormat provided if it is supported by + // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set + if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) { + try { + Class<?> clazz = PigContext + .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); + Method method = clazz.getMethod("setOutputFormatClass", + org.apache.hadoop.mapreduce.Job.class, Class.class); + method.invoke(null, job, PigOutputFormatTez.class); + } catch (Exception e) { + job.setOutputFormatClass(PigOutputFormatTez.class); + log.warn(PigConfiguration.PIG_OUTPUT_LAZY + + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used"); + } + } else { + job.setOutputFormatClass(PigOutputFormatTez.class); + } + } + + // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 0.5.2 + public static String convertToHistoryText(String description, Configuration conf) throws IOException { + // Add a version if this serialization is changed + JSONObject jsonObject = new JSONObject(); + try { + if (description != null && !description.isEmpty()) { + jsonObject.put("desc", description); + } + if (conf != null) { + JSONObject confJson = new JSONObject(); + Iterator<Entry<String, String>> iter = conf.iterator(); + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); + confJson.put(entry.getKey(), entry.getValue()); + } + jsonObject.put("config", confJson); + } + } catch (JSONException e) { + throw new IOException("Error when trying to convert description/conf to JSON", e); + } + return jsonObject.toString(); + } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java Thu Nov 27 12:49:54 2014 @@ -24,8 +24,8 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.impl.PigContext; import org.apache.pig.tools.pigstats.PigStats; import org.apache.pig.tools.pigstats.ScriptState; +import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.apache.pig.tools.pigstats.tez.TezScriptState; -import org.apache.pig.tools.pigstats.tez.TezStats; public class TezExecutionEngine extends HExecutionEngine { @@ -43,6 +43,6 @@ public class TezExecutionEngine extends @Override public PigStats instantiatePigStats() { - return new TezStats(pigContext); + return new TezPigScriptStats(pigContext); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Thu Nov 27 12:49:54 2014 @@ -20,7 +20,6 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.util.EnumSet; import java.util.HashSet; -import java.util.Iterator; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -32,14 +31,13 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.pig.PigConfiguration; +import org.apache.pig.impl.util.UDFContext; +import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.apache.tez.client.TezClient; -import org.apache.tez.common.counters.CounterGroup; -import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; -import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.DAGStatus; import org.apache.tez.dag.api.client.Progress; @@ -64,19 +62,39 @@ public class TezJob implements Runnable private TezClient tezClient; private boolean reuseSession; private TezCounters dagCounters; - // Vertex, CounterGroup, Counter, Value - private Map<String, Map<String, Map<String, Long>>> vertexCounters; + // Timer for DAG status reporter private Timer timer; + private TezJobConfig tezJobConf; + private TezPigScriptStats pigStats; - public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources) - throws IOException { + public TezJob(TezConfiguration conf, DAG dag, + Map<String, LocalResource> requestAMResources, + int estimatedTotalParallelism) throws IOException { this.conf = conf; this.dag = dag; this.requestAMResources = requestAMResources; - this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true); + this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true); this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - this.vertexCounters = Maps.newHashMap(); + tezJobConf = new TezJobConfig(estimatedTotalParallelism); + } + + static class TezJobConfig { + + private int estimatedTotalParallelism = -1; + + public TezJobConfig(int estimatedTotalParallelism) { + this.estimatedTotalParallelism = estimatedTotalParallelism; + } + + public int getEstimatedTotalParallelism() { + return estimatedTotalParallelism; + } + + public void setEstimatedTotalParallelism(int estimatedTotalParallelism) { + this.estimatedTotalParallelism = estimatedTotalParallelism; + } + } public DAG getDAG() { @@ -84,7 +102,7 @@ public class TezJob implements Runnable } public String getName() { - return dag == null ? "" : dag.getName(); + return dag.getName(); } public Configuration getConfiguration() { @@ -103,14 +121,6 @@ public class TezJob implements Runnable return dagCounters; } - public Map<String, Map<String, Long>> getVertexCounters(String group) { - return vertexCounters.get(group); - } - - public Map<String, Long> getVertexCounters(String group, String name) { - return vertexCounters.get(group).get(name); - } - public float getDAGProgress() { Progress p = dagStatus.getDAGProgress(); return p == null ? 0 : (float)p.getSucceededTaskCount() / (float)p.getTotalTaskCount(); @@ -126,10 +136,28 @@ public class TezJob implements Runnable return vertexProgress; } + public VertexStatus getVertexStatus(String vertexName) { + VertexStatus vs = null; + try { + vs = dagClient.getVertexStatus(vertexName, statusGetOpts); + } catch (Exception e) { + // Don't fail the job even if vertex status couldn't + // be retrieved. + log.warn("Cannot retrieve status for vertex " + vertexName, e); + } + return vs; + } + + public void setPigStats(TezPigScriptStats pigStats) { + this.pigStats = pigStats; + } + @Override public void run() { + UDFContext udfContext = UDFContext.getUDFContext(); try { - tezClient = TezSessionManager.getClient(conf, requestAMResources, dag.getCredentials()); + tezClient = TezSessionManager.getClient(conf, requestAMResources, + dag.getCredentials(), tezJobConf); log.info("Submitting DAG " + dag.getName()); dagClient = tezClient.submitDAG(dag); appId = tezClient.getAppMasterApplicationId(); @@ -145,7 +173,7 @@ public class TezJob implements Runnable timer = new Timer(); timer.schedule(new DAGStatusReporter(), 1000, conf.getLong( - PigConfiguration.TEZ_DAG_STATUS_REPORT_INTERVAL, 10) * 1000); + PigConfiguration.PIG_TEZ_DAG_STATUS_REPORT_INTERVAL, 20) * 1000); while (true) { try { @@ -156,10 +184,18 @@ public class TezJob implements Runnable } if (dagStatus.isCompleted()) { + // For tez_local mode where PigProcessor destroys all UDFContext + UDFContext.setUdfContext(udfContext); + + log.info("DAG Status: " + dagStatus); dagCounters = dagStatus.getDAGCounters(); - collectVertexCounters(); TezSessionManager.freeSession(tezClient); try { + pigStats.accumulateStats(this); + } catch (Exception e) { + log.warn("Exception while gathering stats", e); + } + try { if (!reuseSession) { TezSessionManager.stopSession(tezClient); } @@ -182,36 +218,17 @@ public class TezJob implements Runnable } private class DAGStatusReporter extends TimerTask { + + private final String LINE_SEPARATOR = System.getProperty("line.separator"); + @Override public void run() { - log.info("DAG Status: " + dagStatus); - } - } - - private void collectVertexCounters() { - for (Vertex v : dag.getVertices()) { - String name = v.getName(); - try { - VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts); - TezCounters counters = s.getVertexCounters(); - Map<String, Map<String, Long>> grpCounters = Maps.newHashMap(); - Iterator<CounterGroup> grpIt = counters.iterator(); - while (grpIt.hasNext()) { - CounterGroup grp = grpIt.next(); - Iterator<TezCounter> cntIt = grp.iterator(); - Map<String, Long> cntMap = Maps.newHashMap(); - while (cntIt.hasNext()) { - TezCounter cnt = cntIt.next(); - cntMap.put(cnt.getName(), cnt.getValue()); - } - grpCounters.put(grp.getName(), cntMap); - } - vertexCounters.put(name, grpCounters); - } catch (Exception e) { - // Don't fail the job even if vertex counters couldn't - // be retrieved. - log.info("Cannot retrieve counters for vertex " + name, e); - } + if (dagStatus == null) return; + String msg = "status=" + dagStatus.getState() + + ", progress=" + dagStatus.getDAGProgress() + + ", diagnostics=" + + StringUtils.join(dagStatus.getDiagnostics(), LINE_SEPARATOR); + log.info("DAG Status: " + msg); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1642132&r1=1642131&r2=1642132&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Thu Nov 27 12:49:54 2014 @@ -32,6 +32,9 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.pig.PigException; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode; import org.apache.pig.impl.PigContext; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.TezConfiguration; @@ -43,7 +46,6 @@ import org.apache.tez.dag.api.TezConfigu */ public class TezJobCompiler { private static final Log log = LogFactory.getLog(TezJobCompiler.class); - private static int dagIdentifier = 0; private PigContext pigContext; private TezConfiguration tezConf; @@ -53,24 +55,22 @@ public class TezJobCompiler { this.tezConf = new TezConfiguration(conf); } - public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources) + public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources) throws IOException, YarnException { - String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig"); - DAG tezDag = DAG.create(jobName + "-" + dagIdentifier); - dagIdentifier++; + DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString()); tezDag.setCredentials(new Credentials()); - TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources); + TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources); dagBuilder.visit(); return tezDag; } - public TezJob compile(TezOperPlan tezPlan, String grpName, TezPlanContainer planContainer) + public TezJob compile(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer) throws JobCreationException { TezJob job = null; try { // A single Tez job always pack only 1 Tez plan. We will track // Tez job asynchronously to exploit parallel execution opportunities. - job = getJob(tezPlan, planContainer); + job = getJob(tezPlanNode, planContainer); } catch (JobCreationException jce) { throw jce; } catch(Exception e) { @@ -82,11 +82,12 @@ public class TezJobCompiler { return job; } - private TezJob getJob(TezOperPlan tezPlan, TezPlanContainer planContainer) + private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer) throws JobCreationException { try { Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); localResources.putAll(planContainer.getLocalResources()); + TezOperPlan tezPlan = tezPlanNode.getTezOperPlan(); localResources.putAll(tezPlan.getExtraResources()); String shipFiles = pigContext.getProperties().getProperty("pig.streaming.ship.files"); if (shipFiles != null) { @@ -101,8 +102,11 @@ public class TezJobCompiler { TezResourceManager.getInstance().addTezResource(new Path(new URI(file.trim())).toUri()); } } - DAG tezDag = buildDAG(tezPlan, localResources); - return new TezJob(tezConf, tezDag, localResources); + for (Map.Entry<String, LocalResource> entry : localResources.entrySet()) { + log.info("Local resource: " + entry.getKey()); + } + DAG tezDag = buildDAG(tezPlanNode, localResources); + return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism()); } catch (Exception e) { int errCode = 2017; String msg = "Internal error creating job configuration.";
