Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Feb 24 03:34:37 2017 @@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp // The array of Hashtables one per replicated input. replicates[fragment] = // null fragment is the input which is fragmented and not replicated. - protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates; + protected transient TupleToMapKey replicates[]; // varaible which denotes whether we are returning tuples from the foreach // operator protected transient boolean processingPlan; @@ -234,10 +234,7 @@ public class POFRJoin extends PhysicalOp Result res = null; Result inp = null; if (!setUp) { - replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size()); - for (int i = 0 ; i < phyPlanLists.size(); i++) { - replicates.add(null); - } + replicates = new TupleToMapKey[phyPlanLists.size()]; dumTup = mTupleFactory.newTuple(1); setUpHashMap(); setUp = true; @@ -285,7 +282,8 @@ public class POFRJoin extends PhysicalOp return new Result(); } Tuple lrOutTuple = (Tuple) lrOut.result; - Object key = lrOutTuple.get(1); + Tuple key = mTupleFactory.newTuple(1); + key.set(0, lrOutTuple.get(1)); Tuple value = getValueTuple(lr, lrOutTuple); lr.detachInput(); // Configure the for each operator with the relevant bags @@ -298,7 +296,7 @@ public class POFRJoin extends PhysicalOp ce.setValue(value); continue; } - Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i); + TupleToMapKey replicate = replicates[i]; if (replicate.get(key) == null) { if (isLeftOuterJoin) { ce.setValue(nullBag); @@ -306,7 +304,7 @@ public class POFRJoin extends PhysicalOp noMatch = true; break; } - ce.setValue(new NonSpillableDataBag(replicate.get(key))); + ce.setValue(new NonSpillableDataBag(replicate.get(key).getList())); } // If this is not LeftOuter Join and there was no match we @@ -329,28 +327,27 @@ public class POFRJoin extends PhysicalOp } } - protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> { + protected static class TupleToMapKey { + private HashMap<Tuple, TuplesToSchemaTupleList> tuples; private SchemaTupleFactory tf; public TupleToMapKey(int ct, SchemaTupleFactory tf) { - super(ct); + tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct); this.tf = tf; } - @Override - public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) { - if (tf != null && key instanceof Tuple) { - key = TuplesToSchemaTupleList.convert((Tuple)key, tf); + public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) { + if (tf != null) { + key = TuplesToSchemaTupleList.convert(key, tf); } - return (TuplesToSchemaTupleList) super.put(key, val); + return tuples.put(key, val); } - @Override - public TuplesToSchemaTupleList get(Object key) { - if (tf != null && key instanceof Tuple) { - key = TuplesToSchemaTupleList.convert((Tuple)key, tf); + public TuplesToSchemaTupleList get(Tuple key) { + if (tf != null) { + key = TuplesToSchemaTupleList.convert(key, tf); } - return (TuplesToSchemaTupleList) super.get(key); + return tuples.get(key); } } @@ -385,7 +382,7 @@ public class POFRJoin extends PhysicalOp SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i]; if (i == fragment) { - replicates.set(i, null); + replicates[i] = null; continue; } @@ -404,34 +401,25 @@ public class POFRJoin extends PhysicalOp POLocalRearrange lr = LRs[i]; lr.setInputs(Arrays.asList((PhysicalOperator) ld)); - Map<Object, ArrayList<Tuple>> replicate; - if (keySchemaTupleFactory == null) { - replicate = new HashMap<Object, ArrayList<Tuple>>(1000); - } else { - replicate = new TupleToMapKey(1000, keySchemaTupleFactory); - } + TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory); log.debug("Completed setup. Trying to build replication hash table"); for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) { if (getReporter() != null) getReporter().progress(); Tuple tuple = (Tuple) res.result; - Object key = tuple.get(1); - if (isKeyNull(key)) continue; + if (isKeyNull(tuple.get(1))) continue; + Tuple key = mTupleFactory.newTuple(1); + key.set(0, tuple.get(1)); Tuple value = getValueTuple(lr, tuple); - ArrayList<Tuple> values = replicate.get(key); - if (values == null) { - if (inputSchemaTupleFactory == null) { - values = new ArrayList<Tuple>(1); - } else { - values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory); - } - replicate.put(key, values); + if (replicate.get(key) == null) { + replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory)); } - values.add(value); + + replicate.get(key).add(value); } - replicates.set(i, replicate); + replicates[i] = replicate; } long time2 = System.currentTimeMillis(); log.debug("Hash Table built. Time taken: " + (time2 - time1));
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Fri Feb 24 03:34:37 2017 @@ -51,7 +51,7 @@ public class POFRJoinSpark extends POFRJ addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i); } - replicates.set(fragment, null); + replicates[fragment] = null; int i = -1; long start = System.currentTimeMillis(); for (int k = 0; k < inputSchemas.length; ++k) { @@ -61,7 +61,7 @@ public class POFRJoinSpark extends POFRJ SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i]; if (i == fragment) { - replicates.set(fragment, null); + replicates[i] = null; continue; } @@ -91,7 +91,7 @@ public class POFRJoinSpark extends POFRJ replicate.get(key).add(value); } - replicates.set(i, replicate); + replicates[i] = replicate; } long end = System.currentTimeMillis(); log.debug("Hash Table built. Time taken: " + (end - start)); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Fri Feb 24 03:34:37 2017 @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; @@ -56,7 +55,6 @@ import org.apache.pig.pen.util.LineageTr @SuppressWarnings("unchecked") public class POForEach extends PhysicalOperator { private static final long serialVersionUID = 1L; - private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple()); protected List<PhysicalPlan> inputPlans; @@ -266,7 +264,7 @@ public class POForEach extends PhysicalO if (inp.returnStatus == POStatus.STATUS_EOP) { if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) { // continue pull one more output - inp = UNLIMITED_NULL_RESULT; + inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple()); } else { return inp; } @@ -443,8 +441,6 @@ public class POForEach extends PhysicalO if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) { its[i] = ((DataBag)bags[i]).iterator(); - } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) { - its[i] = ((Map)bags[i]).entrySet().iterator(); } else { its[i] = null; } @@ -470,7 +466,7 @@ public class POForEach extends PhysicalO //we instantiate the template array and start populating it with data data = new Object[noItems]; for(int i = 0; i < noItems; ++i) { - if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) { + if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) { if(its[i].hasNext()) { data[i] = its[i].next(); } else { @@ -544,15 +540,6 @@ public class POForEach extends PhysicalO out.append(t.get(j)); } } - } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) { - Map.Entry entry = (Map.Entry)in; - if (knownSize) { - out.set(idx++, entry.getKey()); - out.set(idx++, entry.getValue()); - } else { - out.append(entry.getKey()); - out.append(entry.getValue()); - } } else { if (knownSize) { out.set(idx++, in); @@ -751,12 +738,9 @@ public class POForEach extends PhysicalO opsToBeReset.add(sort); } - @Override - public void visitCross(POCross c) throws VisitorException { - // FIXME: add only if limit is present - opsToBeReset.add(c); - } - + /* (non-Javadoc) + * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject) + */ @Override public void visitProject(POProject proj) throws VisitorException { if(proj instanceof PORelationToExprProject) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Feb 24 03:34:37 2017 @@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE import org.apache.pig.impl.util.MultiMap; import org.apache.pig.newplan.logical.relational.LOJoin; -/** This operator implements merge join algorithm to do map side joins. +/** This operator implements merge join algorithm to do map side joins. * Currently, only two-way joins are supported. One input of join is identified as left * and other is identified as right. Left input tuples are the input records in map. * Right tuples are read from HDFS by opening right stream. - * + * * This join doesn't support outer join. * Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order. */ @@ -99,7 +99,7 @@ public class POMergeJoin extends Physica private FuncSpec rightLoaderFuncSpec; private String rightInputFileName; - + private String indexFile; // Buffer to hold accumulated left tuples. @@ -249,11 +249,12 @@ public class POMergeJoin extends Physica * from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples * from the source, though in the future that is what we would like to do. */ - public static class TuplesToSchemaTupleList extends ArrayList<Tuple> { + public static class TuplesToSchemaTupleList { + private List<Tuple> tuples; private SchemaTupleFactory tf; public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) { - super(ct); + tuples = new ArrayList<Tuple>(ct); if (tf instanceof SchemaTupleFactory) { this.tf = (SchemaTupleFactory)tf; } @@ -272,24 +273,24 @@ public class POMergeJoin extends Physica } } - @Override public boolean add(Tuple t) { if (tf != null) { t = convert(t, tf); } - return super.add(t); + return tuples.add(t); } - @Override public Tuple get(int i) { - return super.get(i); + return tuples.get(i); } - @Override public int size() { - return super.size(); + return tuples.size(); } + public List<Tuple> getList() { + return tuples; + } } @SuppressWarnings("unchecked") @@ -356,7 +357,7 @@ public class POMergeJoin extends Physica } else{ Object rightKey = extractKeysFromTuple(rightInp, 1); - if(null == rightKey) // If we see tuple having null keys in stream, we drop them + if(null == rightKey) // If we see tuple having null keys in stream, we drop them continue; // and fetch next tuple. int cmpval = ((Comparable)rightKey).compareTo(curJoinKey); @@ -398,7 +399,7 @@ public class POMergeJoin extends Physica "Last two tuples encountered were: \n"+ curJoiningRightTup+ "\n" + (Tuple)rightInp.result ; throw new ExecException(errMsg,errCode); - } + } } } } @@ -429,17 +430,17 @@ public class POMergeJoin extends Physica prevLeftKey+ "\n" + curLeftKey ; throw new ExecException(errMsg,errCode); } - + case POStatus.STATUS_EOP: if(this.parentPlan.endOfAllInput || isEndOfInput()){ - // We hit the end on left input. + // We hit the end on left input. // Tuples in bag may still possibly join with right side. curJoinKey = prevLeftKey; curLeftKey = null; if (isEndOfInput()) { leftInputConsumedInSpark = true; } - break; + break; } else // Fetch next left input. return curLeftInp; @@ -464,7 +465,7 @@ public class POMergeJoin extends Physica // Accumulated tuples with same key on left side. // But since we are reading ahead we still haven't checked the read ahead right tuple. // Accumulated left tuples may potentially join with that. So, lets check that first. - + if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){ curJoiningRightTup = (Tuple)prevRightInp.result; @@ -486,17 +487,17 @@ public class POMergeJoin extends Physica slidingToNextRecord = false; } else rightInp = getNextRightInp(prevLeftKey); - + if(rightInp.returnStatus != POStatus.STATUS_OK) return rightInp; Object extractedRightKey = extractKeysFromTuple(rightInp, 1); - - if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them + + if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them continue; // and fetch next tuple. - + Comparable rightKey = (Comparable)extractedRightKey; - + if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){ // Sanity check. int errCode = 1102; @@ -527,7 +528,7 @@ public class POMergeJoin extends Physica else{ // We got ahead on right side. Store currently read right tuple. prevRightKey = rightKey; prevRightInp = rightInp; - // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. + // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. leftTuples = newLeftTupleArray(); leftTuples.add((Tuple)curLeftInp.result); prevLeftInp = curLeftInp; @@ -554,7 +555,7 @@ public class POMergeJoin extends Physica DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader; loader.setIndexFile(indexFile); } - + // Pass signature of the loader to rightLoader // make a copy of the conf to use in calls to rightLoader. rightLoader.setUDFContextSignature(signature); @@ -607,11 +608,11 @@ public class POMergeJoin extends Physica // run the tuple through the pipeline rightPipelineRoot.attachInput(t); return this.getNextRightInp(); - + } default: // We don't deal with ERR/NULL. just pass them down throwProcessingException(false, null); - + } } } catch (IOException e) { @@ -642,8 +643,8 @@ public class POMergeJoin extends Physica int errCode = 2167; String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly"; throw new ExecException(errMsg,errCode,PigException.BUG); - } - + } + return ((Tuple) lrOut.result).get(1); } @@ -659,7 +660,7 @@ public class POMergeJoin extends Physica noInnerPlanOnRightSide = false; this.rightPipelineLeaf = rightPipeline.getLeaves().get(0); this.rightPipelineRoot = rightPipeline.getRoots().get(0); - this.rightPipelineRoot.setInputs(null); + this.rightPipelineRoot.setInputs(null); } else noInnerPlanOnRightSide = true; @@ -710,18 +711,18 @@ public class POMergeJoin extends Physica public boolean supportsMultipleOutputs() { return false; } - + /** * @param rightInputFileName the rightInputFileName to set */ public void setRightInputFileName(String rightInputFileName) { this.rightInputFileName = rightInputFileName; } - + public String getSignature() { return signature; } - + public void setSignature(String signature) { this.signature = signature; } @@ -733,12 +734,12 @@ public class POMergeJoin extends Physica public String getIndexFile() { return indexFile; } - + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { return null; } - + public LOJoin.JOINTYPE getJoinType() { return joinType; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Fri Feb 24 03:34:37 2017 @@ -44,9 +44,6 @@ public class POPoissonSample extends Phy private transient boolean initialized; - // num of rows skipped so far - private transient int numSkipped; - // num of rows sampled so far private transient int numRowsSampled; @@ -92,7 +89,6 @@ public class POPoissonSample extends Phy @Override public Result getNextTuple() throws ExecException { if (!initialized) { - numSkipped = 0; numRowsSampled = 0; avgTupleMemSz = 0; rowNum = 0; @@ -138,7 +134,7 @@ public class POPoissonSample extends Phy } // skip tuples - while (numSkipped < skipInterval) { + for (long numSkipped = 0; numSkipped < skipInterval; numSkipped++) { res = processInput(); if (res.returnStatus == POStatus.STATUS_NULL) { continue; @@ -152,7 +148,6 @@ public class POPoissonSample extends Phy return res; } rowNum++; - numSkipped++; } // skipped enough, get new sample @@ -178,8 +173,6 @@ public class POPoissonSample extends Phy rowNum++; newSample = res; - // reset skipped - numSkipped = 0; return currentSample; } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Fri Feb 24 03:34:37 2017 @@ -125,7 +125,7 @@ public class POReservoirSample extends P } // collect samples until input is exhausted - int rand = randGen.nextInt(rowProcessed + 1); + int rand = randGen.nextInt(rowProcessed); if (rand < numSamples) { samples[rand] = res; } @@ -133,13 +133,8 @@ public class POReservoirSample extends P } } - if (res.returnStatus == POStatus.STATUS_EOP) { - if (this.parentPlan.endOfAllInput) { - sampleCollectionDone = true; - } else { - // In case of Split can get EOP in between. - return res; - } + if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) { + sampleCollectionDone = true; } return getSample(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Fri Feb 24 03:34:37 2017 @@ -51,13 +51,13 @@ public class Packager implements Illustr protected DataBag[] bags; public static enum PackageType { - GROUP, JOIN, BLOOMJOIN + GROUP, JOIN }; protected transient Illustrator illustrator = null; // The key being worked on - protected Object key; + Object key; // marker to indicate if key is a tuple protected boolean isKeyTuple = false; @@ -65,7 +65,7 @@ public class Packager implements Illustr protected boolean isKeyCompound = false; // key's type - protected byte keyType; + byte keyType; // The number of inputs to this // co-group. 0 indicates a distinct, which means there will only be a Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java Fri Feb 24 03:34:37 2017 @@ -60,7 +60,7 @@ public class StoreFuncDecorator { private boolean allowErrors() { return UDFContext.getUDFContext().getJobConf() - .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false); + .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false); } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Fri Feb 24 03:34:37 2017 @@ -162,13 +162,8 @@ public class LoadConverter implements RD private SparkEngineConf sparkEngineConf; private boolean initialized; - //LoadConverter#ToTupleFunction is executed more than once in multiquery case causing - //invalid number of input records, 'skip' flag below indicates first load is finished. - private boolean skip; - public ToTupleFunction(SparkEngineConf sparkEngineConf){ this.sparkEngineConf = sparkEngineConf; - } @Override @@ -177,14 +172,9 @@ public class LoadConverter implements RD long partitionId = TaskContext.get().partitionId(); PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId)); - //We're in POSplit and already counted all input records, - //in a multiquery case skip will be set to true after the first load is finished: - if (sparkCounters != null && SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName, counterName).getValue() > 0) { - skip=true; - } initialized = true; } - if (sparkCounters != null && disableCounter == false && skip == false) { + if (sparkCounters != null && disableCounter == false) { sparkCounters.increment(counterGroupName, counterName, 1L); } return v1._2(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Fri Feb 24 03:34:37 2017 @@ -19,14 +19,13 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -44,7 +43,6 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; -import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -58,7 +56,6 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; -import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator; @@ -90,6 +87,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; @@ -110,6 +108,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.PigImplConstants; +import org.apache.pig.impl.builtin.DefaultIndexableLoader; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.NullablePartitionWritable; import org.apache.pig.impl.io.NullableTuple; @@ -175,7 +174,6 @@ public class TezDagBuilder extends TezOp private PigContext pc; private Configuration globalConf; private Configuration pigContextConf; - private Configuration shuffleVertexManagerBaseConf; private FileSystem fs; private long intermediateTaskInputSize; private Set<String> inputSplitInDiskVertices; @@ -193,8 +191,6 @@ public class TezDagBuilder extends TezOp private String mapTaskLaunchCmdOpts; private String reduceTaskLaunchCmdOpts; - private boolean disableDAGRecovery = false; - public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag, Map<String, LocalResource> localResources) { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); @@ -214,10 +210,6 @@ public class TezDagBuilder extends TezOp } } - public boolean shouldDisableDAGRecovery() { - return disableDAGRecovery; - } - private void initialize(PigContext pc) throws IOException { this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true); @@ -225,16 +217,6 @@ public class TezDagBuilder extends TezOp this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false); MRToTezHelper.processMRSettings(pigContextConf, globalConf); - shuffleVertexManagerBaseConf = new Configuration(false); - // Only copy tez.shuffle-vertex-manager config to keep payload size small - Iterator<Entry<String, String>> iter = pigContextConf.iterator(); - while (iter.hasNext()) { - Entry<String, String> entry = iter.next(); - if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) { - shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue()); - } - } - // Add credentials from binary token file and get tokens for namenodes // specified in mapreduce.job.hdfs-servers SecurityHelper.populateTokenCache(globalConf, dag.getCredentials()); @@ -283,7 +265,7 @@ public class TezDagBuilder extends TezOp if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) { // If tez setting is not defined MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true); - MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false); + MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true); } if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) { @@ -297,7 +279,7 @@ public class TezDagBuilder extends TezOp try { fs = FileSystem.get(globalConf); - intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc)); + intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc)); } catch (Exception e) { log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e); intermediateTaskInputSize = 134217728L; @@ -415,11 +397,7 @@ public class TezDagBuilder extends TezOp tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup); POStore store = tezOp.getVertexGroupInfo().getStore(); if (store != null) { - String outputKey = store.getOperatorKey().toString(); - if (store instanceof POStoreTez) { - outputKey = ((POStoreTez) store).getOutputKey(); - } - vertexGroup.addDataSink(outputKey, + vertexGroup.addDataSink(store.getOperatorKey().toString(), DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(), OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials())); } @@ -463,14 +441,7 @@ public class TezDagBuilder extends TezOp Configuration conf = new Configuration(pigContextConf); - if (edge.needsDistinctCombiner()) { - conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, - MRCombiner.class.getName()); - conf.set(MRJobConfig.COMBINE_CLASS_ATTR, - DistinctCombiner.Combine.class.getName()); - log.info("Setting distinct combiner class between " - + from.getOperatorKey() + " and " + to.getOperatorKey()); - } else if (!combinePlan.isEmpty()) { + if (!combinePlan.isEmpty()) { udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC); addCombiner(combinePlan, to, conf, isMergedInput); } @@ -479,7 +450,7 @@ public class TezDagBuilder extends TezOp POLocalRearrangeTez.class); for (POLocalRearrangeTez lr : lrs) { - if (lr.containsOutputKey(to.getOperatorKey().toString())) { + if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { byte keyType = lr.getKeyType(); setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput); // In case of secondary key sort, main key type is the actual key type @@ -508,8 +479,7 @@ public class TezDagBuilder extends TezOp conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true); conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true); - conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal()); - conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties())); + conf.set("pig.pigContext", serializedPigContext); conf.set("udf.import.list", serializedUDFImportList); if(to.isGlobalSort() || to.isLimitAfterSort()){ @@ -540,36 +510,26 @@ public class TezDagBuilder extends TezOp UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); out.setUserPayload(payLoad); - in.setUserPayload(payLoad); - // Remove combiner and reset payload if (!combinePlan.isEmpty()) { boolean noCombineInReducer = false; - boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap(); String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER); - if (edge.getCombinerInReducer() != null) { - noCombineInReducer = !edge.getCombinerInReducer(); - } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) { + if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) { noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan); } else { noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner); } - if (noCombineInReducer || noCombineInMapper) { + if (noCombineInReducer) { log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey()); conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS); conf.unset(MRJobConfig.COMBINE_CLASS_ATTR); conf.unset("pig.combinePlan"); conf.unset("pig.combine.package"); conf.unset("pig.map.keytype"); - UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf); - if (noCombineInMapper) { - out.setUserPayload(payLoadWithoutCombiner); - } - if (noCombineInReducer) { - in.setUserPayload(payLoadWithoutCombiner); - } + payLoad = TezUtils.createUserPayloadFromConf(conf); } } + in.setUserPayload(payLoad); if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) { // Use custom edge @@ -633,8 +593,6 @@ public class TezDagBuilder extends TezOp setOutputFormat(job); payloadConf.set("udf.import.list", serializedUDFImportList); payloadConf.set("exectype", "TEZ"); - payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal()); - payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties())); // Process stores LinkedList<POStore> stores = processStores(tezOp, payloadConf, job); @@ -653,7 +611,11 @@ public class TezDagBuilder extends TezOp payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists())); payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits())); inputPayLoad = new Configuration(payloadConf); + if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) { + inputPayLoad.set("pig.pigContext", serializedPigContext); + } } + payloadConf.set("pig.pigContext", serializedPigContext); if (tezOp.getSampleOperator() != null) { payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString()); @@ -727,7 +689,7 @@ public class TezDagBuilder extends TezOp PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class); for (POLocalRearrangeTez lr : lrs) { if (lr.isConnectedToPackage() - && lr.containsOutputKey(tezOp.getOperatorKey().toString())) { + && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) { localRearrangeMap.put((int) lr.getIndex(), inputKey); if (isVertexGroup) { isMergedInput = true; @@ -810,25 +772,9 @@ public class TezDagBuilder extends TezOp String vmPluginName = null; Configuration vmPluginConf = null; - boolean containScatterGather = false; - boolean containCustomPartitioner = false; - for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { - if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { - containScatterGather = true; - } - if (edge.partitionerClass != null) { - containCustomPartitioner = true; - } - } - - if(containScatterGather) { - vmPluginName = ShuffleVertexManager.class.getName(); - vmPluginConf = new Configuration(shuffleVertexManagerBaseConf); - } // Set the right VertexManagerPlugin if (tezOp.getEstimatedParallelism() != -1) { - boolean autoParallelism = false; if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) { if (tezOp.getVertexParallelism()==-1 && ( tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1|| @@ -837,12 +783,33 @@ public class TezDagBuilder extends TezOp // to decrease/increase parallelism of sorting vertex dynamically // based on the numQuantiles calculated by sample aggregation vertex vmPluginName = PartitionerDefinedVertexManager.class.getName(); - autoParallelism = true; log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString()); } } else { + boolean containScatterGather = false; + boolean containCustomPartitioner = false; + for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { + if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { + containScatterGather = true; + } + if (edge.partitionerClass!=null) { + containCustomPartitioner = true; + } + } if (containScatterGather && !containCustomPartitioner) { - + vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf; + // Use auto-parallelism feature of ShuffleVertexManager to dynamically + // reduce the parallelism of the vertex + if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true) + && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) { + vmPluginName = PigGraceShuffleVertexManager.class.getName(); + tezOp.setUseGraceParallelism(true); + vmPluginConf.set("pig.tez.plan", getSerializedTezPlan()); + vmPluginConf.set("pig.pigContext", serializedPigContext); + } else { + vmPluginName = ShuffleVertexManager.class.getName(); + } + vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); // For Intermediate reduce, set the bytes per reducer to be block size. long bytesPerReducer = intermediateTaskInputSize; // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user. @@ -851,8 +818,8 @@ public class TezDagBuilder extends TezOp // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce // as map input sizes are mostly always high compared to map output. if (stores.size() > 0) { - if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) { - bytesPerReducer = pigContextConf.getLong( + if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) { + bytesPerReducer = vmPluginConf.getLong( InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER); } else if (tezOp.isGroupBy()) { @@ -861,28 +828,10 @@ public class TezDagBuilder extends TezOp bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT; } } - - // Use auto-parallelism feature of ShuffleVertexManager to dynamically - // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager - // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on - if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true) - && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty() - && tezOp.getCrossKeys() == null) { - vmPluginName = PigGraceShuffleVertexManager.class.getName(); - tezOp.setUseGraceParallelism(true); - vmPluginConf.set("pig.tez.plan", getSerializedTezPlan()); - vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext); - vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer); - } - vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer); - autoParallelism = true; log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString()); } } - if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) { - disableDAGRecovery = true; - } } if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())|| vmPluginName.equals(ShuffleVertexManager.class.getName()))) { @@ -1460,12 +1409,22 @@ public class TezDagBuilder extends TezOp private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) { // the OutputFormat we report to Hadoop is always PigOutputFormat which - // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set + // can be wrapped with LazyOutputFormat provided if it is supported by + // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) { - LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class); + try { + Class<?> clazz = PigContext + .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); + Method method = clazz.getMethod("setOutputFormatClass", + org.apache.hadoop.mapreduce.Job.class, Class.class); + method.invoke(null, job, PigOutputFormatTez.class); + } catch (Exception e) { + job.setOutputFormatClass(PigOutputFormatTez.class); + log.warn(PigConfiguration.PIG_OUTPUT_LAZY + + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used"); + } } else { job.setOutputFormatClass(PigOutputFormatTez.class); } } - } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Feb 24 03:34:37 2017 @@ -30,11 +30,6 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.pig.PigConfiguration; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; -import org.apache.pig.impl.plan.DependencyOrderWalker; -import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.apache.tez.client.TezClient; @@ -56,7 +51,7 @@ import com.google.common.collect.Maps; */ public class TezJob implements Runnable { private static final Log log = LogFactory.getLog(TezJob.class); - private TezConfiguration conf; + private Configuration conf; private EnumSet<StatusGetOpts> statusGetOpts; private Map<String, LocalResource> requestAMResources; private ApplicationId appId; @@ -74,71 +69,31 @@ public class TezJob implements Runnable public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources, - TezOperPlan tezPlan) throws IOException { + int estimatedTotalParallelism) throws IOException { this.conf = conf; this.dag = dag; this.requestAMResources = requestAMResources; this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true); this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - tezJobConf = new TezJobConfig(tezPlan); + tezJobConf = new TezJobConfig(estimatedTotalParallelism); } static class TezJobConfig { private int estimatedTotalParallelism = -1; - private int maxOutputsinSingleVertex; - private int totalVertices = 0; - public TezJobConfig(TezOperPlan tezPlan) throws VisitorException { - this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism(); - MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan); - finder.visit(); - this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex(); - this.totalVertices = finder.getTotalVertices(); + public TezJobConfig(int estimatedTotalParallelism) { + this.estimatedTotalParallelism = estimatedTotalParallelism; } public int getEstimatedTotalParallelism() { return estimatedTotalParallelism; } - public int getMaxOutputsinSingleVertex() { - return maxOutputsinSingleVertex; + public void setEstimatedTotalParallelism(int estimatedTotalParallelism) { + this.estimatedTotalParallelism = estimatedTotalParallelism; } - public int getTotalVertices() { - return totalVertices; - } - - } - - private static class MaxOutputsFinder extends TezOpPlanVisitor { - - private int maxOutputsinSingleVertex = 1; - private int totalVertices = 0; - - public MaxOutputsFinder(TezOperPlan plan) { - super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); - } - - public int getMaxOutputsinSingleVertex() { - return maxOutputsinSingleVertex; - } - - public int getTotalVertices() { - return totalVertices; - } - - @Override - public void visitTezOp(TezOperator tezOperator) throws VisitorException { - if (!tezOperator.isVertexGroup()) { - totalVertices++; - int outputs = tezOperator.outEdges.keySet().size(); - maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs; - } - } - - - } public DAG getDAG() { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Feb 24 03:34:37 2017 @@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex import java.io.File; import java.io.IOException; -import java.lang.reflect.Method; import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -31,7 +30,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.pig.PigException; -import org.apache.pig.backend.hadoop.PigATSClient; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; @@ -52,12 +50,11 @@ public class TezJobCompiler { private static final Log log = LogFactory.getLog(TezJobCompiler.class); private PigContext pigContext; - private Configuration conf; - private boolean disableDAGRecovery; + private TezConfiguration tezConf; public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException { this.pigContext = pigContext; - this.conf = conf; + this.tezConf = new TezConfiguration(conf); } public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources) @@ -67,7 +64,6 @@ public class TezJobCompiler { TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources); dagBuilder.visit(); dagBuilder.avoidContainerReuseIfInputSplitInDisk(); - disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery(); return tezDag; } @@ -89,7 +85,6 @@ public class TezJobCompiler { return job; } - @SuppressWarnings({ "rawtypes", "unchecked" }) private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer) throws JobCreationException { try { @@ -112,34 +107,8 @@ public class TezJobCompiler { } DAG tezDag = buildDAG(tezPlanNode, localResources); tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript())); - // set Tez caller context - // Reflection for the following code since it is only available since tez 0.8.1: - // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext), - // ATSService.EntityType, ""); - // tezDag.setCallerContext(context); - Class callerContextClass = null; - try { - callerContextClass = Class.forName("org.apache.tez.client.CallerContext"); - } catch (ClassNotFoundException e) { - // If pre-Tez 0.8.1, skip setting CallerContext - } - if (callerContextClass != null) { - Method builderBuildMethod = callerContextClass.getMethod("create", String.class, - String.class, String.class, String.class); - Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT, - PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, ""); - Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext", - context.getClass()); - dagSetCallerContext.invoke(tezDag, context); - } log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism()); - TezConfiguration tezConf = new TezConfiguration(conf); - if (disableDAGRecovery - && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, - TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { - tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); - } - return new TezJob(tezConf, tezDag, localResources, tezPlan); + return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism()); } catch (Exception e) { int errCode = 2017; String msg = "Internal error creating job configuration."; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Feb 24 03:34:37 2017 @@ -22,7 +22,6 @@ import java.io.PrintStream; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -167,7 +166,7 @@ public class TezLauncher extends Launche tezStats = new TezPigScriptStats(pc); PigStats.start(tezStats); - conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true"); + conf.set(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true"); TezJobCompiler jc = new TezJobCompiler(pc, conf); TezPlanContainer tezPlanContainer = compile(php, pc); @@ -175,10 +174,6 @@ public class TezLauncher extends Launche tezScriptState.emitInitialPlanNotification(tezPlanContainer); tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch - boolean stop_on_failure = - Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false")); - boolean stoppedOnFailure = false; - TezPlanContainerNode tezPlanContainerNode; TezOperPlan tezPlan; int processedDAGs = 0; @@ -257,18 +252,7 @@ public class TezLauncher extends Launche ((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100); } handleUnCaughtException(pc); - boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed(); - tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded); - // if stop_on_failure is enabled, we need to stop immediately when any job has failed - if (!tezDAGSucceeded) { - if (stop_on_failure) { - stoppedOnFailure = true; - break; - } else { - log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you " - + "want Pig to stop immediately on failure."); - } - } + tezPlanContainer.updatePlan(tezPlan, reporter.notifyFinishedOrFailed()); } tezStats.finish(); @@ -295,11 +279,6 @@ public class TezLauncher extends Launche } } - if (stoppedOnFailure) { - throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017, - PigException.REMOTE_ENVIRONMENT); - } - return tezStats; } @@ -423,11 +402,9 @@ public class TezLauncher extends Launche TezCompiler comp = new TezCompiler(php, pc); comp.compile(); TezPlanContainer planContainer = comp.getPlanContainer(); - // Doing a sort so that test plan printed remains same between jdk7 and jdk8 - List<OperatorKey> opKeys = new ArrayList<>(planContainer.getKeys().keySet()); - Collections.sort(opKeys); - for (OperatorKey opKey : opKeys) { - TezOperPlan tezPlan = planContainer.getOperator(opKey).getTezOperPlan(); + for (Map.Entry<OperatorKey, TezPlanContainerNode> entry : planContainer + .getKeys().entrySet()) { + TezOperPlan tezPlan = entry.getValue().getTezOperPlan(); optimize(tezPlan, pc); } return planContainer; @@ -522,7 +499,7 @@ public class TezLauncher extends Launche @Override public void killJob(String jobID, Configuration conf) throws BackendException { - if (runningJob != null && runningJob.getApplicationId().toString().equals(jobID)) { + if (runningJob != null && runningJob.getApplicationId().toString() == jobID) { try { runningJob.killJob(); } catch (Exception e) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Fri Feb 24 03:34:37 2017 @@ -39,8 +39,6 @@ import org.apache.pig.PigConfiguration; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; -import com.google.common.annotations.VisibleForTesting; - public class TezResourceManager { private static TezResourceManager instance = null; private boolean inited = false; @@ -61,7 +59,6 @@ public class TezResourceManager { /** * This method is only used by test code to reset state. */ - @VisibleForTesting public static void dropInstance() { instance = null; } @@ -69,7 +66,7 @@ public class TezResourceManager { public void init(PigContext pigContext, Configuration conf) throws IOException { if (!inited) { this.resourcesDir = FileLocalizer.getTemporaryResourcePath(pigContext); - this.remoteFs = resourcesDir.getFileSystem(conf); + this.remoteFs = FileSystem.get(conf); this.conf = conf; this.pigContext = pigContext; this.inited = true; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1784224&r1=1784223&r2=1784224&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Feb 24 03:34:37 2017 @@ -18,9 +18,7 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Calendar; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -31,11 +29,9 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.yarn.api.records.LocalResource; -import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig; import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.PigImplConstants; import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.tez.TezScriptState; import org.apache.tez.client.TezAppMasterStatus; @@ -50,13 +46,13 @@ public class TezSessionManager { private static final Log log = LogFactory.getLog(TezSessionManager.class); static { - Utils.addShutdownHookWithPriority(new Runnable() { + Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { TezSessionManager.shutdown(); } - }, PigImplConstants.SHUTDOWN_HOOK_JOB_KILL_PRIORITY); + }); } private static ReentrantReadWriteLock sessionPoolLock = new ReentrantReadWriteLock(); @@ -65,17 +61,11 @@ public class TezSessionManager { private TezSessionManager() { } - private static class SessionInfo { - - public SessionInfo(TezClient session, TezConfiguration config, Map<String, LocalResource> resources) { + public static class SessionInfo { + SessionInfo(TezClient session, Map<String, LocalResource> resources) { this.session = session; - this.config = config; this.resources = resources; } - - public TezConfiguration getConfig() { - return config; - } public Map<String, LocalResource> getResources() { return resources; } @@ -87,23 +77,20 @@ public class TezSessionManager { } private TezClient session; private Map<String, LocalResource> resources; - private TezConfiguration config; private boolean inUse = false; } private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>(); - private static SessionInfo createSession(TezConfiguration amConf, + private static SessionInfo createSession(Configuration conf, Map<String, LocalResource> requestedAMResources, Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException { - MRToTezHelper.translateMRSettingsForTezAM(amConf); + TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf); TezScriptState ss = TezScriptState.get(); ss.addDAGSettingsToConf(amConf); - if (amConf.getBoolean(PigConfiguration.PIG_TEZ_CONFIGURE_AM_MEMORY, true)) { - adjustAMConfig(amConf, tezJobConf); - } - String jobName = amConf.get(PigContext.JOB_NAME, "pig"); + adjustAMConfig(amConf, tezJobConf); + String jobName = conf.get(PigContext.JOB_NAME, "pig"); TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds); try { tezClient.start(); @@ -117,10 +104,12 @@ public class TezSessionManager { tezClient.stop(); throw new RuntimeException(e); } - return new SessionInfo(tezClient, amConf, requestedAMResources); + return new SessionInfo(tezClient, requestedAMResources); } private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) { + int requiredAMMaxHeap = -1; + int requiredAMResourceMB = -1; String amLaunchOpts = amConf.get( TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT); @@ -133,10 +122,8 @@ public class TezSessionManager { // Need more room for native memory/virtual address space // when close to 4G due to 32-bit jvm 4G limit - int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200; - int maxAMResourceMB = 4096; - int requiredAMResourceMB = maxAMResourceMB; - int requiredAMMaxHeap = maxAMHeap; + int minAMMaxHeap = 3200; + int minAMResourceMB = 4096; // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb // Increment container size by 512 mb for every additional 5K tasks. @@ -148,38 +135,22 @@ public class TezSessionManager { // 5000 and above - 1024Xmx, 1536 (512 native memory) for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) { if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) { + requiredAMMaxHeap = minAMMaxHeap; + requiredAMResourceMB = minAMResourceMB; break; } - requiredAMResourceMB = requiredAMResourceMB - 512; - requiredAMMaxHeap = requiredAMResourceMB - 512; - } - - if (tezJobConf.getTotalVertices() > 30) { - //Add 512 mb per 30 vertices - int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30); - requiredAMResourceMB = requiredAMResourceMB + additionaMem; - requiredAMMaxHeap = requiredAMResourceMB - 512; - } - - if (tezJobConf.getMaxOutputsinSingleVertex() > 10) { - //Add 256 mb per 5 outputs if a vertex has more than 10 outputs - int additionaMem = 256 * (tezJobConf.getMaxOutputsinSingleVertex() / 5); - requiredAMResourceMB = requiredAMResourceMB + additionaMem; - requiredAMMaxHeap = requiredAMResourceMB - 512; + minAMResourceMB = minAMResourceMB - 512; + minAMMaxHeap = minAMResourceMB - 512; } - requiredAMResourceMB = Math.min(maxAMResourceMB, requiredAMResourceMB); - requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap); - if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) { amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB); log.info("Increasing " + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from " + configuredAMResourceMB + " to " + requiredAMResourceMB - + " as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism() - + ", total vertices = " + tezJobConf.getTotalVertices() - + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex()); + + " as the number of total estimated tasks is " + + tezJobConf.getEstimatedTotalParallelism()); if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) { amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, @@ -187,9 +158,8 @@ public class TezSessionManager { log.info("Increasing Tez AM Heap Size from " + configuredAMMaxHeap + "M to " + requiredAMMaxHeap - + "M as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism() - + ", total vertices = " + tezJobConf.getTotalVertices() - + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex()); + + "M as the number of total estimated tasks is " + + tezJobConf.getEstimatedTotalParallelism()); log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now " + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)); } @@ -208,22 +178,7 @@ public class TezSessionManager { return true; } - private static boolean validateSessionConfig(SessionInfo currentSession, - Configuration newSessionConfig) - throws TezException, IOException { - // If DAG recovery is disabled for one and enabled for another, do not reuse - if (currentSession.getConfig().getBoolean( - TezConfiguration.DAG_RECOVERY_ENABLED, - TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT) - != newSessionConfig.getBoolean( - TezConfiguration.DAG_RECOVERY_ENABLED, - TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { - return false; - } - return true; - } - - static TezClient getClient(TezConfiguration conf, Map<String, LocalResource> requestedAMResources, + static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources, Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException { List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>(); SessionInfo newSession = null; @@ -241,8 +196,7 @@ public class TezSessionManager { sessionsToRemove.add(sessionInfo); } else if (!sessionInfo.inUse && appMasterStatus.equals(TezAppMasterStatus.READY) - && validateSessionResources(sessionInfo,requestedAMResources) - && validateSessionConfig(sessionInfo, conf)) { + && validateSessionResources(sessionInfo,requestedAMResources)) { sessionInfo.inUse = true; return sessionInfo.session; } @@ -299,11 +253,6 @@ public class TezSessionManager { synchronized (sessionInfo) { if (sessionInfo.session == session) { log.info("Stopping Tez session " + session); - String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") - .format(Calendar.getInstance().getTime()); - System.err.println(timeStamp + " Shutting down Tez session " - + ", sessionName=" + session.getClientName() - + ", applicationId=" + session.getAppMasterApplicationId()); session.stop(); sessionToRemove = sessionInfo; break; @@ -330,30 +279,19 @@ public class TezSessionManager { shutdown = true; for (SessionInfo sessionInfo : sessionPool) { synchronized (sessionInfo) { - TezClient session = sessionInfo.session; try { - String timeStamp = new SimpleDateFormat( - "yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime()); - if (session.getAppMasterStatus().equals( + if (sessionInfo.session.getAppMasterStatus().equals( TezAppMasterStatus.SHUTDOWN)) { log.info("Tez session is already shutdown " - + session); - System.err.println(timeStamp - + " Tez session is already shutdown " + session - + ", sessionName=" + session.getClientName() - + ", applicationId=" + session.getAppMasterApplicationId()); + + sessionInfo.session); continue; } - log.info("Shutting down Tez session " + session); - // Since hadoop calls org.apache.log4j.LogManager.shutdown(); - // the log.info message is not displayed with shutdown hook in Oozie - System.err.println(timeStamp + " Shutting down Tez session " - + ", sessionName=" + session.getClientName() - + ", applicationId=" + session.getAppMasterApplicationId()); - session.stop(); + log.info("Shutting down Tez session " + + sessionInfo.session); + sessionInfo.session.stop(); } catch (Exception e) { log.error("Error shutting down Tez session " - + session, e); + + sessionInfo.session, e); } } }
