Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POCast.java Wed Feb 22 09:43:41 2017 @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.pig.EvalFunc; import org.apache.pig.FuncSpec; import org.apache.pig.LoadCaster; import org.apache.pig.LoadFunc; @@ -89,6 +90,8 @@ public class POCast extends ExpressionOp caster = ((LoadFunc)obj).getLoadCaster(); } else if (obj instanceof StreamToPig) { caster = ((StreamToPig)obj).getLoadCaster(); + } else if (obj instanceof EvalFunc) { + caster = ((EvalFunc)obj).getLoadCaster(); } else { throw new IOException("Invalid class type " + funcSpec.getClassName()); @@ -165,7 +168,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBigInteger(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigInteger."; + String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -281,7 +284,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBigDecimal(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigDecimal."; + String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -396,7 +399,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBoolean(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "boolean."; + String msg = unknownByteArrayErrorMessage + "boolean for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -510,7 +513,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToInteger(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int."; + String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -636,7 +639,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToLong(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "long."; + String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -759,7 +762,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToDouble(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "double."; + String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -881,7 +884,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToFloat(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float."; + String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1007,7 +1010,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToDateTime(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "datetime."; + String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1118,7 +1121,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToCharArray(dba.get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "string."; + String msg = unknownByteArrayErrorMessage + "string for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1270,7 +1273,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToTuple(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple."; + String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1332,7 +1335,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBag(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "bag."; + String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1363,7 +1366,7 @@ public class POCast extends ExpressionOp result = caster.bytesToTuple(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple."; + String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1388,7 +1391,7 @@ public class POCast extends ExpressionOp result = caster.bytesToMap(((DataByteArray)obj).get(), fs); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "tuple."; + String msg = unknownByteArrayErrorMessage + "tuple for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } else { @@ -1402,7 +1405,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBoolean(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int."; + String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1441,7 +1444,7 @@ public class POCast extends ExpressionOp result = caster.bytesToInteger(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "int."; + String msg = unknownByteArrayErrorMessage + "int for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1487,7 +1490,7 @@ public class POCast extends ExpressionOp result = caster.bytesToDouble(((DataByteArray) obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "double."; + String msg = unknownByteArrayErrorMessage + "double for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1533,7 +1536,7 @@ public class POCast extends ExpressionOp result = caster.bytesToLong(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "long."; + String msg = unknownByteArrayErrorMessage + "long for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1579,7 +1582,7 @@ public class POCast extends ExpressionOp result = caster.bytesToFloat(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float."; + String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1625,7 +1628,7 @@ public class POCast extends ExpressionOp result = caster.bytesToDateTime(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "datetime."; + String msg = unknownByteArrayErrorMessage + "datetime for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1664,7 +1667,7 @@ public class POCast extends ExpressionOp result = caster.bytesToCharArray(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "float."; + String msg = unknownByteArrayErrorMessage + "float for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1712,7 +1715,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBigInteger(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigInteger."; + String msg = unknownByteArrayErrorMessage + "BigInteger for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1757,7 +1760,7 @@ public class POCast extends ExpressionOp result = caster.bytesToBigDecimal(((DataByteArray)obj).get()); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "BigDecimal."; + String msg = unknownByteArrayErrorMessage + "BigDecimal for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } break; @@ -1795,6 +1798,10 @@ public class POCast extends ExpressionOp default: throw new ExecException("Cannot convert "+ obj + " to " + fs, 1120, PigException.INPUT); } + case DataType.BYTEARRAY: + //no-op (PIG-4933) + result = obj; + break; default: throw new ExecException("Don't know how to convert "+ obj + " to " + fs, 1120, PigException.INPUT); } @@ -1861,7 +1868,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToBag(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "bag."; + String msg = unknownByteArrayErrorMessage + "bag for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) { @@ -1952,7 +1959,7 @@ public class POCast extends ExpressionOp res.result = caster.bytesToMap(dba.get(), fieldSchema); } else { int errCode = 1075; - String msg = unknownByteArrayErrorMessage + "map."; + String msg = unknownByteArrayErrorMessage + "map for " + this.getOriginalLocations(); throw new ExecException(msg, errCode, PigException.INPUT); } } catch (ExecException ee) {
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Wed Feb 22 09:43:41 2017 @@ -158,23 +158,19 @@ public class POProject extends Expressio illustratorMarkup(inpValue, res.result, -1); return res; } else if(columns.size() == 1) { - try { + if ( inpValue == null ) { + // the tuple is null, so a dereference should also produce a null + res.returnStatus = POStatus.STATUS_OK; + ret = null; + } else if( inpValue.size() > columns.get(0) ) { ret = inpValue.get(columns.get(0)); - } catch (IndexOutOfBoundsException ie) { + } else { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } res.returnStatus = POStatus.STATUS_OK; ret = null; - } catch (NullPointerException npe) { - // the tuple is null, so a dereference should also produce a null - // there is a slight danger here that the Tuple implementation - // may have given the exception for a different reason but if we - // don't catch it, we will die and the most common case for the - // exception would be because the tuple is null - res.returnStatus = POStatus.STATUS_OK; - ret = null; } } else if(isProjectToEnd){ ret = getRangeTuple(inpValue); @@ -215,23 +211,18 @@ public class POProject extends Expressio */ private void addColumn(ArrayList<Object> objList, Tuple inpValue, int i) throws ExecException { - try { + if( inpValue == null ) { + // the tuple is null, so a dereference should also produce a null + objList.add(null); + } else if( inpValue.size() > i ) { objList.add(inpValue.get(i)); - } catch (IndexOutOfBoundsException ie) { + } else { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + i + " which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } objList.add(null); } - catch (NullPointerException npe) { - // the tuple is null, so a dereference should also produce a null - // there is a slight danger here that the Tuple implementation - // may have given the exception for a different reason but if we - // don't catch it, we will die and the most common case for the - // exception would be because the tuple is null - objList.add(null); - } } @Override @@ -406,21 +397,17 @@ public class POProject extends Expressio Object ret; if(columns.size() == 1) { - try{ + if( inpValue == null ) { + // the tuple is null, so a dereference should also produce a null + ret = null; + } else if( inpValue.size() > columns.get(0) ) { ret = inpValue.get(columns.get(0)); - } catch (IndexOutOfBoundsException ie) { + } else { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } ret = null; - } catch (NullPointerException npe) { - // the tuple is null, so a dereference should also produce a null - // there is a slight danger here that the Tuple implementation - // may have given the exception for a different reason but if we - // don't catch it, we will die and the most common case for the - // exception would be because the tuple is null - ret = null; } } else if(isProjectToEnd) { ret = getRangeTuple(inpValue); @@ -428,21 +415,17 @@ public class POProject extends Expressio ArrayList<Object> objList = new ArrayList<Object>(columns.size()); for(int col: columns) { - try { + if( inpValue == null ) { + // the tuple is null, so a dereference should also produce a null + objList.add(null); + } else if( inpValue.size() > col ) { objList.add(inpValue.get(col)); - } catch (IndexOutOfBoundsException ie) { + } else { if(pigLogger != null) { pigLogger.warn(this,"Attempt to access field " + "which was not found in the input", PigWarning.ACCESSING_NON_EXISTENT_FIELD); } objList.add(null); - } catch (NullPointerException npe) { - // the tuple is null, so a dereference should also produce a null - // there is a slight danger here that the Tuple implementation - // may have given the exception for a different reason but if we - // don't catch it, we will die and the most common case for the - // exception would be because the tuple is null - objList.add(null); } } ret = mTupleFactory.newTuple(objList); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Wed Feb 22 09:43:41 2017 @@ -49,7 +49,7 @@ public class CombinerPackager extends Pa private Map<Integer, Integer> keyLookup; private int numBags; - + private transient boolean initialized; private transient boolean useDefaultBag; @@ -77,6 +77,15 @@ public class CombinerPackager extends Pa } } + @Override + public void attachInput(Object key, DataBag[] bags, boolean[] readOnce) + throws ExecException { + this.key = key; + this.bags = bags; + this.readOnce = readOnce; + // Bag can be read directly and need not be materialized again + } + /** * @param keyInfo the keyInfo to set */ Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Wed Feb 22 09:43:41 2017 @@ -17,7 +17,7 @@ */ /** - * + * */ package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; @@ -28,6 +28,7 @@ import java.util.Map; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.data.DataBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; @@ -48,6 +49,15 @@ public class LitePackager extends Packag private PigNullableWritable keyWritable; @Override + public void attachInput(Object key, DataBag[] bags, boolean[] readOnce) + throws ExecException { + this.key = key; + this.bags = bags; + this.readOnce = readOnce; + // Bag can be read directly and need not be materialized again + } + + @Override public boolean[] getInner() { return null; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java Wed Feb 22 09:43:41 2017 @@ -256,4 +256,9 @@ public class POCross extends PhysicalOpe data = null; } + @Override + public void reset() { + clearMemory(); + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Wed Feb 22 09:43:41 2017 @@ -97,7 +97,7 @@ public class POFRJoin extends PhysicalOp // The array of Hashtables one per replicated input. replicates[fragment] = // null fragment is the input which is fragmented and not replicated. - protected transient TupleToMapKey replicates[]; + protected transient List<Map<? extends Object, ? extends List<Tuple>>> replicates; // varaible which denotes whether we are returning tuples from the foreach // operator protected transient boolean processingPlan; @@ -234,7 +234,10 @@ public class POFRJoin extends PhysicalOp Result res = null; Result inp = null; if (!setUp) { - replicates = new TupleToMapKey[phyPlanLists.size()]; + replicates = new ArrayList<Map<? extends Object, ? extends List<Tuple>>>(phyPlanLists.size()); + for (int i = 0 ; i < phyPlanLists.size(); i++) { + replicates.add(null); + } dumTup = mTupleFactory.newTuple(1); setUpHashMap(); setUp = true; @@ -282,8 +285,7 @@ public class POFRJoin extends PhysicalOp return new Result(); } Tuple lrOutTuple = (Tuple) lrOut.result; - Tuple key = mTupleFactory.newTuple(1); - key.set(0, lrOutTuple.get(1)); + Object key = lrOutTuple.get(1); Tuple value = getValueTuple(lr, lrOutTuple); lr.detachInput(); // Configure the for each operator with the relevant bags @@ -296,7 +298,7 @@ public class POFRJoin extends PhysicalOp ce.setValue(value); continue; } - TupleToMapKey replicate = replicates[i]; + Map<? extends Object, ? extends List<Tuple>> replicate = replicates.get(i); if (replicate.get(key) == null) { if (isLeftOuterJoin) { ce.setValue(nullBag); @@ -304,7 +306,7 @@ public class POFRJoin extends PhysicalOp noMatch = true; break; } - ce.setValue(new NonSpillableDataBag(replicate.get(key).getList())); + ce.setValue(new NonSpillableDataBag(replicate.get(key))); } // If this is not LeftOuter Join and there was no match we @@ -327,27 +329,28 @@ public class POFRJoin extends PhysicalOp } } - protected static class TupleToMapKey { - private HashMap<Tuple, TuplesToSchemaTupleList> tuples; + protected static class TupleToMapKey extends HashMap<Object, ArrayList<Tuple>> { private SchemaTupleFactory tf; public TupleToMapKey(int ct, SchemaTupleFactory tf) { - tuples = new HashMap<Tuple, TuplesToSchemaTupleList>(ct); + super(ct); this.tf = tf; } - public TuplesToSchemaTupleList put(Tuple key, TuplesToSchemaTupleList val) { - if (tf != null) { - key = TuplesToSchemaTupleList.convert(key, tf); + @Override + public TuplesToSchemaTupleList put(Object key, ArrayList<Tuple> val) { + if (tf != null && key instanceof Tuple) { + key = TuplesToSchemaTupleList.convert((Tuple)key, tf); } - return tuples.put(key, val); + return (TuplesToSchemaTupleList) super.put(key, val); } - public TuplesToSchemaTupleList get(Tuple key) { - if (tf != null) { - key = TuplesToSchemaTupleList.convert(key, tf); + @Override + public TuplesToSchemaTupleList get(Object key) { + if (tf != null && key instanceof Tuple) { + key = TuplesToSchemaTupleList.convert((Tuple)key, tf); } - return tuples.get(key); + return (TuplesToSchemaTupleList) super.get(key); } } @@ -382,7 +385,7 @@ public class POFRJoin extends PhysicalOp SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i]; if (i == fragment) { - replicates[i] = null; + replicates.set(i, null); continue; } @@ -401,25 +404,34 @@ public class POFRJoin extends PhysicalOp POLocalRearrange lr = LRs[i]; lr.setInputs(Arrays.asList((PhysicalOperator) ld)); - TupleToMapKey replicate = new TupleToMapKey(1000, keySchemaTupleFactory); + Map<Object, ArrayList<Tuple>> replicate; + if (keySchemaTupleFactory == null) { + replicate = new HashMap<Object, ArrayList<Tuple>>(1000); + } else { + replicate = new TupleToMapKey(1000, keySchemaTupleFactory); + } log.debug("Completed setup. Trying to build replication hash table"); for (Result res = lr.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = lr.getNextTuple()) { if (getReporter() != null) getReporter().progress(); Tuple tuple = (Tuple) res.result; - if (isKeyNull(tuple.get(1))) continue; - Tuple key = mTupleFactory.newTuple(1); - key.set(0, tuple.get(1)); + Object key = tuple.get(1); + if (isKeyNull(key)) continue; Tuple value = getValueTuple(lr, tuple); - if (replicate.get(key) == null) { - replicate.put(key, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory)); + ArrayList<Tuple> values = replicate.get(key); + if (values == null) { + if (inputSchemaTupleFactory == null) { + values = new ArrayList<Tuple>(1); + } else { + values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory); + } + replicate.put(key, values); } - - replicate.get(key).add(value); + values.add(value); } - replicates[i] = replicate; + replicates.set(i, replicate); } long time2 = System.currentTimeMillis(); log.debug("Hash Table built. Time taken: " + (time2 - time1)); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoinSpark.java Wed Feb 22 09:43:41 2017 @@ -51,7 +51,7 @@ public class POFRJoinSpark extends POFRJ addSchemaToFactories(keySchemas[i], keySchemaTupleFactories, i); } - replicates[fragment] = null; + replicates.set(fragment, null); int i = -1; long start = System.currentTimeMillis(); for (int k = 0; k < inputSchemas.length; ++k) { @@ -61,7 +61,7 @@ public class POFRJoinSpark extends POFRJ SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[i]; if (i == fragment) { - replicates[i] = null; + replicates.set(fragment, null); continue; } @@ -91,7 +91,7 @@ public class POFRJoinSpark extends POFRJ replicate.get(key).add(value); } - replicates[i] = replicate; + replicates.set(i, replicate); } long end = System.currentTimeMillis(); log.debug("Hash Table built. Time taken: " + (end - start)); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Wed Feb 22 09:43:41 2017 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.BitSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import org.apache.pig.PigException; import org.apache.pig.backend.executionengine.ExecException; @@ -55,6 +56,7 @@ import org.apache.pig.pen.util.LineageTr @SuppressWarnings("unchecked") public class POForEach extends PhysicalOperator { private static final long serialVersionUID = 1L; + private static final Result UNLIMITED_NULL_RESULT = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple()); protected List<PhysicalPlan> inputPlans; @@ -264,7 +266,7 @@ public class POForEach extends PhysicalO if (inp.returnStatus == POStatus.STATUS_EOP) { if (parentPlan!=null && parentPlan.endOfAllInput && !endOfAllInputProcessed && endOfAllInputProcessing) { // continue pull one more output - inp = new Result(POStatus.STATUS_OK, new UnlimitedNullTuple()); + inp = UNLIMITED_NULL_RESULT; } else { return inp; } @@ -441,6 +443,8 @@ public class POForEach extends PhysicalO if(inputData.result instanceof DataBag && isToBeFlattenedArray[i]) { its[i] = ((DataBag)bags[i]).iterator(); + } else if (inputData.result instanceof Map && isToBeFlattenedArray[i]) { + its[i] = ((Map)bags[i]).entrySet().iterator(); } else { its[i] = null; } @@ -466,7 +470,7 @@ public class POForEach extends PhysicalO //we instantiate the template array and start populating it with data data = new Object[noItems]; for(int i = 0; i < noItems; ++i) { - if(isToBeFlattenedArray[i] && bags[i] instanceof DataBag) { + if(isToBeFlattenedArray[i] && (bags[i] instanceof DataBag || bags[i] instanceof Map)) { if(its[i].hasNext()) { data[i] = its[i].next(); } else { @@ -540,6 +544,15 @@ public class POForEach extends PhysicalO out.append(t.get(j)); } } + } else if (isToBeFlattenedArray[i] && in instanceof Map.Entry) { + Map.Entry entry = (Map.Entry)in; + if (knownSize) { + out.set(idx++, entry.getKey()); + out.set(idx++, entry.getValue()); + } else { + out.append(entry.getKey()); + out.append(entry.getValue()); + } } else { if (knownSize) { out.set(idx++, in); @@ -738,9 +751,12 @@ public class POForEach extends PhysicalO opsToBeReset.add(sort); } - /* (non-Javadoc) - * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitProject(org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject) - */ + @Override + public void visitCross(POCross c) throws VisitorException { + // FIXME: add only if limit is present + opsToBeReset.add(c); + } + @Override public void visitProject(POProject proj) throws VisitorException { if(proj instanceof PORelationToExprProject) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Wed Feb 22 09:43:41 2017 @@ -56,11 +56,11 @@ import org.apache.pig.impl.plan.VisitorE import org.apache.pig.impl.util.MultiMap; import org.apache.pig.newplan.logical.relational.LOJoin; -/** This operator implements merge join algorithm to do map side joins. +/** This operator implements merge join algorithm to do map side joins. * Currently, only two-way joins are supported. One input of join is identified as left * and other is identified as right. Left input tuples are the input records in map. * Right tuples are read from HDFS by opening right stream. - * + * * This join doesn't support outer join. * Data is assumed to be sorted in ascending order. It will fail if data is sorted in descending order. */ @@ -99,7 +99,7 @@ public class POMergeJoin extends Physica private FuncSpec rightLoaderFuncSpec; private String rightInputFileName; - + private String indexFile; // Buffer to hold accumulated left tuples. @@ -249,12 +249,11 @@ public class POMergeJoin extends Physica * from Tuple to SchemaTuple. This is necessary because we are not getting SchemaTuples * from the source, though in the future that is what we would like to do. */ - public static class TuplesToSchemaTupleList { - private List<Tuple> tuples; + public static class TuplesToSchemaTupleList extends ArrayList<Tuple> { private SchemaTupleFactory tf; public TuplesToSchemaTupleList(int ct, TupleMaker<?> tf) { - tuples = new ArrayList<Tuple>(ct); + super(ct); if (tf instanceof SchemaTupleFactory) { this.tf = (SchemaTupleFactory)tf; } @@ -273,24 +272,24 @@ public class POMergeJoin extends Physica } } + @Override public boolean add(Tuple t) { if (tf != null) { t = convert(t, tf); } - return tuples.add(t); + return super.add(t); } + @Override public Tuple get(int i) { - return tuples.get(i); + return super.get(i); } + @Override public int size() { - return tuples.size(); + return super.size(); } - public List<Tuple> getList() { - return tuples; - } } @SuppressWarnings("unchecked") @@ -357,7 +356,7 @@ public class POMergeJoin extends Physica } else{ Object rightKey = extractKeysFromTuple(rightInp, 1); - if(null == rightKey) // If we see tuple having null keys in stream, we drop them + if(null == rightKey) // If we see tuple having null keys in stream, we drop them continue; // and fetch next tuple. int cmpval = ((Comparable)rightKey).compareTo(curJoinKey); @@ -399,7 +398,7 @@ public class POMergeJoin extends Physica "Last two tuples encountered were: \n"+ curJoiningRightTup+ "\n" + (Tuple)rightInp.result ; throw new ExecException(errMsg,errCode); - } + } } } } @@ -430,17 +429,17 @@ public class POMergeJoin extends Physica prevLeftKey+ "\n" + curLeftKey ; throw new ExecException(errMsg,errCode); } - + case POStatus.STATUS_EOP: if(this.parentPlan.endOfAllInput || isEndOfInput()){ - // We hit the end on left input. + // We hit the end on left input. // Tuples in bag may still possibly join with right side. curJoinKey = prevLeftKey; curLeftKey = null; if (isEndOfInput()) { leftInputConsumedInSpark = true; } - break; + break; } else // Fetch next left input. return curLeftInp; @@ -465,7 +464,7 @@ public class POMergeJoin extends Physica // Accumulated tuples with same key on left side. // But since we are reading ahead we still haven't checked the read ahead right tuple. // Accumulated left tuples may potentially join with that. So, lets check that first. - + if((null != prevRightKey) && prevRightKey.equals(prevLeftKey)){ curJoiningRightTup = (Tuple)prevRightInp.result; @@ -487,17 +486,17 @@ public class POMergeJoin extends Physica slidingToNextRecord = false; } else rightInp = getNextRightInp(prevLeftKey); - + if(rightInp.returnStatus != POStatus.STATUS_OK) return rightInp; Object extractedRightKey = extractKeysFromTuple(rightInp, 1); - - if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them + + if(null == extractedRightKey) // If we see tuple having null keys in stream, we drop them continue; // and fetch next tuple. - + Comparable rightKey = (Comparable)extractedRightKey; - + if( prevRightKey != null && rightKey.compareTo(prevRightKey) < 0){ // Sanity check. int errCode = 1102; @@ -528,7 +527,7 @@ public class POMergeJoin extends Physica else{ // We got ahead on right side. Store currently read right tuple. prevRightKey = rightKey; prevRightInp = rightInp; - // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. + // Since we didn't find any matching right tuple we throw away the buffered left tuples and add the one read in this function call. leftTuples = newLeftTupleArray(); leftTuples.add((Tuple)curLeftInp.result); prevLeftInp = curLeftInp; @@ -555,7 +554,7 @@ public class POMergeJoin extends Physica DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader; loader.setIndexFile(indexFile); } - + // Pass signature of the loader to rightLoader // make a copy of the conf to use in calls to rightLoader. rightLoader.setUDFContextSignature(signature); @@ -608,11 +607,11 @@ public class POMergeJoin extends Physica // run the tuple through the pipeline rightPipelineRoot.attachInput(t); return this.getNextRightInp(); - + } default: // We don't deal with ERR/NULL. just pass them down throwProcessingException(false, null); - + } } } catch (IOException e) { @@ -643,8 +642,8 @@ public class POMergeJoin extends Physica int errCode = 2167; String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly"; throw new ExecException(errMsg,errCode,PigException.BUG); - } - + } + return ((Tuple) lrOut.result).get(1); } @@ -660,7 +659,7 @@ public class POMergeJoin extends Physica noInnerPlanOnRightSide = false; this.rightPipelineLeaf = rightPipeline.getLeaves().get(0); this.rightPipelineRoot = rightPipeline.getRoots().get(0); - this.rightPipelineRoot.setInputs(null); + this.rightPipelineRoot.setInputs(null); } else noInnerPlanOnRightSide = true; @@ -711,18 +710,18 @@ public class POMergeJoin extends Physica public boolean supportsMultipleOutputs() { return false; } - + /** * @param rightInputFileName the rightInputFileName to set */ public void setRightInputFileName(String rightInputFileName) { this.rightInputFileName = rightInputFileName; } - + public String getSignature() { return signature; } - + public void setSignature(String signature) { this.signature = signature; } @@ -734,12 +733,12 @@ public class POMergeJoin extends Physica public String getIndexFile() { return indexFile; } - + @Override public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { return null; } - + public LOJoin.JOINTYPE getJoinType() { return joinType; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Wed Feb 22 09:43:41 2017 @@ -44,6 +44,9 @@ public class POPoissonSample extends Phy private transient boolean initialized; + // num of rows skipped so far + private transient int numSkipped; + // num of rows sampled so far private transient int numRowsSampled; @@ -89,6 +92,7 @@ public class POPoissonSample extends Phy @Override public Result getNextTuple() throws ExecException { if (!initialized) { + numSkipped = 0; numRowsSampled = 0; avgTupleMemSz = 0; rowNum = 0; @@ -134,7 +138,7 @@ public class POPoissonSample extends Phy } // skip tuples - for (long numSkipped = 0; numSkipped < skipInterval; numSkipped++) { + while (numSkipped < skipInterval) { res = processInput(); if (res.returnStatus == POStatus.STATUS_NULL) { continue; @@ -148,6 +152,7 @@ public class POPoissonSample extends Phy return res; } rowNum++; + numSkipped++; } // skipped enough, get new sample @@ -173,6 +178,8 @@ public class POPoissonSample extends Phy rowNum++; newSample = res; + // reset skipped + numSkipped = 0; return currentSample; } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Wed Feb 22 09:43:41 2017 @@ -125,7 +125,7 @@ public class POReservoirSample extends P } // collect samples until input is exhausted - int rand = randGen.nextInt(rowProcessed); + int rand = randGen.nextInt(rowProcessed + 1); if (rand < numSamples) { samples[rand] = res; } @@ -133,8 +133,13 @@ public class POReservoirSample extends P } } - if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) { - sampleCollectionDone = true; + if (res.returnStatus == POStatus.STATUS_EOP) { + if (this.parentPlan.endOfAllInput) { + sampleCollectionDone = true; + } else { + // In case of Split can get EOP in between. + return res; + } } return getSample(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Wed Feb 22 09:43:41 2017 @@ -51,13 +51,13 @@ public class Packager implements Illustr protected DataBag[] bags; public static enum PackageType { - GROUP, JOIN + GROUP, JOIN, BLOOMJOIN }; protected transient Illustrator illustrator = null; // The key being worked on - Object key; + protected Object key; // marker to indicate if key is a tuple protected boolean isKeyTuple = false; @@ -65,7 +65,7 @@ public class Packager implements Illustr protected boolean isKeyCompound = false; // key's type - byte keyType; + protected byte keyType; // The number of inputs to this // co-group. 0 indicates a distinct, which means there will only be a Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/StoreFuncDecorator.java Wed Feb 22 09:43:41 2017 @@ -60,7 +60,7 @@ public class StoreFuncDecorator { private boolean allowErrors() { return UDFContext.getUDFContext().getJobConf() - .getBoolean(PigConfiguration.PIG_ALLOW_STORE_ERRORS, false); + .getBoolean(PigConfiguration.PIG_ERROR_HANDLING_ENABLED, false); } /** Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Wed Feb 22 09:43:41 2017 @@ -162,8 +162,13 @@ public class LoadConverter implements RD private SparkEngineConf sparkEngineConf; private boolean initialized; + //LoadConverter#ToTupleFunction is executed more than once in multiquery case causing + //invalid number of input records, 'skip' flag below indicates first load is finished. + private boolean skip; + public ToTupleFunction(SparkEngineConf sparkEngineConf){ this.sparkEngineConf = sparkEngineConf; + } @Override @@ -172,9 +177,14 @@ public class LoadConverter implements RD long partitionId = TaskContext.get().partitionId(); PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, Long.toString(partitionId)); + //We're in POSplit and already counted all input records, + //in a multiquery case skip will be set to true after the first load is finished: + if (sparkCounters != null && SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName, counterName).getValue() > 0) { + skip=true; + } initialized = true; } - if (sparkCounters != null && disableCounter == false) { + if (sparkCounters != null && disableCounter == false && skip == false) { sparkCounters.increment(counterGroupName, counterName, 1L); } return v1._2(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Feb 22 09:43:41 2017 @@ -19,13 +19,14 @@ package org.apache.pig.backend.hadoop.executionengine.tez; import java.io.IOException; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.TreeMap; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; @@ -56,6 +58,7 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.HDataType; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator; @@ -87,7 +90,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; -import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; @@ -108,7 +110,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.PigImplConstants; -import org.apache.pig.impl.builtin.DefaultIndexableLoader; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.NullablePartitionWritable; import org.apache.pig.impl.io.NullableTuple; @@ -174,6 +175,7 @@ public class TezDagBuilder extends TezOp private PigContext pc; private Configuration globalConf; private Configuration pigContextConf; + private Configuration shuffleVertexManagerBaseConf; private FileSystem fs; private long intermediateTaskInputSize; private Set<String> inputSplitInDiskVertices; @@ -191,6 +193,8 @@ public class TezDagBuilder extends TezOp private String mapTaskLaunchCmdOpts; private String reduceTaskLaunchCmdOpts; + private boolean disableDAGRecovery = false; + public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag, Map<String, LocalResource> localResources) { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); @@ -210,6 +214,10 @@ public class TezDagBuilder extends TezOp } } + public boolean shouldDisableDAGRecovery() { + return disableDAGRecovery; + } + private void initialize(PigContext pc) throws IOException { this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true); @@ -217,6 +225,16 @@ public class TezDagBuilder extends TezOp this.pigContextConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false); MRToTezHelper.processMRSettings(pigContextConf, globalConf); + shuffleVertexManagerBaseConf = new Configuration(false); + // Only copy tez.shuffle-vertex-manager config to keep payload size small + Iterator<Entry<String, String>> iter = pigContextConf.iterator(); + while (iter.hasNext()) { + Entry<String, String> entry = iter.next(); + if (entry.getKey().startsWith("tez.shuffle-vertex-manager")) { + shuffleVertexManagerBaseConf.set(entry.getKey(), entry.getValue()); + } + } + // Add credentials from binary token file and get tokens for namenodes // specified in mapreduce.job.hdfs-servers SecurityHelper.populateTokenCache(globalConf, dag.getCredentials()); @@ -265,7 +283,7 @@ public class TezDagBuilder extends TezOp if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) { // If tez setting is not defined MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true); - MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, true); + MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, false); } if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) { @@ -279,7 +297,7 @@ public class TezDagBuilder extends TezOp try { fs = FileSystem.get(globalConf); - intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, FileLocalizer.getTemporaryResourcePath(pc)); + intermediateTaskInputSize = fs.getDefaultBlockSize(FileLocalizer.getTemporaryResourcePath(pc)); } catch (Exception e) { log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e); intermediateTaskInputSize = 134217728L; @@ -397,7 +415,11 @@ public class TezDagBuilder extends TezOp tezOp.getVertexGroupInfo().setVertexGroup(vertexGroup); POStore store = tezOp.getVertexGroupInfo().getStore(); if (store != null) { - vertexGroup.addDataSink(store.getOperatorKey().toString(), + String outputKey = store.getOperatorKey().toString(); + if (store instanceof POStoreTez) { + outputKey = ((POStoreTez) store).getOutputKey(); + } + vertexGroup.addDataSink(outputKey, DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(), OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), dag.getCredentials())); } @@ -441,7 +463,14 @@ public class TezDagBuilder extends TezOp Configuration conf = new Configuration(pigContextConf); - if (!combinePlan.isEmpty()) { + if (edge.needsDistinctCombiner()) { + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, + MRCombiner.class.getName()); + conf.set(MRJobConfig.COMBINE_CLASS_ATTR, + DistinctCombiner.Combine.class.getName()); + log.info("Setting distinct combiner class between " + + from.getOperatorKey() + " and " + to.getOperatorKey()); + } else if (!combinePlan.isEmpty()) { udfContextSeparator.serializeUDFContextForEdge(conf, from, to, UDFType.USERFUNC); addCombiner(combinePlan, to, conf, isMergedInput); } @@ -450,7 +479,7 @@ public class TezDagBuilder extends TezOp POLocalRearrangeTez.class); for (POLocalRearrangeTez lr : lrs) { - if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { + if (lr.containsOutputKey(to.getOperatorKey().toString())) { byte keyType = lr.getKeyType(); setIntermediateOutputKeyValue(keyType, conf, to, lr.isConnectedToPackage(), isMergedInput); // In case of secondary key sort, main key type is the actual key type @@ -479,7 +508,8 @@ public class TezDagBuilder extends TezOp conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true); conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true); - conf.set("pig.pigContext", serializedPigContext); + conf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal()); + conf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties())); conf.set("udf.import.list", serializedUDFImportList); if(to.isGlobalSort() || to.isLimitAfterSort()){ @@ -510,26 +540,36 @@ public class TezDagBuilder extends TezOp UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf); out.setUserPayload(payLoad); + in.setUserPayload(payLoad); + // Remove combiner and reset payload if (!combinePlan.isEmpty()) { boolean noCombineInReducer = false; + boolean noCombineInMapper = edge.getCombinerInMap() == null ? false : !edge.getCombinerInMap(); String reducerNoCombiner = globalConf.get(PigConfiguration.PIG_EXEC_NO_COMBINER_REDUCER); - if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) { + if (edge.getCombinerInReducer() != null) { + noCombineInReducer = !edge.getCombinerInReducer(); + } else if (reducerNoCombiner == null || reducerNoCombiner.equals("auto")) { noCombineInReducer = TezCompilerUtil.bagDataTypeInCombinePlan(combinePlan); } else { noCombineInReducer = Boolean.parseBoolean(reducerNoCombiner); } - if (noCombineInReducer) { + if (noCombineInReducer || noCombineInMapper) { log.info("Turning off combiner in reducer vertex " + to.getOperatorKey() + " for edge from " + from.getOperatorKey()); conf.unset(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS); conf.unset(MRJobConfig.COMBINE_CLASS_ATTR); conf.unset("pig.combinePlan"); conf.unset("pig.combine.package"); conf.unset("pig.map.keytype"); - payLoad = TezUtils.createUserPayloadFromConf(conf); + UserPayload payLoadWithoutCombiner = TezUtils.createUserPayloadFromConf(conf); + if (noCombineInMapper) { + out.setUserPayload(payLoadWithoutCombiner); + } + if (noCombineInReducer) { + in.setUserPayload(payLoadWithoutCombiner); + } } } - in.setUserPayload(payLoad); if (edge.dataMovementType!=DataMovementType.BROADCAST && to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && (to.isGlobalSort()||to.isSkewedJoin())) { // Use custom edge @@ -593,6 +633,8 @@ public class TezDagBuilder extends TezOp setOutputFormat(job); payloadConf.set("udf.import.list", serializedUDFImportList); payloadConf.set("exectype", "TEZ"); + payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL, pc.getExecType().isLocal()); + payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES, ObjectSerializer.serialize(pc.getLog4jProperties())); // Process stores LinkedList<POStore> stores = processStores(tezOp, payloadConf, job); @@ -611,11 +653,7 @@ public class TezDagBuilder extends TezOp payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists())); payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits())); inputPayLoad = new Configuration(payloadConf); - if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() instanceof DefaultIndexableLoader) { - inputPayLoad.set("pig.pigContext", serializedPigContext); - } } - payloadConf.set("pig.pigContext", serializedPigContext); if (tezOp.getSampleOperator() != null) { payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.getSampleOperator().getOperatorKey().toString()); @@ -689,7 +727,7 @@ public class TezDagBuilder extends TezOp PlanHelper.getPhysicalOperators(pred.plan, POLocalRearrangeTez.class); for (POLocalRearrangeTez lr : lrs) { if (lr.isConnectedToPackage() - && lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) { + && lr.containsOutputKey(tezOp.getOperatorKey().toString())) { localRearrangeMap.put((int) lr.getIndex(), inputKey); if (isVertexGroup) { isMergedInput = true; @@ -772,9 +810,25 @@ public class TezDagBuilder extends TezOp String vmPluginName = null; Configuration vmPluginConf = null; + boolean containScatterGather = false; + boolean containCustomPartitioner = false; + for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { + if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { + containScatterGather = true; + } + if (edge.partitionerClass != null) { + containCustomPartitioner = true; + } + } + + if(containScatterGather) { + vmPluginName = ShuffleVertexManager.class.getName(); + vmPluginConf = new Configuration(shuffleVertexManagerBaseConf); + } // Set the right VertexManagerPlugin if (tezOp.getEstimatedParallelism() != -1) { + boolean autoParallelism = false; if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) { if (tezOp.getVertexParallelism()==-1 && ( tezOp.isGlobalSort() &&getPlan().getPredecessors(tezOp).size()==1|| @@ -783,33 +837,12 @@ public class TezDagBuilder extends TezOp // to decrease/increase parallelism of sorting vertex dynamically // based on the numQuantiles calculated by sample aggregation vertex vmPluginName = PartitionerDefinedVertexManager.class.getName(); + autoParallelism = true; log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString()); } } else { - boolean containScatterGather = false; - boolean containCustomPartitioner = false; - for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { - if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { - containScatterGather = true; - } - if (edge.partitionerClass!=null) { - containCustomPartitioner = true; - } - } if (containScatterGather && !containCustomPartitioner) { - vmPluginConf = (vmPluginConf == null) ? new Configuration(pigContextConf) : vmPluginConf; - // Use auto-parallelism feature of ShuffleVertexManager to dynamically - // reduce the parallelism of the vertex - if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true) - && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) { - vmPluginName = PigGraceShuffleVertexManager.class.getName(); - tezOp.setUseGraceParallelism(true); - vmPluginConf.set("pig.tez.plan", getSerializedTezPlan()); - vmPluginConf.set("pig.pigContext", serializedPigContext); - } else { - vmPluginName = ShuffleVertexManager.class.getName(); - } - vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + // For Intermediate reduce, set the bytes per reducer to be block size. long bytesPerReducer = intermediateTaskInputSize; // If there are store statements, use BYTES_PER_REDUCER_PARAM configured by user. @@ -818,8 +851,8 @@ public class TezDagBuilder extends TezOp // In Tez, numReducers=(map output size/bytesPerReducer) we need lower values to avoid skews in reduce // as map input sizes are mostly always high compared to map output. if (stores.size() > 0) { - if (vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) { - bytesPerReducer = vmPluginConf.getLong( + if (pigContextConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) { + bytesPerReducer = pigContextConf.getLong( InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER); } else if (tezOp.isGroupBy()) { @@ -828,10 +861,28 @@ public class TezDagBuilder extends TezOp bytesPerReducer = SHUFFLE_BYTES_PER_REDUCER_DEFAULT; } } + + // Use auto-parallelism feature of ShuffleVertexManager to dynamically + // reduce the parallelism of the vertex. Use PigGraceShuffleVertexManager + // instead of ShuffleVertexManager if pig.tez.grace.parallelism is turned on + if (payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true) + && !TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty() + && tezOp.getCrossKeys() == null) { + vmPluginName = PigGraceShuffleVertexManager.class.getName(); + tezOp.setUseGraceParallelism(true); + vmPluginConf.set("pig.tez.plan", getSerializedTezPlan()); + vmPluginConf.set(PigImplConstants.PIG_CONTEXT, serializedPigContext); + vmPluginConf.setLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, bytesPerReducer); + } + vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, bytesPerReducer); + autoParallelism = true; log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString()); } } + if (globalConf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM_DISABLE_DAG_RECOVERY, false) && autoParallelism) { + disableDAGRecovery = true; + } } if (tezOp.isLimit() && (vmPluginName == null || vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())|| vmPluginName.equals(ShuffleVertexManager.class.getName()))) { @@ -1409,22 +1460,12 @@ public class TezDagBuilder extends TezOp private void setOutputFormat(org.apache.hadoop.mapreduce.Job job) { // the OutputFormat we report to Hadoop is always PigOutputFormat which - // can be wrapped with LazyOutputFormat provided if it is supported by - // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set + // can be wrapped with LazyOutputFormat provided if PigConfiguration.PIG_OUTPUT_LAZY is set if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) { - try { - Class<?> clazz = PigContext - .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat"); - Method method = clazz.getMethod("setOutputFormatClass", - org.apache.hadoop.mapreduce.Job.class, Class.class); - method.invoke(null, job, PigOutputFormatTez.class); - } catch (Exception e) { - job.setOutputFormatClass(PigOutputFormatTez.class); - log.warn(PigConfiguration.PIG_OUTPUT_LAZY - + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used"); - } + LazyOutputFormat.setOutputFormatClass(job,PigOutputFormatTez.class); } else { job.setOutputFormatClass(PigOutputFormatTez.class); } } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Wed Feb 22 09:43:41 2017 @@ -30,6 +30,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; +import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.UDFContext; import org.apache.pig.tools.pigstats.tez.TezPigScriptStats; import org.apache.tez.client.TezClient; @@ -51,7 +56,7 @@ import com.google.common.collect.Maps; */ public class TezJob implements Runnable { private static final Log log = LogFactory.getLog(TezJob.class); - private Configuration conf; + private TezConfiguration conf; private EnumSet<StatusGetOpts> statusGetOpts; private Map<String, LocalResource> requestAMResources; private ApplicationId appId; @@ -69,31 +74,71 @@ public class TezJob implements Runnable public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources, - int estimatedTotalParallelism) throws IOException { + TezOperPlan tezPlan) throws IOException { this.conf = conf; this.dag = dag; this.requestAMResources = requestAMResources; this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true); this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - tezJobConf = new TezJobConfig(estimatedTotalParallelism); + tezJobConf = new TezJobConfig(tezPlan); } static class TezJobConfig { private int estimatedTotalParallelism = -1; + private int maxOutputsinSingleVertex; + private int totalVertices = 0; - public TezJobConfig(int estimatedTotalParallelism) { - this.estimatedTotalParallelism = estimatedTotalParallelism; + public TezJobConfig(TezOperPlan tezPlan) throws VisitorException { + this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism(); + MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan); + finder.visit(); + this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex(); + this.totalVertices = finder.getTotalVertices(); } public int getEstimatedTotalParallelism() { return estimatedTotalParallelism; } - public void setEstimatedTotalParallelism(int estimatedTotalParallelism) { - this.estimatedTotalParallelism = estimatedTotalParallelism; + public int getMaxOutputsinSingleVertex() { + return maxOutputsinSingleVertex; } + public int getTotalVertices() { + return totalVertices; + } + + } + + private static class MaxOutputsFinder extends TezOpPlanVisitor { + + private int maxOutputsinSingleVertex = 1; + private int totalVertices = 0; + + public MaxOutputsFinder(TezOperPlan plan) { + super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + } + + public int getMaxOutputsinSingleVertex() { + return maxOutputsinSingleVertex; + } + + public int getTotalVertices() { + return totalVertices; + } + + @Override + public void visitTezOp(TezOperator tezOperator) throws VisitorException { + if (!tezOperator.isVertexGroup()) { + totalVertices++; + int outputs = tezOperator.outEdges.keySet().size(); + maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs; + } + } + + + } public DAG getDAG() { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1783988&r1=1783987&r2=1783988&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Wed Feb 22 09:43:41 2017 @@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.net.URI; import java.util.HashMap; import java.util.Map; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.pig.PigException; +import org.apache.pig.backend.hadoop.PigATSClient; import org.apache.pig.backend.hadoop.executionengine.JobCreationException; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer; @@ -50,11 +52,12 @@ public class TezJobCompiler { private static final Log log = LogFactory.getLog(TezJobCompiler.class); private PigContext pigContext; - private TezConfiguration tezConf; + private Configuration conf; + private boolean disableDAGRecovery; public TezJobCompiler(PigContext pigContext, Configuration conf) throws IOException { this.pigContext = pigContext; - this.tezConf = new TezConfiguration(conf); + this.conf = conf; } public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, LocalResource> localResources) @@ -64,6 +67,7 @@ public class TezJobCompiler { TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlanNode.getTezOperPlan(), tezDag, localResources); dagBuilder.visit(); dagBuilder.avoidContainerReuseIfInputSplitInDisk(); + disableDAGRecovery = dagBuilder.shouldDisableDAGRecovery(); return tezDag; } @@ -85,6 +89,7 @@ public class TezJobCompiler { return job; } + @SuppressWarnings({ "rawtypes", "unchecked" }) private TezJob getJob(TezPlanContainerNode tezPlanNode, TezPlanContainer planContainer) throws JobCreationException { try { @@ -107,8 +112,34 @@ public class TezJobCompiler { } DAG tezDag = buildDAG(tezPlanNode, localResources); tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript())); + // set Tez caller context + // Reflection for the following code since it is only available since tez 0.8.1: + // CallerContext context = CallerContext.create(ATSService.CallerContext, ATSService.getPigAuditId(pigContext), + // ATSService.EntityType, ""); + // tezDag.setCallerContext(context); + Class callerContextClass = null; + try { + callerContextClass = Class.forName("org.apache.tez.client.CallerContext"); + } catch (ClassNotFoundException e) { + // If pre-Tez 0.8.1, skip setting CallerContext + } + if (callerContextClass != null) { + Method builderBuildMethod = callerContextClass.getMethod("create", String.class, + String.class, String.class, String.class); + Object context = builderBuildMethod.invoke(null, PigATSClient.CALLER_CONTEXT, + PigATSClient.getPigAuditId(pigContext), PigATSClient.ENTITY_TYPE, ""); + Method dagSetCallerContext = tezDag.getClass().getMethod("setCallerContext", + context.getClass()); + dagSetCallerContext.invoke(tezDag, context); + } log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism()); - return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism()); + TezConfiguration tezConf = new TezConfiguration(conf); + if (disableDAGRecovery + && tezConf.getBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, + TezConfiguration.DAG_RECOVERY_ENABLED_DEFAULT)) { + tezConf.setBoolean(TezConfiguration.DAG_RECOVERY_ENABLED, false); + } + return new TezJob(tezConf, tezDag, localResources, tezPlan); } catch (Exception e) { int errCode = 2017; String msg = "Internal error creating job configuration.";
