Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Feb 24 08:19:42 2017 @@ -29,9 +29,14 @@ import java.util.Map; import java.util.Set; import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore; import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager; +import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez; import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.DependencyOrderWalker; +import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.OperatorPlan; import org.apache.pig.impl.plan.PlanException; @@ -160,100 +165,178 @@ public class TezPlanContainer extends Op return; } - TezOperator operToSegment = null; - List<TezOperator> succs = new ArrayList<TezOperator>(); + List<TezOperator> opersToSegment = null; try { // Split top down from root to leaves - SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan); + // Get list of operators closer to the root that can be segmented together + FirstLevelSegmentOperatorsFinder finder = new FirstLevelSegmentOperatorsFinder(tezOperPlan); finder.visit(); - operToSegment = finder.getOperatorToSegment(); + opersToSegment = finder.getOperatorsToSegment(); } catch (VisitorException e) { throw new PlanException(e); } + if (!opersToSegment.isEmpty()) { + Set<TezOperator> commonSplitterPredecessors = new HashSet<>(); + for (TezOperator operToSegment : opersToSegment) { + for (TezOperator succ : tezOperPlan.getSuccessors(operToSegment)) { + commonSplitterPredecessors + .addAll(getCommonSplitterPredecessors(tezOperPlan, + operToSegment, succ)); + } + } - if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) { - succs.addAll(tezOperPlan.getSuccessors(operToSegment)); - for (TezOperator succ : succs) { - tezOperPlan.disconnect(operToSegment, succ); - } - for (TezOperator succ : succs) { - try { - if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) { - // Has already been moved to a new plan by previous successor - // as part of dependency. It could have been further split. - // So walk the full plan to find the new plan and connect - TezOperatorFinder finder = new TezOperatorFinder(this, succ); - finder.visit(); - connect(planNode, finder.getPlanContainerNode()); - continue; + if (commonSplitterPredecessors.isEmpty()) { + List<TezOperator> allSuccs = new ArrayList<TezOperator>(); + // Disconnect all the successors and move them to a new plan + for (TezOperator operToSegment : opersToSegment) { + List<TezOperator> succs = new ArrayList<TezOperator>(); + succs.addAll(tezOperPlan.getSuccessors(operToSegment)); + allSuccs.addAll(succs); + for (TezOperator succ : succs) { + tezOperPlan.disconnect(operToSegment, succ); } - TezOperPlan newOperPlan = new TezOperPlan(); + } + TezOperPlan newOperPlan = new TezOperPlan(); + for (TezOperator succ : allSuccs) { tezOperPlan.moveTree(succ, newOperPlan); - TezPlanContainerNode newPlanNode = new TezPlanContainerNode( - generateNodeOperatorKey(), newOperPlan); - add(newPlanNode); - connect(planNode, newPlanNode); - split(newPlanNode); - if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) { - // On further split, the successor moved to a new plan container. - // Connect to that - TezOperatorFinder finder = new TezOperatorFinder(this, succ); - finder.visit(); - disconnect(planNode, newPlanNode); - connect(planNode, finder.getPlanContainerNode()); + } + TezPlanContainerNode newPlanNode = new TezPlanContainerNode( + generateNodeOperatorKey(), newOperPlan); + add(newPlanNode); + connect(planNode, newPlanNode); + split(newPlanNode); + } else { + // If there is a common splitter predecessor between operToSegment and the successor, + // we have to separate out that split to be able to segment. + // So we store the output of split to a temp store and then change the + // splittees to load from it. + String scope = opersToSegment.get(0).getOperatorKey().getScope(); + for (TezOperator splitter : commonSplitterPredecessors) { + try { + List<TezOperator> succs = new ArrayList<TezOperator>(); + succs.addAll(tezOperPlan.getSuccessors(splitter)); + FileSpec fileSpec = TezCompiler.getTempFileSpec(pigContext); + POStore tmpStore = getTmpStore(scope, fileSpec); + // Replace POValueOutputTez with POStore + splitter.plan.remove(splitter.plan.getLeaves().get(0)); + splitter.plan.addAsLeaf(tmpStore); + splitter.segmentBelow = true; + splitter.setSplitter(false); + for (TezOperator succ : succs) { + // Replace POValueInputTez with POLoad + POLoad tmpLoad = getTmpLoad(scope, fileSpec); + succ.plan.replace(succ.plan.getRoots().get(0), tmpLoad); + } + } catch (Exception e) { + throw new PlanException(e); } - } catch (VisitorException e) { - throw new PlanException(e); } } split(planNode); } } - private static class SegmentOperatorFinder extends TezOpPlanVisitor { + private static class FirstLevelSegmentOperatorsFinder extends TezOpPlanVisitor { - private TezOperator operToSegment; + private List<TezOperator> opersToSegment = new ArrayList<>(); - public SegmentOperatorFinder(TezOperPlan plan) { + public FirstLevelSegmentOperatorsFinder(TezOperPlan plan) { super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan)); } - public TezOperator getOperatorToSegment() { - return operToSegment; + public List<TezOperator> getOperatorsToSegment() { + return opersToSegment; } @Override - public void visitTezOp(TezOperator tezOperator) throws VisitorException { - if (tezOperator.needSegmentBelow() && operToSegment == null) { - operToSegment = tezOperator; + public void visitTezOp(TezOperator tezOp) throws VisitorException { + if (tezOp.needSegmentBelow() && getPlan().getSuccessors(tezOp) != null) { + if (opersToSegment.isEmpty()) { + opersToSegment.add(tezOp); + } else { + // If the operator does not have dependency on previous + // operators chosen for segmenting then add it to the + // operators to be segmented together + if (!hasPredecessor(tezOp, opersToSegment)) { + opersToSegment.add(tezOp); + } + } } } - } - - private static class TezOperatorFinder extends TezPlanContainerVisitor { + /** + * Check if the tezOp has one of the opsToCheck as a predecessor. + * It can be a immediate predecessor or multiple levels up. + */ + private boolean hasPredecessor(TezOperator tezOp, List<TezOperator> opsToCheck) { + List<TezOperator> predecessors = getPlan().getPredecessors(tezOp); + if (predecessors != null) { + for (TezOperator pred : predecessors) { + if (opersToSegment.contains(pred)) { + return true; + } else { + if (hasPredecessor(pred, opsToCheck)) { + return true; + } + } + } + } + return false; + } - private TezPlanContainerNode planContainerNode; - private TezOperator operatorToFind; + } - public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) { - super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan)); - this.operatorToFind = operatorToFind; + private Set<TezOperator> getCommonSplitterPredecessors(TezOperPlan plan, TezOperator operToSegment, TezOperator successor) { + Set<TezOperator> splitters1 = new HashSet<>(); + Set<TezOperator> splitters2 = new HashSet<>(); + Set<TezOperator> processedPredecessors = new HashSet<>(); + // Find predecessors which are splitters + fetchSplitterPredecessors(plan, operToSegment, processedPredecessors, splitters1); + if (!splitters1.isEmpty()) { + // For the successor, traverse rest of the plan below it and + // search the predecessors of its successors to find any predecessor that might be a splitter. + Set<TezOperator> allSuccs = new HashSet<>(); + getAllSuccessors(plan, successor, allSuccs); + processedPredecessors.clear(); + processedPredecessors.add(successor); + for (TezOperator succ : allSuccs) { + fetchSplitterPredecessors(plan, succ, processedPredecessors, splitters2); + } + // Find the common ones + splitters1.retainAll(splitters2); } + return splitters1; + } - public TezPlanContainerNode getPlanContainerNode() { - return planContainerNode; + private void fetchSplitterPredecessors(TezOperPlan plan, TezOperator tezOp, + Set<TezOperator> processedPredecessors, Set<TezOperator> splitters) { + List<TezOperator> predecessors = plan.getPredecessors(tezOp); + if (predecessors != null) { + for (TezOperator pred : predecessors) { + // Skip processing already processed predecessor to avoid loops + if (processedPredecessors.contains(pred)) { + continue; + } + if (pred.isSplitter()) { + splitters.add(pred); + } else if (!pred.needSegmentBelow()) { + processedPredecessors.add(pred); + fetchSplitterPredecessors(plan, pred, processedPredecessors, splitters); + } + } } + } - @Override - public void visitTezPlanContainerNode( - TezPlanContainerNode tezPlanContainerNode) - throws VisitorException { - if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) { - planContainerNode = tezPlanContainerNode; + private void getAllSuccessors(TezOperPlan plan, TezOperator tezOp, Set<TezOperator> allSuccs) { + List<TezOperator> successors = plan.getSuccessors(tezOp); + if (successors != null) { + for (TezOperator succ : successors) { + if (!allSuccs.contains(succ)) { + allSuccs.add(succ); + getAllSuccessors(plan, succ, allSuccs); + } } } - } private synchronized OperatorKey generateNodeOperatorKey() { @@ -267,6 +350,21 @@ public class TezPlanContainer extends Op scopeId = 0; } + private POLoad getTmpLoad(String scope, FileSpec fileSpec){ + POLoad ld = new POLoad(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope))); + ld.setPc(pigContext); + ld.setIsTmpLoad(true); + ld.setLFile(fileSpec); + return ld; + } + + private POStore getTmpStore(String scope, FileSpec fileSpec){ + POStore st = new POStore(new OperatorKey(scope, NodeIdGenerator.getGenerator().getNextNodeId(scope))); + st.setIsTmpStore(true); + st.setSFile(fileSpec); + return new POStoreTez(st); + } + @Override public String toString() { ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Fri Feb 24 08:19:42 2017 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.DistinctCombiner; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter; @@ -80,6 +81,9 @@ public class TezPrinter extends TezOpPla printer.setVerbose(isVerbose); printer.visit(); mStream.println(); + } else if (edgeDesc.needsDistinctCombiner()) { + mStream.println("# Combine plan on edge <" + inEdge + ">"); + mStream.println(DistinctCombiner.Combine.class.getName()); } } } Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/BloomPackager.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; + +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; +import org.apache.pig.builtin.BuildBloomBase; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; + +public class BloomPackager extends Packager { + + private static final long serialVersionUID = 1L; + + private boolean bloomCreatedInMap; + private int vectorSizeBytes; + private int numHash; + private int hashType; + private byte bloomKeyType; + private boolean isCombiner; + + private transient ByteArrayOutputStream baos; + private transient Iterator<Object> distinctKeyIter; + + public BloomPackager(boolean bloomCreatedInMap, int vectorSizeBytes, + int numHash, int hashType) { + super(); + this.bloomCreatedInMap = bloomCreatedInMap; + this.vectorSizeBytes = vectorSizeBytes; + this.numHash = numHash; + this.hashType = hashType; + } + + public void setBloomKeyType(byte keyType) { + bloomKeyType = keyType; + } + + public void setCombiner(boolean isCombiner) { + this.isCombiner = isCombiner; + } + + @Override + public void attachInput(Object key, DataBag[] bags, boolean[] readOnce) + throws ExecException { + this.key = key; + this.bags = bags; + this.readOnce = readOnce; + // Bag can be read directly and need not be materialized again + } + + @Override + public Result getNext() throws ExecException { + try { + if (bloomCreatedInMap) { + if (bags == null) { + return new Result(POStatus.STATUS_EOP, null); + } + // Same function for combiner and reducer + return combineBloomFilters(); + } else { + if (isCombiner) { + return getDistinctBloomKeys(); + } else { + if (bags == null) { + return new Result(POStatus.STATUS_EOP, null); + } + return createBloomFilter(); + } + } + } catch (IOException e) { + throw new ExecException("Error while constructing final bloom filter", e); + } + } + + private Result combineBloomFilters() throws IOException { + // We get a bag of bloom filters. combine them into one + Iterator<Tuple> iter = bags[0].iterator(); + Tuple tup = iter.next(); + DataByteArray bloomBytes = (DataByteArray) tup.get(0); + BloomFilter bloomFilter = BuildBloomBase.bloomIn(bloomBytes); + while (iter.hasNext()) { + tup = iter.next(); + bloomFilter.or(BuildBloomBase.bloomIn((DataByteArray) tup.get(0))); + } + + Object partition = key; + detachInput(); // Free up the key and bags reference + + return getSerializedBloomFilter(partition, bloomFilter, bloomBytes.get().length); + } + + private Result createBloomFilter() throws IOException { + // We get a bag of keys. Create a bloom filter from them + // First do distinct of the keys. Not using DistinctBag as memory should not be a problem. + HashSet<Object> bloomKeys = new HashSet<>(); + Iterator<Tuple> iter = bags[0].iterator(); + while (iter.hasNext()) { + bloomKeys.add(iter.next().get(0)); + } + + Object partition = key; + detachInput(); // Free up the key and bags reference + + BloomFilter bloomFilter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType); + for (Object bloomKey: bloomKeys) { + Key k = new Key(DataType.toBytes(bloomKey, bloomKeyType)); + bloomFilter.add(k); + } + bloomKeys = null; + return getSerializedBloomFilter(partition, bloomFilter, vectorSizeBytes + 64); + + } + + private Result getSerializedBloomFilter(Object partition, + BloomFilter bloomFilter, int serializedSize) throws ExecException, + IOException { + if (baos == null) { + baos = new ByteArrayOutputStream(serializedSize); + } + baos.reset(); + DataOutputStream dos = new DataOutputStream(baos); + bloomFilter.write(dos); + dos.flush(); + + Tuple res = mTupleFactory.newTuple(2); + res.set(0, partition); + res.set(1, new DataByteArray(baos.toByteArray())); + + Result r = new Result(); + r.result = res; + r.returnStatus = POStatus.STATUS_OK; + return r; + } + + private Result getDistinctBloomKeys() throws ExecException { + if (distinctKeyIter == null) { + HashSet<Object> bloomKeys = new HashSet<>(); + Iterator<Tuple> iter = bags[0].iterator(); + while (iter.hasNext()) { + bloomKeys.add(iter.next().get(0)); + } + distinctKeyIter = bloomKeys.iterator(); + } + while (distinctKeyIter.hasNext()) { + Tuple res = mTupleFactory.newTuple(2); + res.set(0, key); + res.set(1, distinctKeyIter.next()); + + Result r = new Result(); + r.result = res; + r.returnStatus = POStatus.STATUS_OK; + return r; + } + distinctKeyIter = null; + return new Result(POStatus.STATUS_EOP, null); + } +} Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBloomFilterRearrangeTez.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; +import org.apache.pig.builtin.BuildBloomBase; +import org.apache.pig.classification.InterfaceAudience; +import org.apache.pig.classification.InterfaceStability; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValueReader; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class POBloomFilterRearrangeTez extends POLocalRearrangeTez implements TezInput { + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(POBloomFilterRearrangeTez.class); + private String inputKey; + private transient KeyValueReader reader; + private transient String cacheKey; + private int numBloomFilters; + private transient BloomFilter[] bloomFilters; + + public POBloomFilterRearrangeTez(POLocalRearrangeTez lr, int numBloomFilters) { + super(lr); + this.numBloomFilters = numBloomFilters; + } + + public void setInputKey(String inputKey) { + this.inputKey = inputKey; + } + + @Override + public String[] getTezInputs() { + return new String[] { inputKey }; + } + + @Override + public void replaceInput(String oldInputKey, String newInputKey) { + if (oldInputKey.equals(inputKey)) { + inputKey = newInputKey; + } + } + + @Override + public void addInputsToSkip(Set<String> inputsToSkip) { + cacheKey = "bloom-" + inputKey; + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { + inputsToSkip.add(inputKey); + } + } + + @Override + public void attachInputs(Map<String, LogicalInput> inputs, + Configuration conf) throws ExecException { + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { + bloomFilters = (BloomFilter[]) cacheValue; + return; + } + LogicalInput input = inputs.get(inputKey); + if (input == null) { + throw new ExecException("Input from vertex " + inputKey + " is missing"); + } + try { + reader = (KeyValueReader) input.getReader(); + LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader); + while (reader.next()) { + if (bloomFilters == null) { + bloomFilters = new BloomFilter[numBloomFilters]; + } + Tuple val = (Tuple) reader.getCurrentValue(); + int index = (int) val.get(0); + bloomFilters[index] = BuildBloomBase.bloomIn((DataByteArray) val.get(1)); + } + ObjectCache.getInstance().cache(cacheKey, bloomFilters); + } catch (Exception e) { + throw new ExecException(e); + } + } + + @Override + public Result getNextTuple() throws ExecException { + + // If there is no bloom filter, then it means right input was empty + // Skip processing + if (bloomFilters == null) { + return RESULT_EOP; + } + + while (true) { + res = super.getRearrangedTuple(); + try { + switch (res.returnStatus) { + case POStatus.STATUS_OK: + if (illustrator == null) { + Tuple result = (Tuple) res.result; + Byte index = (Byte) result.get(0); + + // Skip the record if key is not in the bloom filter + if (!isKeyInBloomFilter(result.get(1))) { + continue; + } + PigNullableWritable key = HDataType.getWritableComparableTypes(result.get(1), keyType); + NullableTuple val = new NullableTuple((Tuple)result.get(2)); + key.setIndex(index); + val.setIndex(index); + writer.write(key, val); + } else { + illustratorMarkup(res.result, res.result, 0); + } + continue; + case POStatus.STATUS_NULL: + continue; + case POStatus.STATUS_EOP: + case POStatus.STATUS_ERR: + default: + return res; + } + } catch (IOException ioe) { + int errCode = 2135; + String msg = "Received error from POBloomFilterRearrage function." + ioe.getMessage(); + throw new ExecException(msg, errCode, ioe); + } + } + } + + private boolean isKeyInBloomFilter(Object key) throws ExecException { + if (key == null) { + // Null values are dropped in a inner join and in the case of outer join, + // POBloomFilterRearrangeTez is only in the plan on the non outer relation. + // So just skip them + return false; + } + if (bloomFilters.length == 1) { + // Skip computing hashcode + Key k = new Key(DataType.toBytes(key, keyType)); + return bloomFilters[0].membershipTest(k); + } else { + int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters; + BloomFilter filter = bloomFilters[partition]; + if (filter != null) { + Key k = new Key(DataType.toBytes(key, keyType)); + return filter.membershipTest(k); + } + return false; + } + } + + @Override + public POBloomFilterRearrangeTez clone() throws CloneNotSupportedException { + return (POBloomFilterRearrangeTez) super.clone(); + } + + @Override + public String name() { + return getAliasString() + "BloomFilter Rearrange" + "[" + + DataType.findTypeName(resultType) + "]" + "{" + + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct + + ") - " + mKey.toString() + "\t<-\t " + inputKey + "\t->\t " + outputKey; + } + +} Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java?rev=1784237&view=auto ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java (added) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POBuildBloomRearrangeTez.java Fri Feb 24 08:19:42 2017 @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.bloom.BloomFilter; +import org.apache.hadoop.util.bloom.Key; +import org.apache.pig.PigConfiguration; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.HDataType; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.classification.InterfaceAudience; +import org.apache.pig.classification.InterfaceStability; +import org.apache.pig.data.DataByteArray; +import org.apache.pig.data.DataType; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.io.NullableIntWritable; +import org.apache.pig.impl.io.NullableTuple; +import org.apache.pig.impl.io.PigNullableWritable; +import org.apache.tez.runtime.api.LogicalOutput; +import org.apache.tez.runtime.library.api.KeyValueWriter; + +/** + * This operator writes out the key value for the hash join reduce operation similar to POLocalRearrangeTez. + * In addition, it also writes out the bloom filter constructed from the join keys + * in the case of bloomjoin map strategy or join keys themselves in case of reduce strategy. + * + * Using multiple bloom filters partitioned by the hash of the key allows for parallelism. + * It also allows us to have lower false positives with smaller vector sizes. + * + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class POBuildBloomRearrangeTez extends POLocalRearrangeTez { + private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog(POBuildBloomRearrangeTez.class); + + public static final String DEFAULT_BLOOM_STRATEGY = "map"; + public static final int DEFAULT_NUM_BLOOM_FILTERS_REDUCE = 11; + public static final int DEFAULT_NUM_BLOOM_HASH_FUNCTIONS = 3; + public static final String DEFAULT_BLOOM_HASH_TYPE = "murmur"; + public static final int DEFAULT_BLOOM_VECTOR_SIZE_BYTES = 1024 * 1024; + + private String bloomOutputKey; + private boolean skipNullKeys = false; + private boolean createBloomInMap; + private int numBloomFilters; + private int vectorSizeBytes; + private int numHash; + private int hashType; + + private transient BloomFilter[] bloomFilters; + private transient KeyValueWriter bloomWriter; + private transient PigNullableWritable nullKey; + private transient Tuple bloomValue; + private transient NullableTuple bloomNullableTuple; + + public POBuildBloomRearrangeTez(POLocalRearrangeTez lr, + boolean createBloomInMap, int numBloomFilters, int vectorSizeBytes, + int numHash, int hashType) { + super(lr); + this.createBloomInMap = createBloomInMap; + this.numBloomFilters = numBloomFilters; + this.vectorSizeBytes = vectorSizeBytes; + this.numHash = numHash; + this.hashType = hashType; + } + + public static int getNumBloomFilters(Configuration conf) { + if ("map".equals(conf.get(PigConfiguration.PIG_BLOOMJOIN_STRATEGY, DEFAULT_BLOOM_STRATEGY))) { + return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, 1); + } else { + return conf.getInt(PigConfiguration.PIG_BLOOMJOIN_NUM_FILTERS, DEFAULT_NUM_BLOOM_FILTERS_REDUCE); + } + } + + public void setSkipNullKeys(boolean skipNullKeys) { + this.skipNullKeys = skipNullKeys; + } + + public void setBloomOutputKey(String bloomOutputKey) { + this.bloomOutputKey = bloomOutputKey; + } + + @Override + public boolean containsOutputKey(String key) { + if(super.containsOutputKey(key)) { + return true; + } + return bloomOutputKey.equals(key); + } + + @Override + public String[] getTezOutputs() { + return new String[] { outputKey, bloomOutputKey }; + } + + @Override + public void replaceOutput(String oldOutputKey, String newOutputKey) { + if (oldOutputKey.equals(outputKey)) { + outputKey = newOutputKey; + } else if (oldOutputKey.equals(bloomOutputKey)) { + bloomOutputKey = newOutputKey; + } + } + + @Override + public void attachOutputs(Map<String, LogicalOutput> outputs, + Configuration conf) throws ExecException { + super.attachOutputs(outputs, conf); + LogicalOutput output = outputs.get(bloomOutputKey); + if (output == null) { + throw new ExecException("Output to vertex " + bloomOutputKey + " is missing"); + } + try { + bloomWriter = (KeyValueWriter) output.getWriter(); + LOG.info("Attached output to vertex " + bloomOutputKey + " : output=" + output + ", writer=" + bloomWriter); + } catch (Exception e) { + throw new ExecException(e); + } + bloomFilters = new BloomFilter[numBloomFilters]; + bloomValue = mTupleFactory.newTuple(1); + bloomNullableTuple = new NullableTuple(bloomValue); + } + + @Override + public Result getNextTuple() throws ExecException { + + PigNullableWritable key; + while (true) { + res = super.getRearrangedTuple(); + try { + switch (res.returnStatus) { + case POStatus.STATUS_OK: + if (illustrator == null) { + Tuple result = (Tuple) res.result; + Byte index = (Byte) result.get(0); + + Object keyObj = result.get(1); + if (keyObj != null) { + key = HDataType.getWritableComparableTypes(keyObj, keyType); + // null keys cannot be part of bloom filter + // Since they are also dropped during join we can skip them + if (createBloomInMap) { + addKeyToBloomFilter(keyObj); + } else { + writeJoinKeyForBloom(keyObj); + } + } else if (skipNullKeys) { + // Inner join. So don't bother writing null key + continue; + } else { + if (nullKey == null) { + nullKey = HDataType.getWritableComparableTypes(keyObj, keyType); + } + key = nullKey; + } + + NullableTuple val = new NullableTuple((Tuple)result.get(2)); + key.setIndex(index); + val.setIndex(index); + writer.write(key, val); + } else { + illustratorMarkup(res.result, res.result, 0); + } + continue; + case POStatus.STATUS_NULL: + continue; + case POStatus.STATUS_EOP: + if (this.parentPlan.endOfAllInput && createBloomInMap) { + // In case of Split will get EOP after every record. + // So check for endOfAllInput + writeBloomFilters(); + } + case POStatus.STATUS_ERR: + default: + return res; + } + } catch (IOException ioe) { + int errCode = 2135; + String msg = "Received error from POBuildBloomRearrage function." + ioe.getMessage(); + throw new ExecException(msg, errCode, ioe); + } + } + } + + private void addKeyToBloomFilter(Object key) throws ExecException { + Key k = new Key(DataType.toBytes(key, keyType)); + if (bloomFilters.length == 1) { + if (bloomFilters[0] == null) { + bloomFilters[0] = new BloomFilter(vectorSizeBytes * 8, numHash, hashType); + } + bloomFilters[0].add(k); + } else { + int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters; + BloomFilter filter = bloomFilters[partition]; + if (filter == null) { + filter = new BloomFilter(vectorSizeBytes * 8, numHash, hashType); + bloomFilters[partition] = filter; + } + filter.add(k); + } + } + + private void writeJoinKeyForBloom(Object key) throws IOException { + int partition = (key.hashCode() & Integer.MAX_VALUE) % numBloomFilters; + bloomValue.set(0, key); + bloomWriter.write(new NullableIntWritable(partition), bloomNullableTuple); + } + + private void writeBloomFilters() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(vectorSizeBytes + 64); + for (int i = 0; i < bloomFilters.length; i++) { + if (bloomFilters[i] != null) { + DataOutputStream dos = new DataOutputStream(baos); + bloomFilters[i].write(dos); + dos.flush(); + bloomValue.set(0, new DataByteArray(baos.toByteArray())); + bloomWriter.write(new NullableIntWritable(i), bloomNullableTuple); + baos.reset(); + } + } + } + + @Override + public POBuildBloomRearrangeTez clone() throws CloneNotSupportedException { + return (POBuildBloomRearrangeTez) super.clone(); + } + + @Override + public String name() { + return getAliasString() + "BuildBloom Rearrange" + "[" + + DataType.findTypeName(resultType) + "]" + "{" + + DataType.findTypeName(keyType) + "}" + "(" + mIsDistinct + + ") - " + mKey.toString() + "\t->\t[ " + outputKey + ", " + bloomOutputKey +"]"; + } + +} Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Fri Feb 24 08:19:42 2017 @@ -56,6 +56,7 @@ public class POCounterStatsTez extends P private transient KeyValuesReader reader; private transient KeyValueWriter writer; private transient boolean finished = false; + private transient boolean hasNext = false; public POCounterStatsTez(OperatorKey k) { super(k); @@ -88,6 +89,7 @@ public class POCounterStatsTez extends P try { reader = (KeyValuesReader) input.getReader(); LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + reader); + hasNext = reader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -130,12 +132,13 @@ public class POCounterStatsTez extends P Integer key = null; Long value = null; // Read count of records per task - while (reader.next()) { + while (hasNext) { key = ((IntWritable)reader.getCurrentKey()).get(); for (Object val : reader.getCurrentValues()) { value = ((LongWritable)val).get(); counterRecords.put(key, value); } + hasNext = reader.next(); } // BinInterSedes only takes String for map key Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Fri Feb 24 08:19:42 2017 @@ -19,6 +19,8 @@ package org.apache.pig.backend.hadoop.executionengine.tez.plan.operator; import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -101,9 +103,13 @@ public class POFRJoinTez extends POFRJoi LogicalInput input = inputs.get(key); if (!this.replInputs.contains(input)) { this.replInputs.add(input); - this.replReaders.add((KeyValueReader) input.getReader()); + KeyValueReader reader = (KeyValueReader) input.getReader(); + this.replReaders.add(reader); + log.info("Attached input from vertex " + key + " : input=" + input + ", reader=" + reader); } } + // Do not force fetch input by reading first record. Cases like MultiQuery_Union_4 have + // multiple POFRJoinTez loading same replicate input and will skip records } catch (Exception e) { throw new ExecException(e); } @@ -114,6 +120,7 @@ public class POFRJoinTez extends POFRJoi * * @throws ExecException */ + @SuppressWarnings("unchecked") @Override protected void setUpHashMap() throws ExecException { @@ -121,8 +128,8 @@ public class POFRJoinTez extends POFRJoi // where same POFRJoinTez occurs in different Split sub-plans Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); if (cacheValue != null) { - replicates = (TupleToMapKey[]) cacheValue; - log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey); + replicates = (List<Map<? extends Object, ? extends List<Tuple>>>) cacheValue; + log.info("Found " + (replicates.size() - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey); return; } @@ -148,7 +155,7 @@ public class POFRJoinTez extends POFRJoi long time1 = System.currentTimeMillis(); - replicates[fragment] = null; + replicates.set(fragment, null); int inputIdx = 0; // We need to adjust the index because the number of replInputs is // one less than the number of inputSchemas. The inputSchemas @@ -158,7 +165,12 @@ public class POFRJoinTez extends POFRJoi SchemaTupleFactory inputSchemaTupleFactory = inputSchemaTupleFactories[schemaIdx]; SchemaTupleFactory keySchemaTupleFactory = keySchemaTupleFactories[schemaIdx]; - TupleToMapKey replicate = new TupleToMapKey(4000, keySchemaTupleFactory); + Map<Object, ArrayList<Tuple>> replicate; + if (keySchemaTupleFactory == null) { + replicate = new HashMap<Object, ArrayList<Tuple>>(4000); + } else { + replicate = new TupleToMapKey(4000, keySchemaTupleFactory); + } POLocalRearrange lr = LRs[schemaIdx]; try { @@ -168,7 +180,8 @@ public class POFRJoinTez extends POFRJoi } PigNullableWritable key = (PigNullableWritable) replReaders.get(inputIdx).getCurrentKey(); - if (isKeyNull(key.getValueAsPigType())) continue; + Object keyValue = key.getValueAsPigType(); + if (isKeyNull(keyValue)) continue; NullableTuple val = (NullableTuple) replReaders.get(inputIdx).getCurrentValue(); // POFRJoin#getValueTuple() is reused to construct valTuple, @@ -176,27 +189,31 @@ public class POFRJoinTez extends POFRJoi // construct one here. Tuple retTuple = mTupleFactory.newTuple(3); retTuple.set(0, key.getIndex()); - retTuple.set(1, key.getValueAsPigType()); + retTuple.set(1, keyValue); retTuple.set(2, val.getValueAsPigType()); Tuple valTuple = getValueTuple(lr, retTuple); - Tuple keyTuple = mTupleFactory.newTuple(1); - keyTuple.set(0, key.getValueAsPigType()); - if (replicate.get(keyTuple) == null) { - replicate.put(keyTuple, new TuplesToSchemaTupleList(1, inputSchemaTupleFactory)); + ArrayList<Tuple> values = replicate.get(keyValue); + if (values == null) { + if (inputSchemaTupleFactory == null) { + values = new ArrayList<Tuple>(1); + } else { + values = new TuplesToSchemaTupleList(1, inputSchemaTupleFactory); + } + replicate.put(keyValue, values); } - replicate.get(keyTuple).add(valTuple); + values.add(valTuple); } } catch (IOException e) { throw new ExecException(e); } - replicates[schemaIdx] = replicate; + replicates.set(schemaIdx, replicate); inputIdx++; schemaIdx++; } long time2 = System.currentTimeMillis(); - log.info((replicates.length - 1) + " replication hash tables built. Time taken: " + (time2 - time1)); + log.info((replicates.size() - 1) + " replication hash tables built. Time taken: " + (time2 - time1)); ObjectCache.getInstance().cache(cacheKey, replicates); log.info("Cached replicate hash tables in Tez ObjectRegistry with vertex scope. cachekey=" + cacheKey); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Fri Feb 24 08:19:42 2017 @@ -57,6 +57,7 @@ public class POIdentityInOutTez extends private transient KeyValuesReader shuffleReader; private transient boolean shuffleInput; private transient boolean finished = false; + private transient boolean hasNext = false; public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) { super(inputRearrange); @@ -95,9 +96,12 @@ public class POIdentityInOutTez extends Reader r = input.getReader(); if (r instanceof KeyValueReader) { reader = (KeyValueReader) r; + // Force input fetch + hasNext = reader.next(); } else { shuffleInput = true; shuffleReader = (KeyValuesReader) r; + hasNext = shuffleReader.next(); } LOG.info("Attached input from vertex " + inputKey + " : input=" + input + ", reader=" + r); } catch (Exception e) { @@ -127,7 +131,7 @@ public class POIdentityInOutTez extends return RESULT_EOP; } if (shuffleInput) { - while (shuffleReader.next()) { + while (hasNext) { Object curKey = shuffleReader.getCurrentKey(); Iterable<Object> vals = shuffleReader.getCurrentValues(); if (isSkewedJoin) { @@ -139,9 +143,10 @@ public class POIdentityInOutTez extends for (Object val : vals) { writer.write(curKey, val); } + hasNext = shuffleReader.next(); } } else { - while (reader.next()) { + while (hasNext) { if (isSkewedJoin) { NullablePartitionWritable wrappedKey = new NullablePartitionWritable( (PigNullableWritable) reader.getCurrentKey()); @@ -155,6 +160,7 @@ public class POIdentityInOutTez extends writer.write(reader.getCurrentKey(), reader.getCurrentValue()); } + hasNext = reader.next(); } } finished = true; Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Fri Feb 24 08:19:42 2017 @@ -71,8 +71,8 @@ public class POLocalRearrangeTez extends } } - public String getOutputKey() { - return outputKey; + public boolean containsOutputKey(String key) { + return outputKey.equals(key); } public void setOutputKey(String outputKey) { @@ -122,6 +122,10 @@ public class POLocalRearrangeTez extends } } + protected Result getRearrangedTuple() throws ExecException { + return super.getNextTuple(); + } + @Override public Result getNextTuple() throws ExecException { res = super.getNextTuple(); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java Fri Feb 24 08:19:42 2017 @@ -51,6 +51,7 @@ public class PORankTez extends PORank im private transient Map<Integer, Long> counterOffsets; private transient Configuration conf; private transient boolean finished = false; + private transient Boolean hasFirstRecord; public PORankTez(PORank copy) { super(copy); @@ -100,6 +101,7 @@ public class PORankTez extends PORank im try { reader = (KeyValueReader) input.getReader(); LOG.info("Attached input from vertex " + tuplesInputKey + " : input=" + input + ", reader=" + reader); + hasFirstRecord = reader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -140,9 +142,18 @@ public class PORankTez extends PORank im Result inp = null; try { - while (reader.next()) { - inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); - return addRank(inp); + if (hasFirstRecord != null) { + if (hasFirstRecord) { + hasFirstRecord = null; + inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); + return addRank(inp); + } + hasFirstRecord = null; + } else { + while (reader.next()) { + inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue()); + return addRank(inp); + } } } catch (IOException e) { throw new ExecException(e); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java Fri Feb 24 08:19:42 2017 @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.WritableComparator; import org.apache.pig.backend.executionengine.ExecException; @@ -32,12 +34,16 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.AccumulativeTupleBuffer; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager; import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil; import org.apache.pig.data.AccumulativeBag; import org.apache.pig.data.DataBag; import org.apache.pig.data.InternalCachedBag; +import org.apache.pig.data.ReadOnceBag; import org.apache.pig.data.Tuple; import org.apache.pig.impl.io.NullableTuple; import org.apache.pig.impl.io.PigNullableWritable; @@ -48,6 +54,7 @@ import org.apache.tez.runtime.library.co public class POShuffleTezLoad extends POPackage implements TezInput { private static final long serialVersionUID = 1L; + private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class); protected List<String> inputKeys = new ArrayList<String>(); private boolean isSkewedJoin = false; @@ -61,6 +68,7 @@ public class POShuffleTezLoad extends PO private transient WritableComparator groupingComparator = null; private transient Configuration conf; private transient int accumulativeBatchSize; + private transient boolean readOnceOneBag; public POShuffleTezLoad(POPackage pack) { super(pack); @@ -101,7 +109,10 @@ public class POShuffleTezLoad extends PO // - Input key will be repeated, but index would be same within a TezInput if (!this.inputs.contains(input)) { this.inputs.add(input); - this.readers.add((KeyValuesReader)input.getReader()); + KeyValuesReader reader = (KeyValuesReader)input.getReader(); + this.readers.add(reader); + LOG.info("Attached input from vertex " + inputKey + + " : input=" + input + ", reader=" + reader); } } @@ -117,6 +128,13 @@ public class POShuffleTezLoad extends PO for (int i = 0; i < numTezInputs; i++) { finished[i] = !readers.get(i).next(); } + + this.readOnceOneBag = (numInputs == 1) + && (pkgr instanceof CombinerPackager + || pkgr instanceof LitePackager || pkgr instanceof BloomPackager); + if (readOnceOneBag) { + readOnce[0] = true; + } } catch (Exception e) { throw new ExecException(e); } @@ -187,43 +205,47 @@ public class POShuffleTezLoad extends PO } else { - for (int i = 0; i < numInputs; i++) { - bags[i] = new InternalCachedBag(numInputs); - } - - if (numTezInputs == 1) { - do { - Iterable<Object> vals = readers.get(0).getCurrentValues(); - for (Object val : vals) { - NullableTuple nTup = (NullableTuple) val; - int index = nTup.getIndex(); - Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); - bags[index].add(tup); - } - finished[0] = !readers.get(0).next(); - if (finished[0]) { - break; - } - cur = readers.get(0).getCurrentKey(); - } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators + if (readOnceOneBag) { + bags[0] = new TezReadOnceBag(pkgr, min); } else { - for (int i = 0; i < numTezInputs; i++) { - if (!finished[i]) { - cur = readers.get(i).getCurrentKey(); - // We need to loop in case of Grouping Comparators - while (groupingComparator.compare(min, cur) == 0) { - Iterable<Object> vals = readers.get(i).getCurrentValues(); - for (Object val : vals) { - NullableTuple nTup = (NullableTuple) val; - int index = nTup.getIndex(); - Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); - bags[index].add(tup); - } - finished[i] = !readers.get(i).next(); - if (finished[i]) { - break; - } + for (int i = 0; i < numInputs; i++) { + bags[i] = new InternalCachedBag(numInputs); + } + + if (numTezInputs == 1) { + do { + Iterable<Object> vals = readers.get(0).getCurrentValues(); + for (Object val : vals) { + NullableTuple nTup = (NullableTuple) val; + int index = nTup.getIndex(); + Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); + bags[index].add(tup); + } + finished[0] = !readers.get(0).next(); + if (finished[0]) { + break; + } + cur = readers.get(0).getCurrentKey(); + } while (groupingComparator.compare(min, cur) == 0); // We need to loop in case of Grouping Comparators + } else { + for (int i = 0; i < numTezInputs; i++) { + if (!finished[i]) { cur = readers.get(i).getCurrentKey(); + // We need to loop in case of Grouping Comparators + while (groupingComparator.compare(min, cur) == 0) { + Iterable<Object> vals = readers.get(i).getCurrentValues(); + for (Object val : vals) { + NullableTuple nTup = (NullableTuple) val; + int index = nTup.getIndex(); + Tuple tup = pkgr.getValueTuple(keyWritable, nTup, index); + bags[index].add(tup); + } + finished[i] = !readers.get(i).next(); + if (finished[i]) { + break; + } + cur = readers.get(i).getCurrentKey(); + } } } } @@ -383,4 +405,74 @@ public class POShuffleTezLoad extends PO } + private class TezReadOnceBag extends ReadOnceBag { + + private static final long serialVersionUID = 1L; + private Iterator<Object> iter; + + public TezReadOnceBag(Packager pkgr, + PigNullableWritable currentKey) throws IOException { + this.pkgr = pkgr; + this.keyWritable = currentKey; + this.iter = readers.get(0).getCurrentValues().iterator(); + } + + @Override + public Iterator<Tuple> iterator() { + return new TezReadOnceBagIterator(); + } + + private class TezReadOnceBagIterator implements Iterator<Tuple> { + + @Override + public boolean hasNext() { + if (iter.hasNext()) { + return true; + } else { + try { + finished[0] = !readers.get(0).next(); + if (finished[0]) { + return false; + } + // Currently combiner is not being applied when secondary key(grouping comparator) is used + // But might change in future. So check if the next key is same and return its values + Object cur = readers.get(0).getCurrentKey(); + if (groupingComparator.compare(keyWritable, cur) == 0) { + iter = readers.get(0).getCurrentValues().iterator(); + // Key should at least have one value. But doing a check just for safety + if (iter.hasNext()) { + return true; + } else { + throw new RuntimeException("Unexpected. Key " + keyWritable + " does not have any values"); + } + } + return false; + } catch (IOException e) { + throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e); + } + } + } + + @Override + public Tuple next() { + NullableTuple ntup = (NullableTuple) iter.next(); + int index = ntup.getIndex(); + Tuple ret = null; + try { + ret = pkgr.getValueTuple(keyWritable, ntup, index); + } catch (ExecException e) { + throw new RuntimeException("ReadOnceBag failed to get value tuple : ", e); + } + return ret; + } + + @Override + public void remove() { + throw new RuntimeException("ReadOnceBag.iterator().remove() is not allowed"); + } + } + + } + + } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java Fri Feb 24 08:19:42 2017 @@ -57,6 +57,7 @@ public class POShuffledValueInputTez ext private transient Iterator<KeyValueReader> readers; private transient KeyValueReader currentReader; private transient Configuration conf; + private transient Boolean hasFirstRecord; public POShuffledValueInputTez(OperatorKey k) { super(k); @@ -98,6 +99,8 @@ public class POShuffledValueInputTez ext } readers = readersList.iterator(); currentReader = readers.next(); + // Force input fetch + hasFirstRecord = currentReader.next(); } catch (Exception e) { throw new ExecException(e); } @@ -111,7 +114,15 @@ public class POShuffledValueInputTez ext } do { - if (currentReader.next()) { + if (hasFirstRecord != null) { + if (hasFirstRecord) { + hasFirstRecord = null; + Tuple origTuple = (Tuple) currentReader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + return new Result(POStatus.STATUS_OK, copy); + } + hasFirstRecord = null; + } else if (currentReader.next()) { Tuple origTuple = (Tuple) currentReader.getCurrentValue(); Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); return new Result(POStatus.STATUS_OK, copy); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java Fri Feb 24 08:19:42 2017 @@ -60,6 +60,8 @@ public class POSimpleTezLoad extends POL private transient Configuration conf; private transient boolean finished = false; private transient TezCounter inputRecordCounter; + private transient boolean initialized; + private transient boolean noTupleCopy; public POSimpleTezLoad(OperatorKey k, LoadFunc loader) { super(k, loader); @@ -149,7 +151,13 @@ public class POSimpleTezLoad extends POL } else { Result res = new Result(); Tuple next = (Tuple) reader.getCurrentValue(); - res.result = next; + if (!initialized) { + noTupleCopy = mTupleFactory.newTuple(1).getClass().isInstance(next); + initialized = true; + } + // Some Loaders return implementations of DefaultTuple instead of BinSedesTuple + // In that case copy to BinSedesTuple + res.result = noTupleCopy ? next : mTupleFactory.newTupleNoCopy(next.getAll()); res.returnStatus = POStatus.STATUS_OK; if (inputRecordCounter != null) { inputRecordCounter.increment(1); Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Fri Feb 24 08:19:42 2017 @@ -102,19 +102,19 @@ public class POStoreTez extends POStore throw new ExecException(e); } - // Multiple outputs - can be another store or other outputs (shuffle, broadcast) - if (outputs.size() > 1) { - CounterGroup multiStoreGroup = processorContext.getCounters() - .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); - if (multiStoreGroup == null) { - processorContext.getCounters().addGroup( - MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, - MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); - } - String name = MRPigStatsUtil.getMultiStoreCounterName(this); - if (name != null) { - outputRecordCounter = multiStoreGroup.addCounter(name, name, 0); - } + // Even if there is a single hdfs output, we add multi store counter + // Makes it easier for user to see records for a particular store from + // the DAG counter + CounterGroup multiStoreGroup = processorContext.getCounters() + .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); + if (multiStoreGroup == null) { + processorContext.getCounters().addGroup( + MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP, + MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP); + } + String name = MRPigStatsUtil.getMultiStoreCounterName(this); + if (name != null) { + outputRecordCounter = multiStoreGroup.addCounter(name, name, 0); } } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java Fri Feb 24 08:19:42 2017 @@ -57,6 +57,7 @@ public class POValueInputTez extends Phy private transient KeyValuesReader shuffleReader; private transient boolean shuffleInput; private transient boolean hasNext; + private transient Boolean hasFirstRecord; public POValueInputTez(OperatorKey k) { super(k); @@ -92,6 +93,8 @@ public class POValueInputTez extends Phy Reader r = input.getReader(); if (r instanceof KeyValueReader) { reader = (KeyValueReader) r; + // Force input fetch + hasFirstRecord = reader.next(); } else { shuffleInput = true; shuffleReader = (KeyValuesReader) r; @@ -118,10 +121,22 @@ public class POValueInputTez extends Phy } hasNext = shuffleReader.next(); } - } else if (reader.next()) { - Tuple origTuple = (Tuple) reader.getCurrentValue(); - Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); - return new Result(POStatus.STATUS_OK, copy); + } else { + if (hasFirstRecord != null) { + if (hasFirstRecord) { + hasFirstRecord = null; + Tuple origTuple = (Tuple) reader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + return new Result(POStatus.STATUS_OK, copy); + } + hasFirstRecord = null; + } else { + while (reader.next()) { + Tuple origTuple = (Tuple) reader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + return new Result(POStatus.STATUS_OK, copy); + } + } } finished = true; // For certain operators (such as STREAM), we could still have some work Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Fri Feb 24 08:19:42 2017 @@ -69,6 +69,11 @@ public class CombinerOptimizer extends T } for (TezOperator from : predecessors) { + PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan; + if (!combinePlan.isEmpty()) { + // Cases like bloom join have combine plan already set + continue; + } List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class); if (rearranges.isEmpty()) { continue; @@ -77,7 +82,7 @@ public class CombinerOptimizer extends T POLocalRearrangeTez connectingLR = null; PhysicalPlan rearrangePlan = from.plan; for (POLocalRearrangeTez lr : rearranges) { - if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { + if (lr.containsOutputKey(to.getOperatorKey().toString())) { connectingLR = lr; break; } @@ -90,7 +95,6 @@ public class CombinerOptimizer extends T // Detected the POLocalRearrange -> POPackage pattern. Let's add // combiner if possible. - PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan; CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg); if(!combinePlan.isEmpty()) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/LoaderProcessor.java Fri Feb 24 08:19:42 2017 @@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.pig.LoadFunc; import org.apache.pig.PigConfiguration; import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; @@ -65,11 +66,6 @@ public class LoaderProcessor extends Tez this.jobConf.setBoolean("mapred.mapper.new-api", true); this.jobConf.setClass("mapreduce.inputformat.class", PigInputFormat.class, InputFormat.class); - try { - this.jobConf.set("pig.pigContext", ObjectSerializer.serialize(pc)); - } catch (IOException e) { - throw new VisitorException(e); - } } /** @@ -175,6 +171,7 @@ public class LoaderProcessor extends Tez // splits can be moved to if(loads) block below int parallelism = tezOp.getLoaderInfo().getInputSplitInfo().getNumTasks(); tezOp.setRequestedParallelism(parallelism); + tezOp.setTotalInputFilesSize(InputSizeReducerEstimator.getTotalInputFileSize(conf, lds, job)); } return lds; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Feb 24 08:19:42 2017 @@ -153,6 +153,8 @@ public class MultiQueryOptimizerTez exte } } if (getPlan().getSuccessors(successor) != null) { + nonPackageInputSuccessors.clear(); + toMergeSuccessors.clear(); for (TezOperator succSuccessor : getPlan().getSuccessors(successor)) { if (succSuccessor.isUnion()) { if (!(unionOptimizerOn && @@ -171,7 +173,13 @@ public class MultiQueryOptimizerTez exte continue; } } - toMergeSuccessors.add(succSuccessor); + if (TezCompilerUtil.isNonPackageInput(successor.getOperatorKey().toString(), succSuccessor)) { + // Output goes to scalar or POFRJoinTez in the union operator + // We need to ensure it is the only one to avoid parallel edges + canMerge = canMerge ? nonPackageInputSuccessors.add(succSuccessor) : false; + } else { + toMergeSuccessors.add(succSuccessor); + } List<TezOperator> unionSuccessors = getPlan().getSuccessors(succSuccessor); if (unionSuccessors != null) { for (TezOperator unionSuccessor : unionSuccessors) { Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Feb 24 08:19:42 2017 @@ -115,11 +115,16 @@ public class ParallelismSetter extends T } else if (pc.defaultParallel != -1) { parallelism = pc.defaultParallel; } + if (parallelism == 0) { + // We need to produce empty output file. + // Even if user set PARALLEL 0, mapreduce has 1 reducer + parallelism = 1; + } boolean overrideRequestedParallelism = false; if (parallelism != -1 && autoParallelismEnabled - && tezOp.isIntermediateReducer() && !tezOp.isDontEstimateParallelism() + && tezOp.isIntermediateReducer() && tezOp.isOverrideIntermediateParallelism()) { overrideRequestedParallelism = true; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/SecondaryKeyOptimizerTez.java Fri Feb 24 08:19:42 2017 @@ -75,7 +75,7 @@ public class SecondaryKeyOptimizerTez ex POLocalRearrangeTez connectingLR = null; PhysicalPlan rearrangePlan = from.plan; for (POLocalRearrangeTez lr : rearranges) { - if (lr.getOutputKey().equals(to.getOperatorKey().toString())) { + if (lr.containsOutputKey(to.getOperatorKey().toString())) { connectingLR = lr; break; } Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java?rev=1784237&r1=1784236&r2=1784237&view=diff ============================================================================== --- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java (original) +++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezEstimatedParallelismClearer.java Fri Feb 24 08:19:42 2017 @@ -30,6 +30,8 @@ public class TezEstimatedParallelismClea @Override public void visitTezOp(TezOperator tezOp) throws VisitorException { - tezOp.setEstimatedParallelism(-1); + if (!tezOp.isDontEstimateParallelism()) { + tezOp.setEstimatedParallelism(-1); + } } }