Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POPartitionRearrangeTez.java Fri Mar 4 18:17:39 2016 @@ -33,11 +33,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor; -import org.apache.pig.data.BagFactory; import org.apache.pig.data.DataBag; import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.io.NullablePartitionWritable; import org.apache.pig.impl.io.NullableTuple; @@ -57,13 +55,11 @@ public class POPartitionRearrangeTez ext private static final long serialVersionUID = 1L; private static final Log LOG = LogFactory.getLog(POPartitionRearrangeTez.class); - private static final TupleFactory tf = TupleFactory.getInstance(); - private static final BagFactory mBagFactory = BagFactory.getInstance(); // ReducerMap will store the tuple, max reducer index & min reducer index - private Map<Object, Pair<Integer, Integer>> reducerMap = Maps.newHashMap(); - private Integer totalReducers = -1; - private boolean inited = false; + private transient Map<Object, Pair<Integer, Integer>> reducerMap; + private transient Integer totalReducers; + private transient boolean inited; public POPartitionRearrangeTez(OperatorKey k) { this(k, -1); @@ -201,6 +197,8 @@ public class POPartitionRearrangeTez ext } Map<String, Object> distMap = null; + totalReducers = -1; + reducerMap = Maps.newHashMap(); if (PigProcessor.sampleMap != null) { // We've already collected sampleMap in PigProcessor distMap = PigProcessor.sampleMap; @@ -232,7 +230,7 @@ public class POPartitionRearrangeTez ext if (idxTuple.size() > 3) { // remove the last 2 fields of the tuple, i.e: minIndex // and maxIndex and store it in the reducer map - Tuple keyTuple = tf.newTuple(); + Tuple keyTuple = mTupleFactory.newTuple(); for (int i=0; i < idxTuple.size() - 2; i++) { keyTuple.append(idxTuple.get(i)); } @@ -255,4 +253,9 @@ public class POPartitionRearrangeTez ext cache.cache(reducerMapCacheKey, reducerMap); inited = true; } + + @Override + public POPartitionRearrangeTez clone() throws CloneNotSupportedException { + return (POPartitionRearrangeTez) super.clone(); + } }
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java Fri Mar 4 18:17:39 2016 @@ -50,6 +50,7 @@ public class PORankTez extends PORank im private transient KeyValueReader reader; private transient Map<Integer, Long> counterOffsets; private transient Configuration conf; + private transient boolean finished = false; public PORankTez(PORank copy) { super(copy); @@ -133,6 +134,9 @@ public class PORankTez extends PORank im @Override public Result getNextTuple() throws ExecException { + if (finished) { + return RESULT_EOP; + } Result inp = null; try { @@ -150,6 +154,7 @@ public class PORankTez extends PORank im if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) { this.parentPlan.endOfAllInput = true; } + finished = true; return RESULT_EOP; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Fri Mar 4 18:17:39 2016 @@ -50,15 +50,15 @@ public class POShuffleTezLoad extends PO private static final long serialVersionUID = 1L; protected List<String> inputKeys = new ArrayList<String>(); - protected List<LogicalInput> inputs = new ArrayList<LogicalInput>(); - protected List<KeyValuesReader> readers = new ArrayList<KeyValuesReader>(); - - private boolean[] finished; - private boolean[] readOnce; - - private WritableComparator comparator = null; private boolean isSkewedJoin = false; + private transient List<LogicalInput> inputs; + private transient List<KeyValuesReader> readers; + private transient int numTezInputs; + private transient boolean[] finished; + private transient boolean[] readOnce; + private transient WritableComparator comparator = null; + private transient WritableComparator groupingComparator = null; private transient Configuration conf; private transient int accumulativeBatchSize; @@ -73,7 +73,7 @@ public class POShuffleTezLoad extends PO @Override public void replaceInput(String oldInputKey, String newInputKey) { - if (inputKeys.remove(oldInputKey)) { + while (inputKeys.remove(oldInputKey)) { inputKeys.add(newInputKey); } } @@ -86,33 +86,40 @@ public class POShuffleTezLoad extends PO public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException { this.conf = conf; - comparator = (WritableComparator) ConfigUtils.getInputKeySecondaryGroupingComparator(conf); + this.inputs = new ArrayList<LogicalInput>(); + this.readers = new ArrayList<KeyValuesReader>(); + this.comparator = (WritableComparator) ConfigUtils.getIntermediateInputKeyComparator(conf); + this.groupingComparator = (WritableComparator) ConfigUtils.getInputKeySecondaryGroupingComparator(conf); + this.accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize(); + try { - for (String key : inputKeys) { - LogicalInput input = inputs.get(key); - this.inputs.add(input); - this.readers.add((KeyValuesReader)input.getReader()); + for (String inputKey : inputKeys) { + LogicalInput input = inputs.get(inputKey); + // 1) Case of self join/cogroup/cross with Split - numTezInputs < numInputs/inputKeys + // - Same TezInput will contain multiple indexes in case of join + // 2) data unioned within Split - inputKeys > numInputs/numTezInputs + // - Input key will be repeated, but index would be same within a TezInput + if (!this.inputs.contains(input)) { + this.inputs.add(input); + this.readers.add((KeyValuesReader)input.getReader()); + } } - // We need to adjust numInputs because it's possible for both - // OrderedGroupedKVInput and non-OrderedGroupedKVInput to be attached - // to the same vertex. If so, we're only interested in - // OrderedGroupedKVInputs. So we ignore the others. - this.numInputs = this.inputs.size(); + this.numInputs = this.pkgr.getKeyInfo().size(); + this.numTezInputs = this.inputs.size(); readOnce = new boolean[numInputs]; for (int i = 0; i < numInputs; i++) { readOnce[i] = false; } - finished = new boolean[numInputs]; - for (int i = 0; i < numInputs; i++) { + finished = new boolean[numTezInputs]; + for (int i = 0; i < numTezInputs; i++) { finished[i] = !readers.get(i).next(); } } catch (Exception e) { throw new ExecException(e); } - accumulativeBatchSize = AccumulatorOptimizerUtil.getAccumulativeBatchSize(); } @Override @@ -128,17 +135,27 @@ public class POShuffleTezLoad extends PO boolean hasData = false; Object cur = null; PigNullableWritable min = null; - int minIndex = -1; try { - for (int i = 0; i < numInputs; i++) { - if (!finished[i]) { + if (numTezInputs == 1) { + if (!finished[0]) { hasData = true; - cur = readers.get(i).getCurrentKey(); - if (min == null || comparator.compare(min, cur) > 0) { - //Not a deep clone. Writable is referenced. - min = ((PigNullableWritable)cur).clone(); - minIndex = i; + cur = readers.get(0).getCurrentKey(); + // Just move to the next key without comparison + min = ((PigNullableWritable)cur).clone(); + } + } else { + for (int i = 0; i < numTezInputs; i++) { + if (!finished[i]) { + hasData = true; + cur = readers.get(i).getCurrentKey(); + // TODO: PIG-4652 should compare key bytes instead + // of deserialized objects when using BytesComparator + // for faster comparison + if (min == null || comparator.compare(min, cur) > 0) { + //Not a deep clone. Writable is referenced. + min = ((PigNullableWritable)cur).clone(); + } } } } @@ -153,7 +170,7 @@ public class POShuffleTezLoad extends PO if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) { this.parentPlan.endOfAllInput = true; } - return new Result(POStatus.STATUS_EOP, null); + return RESULT_EOP; } key = pkgr.getKey(min); @@ -164,7 +181,6 @@ public class POShuffleTezLoad extends PO if (isAccumulative()) { buffer.setCurrentKey(min); - buffer.setCurrentKeyIndex(minIndex); for (int i = 0; i < numInputs; i++) { bags[i] = new AccumulativeBag(buffer, i); } @@ -172,34 +188,45 @@ public class POShuffleTezLoad extends PO } else { for (int i = 0; i < numInputs; i++) { + bags[i] = new InternalCachedBag(numInputs); + } - DataBag bag = null; - - if (!finished[i]) { - cur = readers.get(i).getCurrentKey(); - // We need to loop in case of Grouping Comparators - while (comparator.compare(min, cur) == 0 - && (!min.isNull() || (min.isNull() && i == minIndex))) { - Iterable<Object> vals = readers.get(i).getCurrentValues(); - bag = bags[i] == null ? new InternalCachedBag(numInputs) : bags[i]; - for (Object val : vals) { - NullableTuple nTup = (NullableTuple) val; - int index = nTup.getIndex(); - Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); - bag.add(tup); - } - bags[i] = bag; - finished[i] = !readers.get(i).next(); - if (finished[i]) { - break; - } + if (numTezInputs == 1) { + do { + Iterable<Object> vals = readers.get(0).getCurrentValues(); + for (Object val : vals) { + NullableTuple nTup = (NullableTuple) val; + int index = nTup.getIndex(); + Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); + bags[index].add(tup); + } + finished[0] = !readers.get(0).next(); + if (finished[0]) { + break; + } + cur = readers.get(0).getCurrentKey(); + } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators + } else { + for (int i = 0; i < numTezInputs; i++) { + if (!finished[i]) { cur = readers.get(i).getCurrentKey(); + // We need to loop in case of Grouping Comparators + while (groupingComparator.compare(min, cur) == 0) { + Iterable<Object> vals = readers.get(i).getCurrentValues(); + for (Object val : vals) { + NullableTuple nTup = (NullableTuple) val; + int index = nTup.getIndex(); + Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); + bags[index].add(tup); + } + finished[i] = !readers.get(i).next(); + if (finished[i]) { + break; + } + cur = readers.get(i).getCurrentKey(); + } } } - - if (bag == null) { - bags[i] = new InternalCachedBag(numInputs); - } } } @@ -240,7 +267,6 @@ public class POShuffleTezLoad extends PO private int batchSize; private List<Tuple>[] bags; private PigNullableWritable min; - private int minIndex; private boolean clearedCurrent = true; @SuppressWarnings("unchecked") @@ -261,19 +287,14 @@ public class POShuffleTezLoad extends PO clearedCurrent = false; } - public void setCurrentKeyIndex(int curKeyIndex) { - this.minIndex = curKeyIndex; - } - @Override public boolean hasNextBatch() { Object cur = null; try { - for (int i = 0; i < numInputs; i++) { + for (int i = 0; i < numTezInputs; i++) { if (!finished[i]) { cur = readers.get(i).getCurrentKey(); - if (comparator.compare(min, cur) == 0 - && (!min.isNull() || (min.isNull() && i == minIndex))) { + if (groupingComparator.compare(min, cur) == 0) { return true; } } @@ -292,15 +313,16 @@ public class POShuffleTezLoad extends PO bags[i].clear(); } try { - for (int i = 0; i < numInputs; i++) { + for (int i = 0; i < numTezInputs; i++) { if (!finished[i]) { cur = readers.get(i).getCurrentKey(); int batchCount = 0; - while (comparator.compare(min, cur) == 0 && (!min.isNull() || - min.isNull() && i==minIndex)) { + while (groupingComparator.compare(min, cur) == 0) { Iterator<Object> iter = readers.get(i).getCurrentValues().iterator(); while (iter.hasNext() && batchCount < batchSize) { - bags[i].add(pkgr.getValueTuple(keyWritable, (NullableTuple) iter.next(), i)); + NullableTuple nTup = (NullableTuple) iter.next(); + int index = nTup.getIndex(); + bags[index].add(pkgr.getValueTuple(keyWritable, nTup, index)); batchCount++; } if (batchCount == batchSize) { @@ -333,11 +355,10 @@ public class POShuffleTezLoad extends PO // early termination of accumulator Object cur = null; try { - for (int i = 0; i < numInputs; i++) { + for (int i = 0; i < numTezInputs; i++) { if (!finished[i]) { cur = readers.get(i).getCurrentKey(); - while (comparator.compare(min, cur) == 0 && (!min.isNull() || - min.isNull() && i==minIndex)) { + while (groupingComparator.compare(min, cur) == 0) { finished[i] = !readers.get(i).next(); if (finished[i]) { break; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Fri Mar 4 18:17:39 2016 @@ -38,7 +38,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.tez.runtime.api.LogicalInput; @@ -57,7 +56,6 @@ public class POShuffledValueInputTez ext private transient boolean finished = false; private transient Iterator<KeyValueReader> readers; private transient KeyValueReader currentReader; - protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); private transient Configuration conf; public POShuffledValueInputTez(OperatorKey k) { @@ -71,7 +69,7 @@ public class POShuffledValueInputTez ext @Override public void replaceInput(String oldInputKey, String newInputKey) { - if (inputKeys.remove(oldInputKey)) { + while (inputKeys.remove(oldInputKey)) { inputKeys.add(newInputKey); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Fri Mar 4 18:17:39 2016 @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.pig.LoadFunc; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; @@ -31,28 +32,37 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezTaskConfigurable; import org.apache.pig.data.Tuple; import org.apache.pig.impl.PigImplConstants; -import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil; +import org.apache.tez.common.counters.CounterGroup; +import org.apache.tez.common.counters.TezCounter; import org.apache.tez.mapreduce.input.MRInput; import org.apache.tez.mapreduce.lib.MRReader; import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueReader; /** * POSimpleTezLoad is used on the backend to read tuples from a Tez MRInput */ -public class POSimpleTezLoad extends POLoad implements TezInput { +public class POSimpleTezLoad extends POLoad implements TezInput, TezTaskConfigurable { private static final long serialVersionUID = 1L; + private String inputKey; - private MRInput input; - private KeyValueReader reader; + + private transient ProcessorContext processorContext; + private transient MRInput input; + private transient KeyValueReader reader; private transient Configuration conf; + private transient boolean finished = false; + private transient TezCounter inputRecordCounter; - public POSimpleTezLoad(OperatorKey k, FileSpec lfile) { - super(k, lfile); + public POSimpleTezLoad(OperatorKey k, LoadFunc loader) { + super(k, loader); } @Override @@ -68,6 +78,12 @@ public class POSimpleTezLoad extends POL } @Override + public void initialize(ProcessorContext processorContext) + throws ExecException { + this.processorContext = processorContext; + } + + @Override public void addInputsToSkip(Set<String> inputsToSkip) { } @@ -92,6 +108,22 @@ public class POSimpleTezLoad extends POL } catch (IOException e) { throw new ExecException(e); } + + // Multiple inputs - other broadcast input like replicate join table, order by sample. + // We use multi input counters to just get MRInput records count. + if (inputs.size() > 1) { + CounterGroup multiInputGroup = processorContext.getCounters() + .getGroup(MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP); + if (multiInputGroup == null) { + processorContext.getCounters().addGroup( + MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP, + MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP); + } + String name = MRPigStatsUtil.getMultiInputsCounterName(super.getLFile().getFileName(), 0); + if (name != null) { + inputRecordCounter = multiInputGroup.addCounter(name, name, 0); + } + } } /** @@ -102,22 +134,28 @@ public class POSimpleTezLoad extends POL @Override public Result getNextTuple() throws ExecException { try { - Result res = new Result(); + if (finished) { + return RESULT_EOP; + } if (!reader.next()) { - res.result = null; - res.returnStatus = POStatus.STATUS_EOP; // For certain operators (such as STREAM), we could still have some work // to do even after seeing the last input. These operators set a flag that // says all input has been sent and to run the pipeline one more time. if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, "false"))) { this.parentPlan.endOfAllInput = true; } + finished = true; + return RESULT_EOP; } else { + Result res = new Result(); Tuple next = (Tuple) reader.getCurrentValue(); res.result = next; res.returnStatus = POStatus.STATUS_OK; + if (inputRecordCounter != null) { + inputRecordCounter.increment(1); + } + return res; } - return res; } catch (IOException e) { throw new ExecException(e); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Fri Mar 4 18:17:39 2016 @@ -43,10 +43,13 @@ import org.apache.tez.runtime.library.ap public class POStoreTez extends POStore implements TezOutput, TezTaskConfigurable { private static final long serialVersionUID = 1L; + + private String outputKey; + + private transient ProcessorContext processorContext; private transient MROutput output; private transient KeyValueWriter writer; - private String outputKey; - private TezCounter outputRecordCounter; + private transient TezCounter outputRecordCounter; public POStoreTez(OperatorKey k) { super(k); @@ -76,19 +79,7 @@ public class POStoreTez extends POStore @Override public void initialize(ProcessorContext processorContext) throws ExecException { - if (isMultiStore()) { - CounterGroup multiStoreGroup = processorContext.getCounters() - .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); - if (multiStoreGroup == null) { - processorContext.getCounters().addGroup( - MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, - MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); - } - String name = MRPigStatsUtil.getMultiStoreCounterName(this); - if (name != null) { - outputRecordCounter = multiStoreGroup.addCounter(name, name, 0); - } - } + this.processorContext = processorContext; } @Override @@ -110,6 +101,21 @@ public class POStoreTez extends POStore } catch (IOException e) { throw new ExecException(e); } + + // Multiple outputs - can be another store or other outputs (shuffle, broadcast) + if (outputs.size() > 1) { + CounterGroup multiStoreGroup = processorContext.getCounters() + .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); + if (multiStoreGroup == null) { + processorContext.getCounters().addGroup( + MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, + MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); + } + String name = MRPigStatsUtil.getMultiStoreCounterName(this); + if (name != null) { + outputRecordCounter = multiStoreGroup.addCounter(name, name, 0); + } + } } @Override @@ -121,9 +127,10 @@ public class POStoreTez extends POStore if (illustrator == null) { // PigOutputFormat.PigRecordWriter will call storeFunc.putNext writer.write(null, res.result); - } else + } else { illustratorMarkup(res.result, res.result, 0); - res = empty; + } + res = RESULT_EMPTY; if (outputRecordCounter != null) { outputRecordCounter.increment(1); @@ -143,4 +150,9 @@ public class POStoreTez extends POStore return res; } + @Override + public String name() { + return super.name() + (getOperatorKey().toString().equals(outputKey) ? "" : "\t->\t " +outputKey); + } + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Fri Mar 4 18:17:39 2016 @@ -33,7 +33,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.tez.runtime.api.LogicalInput; @@ -58,7 +57,6 @@ public class POValueInputTez extends Phy private transient KeyValuesReader shuffleReader; private transient boolean shuffleInput; private transient boolean hasNext; - protected static final TupleFactory mTupleFactory = TupleFactory.getInstance(); public POValueInputTez(OperatorKey k) { super(k); @@ -120,12 +118,10 @@ public class POValueInputTez extends Phy } hasNext = shuffleReader.next(); } - } else { - if (reader.next()) { - Tuple origTuple = (Tuple)reader.getCurrentValue(); - Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); - return new Result(POStatus.STATUS_OK, copy); - } + } else if (reader.next()) { + Tuple origTuple = (Tuple) reader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + return new Result(POStatus.STATUS_OK, copy); } finished = true; // For certain operators (such as STREAM), we could still have some work Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueOutputTez.java Fri Mar 4 18:17:39 2016 @@ -39,7 +39,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezTaskConfigurable; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; import org.apache.tez.runtime.api.LogicalOutput; @@ -51,8 +50,8 @@ public class POValueOutputTez extends Ph private static final long serialVersionUID = 1L; private static final Log LOG = LogFactory.getLog(POValueOutputTez.class); - private static final TupleFactory tupleFactory = TupleFactory.getInstance(); - + private boolean scalarOutput; + private transient Object scalarValue; private boolean taskIndexWithRecordIndexAsKey; // TODO Change this to outputKey and write only once // when a shared edge support is available in Tez @@ -71,6 +70,14 @@ public class POValueOutputTez extends Ph super(k); } + public boolean isScalarOutput() { + return scalarOutput; + } + + public void setScalarOutput(boolean scalarOutput) { + this.scalarOutput = scalarOutput; + } + public boolean isTaskIndexWithRecordIndexAsKey() { return taskIndexWithRecordIndexAsKey; } @@ -96,8 +103,8 @@ public class POValueOutputTez extends Ph @Override public void replaceOutput(String oldOutputKey, String newOutputKey) { - if (outputKeys.remove(oldOutputKey)) { - outputKeys.add(oldOutputKey); + while (outputKeys.remove(oldOutputKey)) { + outputKeys.add(newOutputKey); } } @@ -149,14 +156,25 @@ public class POValueOutputTez extends Ph if (inp.returnStatus == POStatus.STATUS_NULL) { continue; } + if (scalarOutput) { + if (scalarValue == null) { + scalarValue = inp.result; + } else { + String msg = "Scalar has more than one row in the output. " + + "1st : " + scalarValue + ", 2nd :" + + inp.result + + " (common cause: \"JOIN\" then \"FOREACH ... GENERATE foo.bar\" should be \"foo::bar\" )"; + throw new ExecException(msg); + } + } + if (taskIndexWithRecordIndexAsKey) { + Tuple tuple = mTupleFactory.newTuple(2); + tuple.set(0, taskIndex); + tuple.set(1, count++); + key = tuple; + } for (KeyValueWriter writer : writers) { try { - if (taskIndexWithRecordIndexAsKey) { - Tuple tuple = tupleFactory.newTuple(2); - tuple.set(0, taskIndex); - tuple.set(1, count++); - key = tuple; - } writer.write(key, inp.result); } catch (IOException e) { throw new ExecException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Fri Mar 4 18:17:39 2016 @@ -24,6 +24,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.pig.LoadFunc; @@ -47,13 +48,28 @@ import org.apache.pig.impl.util.UDFConte import org.apache.tez.mapreduce.hadoop.MRInputHelpers; public class LoaderProcessor extends TezOpPlanVisitor { - private Configuration conf; + + private static final Log LOG = LogFactory.getLog(LoaderProcessor.class); + + private TezOperPlan tezOperPlan; + private JobConf jobConf; private PigContext pc; - private static final Log log = LogFactory.getLog(LoaderProcessor.class); - public LoaderProcessor(TezOperPlan plan, PigContext pigContext) { + + public LoaderProcessor(TezOperPlan plan, PigContext pigContext) throws VisitorException { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + this.tezOperPlan = plan; this.pc = pigContext; - this.conf = ConfigurationUtil.toConfiguration(pc.getProperties());; + this.jobConf = new JobConf(ConfigurationUtil.toConfiguration(pc.getProperties())); + // This ensures that the same credentials object is used by reference everywhere + this.jobConf.setCredentials(tezOperPlan.getCredentials()); + this.jobConf.setBoolean("mapred.mapper.new-api", true); + this.jobConf.setClass("mapreduce.inputformat.class", + PigInputFormat.class, InputFormat.class); + try { + this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc)); + } catch (IOException e) { + throw new VisitorException(e); + } } /** @@ -76,22 +92,25 @@ public class LoaderProcessor extends Tez ArrayList<String> inpSignatureLists = new ArrayList<String>(); ArrayList<Long> inpLimits = new ArrayList<Long>(); - Job job = Job.getInstance(conf); - conf = job.getConfiguration(); - conf.setBoolean("mapred.mapper.new-api", true); - conf.setClass("mapreduce.inputformat.class", - PigInputFormat.class, InputFormat.class); - conf.set("pig.pigContext", ObjectSerializer.serialize(pc)); List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan, POLoad.class); + Job job = Job.getInstance(jobConf); + Configuration conf = job.getConfiguration(); + if (lds != null && lds.size() > 0) { - for (POLoad ld : lds) { - LoadFunc lf = ld.getLoadFunc(); - lf.setLocation(ld.getLFile().getFileName(), job); + if (lds.size() == 1) { + for (POLoad ld : lds) { + LoadFunc lf = ld.getLoadFunc(); + lf.setLocation(ld.getLFile().getFileName(), job); - // Store the inp filespecs - inp.add(ld.getLFile()); + // Store the inp filespecs + inp.add(ld.getLFile()); + } + } else { + throw new VisitorException( + "There is more than one load for TezOperator " + + tezOp); } } @@ -114,7 +133,9 @@ public class LoaderProcessor extends Tez tezOp.plan.remove(ld); // Now add the input handling operator for the Tez backend // TODO: Move this upstream to the PhysicalPlan generation - POSimpleTezLoad tezLoad = new POSimpleTezLoad(ld.getOperatorKey(), ld.getLFile()); + POSimpleTezLoad tezLoad = new POSimpleTezLoad(ld.getOperatorKey(), ld.getLoadFunc()); + tezLoad.setLFile(ld.getLFile()); + tezLoad.setSignature(ld.getSignature()); tezLoad.setInputKey(ld.getOperatorKey().toString()); tezLoad.copyAliasFrom(ld); tezLoad.setCacheFiles(ld.getCacheFiles()); @@ -127,10 +148,10 @@ public class LoaderProcessor extends Tez UDFContext.getUDFContext().serialize(conf); conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList())); - conf.set("pig.inputs", ObjectSerializer.serialize(inp)); - conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets)); - conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists)); - conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits)); + conf.set(PigInputFormat.PIG_INPUTS, ObjectSerializer.serialize(inp)); + conf.set(PigInputFormat.PIG_INPUT_TARGETS, ObjectSerializer.serialize(inpTargets)); + conf.set(PigInputFormat.PIG_INPUT_SIGNATURES, ObjectSerializer.serialize(inpSignatureLists)); + conf.set(PigInputFormat.PIG_INPUT_LIMITS, ObjectSerializer.serialize(inpLimits)); String tmp; long maxCombinedSplitSize = 0; if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false")) @@ -139,7 +160,7 @@ public class LoaderProcessor extends Tez try { maxCombinedSplitSize = Long.parseLong(tmp); } catch (NumberFormatException e) { - log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size"); + LOG.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size"); } } if (maxCombinedSplitSize > 0) @@ -150,6 +171,10 @@ public class LoaderProcessor extends Tez // Not using MRInputAMSplitGenerator because delegation tokens are // fetched in FileInputFormat tezOp.getLoaderInfo().setInputSplitInfo(MRInputHelpers.generateInputSplitsToMem(conf, false, 0)); + // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input + // splits can be moved to if(loads) block below + int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks(); + tezOp.setRequestedParallelism(parallelism); } return lds; } @@ -159,6 +184,7 @@ public class LoaderProcessor extends Tez try { tezOp.getLoaderInfo().setLoads(processLoads(tezOp)); } catch (Exception e) { + e.printStackTrace(); throw new VisitorException(e); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Mar 4 18:17:39 2016 @@ -24,17 +24,13 @@ import java.util.Map.Entry; import java.util.Set; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; -import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez; -import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.PlanException; @@ -42,8 +38,27 @@ import org.apache.pig.impl.plan.ReverseD import org.apache.pig.impl.plan.VisitorException; public class MultiQueryOptimizerTez extends TezOpPlanVisitor { - public MultiQueryOptimizerTez(TezOperPlan plan) { + + private boolean unionOptimizerOn; + private List<String> unionSupportedStoreFuncs; + private List<String> unionUnsupportedStoreFuncs; + + public MultiQueryOptimizerTez(TezOperPlan plan, boolean unionOptimizerOn, + List<String> unionSupportedStoreFuncs, + List<String> unionUnsupportedStoreFuncs) { super(plan, new ReverseDependencyOrderWalker<TezOperator, TezOperPlan>(plan)); + this.unionOptimizerOn = unionOptimizerOn; + this.unionSupportedStoreFuncs = unionSupportedStoreFuncs;; + this.unionUnsupportedStoreFuncs = unionUnsupportedStoreFuncs; + } + + private void addAllPredecessors(TezOperator tezOp, List<TezOperator> predsList) { + if (getPlan().getPredecessors(tezOp) != null) { + for (TezOperator pred : getPlan().getPredecessors(tezOp)) { + predsList.add(pred); + addAllPredecessors(pred, predsList); + } + } } @Override @@ -54,20 +69,67 @@ public class MultiQueryOptimizerTez exte } List<TezOperator> splittees = new ArrayList<TezOperator>(); + Set<TezOperator> mergedNonPackageInputSuccessors = new HashSet<TezOperator>(); List<TezOperator> successors = getPlan().getSuccessors(tezOp); for (TezOperator successor : successors) { + List<TezOperator> predecessors = new ArrayList<TezOperator>(getPlan().getPredecessors(successor)); + predecessors.remove(tezOp); + if (!predecessors.isEmpty()) { + // If has other dependency that conflicts with other splittees, don't merge into split + // For eg: self replicate join/skewed join + // But if replicate input is from a different operator allow it, but ensure + // that we don't have more than one input coming from that operator into the split + + // Check if other splittees or its predecessors (till the root) are not present in + // the predecessors (till the root) of this splittee. + // Need to check the whole predecessors hierarchy till root as the conflict + // could be multiple levels up + for (TezOperator predecessor : getPlan().getPredecessors(successor)) { + if (predecessor != tezOp) { + predecessors.add(predecessor); + addAllPredecessors(predecessor, predecessors); + } + } + List<TezOperator> toMergeSuccPredecessors = new ArrayList<TezOperator>(successors); + toMergeSuccPredecessors.remove(successor); + for (TezOperator splittee : splittees) { + for (TezOperator spliteePred : getPlan().getPredecessors(splittee)) { + if (spliteePred != tezOp) { + toMergeSuccPredecessors.add(spliteePred); + addAllPredecessors(spliteePred, toMergeSuccPredecessors); + } + } + } + if (predecessors.removeAll(toMergeSuccPredecessors)) { + continue; + } + } - // If has other dependency, don't merge into split, - if (getPlan().getPredecessors(successor).size()!=1) { + // Split contains right input of different skewed joins + if (successor.getSampleOperator() != null + && tezOp.getSampleOperator() != null + && !successor.getSampleOperator().equals( + tezOp.getSampleOperator())) { continue; } - // Detect diamond shape, we cannot merge it into split, since Tez - // does not handle double edge between vertexes - // TODO: PIG-3876 to handle this by writing to same edge + // Detect diamond shape into successor operator, we cannot merge it into split, + // since Tez does not handle double edge between vertexes + // Successor could be + // - union operator (if no union optimizer changing it to vertex group which supports multiple edges) + // - self replicate join, self skewed join or scalar + // - POPackage (Self hash joins can write to same output edge and is handled by POShuffleTezLoad) Set<TezOperator> mergedSuccessors = new HashSet<TezOperator>(); + // These successors should not be merged due to diamond shape + Set<TezOperator> toNotMergeSuccessors = new HashSet<TezOperator>(); + // These successors can be merged Set<TezOperator> toMergeSuccessors = new HashSet<TezOperator>(); + // These successors (Scalar, POFRJoinTez) can be merged if they are the only input. + // Only in case of POPackage(POShuffleTezLoad) multiple inputs can be handled from a Split + Set<TezOperator> nonPackageInputSuccessors = new HashSet<TezOperator>(); + boolean canMerge = true; + mergedSuccessors.addAll(successors); for (TezOperator splittee : splittees) { if (getPlan().getSuccessors(splittee) != null) { @@ -75,15 +137,62 @@ public class MultiQueryOptimizerTez exte } } if (getPlan().getSuccessors(successor) != null) { - toMergeSuccessors.addAll(getPlan().getSuccessors(successor)); + for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) { + if (succSuccessor.isUnion()) { + if (!(unionOptimizerOn && + UnionOptimizer.isOptimizable(succSuccessor, + unionSupportedStoreFuncs, + unionUnsupportedStoreFuncs))) { + toNotMergeSuccessors.add(succSuccessor); + } else { + toMergeSuccessors.add(succSuccessor); + List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor); + if (unionSuccessors != null) { + for (TezOperator unionSuccessor : unionSuccessors) { + if (TezCompilerUtil.isNonPackageInput(succSuccessor.getOperatorKey().toString(), unionSuccessor)) { + canMerge = canMerge ? nonPackageInputSuccessors.add(unionSuccessor) : false; + } else { + toMergeSuccessors.add(unionSuccessor); + } + } + } + } + } else if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) { + // Output goes to scalar or POFRJoinTez instead of POPackage + // POPackage/POShuffleTezLoad can handle multiple inputs from a Split. + // But if input is sent to any other operator like + // scalar, POFRJoinTez then we need to ensure it is the only one. + canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false; + } else { + toMergeSuccessors.add(succSuccessor); + } + } } - mergedSuccessors.retainAll(toMergeSuccessors); + + if (canMerge) { + if (!nonPackageInputSuccessors.isEmpty() || !mergedNonPackageInputSuccessors.isEmpty()) { + // If a non-POPackage input successor is already merged or + // if there is a POPackage and non-POPackage to be merged, + // then skip as it will become diamond shape + // For eg: POFRJoinTez+Scalar, POFRJoinTez/Scalar+POPackage + if (nonPackageInputSuccessors.removeAll(mergedSuccessors) + || toMergeSuccessors.removeAll(mergedNonPackageInputSuccessors) + || toMergeSuccessors.removeAll(nonPackageInputSuccessors)) { + continue; + } + } + } else { + continue; + } + + mergedSuccessors.retainAll(toNotMergeSuccessors); if (mergedSuccessors.isEmpty()) { // no shared edge after merge splittees.add(successor); + mergedNonPackageInputSuccessors.addAll(nonPackageInputSuccessors); } } - if (splittees.size()==0) { + if (splittees.size() == 0) { return; } @@ -136,42 +245,46 @@ public class MultiQueryOptimizerTez exte } } - static public void removeSplittee(TezOperPlan plan, TezOperator splitter, TezOperator splittee) throws PlanException { - if (plan.getSuccessors(splittee)!=null) { - List<TezOperator> succs = new ArrayList<TezOperator>(); - succs.addAll(plan.getSuccessors(splittee)); - plan.disconnect(splitter, splittee); + private void removeSplittee(TezOperPlan plan, TezOperator splitter, + TezOperator splittee) throws PlanException, VisitorException { + + plan.disconnect(splitter, splittee); + + String spliteeKey = splittee.getOperatorKey().toString(); + String splitterKey = splitter.getOperatorKey().toString(); + + if (plan.getPredecessors(splittee) != null) { + for (TezOperator pred : new ArrayList<TezOperator>(plan.getPredecessors(splittee))) { + TezEdgeDescriptor edge = pred.outEdges.remove(splittee.getOperatorKey()); + if (edge == null) { + throw new VisitorException("Edge description is empty"); + } + plan.disconnect(pred, splittee); + TezCompilerUtil.connectTezOpToNewSuccesor(plan, pred, splitter, edge, spliteeKey); + } + } + + if (plan.getSuccessors(splittee) != null) { + List<TezOperator> succs = new ArrayList<TezOperator>(plan.getSuccessors(splittee)); + List<TezOperator> splitterSuccs = plan.getSuccessors(splitter); for (TezOperator succTezOperator : succs) { TezEdgeDescriptor edge = succTezOperator.inEdges.get(splittee.getOperatorKey()); - splitter.outEdges.remove(splittee.getOperatorKey()); succTezOperator.inEdges.remove(splittee.getOperatorKey()); plan.disconnect(splittee, succTezOperator); - TezCompilerUtil.connect(plan, splitter, succTezOperator, edge); - try { - List<TezInput> inputs = PlanHelper.getPhysicalOperators(succTezOperator.plan, TezInput.class); - for (TezInput input : inputs) { - input.replaceInput(splittee.getOperatorKey().toString(), - splitter.getOperatorKey().toString()); - } - List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(succTezOperator.plan, POUserFunc.class); - for (POUserFunc userFunc : userFuncs) { - if (userFunc.getFunc() instanceof ReadScalarsTez) { - TezInput tezInput = (TezInput)userFunc.getFunc(); - tezInput.replaceInput(splittee.getOperatorKey().toString(), - splitter.getOperatorKey().toString()); - userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs()); - } - } - } catch (VisitorException e) { - throw new PlanException(e); + // Do not connect again in case of self join/cross/cogroup or union + if (splitterSuccs == null || !splitterSuccs.contains(succTezOperator)) { + TezCompilerUtil.connectTezOpToNewPredecessor(plan, succTezOperator, splitter, edge, null); } + TezCompilerUtil.replaceInput(succTezOperator, spliteeKey, splitterKey); + if (succTezOperator.isUnion()) { - int index = succTezOperator.getUnionPredecessors().indexOf(splittee.getOperatorKey()); - if (index > -1) { - succTezOperator.getUnionPredecessors().set(index, splitter.getOperatorKey()); + int index = succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey()); + while (index > -1) { + succTezOperator.getUnionMembers().set(index, splitter.getOperatorKey()); + index = succTezOperator.getUnionMembers().indexOf(splittee.getOperatorKey()); } } } @@ -179,7 +292,7 @@ public class MultiQueryOptimizerTez exte plan.remove(splittee); } - static public void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) { + private void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) { // Copy only map side properties. For eg: crossKeys. // Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc if (subPlanOper.getCrossKeys() != null) { @@ -189,6 +302,11 @@ public class MultiQueryOptimizerTez exte } parentOper.copyFeatures(subPlanOper, null); + // For skewed join right input + if (subPlanOper.getSampleOperator() != null) { + parentOper.setSampleOperator(subPlanOper.getSampleOperator()); + } + if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) { parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism()); } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Mar 4 18:17:39 2016 @@ -17,7 +17,6 @@ */ package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer; -import java.util.LinkedList; import java.util.Map; import org.apache.commons.logging.Log; @@ -26,14 +25,11 @@ import org.apache.hadoop.conf.Configurat import org.apache.pig.PigConfiguration; import org.apache.pig.backend.executionengine.ExecException; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; -import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan; import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper; -import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.PigImplConstants; @@ -81,25 +77,20 @@ public class ParallelismSetter extends T // Can only set parallelism here if the parallelism isn't derived from // splits int parallelism = -1; - boolean intermediateReducer = false; - LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class); - if (stores.size() <= 0) { - intermediateReducer = true; - } if (tezOp.getLoaderInfo().getLoads() != null && tezOp.getLoaderInfo().getLoads().size() > 0) { - // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input - // splits can be moved to if(loads) block below - parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks(); - tezOp.setRequestedParallelism(parallelism); + // requestedParallelism of Loader vertex is handled in LoaderProcessor + // propogate to vertexParallelism + tezOp.setVertexParallelism(tezOp.getRequestedParallelism()); + incrementTotalParallelism(tezOp, tezOp.getRequestedParallelism()); + return; } else { int prevParallelism = -1; boolean isOneToOneParallelism = false; - intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOp); for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : tezOp.inEdges.entrySet()) { if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) { TezOperator pred = mPlan.getOperator(entry.getKey()); - parallelism = pred.getEffectiveParallelism(); + parallelism = pred.getEffectiveParallelism(pc.defaultParallel); if (prevParallelism == -1) { prevParallelism = parallelism; } else if (prevParallelism != parallelism) { @@ -107,7 +98,12 @@ public class ParallelismSetter extends T + tezOp.getOperatorKey().toString() + " are not equal"); } tezOp.setRequestedParallelism(pred.getRequestedParallelism()); - tezOp.setEstimatedParallelism(pred.getEstimatedParallelism()); + // If tezOp.estimatedParallelism already set, don't override + // The only case is in PigGraceShuffleVertexManager, which + // set the estimated parallelism according to the output data size of the node + if (tezOp.getEstimatedParallelism()==-1) { + tezOp.setEstimatedParallelism(pred.getEstimatedParallelism()); + } isOneToOneParallelism = true; incrementTotalParallelism(tezOp, parallelism); parallelism = -1; @@ -122,7 +118,7 @@ public class ParallelismSetter extends T boolean overrideRequestedParallelism = false; if (parallelism != -1 && autoParallelismEnabled - && intermediateReducer + && tezOp.isIntermediateReducer() && !tezOp.isDontEstimateParallelism() && tezOp.isOverrideIntermediateParallelism()) { overrideRequestedParallelism = true; @@ -133,6 +129,9 @@ public class ParallelismSetter extends T // if it is intermediate reducer parallelism = estimator.estimateParallelism(mPlan, tezOp, conf); if (overrideRequestedParallelism) { + if (tezOp.getRequestedParallelism() != parallelism) { + LOG.info("Increased requested parallelism of " + tezOp.getOperatorKey() + " to " + parallelism); + } tezOp.setRequestedParallelism(parallelism); } else { tezOp.setEstimatedParallelism(parallelism); @@ -141,7 +140,12 @@ public class ParallelismSetter extends T parallelism = tezOp.getEstimatedParallelism(); } if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) { - if (!overrideRequestedParallelism) { + boolean additionalEdge = false; + if (tezOp.isGlobalSort() && getPlan().getPredecessors(tezOp).size() != 1 || + tezOp.isSkewedJoin() && getPlan().getPredecessors(tezOp).size() != 2) { + additionalEdge = true; + } + if (!overrideRequestedParallelism && !additionalEdge) { incrementTotalParallelism(tezOp, parallelism); // PartitionerDefinedVertexManager will determine parallelism. // So call setVertexParallelism with -1 @@ -156,12 +160,12 @@ public class ParallelismSetter extends T for (TezOperator pred : mPlan.getPredecessors(tezOp)) { if (pred.isSampleBasedPartitioner()) { for (TezOperator partitionerPred : mPlan.getPredecessors(pred)) { - if (partitionerPred.isSampleAggregation()) { - LOG.debug("Updating constant value to " + parallelism + " in " + partitionerPred.plan); - LOG.info("Increased requested parallelism of " + partitionerPred.getOperatorKey() + " to " + parallelism); + if (partitionerPred.isSampleAggregation() && partitionerPred.plan!=null) { + LOG.debug("Updating parallelism constant value to " + parallelism + " in " + partitionerPred.plan); ParallelConstantVisitor visitor = new ParallelConstantVisitor(partitionerPred.plan, parallelism); visitor.visit(); + partitionerPred.setNeedEstimatedQuantile(false); break; } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Fri Mar 4 18:17:39 2016 @@ -56,12 +56,13 @@ public class SecondaryKeyOptimizerTez ex return; } + // TODO: PIG-4685: SecondaryKeyOptimizerTez does not optimize cogroup // Current code does not handle more than one predecessors // even though it is possible. The problem is when we // process the first predecessor, we remove the foreach inner // operators from the reduce side, and the second predecessor // cannot see them - if (predecessors.size()>1) { + if (predecessors.size() > 1) { return; } TezOperator from = predecessors.get(0); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1733627&r1=1733626&r2=1733627&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Mar 4 18:17:39 2016 @@ -26,8 +26,10 @@ import org.apache.hadoop.conf.Configurat import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; @@ -42,7 +44,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez; import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez; -import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; +import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.plan.DepthFirstWalker; import org.apache.pig.impl.plan.OperatorKey; @@ -56,7 +58,7 @@ import org.apache.tez.dag.api.EdgeProper * * Since currently it is only possible to reduce the parallelism * estimation is exaggerated and will rely on Tez runtime to - * descrease the parallelism + * decrease the parallelism */ public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator { @@ -65,6 +67,14 @@ public class TezOperDependencyParallelis static final double DEFAULT_FILTER_FACTOR = 0.7; static final double DEFAULT_LIMIT_FACTOR = 0.1; + // Most of the cases distinct does not reduce much. + // So keeping it high at 0.9 + static final double DEFAULT_DISTINCT_FACTOR = 0.9; + + // Most of the cases aggregation can reduce by a lot. + // But keeping at 0.7 to take worst case scenarios into account + static final double DEFAULT_AGGREGATION_FACTOR = 0.7; + private PigContext pc; @Override @@ -79,15 +89,13 @@ public class TezOperDependencyParallelis return -1; } - boolean intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOper); - // TODO: If map opts and reduce opts are same estimate higher parallelism // for tasks based on the count of number of map tasks else be conservative as now maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM, PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM); // If parallelism is set explicitly, respect it - if (!intermediateReducer && tezOper.getRequestedParallelism()!=-1) { + if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) { return tezOper.getRequestedParallelism(); } @@ -111,7 +119,7 @@ public class TezOperDependencyParallelis // and sample/scalar (does not impact parallelism) if (entry.getValue().dataMovementType==DataMovementType.SCATTER_GATHER || entry.getValue().dataMovementType==DataMovementType.ONE_TO_ONE) { - double predParallelism = pred.getEffectiveParallelism(); + double predParallelism = pred.getEffectiveParallelism(pc.defaultParallel); if (predParallelism==-1) { throw new IOException("Cannot estimate parallelism for " + tezOper.getOperatorKey().toString() + ", effective parallelism for predecessor " + tezOper.getOperatorKey().toString() @@ -120,10 +128,8 @@ public class TezOperDependencyParallelis //For cases like Union we can just limit to sum of pred vertices parallelism boolean applyFactor = !tezOper.isUnion(); - if (pred.plan!=null && applyFactor) { // pred.plan can be null if it is a VertexGroup - TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(pred.plan, tezOper.getOperatorKey().toString()); - parallelismFactorVisitor.visit(); - predParallelism = predParallelism * parallelismFactorVisitor.getFactor(); + if (!pred.isVertexGroup() && applyFactor) { + predParallelism = predParallelism * pred.getParallelismFactor(tezOper); } estimatedParallelism += predParallelism; } @@ -131,7 +137,7 @@ public class TezOperDependencyParallelis int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism); - if (intermediateReducer && tezOper.isOverrideIntermediateParallelism()) { + if (tezOper.isIntermediateReducer() && tezOper.isOverrideIntermediateParallelism()) { // Estimated reducers should not be more than the configured limit roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount); int userSpecifiedParallelism = pc.defaultParallel; @@ -150,6 +156,12 @@ public class TezOperDependencyParallelis roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount); } + if (roundedEstimatedParallelism == 0) { + throw new IOException("Estimated parallelism for " + + tezOper.getOperatorKey().toString() + + " is 0 which is unexpected"); + } + return roundedEstimatedParallelism; } @@ -157,7 +169,7 @@ public class TezOperDependencyParallelis List<TezOperator> preds = plan.getPredecessors(tezOper); for (TezOperator pred : preds) { if (pred.isVertexGroup()) { - for (OperatorKey unionPred : pred.getUnionPredecessors()) { + for (OperatorKey unionPred : pred.getVertexGroupMembers()) { if (unionPred.toString().equals(inputKey)) { return plan.getOperator(unionPred); } @@ -174,9 +186,24 @@ public class TezOperDependencyParallelis public static class TezParallelismFactorVisitor extends PhyPlanVisitor { private double factor = 1; private String outputKey; - public TezParallelismFactorVisitor(PhysicalPlan plan, String outputKey) { - super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan)); - this.outputKey = outputKey; + private TezOperator tezOp; + + public TezParallelismFactorVisitor(TezOperator tezOp, TezOperator successor) { + super(tezOp.plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(tezOp.plan)); + this.tezOp = tezOp; + this.outputKey = tezOp.getOperatorKey().toString(); + + if (successor != null) { + // Map side combiner + TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey()); + if (!edge.combinePlan.isEmpty()) { + if (successor.isDistinct()) { + factor = DEFAULT_DISTINCT_FACTOR; + } else { + factor = DEFAULT_AGGREGATION_FACTOR; + } + } + } } @Override @@ -194,11 +221,17 @@ public class TezOperDependencyParallelis @Override public void visitPOForEach(POForEach nfe) throws VisitorException { List<Boolean> flattens = nfe.getToBeFlattened(); + List<PhysicalPlan> inputPlans = nfe.getInputPlans(); boolean containFlatten = false; - for (boolean flatten : flattens) { - if (flatten) { - containFlatten = true; - break; + for (int i = 0; i < flattens.size(); i++) { + if (flattens.get(i)) { + PhysicalPlan inputPlan = inputPlans.get(i); + PhysicalOperator root = inputPlan.getRoots().get(0); + if (root instanceof POProject + && root.getResultType() == DataType.BAG) { + containFlatten = true; + break; + } } } if (containFlatten) { @@ -226,6 +259,12 @@ public class TezOperDependencyParallelis // JoinPackager is equivalent to a foreach flatten after shuffle if (pkg.getPkgr() instanceof JoinPackager) { factor *= DEFAULT_FLATTEN_FACTOR; + } else if (pkg.getPkgr() instanceof CombinerPackager) { + if (tezOp.isDistinct()) { + factor *= DEFAULT_DISTINCT_FACTOR; + } else { + factor *= DEFAULT_AGGREGATION_FACTOR; + } } }
