Author: rohini Date: Tue Oct 3 17:31:51 2017 New Revision: 1811015 URL: http://svn.apache.org/viewvc?rev=1811015&view=rev Log: PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin (satishsaley via rohini)
Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1811015&r1=1811014&r2=1811015&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue Oct 3 17:31:51 2017 @@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley IMPROVEMENTS +PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin (satishsaley via rohini) + PIG-5306: REGEX_EXTRACT() logs every line that doesn't match (satishsaley via rohini) PIG-5298: Verify if org.mortbay.jetty is removable (nkollar via szita) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1811015&r1=1811014&r2=1811015&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Tue Oct 3 17:31:51 2017 @@ -21,6 +21,7 @@ package org.apache.pig.backend.hadoop.ex import java.io.IOException; import java.io.ObjectInputStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.PriorityQueue; @@ -41,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; import org.apache.pig.data.DataBag; @@ -53,6 +55,7 @@ import org.apache.pig.impl.PigImplConsta import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.PlanException; import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.Pair; @@ -78,8 +81,6 @@ public class POMergeCogroup extends Phys // relation is also included in the count. private transient int relationCnt; - private transient TupleFactory mTupleFactory; - private String indexFileName; private FuncSpec idxFuncSpec; @@ -113,7 +114,19 @@ public class POMergeCogroup extends Phys for(int i=0; i < lrs.length; i++) LRs[i].setStripKeyFromValue(false); } - + + public POMergeCogroup(POMergeCogroup copy) { + super(copy); + this.sidFuncSpecs = copy.sidFuncSpecs; + this.sideFileSpecs = copy.sideFileSpecs; + this.LRs = copy.LRs; + this.indexFileName = copy.indexFileName; + this.idxFuncSpec = copy.idxFuncSpec; + this.loaderSignatures = copy.loaderSignatures; + this.endOfRecordMark = copy.endOfRecordMark; + this.counter = copy.counter; + } + // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL for Tez. // This is because: // For MR, we send EOP at the end of every record @@ -413,7 +426,7 @@ public class POMergeCogroup extends Phys } } - private List<Pair<Integer,Tuple>> readIndex() throws ExecException{ + protected List<Pair<Integer, Tuple>> readIndex() throws ExecException { // Assertions on index we are about to read: // We are reading index from a file through POLoad which will return tuples. @@ -438,20 +451,28 @@ public class POMergeCogroup extends Phys List<Pair<Integer,Tuple>> index = new ArrayList<Pair<Integer,Tuple>>(); for(Result res = ld.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = ld.getNextTuple()){ - - Tuple idxTuple = (Tuple)res.result; - int colCnt = idxTuple.size()-2; - Tuple keyTuple = mTupleFactory.newTuple(colCnt); - - for (int i=0; i< colCnt; i++) - keyTuple.set(i, idxTuple.get(i)); - - index.add(new Pair<Integer, Tuple>((Integer)idxTuple.get(colCnt+1), keyTuple)); + addTupleToIndex((Tuple) res.result, index); } return index; } + /** + * Separates out key tuple from given tuple and adds it to index. + * + * @param tuple + * @param index + * @throws ExecException + */ + protected void addTupleToIndex(Tuple tuple, List<Pair<Integer, Tuple>> index) throws ExecException { + int colCnt = tuple.size() - 2; + Tuple keyTuple = mTupleFactory.newTuple(colCnt); + for (int i = 0; i < colCnt; i++) { + keyTuple.set(i, tuple.get(i)); + } + index.add(new Pair<Integer, Tuple>((Integer) tuple.get(colCnt + 1), keyTuple)); + } + @SuppressWarnings("unchecked") private Comparable<Object> getFirstKeyOfNextSplit(final int curSplitIdx, final List<Pair<Integer,Tuple>> index) throws IOException{ @@ -548,7 +569,6 @@ public class POMergeCogroup extends Phys ClassNotFoundException, ExecException { is.defaultReadObject(); - mTupleFactory = TupleFactory.getInstance(); this.heap = new PriorityQueue<Tuple>(11, new Comparator<Tuple>() { @SuppressWarnings("unchecked") @@ -622,4 +642,21 @@ public class POMergeCogroup extends Phys public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) { return null; } + + @Override + public POMergeCogroup clone() throws CloneNotSupportedException { + POMergeCogroup clone = (POMergeCogroup) super.clone(); + clone.sidFuncSpecs = new ArrayList<FuncSpec>(); + for (FuncSpec f : this.sidFuncSpecs) { + clone.sidFuncSpecs.add(f.clone()); + } + clone.sideFileSpecs = new ArrayList<String>(this.sideFileSpecs); + clone.LRs = new POLocalRearrange[this.LRs.length]; + for (int i = 0; i < this.LRs.length; i++) { + clone.LRs[i] = this.LRs[i].clone(); + } + clone.idxFuncSpec = this.idxFuncSpec.clone(); + clone.loaderSignatures = new ArrayList<String>(this.loaderSignatures); + return clone; + } } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java?rev=1811015&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java Tue Oct 3 17:31:51 2017 @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; + +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.util.Pair; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValueReader; + +public class POMergeCogroupTez extends POMergeCogroup implements TezInput { + + private static final Log LOG = LogFactory.getLog(POMergeJoinTez.class); + private static final long serialVersionUID = 1L; + private String inputKey; + private transient String cacheKey; + private transient KeyValueReader reader; + private transient List<Pair<Integer, Tuple>> index; + + public POMergeCogroupTez(OperatorKey k, List<PhysicalOperator> inpPOs, POLocalRearrange[] lrs, int parallel) { + super(k, inpPOs, lrs, parallel); + } + + public POMergeCogroupTez(POMergeCogroup copy) { + super(copy); + } + + public void setInputKey(String inputKey) { + this.inputKey = inputKey; + } + + @Override + public String[] getTezInputs() { + return new String[] { this.inputKey }; + } + + @Override + public void replaceInput(String oldInputKey, String newInputKey) { + this.inputKey = newInputKey; + } + + @Override + public void addInputsToSkip(Set<String> inputsToSkip) { + cacheKey = "mergecogrp-" + inputKey; + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { + inputsToSkip.add(inputKey); + } + } + + @SuppressWarnings("unchecked") + @Override + public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException { + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { + this.index = (LinkedList<Pair<Integer, Tuple>>) cacheValue; + return; + } + + LogicalInput input = inputs.get(inputKey); + if (input == null) { + throw new ExecException("Input from vertex " + inputKey + " is missing"); + } + + try { + input = inputs.get(this.inputKey); + reader = (KeyValueReader) input.getReader(); + LOG.info( + "Attached input from vertex " + this.inputKey + " : input=" + input + ", reader=" + reader); + index = new LinkedList<>(); + while (reader.next()) { + Tuple origTuple = (Tuple) reader.getCurrentValue(); + Tuple copyTuple = mTupleFactory.newTuple(origTuple.getAll()); + addTupleToIndex(copyTuple, index); + } + ObjectCache.getInstance().cache(cacheKey, this.index); + } + catch (Exception e) { + throw new ExecException(e); + } + } + + @Override + public String name() { + return super.name().replace("MergeCogroup", "MergeCogroupTez") + "\t<-\t " + this.inputKey; + } + + @Override + protected List<Pair<Integer, Tuple>> readIndex() { + return this.index; + } + + @Override + public POMergeCogroupTez clone() throws CloneNotSupportedException { + return (POMergeCogroupTez) super.clone(); + } +} \ No newline at end of file Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1811015&r1=1811014&r2=1811015&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Tue Oct 3 17:31:51 2017 @@ -43,7 +43,6 @@ import org.apache.pig.data.SchemaTupleBa import org.apache.pig.data.SchemaTupleClassGenerator.GenContext; import org.apache.pig.data.SchemaTupleFactory; import org.apache.pig.data.Tuple; -import org.apache.pig.data.TupleFactory; import org.apache.pig.data.TupleMaker; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.builtin.DefaultIndexableLoader; @@ -55,6 +54,7 @@ import org.apache.pig.impl.plan.PlanExce import org.apache.pig.impl.plan.VisitorException; import org.apache.pig.impl.util.MultiMap; import org.apache.pig.newplan.logical.relational.LOJoin; +import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE; /** This operator implements merge join algorithm to do map side joins. * Currently, only two-way joins are supported. One input of join is identified as left @@ -82,21 +82,21 @@ public class POMergeJoin extends Physica //The Local Rearrange operators modeling the join key private POLocalRearrange[] LRs; - private transient LoadFunc rightLoader; + protected transient LoadFunc rightLoader; private OperatorKey opKey; - private Object prevLeftKey; + private transient Object prevLeftKey; - private Result prevLeftInp; + private transient Result prevLeftInp; - private Object prevRightKey = null; + private transient Object prevRightKey = null; - private Result prevRightInp; + private transient Result prevRightInp; //boolean denoting whether we are generating joined tuples in this getNext() call or do we need to read in more data. - private boolean doingJoin; + private transient boolean doingJoin; - private FuncSpec rightLoaderFuncSpec; + protected FuncSpec rightLoaderFuncSpec; private String rightInputFileName; @@ -113,17 +113,17 @@ public class POMergeJoin extends Physica private boolean noInnerPlanOnRightSide; - private Object curJoinKey; + private transient Object curJoinKey; - private Tuple curJoiningRightTup; + private transient Tuple curJoiningRightTup; private int counter; // # of tuples on left side with same key. - private int leftTupSize = -1; + private transient int leftTupSize; - private int rightTupSize = -1; + private transient int rightTupSize; - private int arrayListSize = 1024; + private static int ARRAY_LIST_SIZE = 1024; private LOJoin.JOINTYPE joinType; @@ -147,10 +147,9 @@ public class POMergeJoin extends Physica // Only for spark. // it means that current operator reaches at its end and the last left input was // added into 'leftTuples', ready for join. - private boolean leftInputConsumedInSpark = false; + private transient boolean leftInputConsumedInSpark = false; // This serves as the default TupleFactory - private transient TupleFactory mTupleFactory; /** * These TupleFactories are used for more efficient Tuple generation. This should @@ -185,6 +184,25 @@ public class POMergeJoin extends Physica this.mergedInputSchema = mergedInputSchema; } + public POMergeJoin(POMergeJoin copy) { + super(copy); + this.firstTime = copy.firstTime; + this.LRs = copy.LRs; + this.rightLoaderFuncSpec = copy.rightLoaderFuncSpec; + this.rightInputFileName = copy.rightInputFileName; + this.indexFile = copy.indexFile; + this.inpPlans = copy.inpPlans; + this.rightPipelineLeaf = copy.rightPipelineLeaf; + this.rightPipelineRoot = copy.rightPipelineRoot; + this.noInnerPlanOnRightSide = copy.noInnerPlanOnRightSide; + this.counter = copy.counter; + this.joinType = copy.joinType; + this.signature = copy.signature; + this.endOfRecordMark = copy.endOfRecordMark; + this.leftInputSchema = copy.leftInputSchema; + this.mergedInputSchema = copy.mergedInputSchema; + } + /** * Configures the Local Rearrange operators to get keys out of tuple. * @throws ExecException @@ -211,8 +229,6 @@ public class POMergeJoin extends Physica * This is a helper method that sets up all of the TupleFactory members. */ private void prepareTupleFactories() { - mTupleFactory = TupleFactory.getInstance(); - if (leftInputSchema != null) { leftTupleMaker = SchemaTupleBackend.newSchemaTupleFactory(leftInputSchema, false, GenContext.MERGE_JOIN); } @@ -241,7 +257,7 @@ public class POMergeJoin extends Physica * @return the list object to store Tuples in */ private TuplesToSchemaTupleList newLeftTupleArray() { - return new TuplesToSchemaTupleList(arrayListSize, leftTupleMaker); + return new TuplesToSchemaTupleList(ARRAY_LIST_SIZE, leftTupleMaker); } /** @@ -546,14 +562,8 @@ public class POMergeJoin extends Physica } } - private void seekInRightStream(Object firstLeftKey) throws IOException{ - rightLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec); - - // check if hadoop distributed cache is used - if (indexFile != null && rightLoader instanceof DefaultIndexableLoader) { - DefaultIndexableLoader loader = (DefaultIndexableLoader)rightLoader; - loader.setIndexFile(indexFile); - } + private void seekInRightStream(Object firstLeftKey) throws IOException { + rightLoader = getRightLoader(); // Pass signature of the loader to rightLoader // make a copy of the conf to use in calls to rightLoader. @@ -565,6 +575,23 @@ public class POMergeJoin extends Physica firstLeftKey instanceof Tuple ? (Tuple)firstLeftKey : mTupleFactory.newTuple(firstLeftKey)); } + /** + * Instantiate right loader + * + * @return + * @throws IOException + * @throws ExecException + */ + protected LoadFunc getRightLoader() throws ExecException, IOException { + LoadFunc loader = (LoadFunc) PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec); + // check if hadoop distributed cache is used + if (indexFile != null && loader instanceof DefaultIndexableLoader) { + DefaultIndexableLoader defLoader = (DefaultIndexableLoader) loader; + defLoader.setIndexFile(indexFile); + } + return loader; + } + private Result getNextRightInp(Object leftKey) throws ExecException{ /* @@ -668,7 +695,6 @@ public class POMergeJoin extends Physica private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException, ExecException{ is.defaultReadObject(); - mTupleFactory = TupleFactory.getInstance(); } @@ -746,4 +772,26 @@ public class POMergeJoin extends Physica public POLocalRearrange[] getLRs() { return LRs; } + + @Override + public POMergeJoin clone() throws CloneNotSupportedException { + POMergeJoin clone = (POMergeJoin) super.clone(); + clone.LRs = new POLocalRearrange[this.LRs.length]; + for (int i = 0; i < this.LRs.length; i++) { + clone.LRs[i] = this.LRs[i].clone(); + } + clone.rightLoaderFuncSpec = this.rightLoaderFuncSpec.clone(); + clone.inpPlans = new MultiMap<PhysicalOperator, PhysicalPlan>(); + for (PhysicalOperator op : this.inpPlans.keySet()) { + PhysicalOperator cloneOp = op.clone(); + for (PhysicalPlan phyPlan : this.inpPlans.get(op)) { + clone.inpPlans.put(cloneOp, phyPlan.clone()); + } + } + clone.rightPipelineLeaf = this.rightPipelineLeaf.clone(); + clone.rightPipelineRoot = this.rightPipelineRoot.clone(); + clone.leftInputSchema = this.leftInputSchema.clone(); + clone.mergedInputSchema = this.mergedInputSchema.clone(); + return clone; + } } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java?rev=1811015&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java Tue Oct 3 17:31:51 2017 @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators; + +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.LoadFunc; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.PigContext; +import org.apache.pig.impl.builtin.TezIndexableLoader; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.library.api.KeyValueReader; + +public class POMergeJoinTez extends POMergeJoin implements TezInput { + + private static final Log LOG = LogFactory.getLog(POMergeJoinTez.class); + private static final long serialVersionUID = 1L; + private String inputKey; + private transient String cacheKey; + private transient KeyValueReader reader; + private LinkedList<Tuple> index; + + public POMergeJoinTez(POMergeJoin joinOp) { + super(joinOp); + } + + public void setInputKey(String inputKey) { + this.inputKey = inputKey; + } + + @Override + public String[] getTezInputs() { + return new String[] { this.inputKey }; + } + + @Override + public void replaceInput(String oldInputKey, String newInputKey) { + this.inputKey = newInputKey; + } + + @Override + public void addInputsToSkip(Set<String> inputsToSkip) { + cacheKey = "merge-" + inputKey; + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { + inputsToSkip.add(inputKey); + } + } + + @SuppressWarnings("unchecked") + @Override + public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf) throws ExecException { + Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey); + if (cacheValue != null) { + this.index = (LinkedList<Tuple>) cacheValue; + rightLoader = getRightLoader(); + return; + } + + LogicalInput input = inputs.get(inputKey); + if (input == null) { + throw new ExecException("Input from vertex " + inputKey + " is missing"); + } + + try { + reader = (KeyValueReader) input.getReader(); + LOG.info( + "Attached input from vertex " + this.inputKey + " : input=" + input + ", reader=" + reader); + this.index = new LinkedList<Tuple>(); + while (reader.next()) { + Tuple origTuple = (Tuple) reader.getCurrentValue(); + Tuple copy = mTupleFactory.newTuple(origTuple.getAll()); + this.index.add(copy); + } + ObjectCache.getInstance().cache(cacheKey, this.index); + rightLoader = getRightLoader(); + } + catch (Exception e) { + throw new ExecException(e); + } + } + + @Override + public String name() { + return super.name().replace("MergeJoin", "MergeJoinTez") + "\t<-\t " + this.inputKey; + } + + @Override + protected LoadFunc getRightLoader() throws ExecException { + LoadFunc loader = (LoadFunc) PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec); + if (loader instanceof TezIndexableLoader) { + ((TezIndexableLoader) loader).setIndex(index); + } + return loader; + } + + @Override + public POMergeJoinTez clone() throws CloneNotSupportedException { + return (POMergeJoinTez) super.clone(); + } +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1811015&r1=1811014&r2=1811015&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Tue Oct 3 17:31:51 2017 @@ -72,7 +72,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroupTez; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoinTez; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample; @@ -112,9 +114,9 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; -import org.apache.pig.impl.builtin.DefaultIndexableLoader; import org.apache.pig.impl.builtin.GetMemNumRows; import org.apache.pig.impl.builtin.PartitionSkewedKeys; +import org.apache.pig.impl.builtin.TezIndexableLoader; import org.apache.pig.impl.io.FileLocalizer; import org.apache.pig.impl.io.FileSpec; import org.apache.pig.impl.io.NullableIntWritable; @@ -925,7 +927,9 @@ public class TezCompiler extends PhyPlan } @Override - public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws VisitorException { + public void visitMergeCoGroup(POMergeCogroup poCoGroup) throws VisitorException { + + POMergeCogroupTez poCoGrp = new POMergeCogroupTez(poCoGroup); if(compiledInputs.length < 2){ int errCode=2251; String errMsg = "Merge Cogroup work on two or more relations." + @@ -998,43 +1002,53 @@ public class TezCompiler extends PhyPlan // Create new map-reduce operator for indexing job and then configure it. TezOperator indexerTezOp = getTezOp(); - FileSpec idxFileSpec = getIndexingJob(indexerTezOp, baseMROp, poCoGrp.getLRInnerPlansOf(0)); - poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec()); - poCoGrp.setIndexFileName(idxFileSpec.getFileName()); + configureIndexerOp(indexerTezOp, baseMROp, poCoGrp); baseMROp.plan.addAsLeaf(poCoGrp); + baseMROp.markMergeCogroup(); for (FuncSpec funcSpec : funcSpecs) baseMROp.UDFs.add(funcSpec.toString()); - phyToTezOpMap.put(poCoGrp,baseMROp); + phyToTezOpMap.put(poCoGrp, baseMROp); // Going forward, new operators should be added in baseMRop. To make // sure, reset curMROp. curTezOp = baseMROp; } - catch (ExecException e){ - throw new TezCompilerException(e.getDetailedMessage(),e.getErrorCode(),e.getErrorSource(),e); + catch (ExecException e) { + throw new TezCompilerException(e.getDetailedMessage(), e.getErrorCode(), e.getErrorSource(), e); } - catch (TezCompilerException mrce){ - throw(mrce); + catch (TezCompilerException mrce) { + throw (mrce); } catch (CloneNotSupportedException e) { throw new TezCompilerException(e); } - catch(PlanException e){ + catch (PlanException e) { int errCode = 2034; String msg = "Error compiling operator " + poCoGrp.getClass().getCanonicalName(); throw new TezCompilerException(msg, errCode, PigException.BUG, e); } - catch (IOException e){ + catch (IOException e) { int errCode = 3000; String errMsg = "IOException caught while compiling POMergeCoGroup"; - throw new TezCompilerException(errMsg, errCode,e); + throw new TezCompilerException(errMsg, errCode, e); } } - // Sets up the indexing job for map-side cogroups. - private FileSpec getIndexingJob(TezOperator indexerTezOp, - final TezOperator baseTezOp, final List<PhysicalPlan> mapperLRInnerPlans) + /** + * Sets up the indexing vertex for map-side cogroups. + * + * @param indexerTezOp + * @param baseTezOp + * @param poCoGrp + * @throws TezCompilerException + * @throws PlanException + * @throws ExecException + * @throws IOException + * @throws CloneNotSupportedException + */ + private void configureIndexerOp(TezOperator indexerTezOp, + final TezOperator baseTezOp, final POMergeCogroupTez poCoGrp) throws TezCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException { // First replace loader with MergeJoinIndexer. @@ -1054,7 +1068,7 @@ public class TezCompiler extends PhyPlan String[] indexerArgs = new String[6]; indexerArgs[0] = funcSpec.toString(); - indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans); + indexerArgs[1] = ObjectSerializer.serialize((Serializable) poCoGrp.getLRInnerPlansOf(0)); indexerArgs[3] = baseLoader.getSignature(); indexerArgs[4] = baseLoader.getOperatorKey().scope; indexerArgs[5] = Boolean.toString(false); // we care for nulls. @@ -1076,6 +1090,7 @@ public class TezCompiler extends PhyPlan indexerArgs[2] = ObjectSerializer.serialize(phyPlan); POLoad idxJobLoader = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope))); + idxJobLoader.copyAliasFrom(baseLoader); idxJobLoader.setPc(pigContext); idxJobLoader.setIsTmpLoad(true); idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(), @@ -1092,19 +1107,19 @@ public class TezCompiler extends PhyPlan tezPlan.add(indexAggrOper); tezPlan.add(indexerTezOp); TezCompilerUtil.simpleConnectTwoVertex(tezPlan, indexerTezOp, indexAggrOper, scope, nig); - TezCompilerUtil.connect(tezPlan, indexAggrOper, baseTezOp); - indexAggrOper.segmentBelow = true; - indexerTezOp.setRequestedParallelism(1); // we need exactly one reducer for indexing job. indexerTezOp.setDontEstimateParallelism(true); - POStore st = TezCompilerUtil.getStore(scope, nig); - FileSpec strFile = getTempFileSpec(pigContext); - st.setSFile(strFile); - indexAggrOper.plan.addAsLeaf(st); + // Convert the index as a broadcast input + POValueOutputTez indexAggrOperOutput = new POValueOutputTez(OperatorKey.genOpKey(scope)); + indexAggrOper.plan.addAsLeaf(indexAggrOperOutput); + indexAggrOperOutput.addOutputKey(baseTezOp.getOperatorKey().toString()); + indexAggrOper.markIndexer(); indexAggrOper.setClosed(true); - - return strFile; + TezEdgeDescriptor edge = new TezEdgeDescriptor(); + TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); + TezCompilerUtil.connect(tezPlan, indexAggrOper, baseTezOp, edge); + poCoGrp.setInputKey(indexAggrOper.getOperatorKey().toString()); } /** Since merge-join works on two inputs there are exactly two TezOper predecessors identified as left and right. @@ -1118,27 +1133,23 @@ public class TezCompiler extends PhyPlan * in physical plan, that is yanked and set as inner plans of joinOp. * 2) LeftTezOper: add the Join operator in it. * - * We also need to segment the DAG into two, because POMergeJoin depends on the index file which loads with - * DefaultIndexableLoader. It is possible to convert the index as a broadcast input, but that is costly - * because a lot of logic is built into DefaultIndexableLoader. We can revisit it later. */ @Override public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException { - try{ - if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){ + if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2) { int errCode=1101; throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode); } curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0)); - TezOperator rightTezOpr = null; TezOperator rightTezOprAggr = null; - if(curTezOp.equals(compiledInputs[0])) + if (curTezOp.equals(compiledInputs[0])) { rightTezOpr = compiledInputs[1]; - else + } else { rightTezOpr = compiledInputs[0]; + } // We will first operate on right side which is indexer job. // First yank plan of the compiled right input and set that as an inner plan of right operator. @@ -1220,9 +1231,11 @@ public class TezCompiler extends PhyPlan } } } else { + joinOp = new POMergeJoinTez(joinOp); LoadFunc loadFunc = rightLoader.getLoadFunc(); //Replacing POLoad with indexer is disabled for 'merge-sparse' joins. While - //this feature would be useful, the current implementation of DefaultIndexableLoader + // this feature would be useful, the current implementation of + // DefaultIndexableLoader //is not designed to handle multiple calls to seekNear. Specifically, it rereads the entire index //for each call. Some refactoring of this class is required - and then the check below could be removed. if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) { @@ -1265,32 +1278,37 @@ public class TezCompiler extends PhyPlan rightTezOprAggr.setRequestedParallelism(1); // we need exactly one task for indexing job. rightTezOprAggr.setDontEstimateParallelism(true); - POStore st = TezCompilerUtil.getStore(scope, nig); - FileSpec strFile = getTempFileSpec(pigContext); - st.setSFile(strFile); - rightTezOprAggr.plan.addAsLeaf(st); - rightTezOprAggr.setClosed(true); - rightTezOprAggr.segmentBelow = true; - - // set up the DefaultIndexableLoader for the join operator - String[] defaultIndexableLoaderArgs = new String[5]; - defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); - defaultIndexableLoaderArgs[1] = strFile.getFileName(); - defaultIndexableLoaderArgs[2] = strFile.getFuncSpec().toString(); - defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope; - defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName(); - joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs))); + // Convert the index as a broadcast input + POValueOutputTez rightTezOprAggrOutput = new POValueOutputTez(OperatorKey.genOpKey(scope)); + rightTezOprAggr.plan.addAsLeaf(rightTezOprAggrOutput); + rightTezOprAggrOutput.addOutputKey(curTezOp.getOperatorKey().toString()); + + TezEdgeDescriptor edge = new TezEdgeDescriptor(); + TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST); + TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp, edge); + + ((POMergeJoinTez) joinOp).setInputKey(rightTezOprAggr.getOperatorKey().toString()); + // set up the TezIndexableLoader for the join operator + String[] tezIndexableLoaderArgs = new String[3]; + tezIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString(); + tezIndexableLoaderArgs[1] = joinOp.getOperatorKey().scope; + tezIndexableLoaderArgs[2] = origRightLoaderFileSpec.getFileName(); + joinOp.setRightLoaderFuncSpec( + (new FuncSpec(TezIndexableLoader.class.getName(), tezIndexableLoaderArgs))); joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName()); - - joinOp.setIndexFile(strFile.getFileName()); udfs.add(origRightLoaderFileSpec.getFuncSpec().toString()); } + if(joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) { + curTezOp.markMergeSparseJoin(); + } else { + curTezOp.markMergeJoin(); + } // We are done with right side. Lets work on left now. // Join will be materialized in leftTezOper. - if(!curTezOp.isClosed()) // Life is easy + if (!curTezOp.isClosed()) {// Life is easy curTezOp.plan.addAsLeaf(joinOp); - + } else{ int errCode = 2022; String msg = "Input plan has been closed. This is unexpected while compiling."; @@ -1298,14 +1316,14 @@ public class TezCompiler extends PhyPlan } if(rightTezOprAggr != null) { rightTezOprAggr.markIndexer(); - // We want to ensure indexing job runs prior to actual join job. So, connect them in order. - TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp); } + phyToTezOpMap.put(joinOp, curTezOp); // no combination of small splits as there is currently no way to guarantee the sortness // of the combined splits. curTezOp.noCombineSmallSplits(); curTezOp.UDFs.addAll(udfs); + } catch(PlanException e){ int errCode = 2034; @@ -2661,5 +2679,6 @@ public class TezCompiler extends PhyPlan private TezOperator getTezOp() { return new TezOperator(OperatorKey.genOpKey(scope)); } + } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1811015&r1=1811014&r2=1811015&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Tue Oct 3 17:31:51 2017 @@ -185,7 +185,13 @@ public class TezOperator extends Operato // Indicate if this job constructs bloom filter BUILDBLOOM, // Indicate if this job applies bloom filter - FILTERBLOOM; + FILTERBLOOM, + // Indicate if this job is a merge join job + MERGE_JOIN, + // Indicate if this job is a merge sparse join job + MERGE_SPARSE_JOIN, + // Indicate if this job is a merge cogroup job + MERGE_COGROUP; }; // Features in the job/vertex. Mostly will be only one feature. @@ -393,6 +399,14 @@ public class TezOperator extends Operato feature.set(OPER_FEATURE.COGROUP.ordinal()); } + public boolean isMergeCogroup() { + return feature.get(OPER_FEATURE.MERGE_COGROUP.ordinal()); + } + + public void markMergeCogroup() { + feature.set(OPER_FEATURE.MERGE_COGROUP.ordinal()); + } + public boolean isRegularJoin() { return feature.get(OPER_FEATURE.HASHJOIN.ordinal()); } @@ -473,6 +487,22 @@ public class TezOperator extends Operato feature.set(OPER_FEATURE.FILTERBLOOM.ordinal()); } + public boolean isMergeJoin() { + return feature.get(OPER_FEATURE.MERGE_JOIN.ordinal()); + } + + public void markMergeJoin() { + feature.set(OPER_FEATURE.MERGE_JOIN.ordinal()); + } + + public boolean isMergeSparseJoin() { + return feature.get(OPER_FEATURE.MERGE_SPARSE_JOIN.ordinal()); + } + + public void markMergeSparseJoin() { + feature.set(OPER_FEATURE.MERGE_SPARSE_JOIN.ordinal()); + } + public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) { for (OPER_FEATURE opf : OPER_FEATURE.values()) { if (excludeFeatures != null && excludeFeatures.contains(opf)) { Modified: pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1811015&r1=1811014&r2=1811015&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original) +++ pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Tue Oct 3 17:31:51 2017 @@ -71,8 +71,8 @@ public class DefaultIndexableLoader exte private LoadFunc loader; // Index is modeled as FIFO queue and LinkedList implements java Queue interface. - private LinkedList<Tuple> index; - private FuncSpec rightLoaderFuncSpec; + protected LinkedList<Tuple> index; + protected FuncSpec rightLoaderFuncSpec; private String scope; private Tuple dummyTuple = null; @@ -94,6 +94,9 @@ public class DefaultIndexableLoader exte this.inpLocation = inputLocation; } + public DefaultIndexableLoader() { + } + @SuppressWarnings("unchecked") @Override public void seekNear(Tuple keys) throws IOException{ @@ -113,16 +116,8 @@ public class DefaultIndexableLoader exte // there are multiple Join keys, the tuple itself represents // the join key Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys); - POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec))); - - Properties props = ConfigurationUtil.getLocalFSProperties(); - PigContext pc = new PigContext(ExecType.LOCAL, props); - ld.setPc(pc); - index = new LinkedList<Tuple>(); - for(Result res=ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple()) - index.offer((Tuple) res.result); - + loadIndex(); Tuple prevIdxEntry = null; Tuple matchedEntry; @@ -193,6 +188,22 @@ public class DefaultIndexableLoader exte initRightLoader(splitsAhead); } + /** + * Set indices as LinkedList from index file + * + * @throws ExecException + */ + protected void loadIndex() throws ExecException { + POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec))); + + Properties props = ConfigurationUtil.getLocalFSProperties(); + PigContext pc = new PigContext(ExecType.LOCAL, props); + ld.setPc(pc); + index = new LinkedList<Tuple>(); + for (Result res = ld.getNextTuple(); res.returnStatus != POStatus.STATUS_EOP; res = ld.getNextTuple()) + index.offer((Tuple) res.result); + } + private void initRightLoader(int [] splitsToBeRead) throws IOException{ Properties properties = (Properties) ObjectSerializer .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.client.sys.props")); Added: pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java?rev=1811015&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java (added) +++ pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java Tue Oct 3 17:31:51 2017 @@ -0,0 +1,34 @@ +package org.apache.pig.impl.builtin; + +import java.util.LinkedList; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.FuncSpec; +import org.apache.pig.backend.executionengine.ExecException; +import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput; +import org.apache.pig.data.Tuple; +import org.apache.tez.runtime.api.LogicalInput; + +public class TezIndexableLoader extends DefaultIndexableLoader { + + public TezIndexableLoader(String loaderFuncSpec, String scope, String inputLocation) { + super(loaderFuncSpec, null, null, scope, inputLocation); + } + + @Override + public void loadIndex() throws ExecException { + // no op + } + + /** + * Loads indices from provided LinkedList + * + * @param index + * @throws ExecException + */ + public void setIndex(LinkedList<Tuple> index) throws ExecException { + this.index = index; + } +} Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1811015&r1=1811014&r2=1811015&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java (original) +++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Tue Oct 3 17:31:51 2017 @@ -290,6 +290,15 @@ public class TezScriptState extends Scri if (tezOp.isLimit() || tezOp.isLimitAfterSort()) { feature.set(PIG_FEATURE.LIMIT.ordinal()); } + if (tezOp.isMergeJoin()) { + feature.set(PIG_FEATURE.MERGE_JOIN.ordinal()); + } + if (tezOp.isMergeSparseJoin()) { + feature.set(PIG_FEATURE.MERGE_SPARSE_JOIN.ordinal()); + } + if (tezOp.isMergeCogroup()) { + feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal()); + } try { new FeatureVisitor(tezOp.plan, feature).visit(); } catch (VisitorException e) { Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld?rev=1811015&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld (added) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld Tue Oct 3 17:31:51 2017 @@ -0,0 +1,43 @@ +#-------------------------------------------------- +# There are 1 DAGs in the session +#-------------------------------------------------- +#-------------------------------------------------- +# TEZ DAG plan: pig-0_scope-0 +#-------------------------------------------------- +Tez vertex scope-24 -> Tez vertex scope-32, +Tez vertex scope-32 -> Tez vertex scope-22, +Tez vertex scope-22 + +Tez vertex scope-24 +# Plan on vertex +Local Rearrange[tuple]{tuple}(false) - scope-34 -> scope-32 +| | +| Project[tuple][*] - scope-33 +| +|---a: Load(file:///tmp/input1:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh 3ilO1GLJNhT5TGq6OnpUsGZC3tJHizAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','eNq9fVlsHVeaXnERrd2SvKglWRJl0drMqrtwEWWNxqYo0qKbW5N0L1LacvFWXbLE2lxVl7yU4c6Mg06CyaATBBO400jcQZKHeWggQD8NkDxMFmBeMsiCDhAkCDAYzGvykgAzyCDo/P9/zqk6VXXqkrSNcbfpe+ue/fz/96/n1C/+p3YkjrSlINo0zNBsbdlG6GwaG2Zr2/YtY8u0giA07K7d6iRO4Nv+puNDka292GmZ7oK5Z0dG6Jp+bKzwZyvwTWP/9PVr/U+0U9DQcnvadef9sJMsaEeC0DPDRLu+AJ3WWKc16LTmeKFbg27c2mLHTZxFM7zfjbS3CkPDUtSlsRzakZkEUb7HowvacW8uCrxZa9OOP9F+pPVBn9637b040V5eeG7umLwTaH9Be8lbsM0dG347I/224MQJ/DjoLYfUxMCCNuStBkFC3wYXtNPeWtBOit2cpKfrgfzsqCd/74aw2tdUU8J+DTFxPp3+fmxh0IMnNAqqfRaHyYo/NuMt+O3IS//tX//b1z/+TwNa/5x23A1Ma85swcLMa8eSrciOtwLX6obvvkdtntw9Cn/P4Mcujelo6Yc+6GVk32WHFZX2+QOt37FgoeNWENqJdo4tJpTfrK0lkeNv3u+KWV1JeDHo5gdfh/Ai2zXxR9MVYwIyXJ4Lo lloLxvbySfaWSeeNSN3b92OPMc3E9t6oh3b9oNdf815YT/RTgBFrjmWvey7e/PaS34wn9hevKCdz9HuShS07DiGySTaK9IEHwaBa5s+EMxxh8ohQxChPNVedeL14KE9ByNNbN+2pqPI3Eu0/qdPgFyCkH5ctWM74eVP4BoDSbaB8hJt+WmRSfgC1dgC1QoLVMstUE3wpFie+9A+0AMQ2fpeiDTf//QhUHYMjXtmotWVHOkGm1mLrGhtjf6DE046oWsvmtt2lGjDxfqWmZi19bQEsvPC19nw4nxye3zEbLU6HuAN7cF0kmAf1rz2amR/0oHJ2taKGZmua7tO7D3UjmcLAWRruo5JW3YSvlADiXapejow8yEqJfDglAujNTft9chs4VLcLNYNbV9Ai1QQ2jkbRA5MFaYYtIiaRZMvBZ1E6uF4aEa2T7SVaCtfiywIsGsyYMM4BiIkiEdfq+FVWlLc5mv7wke2dwRxgCaJdkNJgFitJqEOIAmuySlN+zX80020PhNx8ZUMF4nJEMO7v/2fL//DPzL/8YDWN68NxsDrhEJ9u4Mc5r77zePPAuBvNrkhABqAAC/Exx9oR1zHcxJ4Bhz/YfgIGgZ6c+ccFzDzmnL2TlDDn9dCuwW71B+2FHxGBVeczZnAT+wuSq5jsbMJONcB2kei7uLf14mUTvCxXcAvp/ka4ucrYciE3dXcCv3wm2TYkeUCsU//5I+Xzv7X//fTfm0ANgiZaF4bCtptWJ4cV7Jtg38HaajdEHf84dcZGqNVSYI91E5GNqyYv5bAwgH4DzGEyAuz5Y3ndgtIPOwPaenwHw3GclVF8E5giL3LJHo/6ARteLpkejZHnKPtjt/CUol2obi3c/wn6BJGghXfqdVqiRfWCIAa0Pf5Qt+iStYn6C/HWq4Zx2mnT7WjqCJMR5vA9a88LYvrBe1lBqRQgsE91nsVR/Gg0F8CyGGsw59FJkVngs0o6IQjjzqetzcTAOS2EnPDtZ EF7Ii27opquTIKzrblzLz2smW3TdgIAd9PtJOOD22HbFjIYP5sF1DC8Qn95123EydEbwvaOaIh21prRU6Y4G4IhU40uxBsLtg7NtDCt+S1B7E3/rxGv9xnhYGGrJnA80zfEm2c4Y9xxVPshudHkfZQtCg2dJb/hNITtiR5Jsuel6lbUDSAYRIHEfl1SS3Nnt9nAsH00hmBiBiiJ5nASEvjk7sL2ok4vwhQ6Ey87YQhLM+WE66YyRZ73g2JrPuQsM9nsAqia9u2uNI5/odLf+2M+4d/CfgN+hPIXtCMliPY366sVGqStomtvS7tO03VoBWuxy9dvdr/7yYHoFi4C6i0NAw1hrTB+aW55byaqlXpr7kfDg0Oif0CSPgF4pIrdiijQg1E2oOv1KCqLWChgYY+BoN8NVvcbG/vnfuPL8LJ3/wpScajnErZHnZzYg53ghjrlX/58lvXPvizPziQAXCU1onEQFdA/gz7rf9d/K3/1/DvSKKd3jMj32ibjqu3gU5Ba2+bLgCzdhm05ci2Oi3beB5sGJ0NGLcZbxu2j4PB9h4nWo1qQ+dBJ2oBm/qg9ESGZ3b1VgDMboM+ppth6Dpc64HmG3X4J9FeA9jc2ANQMWBBDFju1nbc8WDNJhrNRJtWtmuBKN6zLfyvvUkN6kkA2wMFvWDHdHUHcCXCDx72NMZ6+g1qK3E8GyWPHkMJByblIklaG3r2A6ylDU2Zlt7C/ddRk0i0Y436+NTE3Uls6kI7NuIx02gFvm8TFlC7ASqSRyZYd7eydWu5DihzRtzZAHXAQFSH5tPFAMW8AeUnOGnFQFmRk+wZ214satp+K9oLYRGNbXvPoFEBCYZOBGrU0fGxZp11aWRdxluddhv6yUaob9t2qAP87NiqvfMDK11fqJQAvtqRLp4gYeF6BB0fptjfzM8P6GLLwWWDsaFtBBVyO3B0anKcD3GUOgNSkMZqelmPiBMGqU3G9gYoe1Dleml8oWd7OpGKzmZ iJdpgEnVgl+6oN3kLYNyF+cjzwOl/WKRvMQ8LWFu3HFDuGyOfqsccJybw/iaW+qzG69WwHswyK4i8AuIJ0CMytmzTTbZo4NISAclMssV5KzcUUYv3Y0RBkBg0pOGRTzmxgGZgUP+sZo2XhaaUjBPZrWDHhunxZeMUMJIfrgGyhBMpPWOChrqtKMhKfJZoV4ExpjdQHrdI8qztgSnmMZJHqZ9ob0rAyicBdVxEYoPweA56Og2PrI60QkOTnH6uqDvYMiPe/kVl+4/NCNst8ySHJ+TgEHSXrMfBiSIX56icKiuo6VoFWgIUsscwir5GoulZuQLHAm1jF4wHABNbNrLcYN1oTsDqCZCIXYQCgqoYULvFhwVKX6JNlZcgBRWsiOv2ENUkMK/WqAUmQABMzpd4DXoBqZNoZ2p20qptRxsT6aM7vPGtJAkNs5NswUg5pkGZCKYexGnhiyOfdmLkgcCzP+MGZvrjaxxPPfSIATEBVgLm4pJ+BxgUJERsf0I0FAdRwuk4smiJEu0lkiSEf2rm57sF+n60lxcNgw2qd80JWxlBEGCy0ikF4lA+T7ST8ZixAbS6zUTC0cm7jfoUgFsORWi/ccwwVr41nDjqpdW1nHhbZ7CgF3GBRjjUaLK5MflsWp4Df1tAoH13AJsOJdJCUPmARcvgga3URGEV4Aoc4cSpe7Bng1ONe00mv2HuMGrL7kocO9BoTiXaJVgwtrD6LtCfrYeIaQkXqUcmJybGJhPtphKskCbAOBOjR/rV3qoYfEwwUIs8KptotwvbgcQDGgZ0AcOzSc0BoQG/xBwCVXKJUwTycZ52YPJN2pSrW6bxIghQsiJQo7sQdRGmChhMOQe8Pg2rkBP4fWMg70urbHa6YgvjbBjPUlHOuPtmmbtZWWONFXzMZF2ivX0QWYuKbYfphR/l4E6tSGR6zlBjilHmOQmT8nLlYh4MCSftCOfWJQKBym+oKZgBAzbzaaKd4OjAAP6GEuDx97Wx 6UwkAFcryFgp/u4UQZIM2tjwSbncsZkaiD+jE3WsXl2DFdylZW8jUKUkcEcBMbAMRFhgBRiZBol9jAPU14qLx/Ro6MNv2a6k+xqk+1JHHwPzllVTkDkwXSfeEl3wMd2t5nMud+ChLhwpeivs6BxyBxq48XfU9AWy2NK3HdcVjKCzVfg2iC8lo3NQsyz4ISYeVxbbCuLEN0F6vDNVHwMGOgszBT0ApiWjNBDxdeBKjqle4DuACMyuSPmXb8vnxMC0kc/tBHYR+AEWjxCCORLEok72UpFxB1ux0fEjexP0A3hE9kk263fU2iB+Znia2ktstA7alNmMYKf0g0jaECq2QHoCj7z6eH19pfbs8fLa+nsLyzPTC/gp0Zp5kipzOFheFOLrwAAE3tSq8CYFV/6Zww8AQjzmE+NkshLn8eNEe/9Q8grINkrooV6wxvjCMvCIgeBsi/MoVeQjf788clYBdziS64kNYNXn/UXalrWZxTUmTt4uU1TskjnVpv124pSnBWVV6CK4g3phBxNyHQ3FDnG4dqM0Ky5B+bhTRjlZN+h/wA8IGHeVXCMRbYxziJIN20xkNYgPeLpE48AOurm5GQmciUBB1jMazdoAGAx8C4FLB9X2lkysLaDMWGdRD2AqN9i1Lf6VOv4doErquNSZjfyl5xlX9IQ1/yjRXi+pjWEn2hR27T2lNSVp8US8fscD9gFDZhMXNQYhTQiMzodvZQ2ghgP4Z+y0UF3GDjogwVXsFAEQ4BgdFivQoYTthQnzWaDAAPQcQ/R8maFny81b4xf56jEwgVFGXkFclXR6WEowfj3Ea750n3QctAsIWHktXbG9gOrATKBpgWq/VbDx+1CpLfaEhAub94L64Q2f45vAqgvh3pwAvW5SKSZxMUM0aF3LxN/MCDoHBQ3lJRVItFduBeR5n0FGfkAPb1fqCknCVOVjk/XxKW4J1LOtYYE10gDZx3YQeSaz/HDL4YOFu3KnjBag2qalHjHP3 AwrfK3saUFpngldoVBNKJmSqatCdOhhh8Qz/JDf6dfLvYRgVKDzbGxiEij0kop19Dw9lTQ68rAwHzh5XGFhyxocmvZJ0ApcdKCtL6ztAGOfRR2Gdk4WT4BYNyX5H4Y6+blIFebsH7omrXrZ3SAxZoGKgc4rFd0Pgo11VmuRreC8jx7LhweHXIa1hukH/p4XdGIBTVze3zs44ZpJwsnWQOUEoKMFYx+uxMD8Dl9rx5l5Ggfujk0+TFDz4j0PZrEtNJCpSvtEB5ULNwadrVyb1DMjhOr/jClLyKUbQEZkd4JW8VrZmIIiOTWhqK/ETO5h4xt2G7HQ2URVj+splzLJjy5dHdRIXbh0sciiINkSDqEXDUjtSnlIvqenJaB6Qn4AmA3naz4hZlAONSebjfHxnHtFIjEYTgygio9iCvsB/OJo+RpXOK+2LHjObI8LaucSFAAmKNJMBHZY4AHwExVyPxla4Cdr8KzWYb8D8ChdTKmtwnQS6IYMYKPS9Sc8kKIit4TzDiwSZHzlWixWh9ox05fqlQy3aIbrsPsj8N9lqvyQVl1Y7nmyFFCFOBPb6BpMxArraspi3j+YjAHyKoYB2KT6cQitVdqkTG4xT03e1gBQu1Lcj45jGbIldwQoRTjzhf8QHTLIn6wgH7U6uIHJMVbHzdR2dCA6LQrjtXprnqU4ibBSsAoOFcNDAPSkQczwhytMkYBnK9jPHogHCbYz07KN/oYYzDX2pOu5JVsyDmFNXMaozLPB9FjxHF0zDeb/HFFj6q69gZueqqI3uM1WBt/MaGtMTVV6xssUIFAFCOBtJTPDwEHJsZCXxbZyy+BzMezqLeMUv3q4fZLqi50Su7MmfspNUbayQL4QVaeWvAXlYvLs1ieEs6Za6y9EYCbqRZADyWLpfpA4bSHkGGNgKhrQjVBIXimoay1Cif7GRE6pyXn3ETOyYPPzRDufV4y5x5qcgo16c7zkfhNbBvwZtDdMDKMKQ4TLwWEalS 9p8p0QI5txQV6eB5WMSxjX2TDMHdNxhbLzcXHpq2F/C7ZVaPHvKUllN4i2de4vRFsng2O20fhs13Qk30ZT7bQwUWkOQOF0Wjq6HbEVeVJ5VORMK8wRCVncgiv6joJeWOhNDEFnJqondv6tEo5SyoeQoGmoYQAIMtGu5/c4lRmy0owN/4/MG0b+faYK4To7rVRhwklHjoXQeNSCxd0KHtwH0Mh3gckJOPMWcxTSujl+K+KrdEXtJM4C1mc5BqWPPgPNVkkQKWwdIdwBrULegQ76cDJEUiolXOby6nez6mxJkW3oE7c1QLsHRma0BwYnD0MKCmRgBZIH1IMWWgFWhKYvqOh+EksO1zFu3IxU62folkIzHXGoWn/mtgyK9BmpQu2g4eN0SH0TOeW/aKvwFJ5WELK0lL4psY2y/mf7O/rulpNgqia0ufTB9Hennz1eXpwdfTz9aHl55dnM8uLi8lLu0eNHc2uFMktzzx7Nr4rvP5heZTVyYCTRQIpAoNsh4KPNjvvxL4BoChCJM6eJ47wBKyfrxUbJ6IXVRNLrRDyXgffEFXBFjAkxGio6lkstcxy5XN6Fsmf9StFL8GJb0oPEWl8rDdIgBwcRd2qlfigcKUy3X4cG1jI3C/x6i0FNpu/AmD2gYrSZWDs684ugdxys0oslt0yal5Jox0FBvzs+NTY5frcymtsGo4kp3FeUCjf+PpeEGNGVdSAuKSUdiD8hHYjFyvgOSPSn5yJ+RcfYt4VHXSWbyL1pSP7NC7l4EMvh1V9gZzBxHvTUt8DEGi9xwa4D1sBurGfRmdTfXtj7C1I81DVf7AEQc1TmSHmjxJGeHW0iNoGkzvaCCxJ9P/BiJIN6GZ/oR9myp/RHWrbwOVBCHPNqHlufffKMXNDC+5sz/Ry/082mrLO0KtSEeUAlRvM+0VOR1onN1L+nKwWC8CXLoUM0u7DS74kQbA8nqSAHYUtzHritrPdiW1dSzee5XBwJdWS8BjbacOE jwh7V+hWY5MiFCcCOpzOxzCOlg+P1e5NCUqiEPtCbE1g67hJqYWDZlMJvz3c8FD5p1BMkz4WKqLW3waHwapVSkydJvTQuxiIUFC2pr4hO50EThd6IiCUTHn8C8po+ZC4UTAsGCY25ArRBNiESlWOs6BLVQeQk2puL0wtAmc+mV2eXpp8tTn//wUjxSdFXJXiBfNNtDHx3eSYAz0FAv64E9Kj8cKas9gih4wbGhts2hQ2UscEKkGBUfIIfdOZ3ernWiaPahuPXWPEqBinbrTl/4Rtl8DC7sg3xK+FoK4XxPD3yWCIeeoEpEgZbpEekvObDBah7UVKY8JCWs6vSYaaS2ihz2fVKOdiyHeyVB2QwNdcHHLe7aFen7j4yT5Fh7gxHu4l2holB2Uf2qspHpnAu5P2L2OW/ESZBOT4FFMhtJhbeyDHvwF2Mi1+K0KXHQ4ufdExo2i3k7b1ZiTBxZoz2dKJ+p2N3bNQCJet1XL0dSPwEEYhWtl/maO33Ab1Ulq/AmuGCto84E4fIsnKS09SB7dqCPXSu5OciLWizaBQK/xostiN0/bbTRY83UgZf2/38WiJDkYvDC4W5Zf4Vzv0XU7dlzuYSEvUvSmBttlx9x7F3ddxarW8YTF+uVaFlk/d6vsQzU3OeksKgmZhiQSjUA1O2aOyz2CJJIb/gUY78cjmOiG8pCPejHDqF+loxFHxLwbnldFo2zr5TIilJ5iWWNlAkQ6A0ULQMYWrrmUWEoMy80QYwl5SlA1jUrCuy/VhiBdPjKepRckc8PFx0RzLxwiBw01D2h7mAVQ8nZxkAL0jSJj8rTpqFbWJWNnNESaiOfHLngM4Thou1rAx+jOdBHioifiSrA+YuYF43jEvCv60E1a4QTHHKV31WxiledTkEeRK703Y8k0Qz9IyCcKOVNT5o2aXSqmyXssrN09tUWvfVArZ1fJ/EC7A7FefLfbVEpkVXaRa1v4sO0VekDUSfJzIcp4mJaq+t9Flv mSFI3QSogrCfeRQbqgnvq257gose53wiEmyKjM2blA4qtrnGHteeb7H8mSyFK9HuH34YW44dYXBtD2Ml3GrCVpSJ99waRSK+WPQVq+FMpvupnPdmv2Xmuy4WeqBuwDqfoQkS7oXcM38M82+eLS8t/KBIOKn4YUofZ+T/nnOjyMkEpfQBTmcXFTYe98LitC5XRS3AYslbRmSisrjU0Sx/cmxfzQXEZ+JgVI7lOjmpL+GqrOjysD4vjMkGsVInocLo0V0jx+GHQETfszfmqLbwGVXGFkACcp5KtMvqoin33dw/UNGoN1EWTMn+AunsCg+s6CJATs7ZfFrxQJMnh1YmbaFxzJXEW8p9UmeYo9n0BgIGt6qYMgb6IehxqTLx93I0J6QJwj4QHeMwTkTnWU51yd7kVljzYFEjP8gCR1wq3VCvcUX+5XmVEh8EoBAeq4lnoD+qZa2c8LV4uHBOOpzM9bS6iARo87QvntvMjFQ3CLzURv2J8AiocwIzRGMDe9h7YHIjaZ4Jb2uWN1VSMstm2SkEx12nnXDn2ZQ6U5ZKxDyKsobfluiznDt7Z7/cqcz1Bdw31hifuIvp3td7pXLlc7ea1XZGK/Awy5TCbJjrqktpq2joojoppVMLB6vazUvMJ7j/lJC9E/U65fEqsZl42bMtB3EyOwj0m4c+CPRMbkl4IOXNLmT+sxObUigbdXtxuOsN5VjLU2vU683Ks0QUI2DnMdFPJlS/66rcv8K+0ZE9clBcrk6spoV6U52kr/teGkZTeTlYlgcbHYpHytmnFBJoghIV9nCrf1IJ8srkKTCxHdcy6BxlEKKxf1T/vteFVQUyHy2qq1SM+TJF/obpbgbw2xYUP7r2eLqxsrr0vkC4agMxn1Z/OrV4OSAMl1mTDnl+p+O0ttcox+s1+fkWpjnzPFGvE8H/izucKhawvkU74ee5eIUsEarDbbdLGoDvZV69otX1RASwe/jXcU+lkOnPhZZxwJCpt2FbV hozNXplZHO/Vv6ozxhx0b3KUfJkqnjPb4npkZGGw2Y4NzCBLUwe6BAPehnZoz1MD2Ye192cF0TaNEl5RI78JSiVuUgCHfg5txtErvWO6e8BtLwT7bYsszLnbRNdNLB8st6SUz2ls7g8zFFeURY/Zgoey/bSebqXwbQcTEI4mzv/g0QlsvCr9E8MgErOdVAX3vJtQL0AFiDY3KMhoJRCs1cEkJlAu1XmGqzJTvvTibZFVj515cFcV7OuQEbWK3wDvr2rs2QwhRltF51tqfknUiolC/BamgNXdfAJC/79RHt0cMPIh20iUALatnh3OobbRcjnjaIK9dw0Y2IJu4uh4xkaisq7W/ZVXZL1VhbIp8g+/oGmWPwelPJS1kKWaUta+9Dc9PzC7KMqKQvUa8d0+4SUvoMpfFzeSTqsw0+jmpYJPGWx5EKQjeONe+Nj9fHKiDgoCYi5qYw80WjeJSnZeCffAXlGcH4K395NddtI8ywjUjJAgfDGemj9+2Sdvl2MxCFL59MIfBv2Cz6H3NUlkg7RWxyj2z/JHwY1+IUC0HgF7yQg5czIWqPK6TnQi5KRoU5svlLhj1OeTrhbaeYUZAMBy9A9TgKYWp6KAphQEAsabR7kgHVencKavkgiLx9BOszho6ZohhN+CigSrH0kLPAqESppsPlTom8pzCF+tNAo4dLEQY5OVsUo7x5C/gJwbwDCclfBa0yjc22cim67jDZUoX86kWGwqG1fItSEHkpwYZjQ6FwV6XCYPGDIiQ/gtiLAFW1jplcxFQor3DzUEjMRTuYwGK+IZKPDpDTcP0BCearlitRyiXC/XUq6S7P48ec/A3UXs5Qrs67f7HH6I6X9a5z2c5sj0T368ofLJ6FKzvRxljp6SbluuZOdr7dJxFT4H9LjPQBA5OA00tj154r7MvbJ0fdQg0SxJD5c7nmZQaLNo4POT1xn450t/O1BK+w8iE2E8HgU1iB8EINgj0cBnFv2A3+UKY0P9k aBXTYAquARtvfgLWjr3QMrje3IBprCDFhu+kAxTxFnIkMZIZ7BCTd6OHd5BqYrgbCI011i14XoZe56UzrlQkc2yFW/8cIJm+iRi0yUB6d4TINpNsBEvXxbFDWBPhwLtgXvX5kkehiuPr6TxTU/LhgSdIUM3e2nyyfZ0PvWzZC2XnmSNU2xdnwU0sYOhjMQqQaXlpdmS2E3zPXnWUN4gejIp+1UhM6tfdb7kpbCMjBEkONSr6aaYdl7oSvsSaLKXqe1jR5cXeFAnBIO+Uo4LZOOgmYsypk32rbfQmaLwToVeTMSai2nZ9APQ5Bnc9YA0/zfUmsvS3aCibrrvHCV07uA1QUD5CNlzh2BTpam+VHuNJaMF+VofCkcqvQ/TeadNHKl0lGoyjO4BkbvVWehpg9uXIjVinV+S45wUswf3GtJTgruulyYmV0VTaZ3EVxX2gDF2FjmxLo3hcGxc9Qf6qJR2BLD0iuGhcUe0/cfwNcVdM+vrswk2niFybeJl3K2O66OVKwy+yJxLUQuaMiG3ONM15XqK2BYxn/fvSrhCEoWmPrIp39ZdYYBWIkM+Hzs9n6Fczwj+l53cTUrlN7qOp+luWi9skKZ3PpJVfYcX0pkkfhwGvi9umiz8sYO6ThcORJO+8LNPeblZEcC8GorMCdnv7Jljo2iigyYBmx756Onz4xp/Ympv6jr9374VH9P/vppfbQ5MfHZu09HfvjuSBle8maL2oYm2a84AoMwL51eboNCYyNQ0hFF/L0Gv0uBZ0pBznJ0B8dJYJN2VqlS3lSLc4nm9IPhiGgi8yIvsBzOdf6DHJJ5syLBoJBQkzqJq6gIF+Ff5VJlBfADCPsxtpU/WdgYa9TvNlmGZuWavN0jlAyWPmF8tswDDaOUSV5x/I+v5Ae97mLYaRpbsfGYNbBGa5uFdLI1XGMbpbr3TdIeqIvEBNuN+azoyp8jeLqnrr4gCA+tIkToGP2hMMAxurcU79TIMusUCdMlW/e TThDhDYCnaTi4Ye80G1ONnAWEq0tRTx7/Td0cJVRiel8JlTBC/prkPMForjBiT+c1uUSb+eqeOoppcM+ZH2wE1l4u16d6x3kec5q7/GvhQqm63mrDjhPdboM1IjI4rlVxqMSb9w/ImyWWJG4cPcypZK7pPv6qd4AwP2rpEpD9r5sACtkKMIB16v3Z9dGV5bX10cez04/woAdW6e2lPSlbAAm7zJfu5NW+pUgu5jLsSk9Lul51WEw+n1gV/i+ojDy1XxwY4zejCZ/+ZTJo95nhpZJDpHTTxUjZfLXpMqJiHOhXIjJWPbXDudrqRbmdBT5z3kY5/ImXAxm9qUx1OuB32ckfdqUQAXsYOH5SvF2o3tPBy+/hRvkU2ewsBR3aLt19p6fXEuhpLZ3Vks/RVzpHRDqoUAQlNzQ6ESf3TeCBJXCZRZZLP8Z7WTEJ971eKlvx4lU2D57skz/cOHqgxMb0Gs8RBecU00Y+U51lzBJdTUvWnX6enl/qcTpB6bSeFG4zxQ1qdC+0zuLJxzCejJLRE0pHzuVLLyPQU1fdz3JZpuyUIWPQvOOeCba3Kw0/uiuThXVSrz1mx2F4gF2+ERsdz0TvER0/Gqg3m0XbpMBLZHngjTw3qiPLRYb/WZaDj6tC/iO/7Wx2IsYRqb9lqG40xwoHS2Wb1/JjxmmwDEjj/b+TO5C47y0uPDi+OjuzvPpI8H9VuCHvEDfSOwPmDhBVxpB5O9aBL4Ho0KAjv7ilR3ExO/jGQTyUsSuOdVXGuzgnnaVzXxHl0QnL7ZfKS2tEKRj/96BjHNOaXHF0v1rmThQY0/AHL5bH5MmvWH3VbmNoIFe76gzW/ob1nZ53ERTMyAPdSIB2JN012y5eaVN9P6v60PRg3bgLjS0ezM8LShE9R7cJX5bM25vdvQfN3mNZjdKRv5Ticxn1v5Vov3HIQ1OgHOm7JiYomdE2zWCsfRB1aouCPrDEb39fXxVvW9G/5yRbo3ShP6gaeAf6 6HSrZYfJKHsJBEBP7xsQxf25PW9W5GEc7T+IC45zt0KzF/awqwd5MBELf6EyWHZKV0hzLrtczjAuHL/9WKh+MswXrylgqi+5nPFFMqB4+JtMzfixGLvSdVHWl0cr3ECu2fHpKIFCSWzse5eb5PHXGXQO+nR19cSh7+xi1s3tWzdyl3bh09u34ulFIAMcGb6L4sGn9c9u367M/ZIsTzrW2AkVrudLpZOn7MwwSC8gyZHYjN13sPn3RofpM1th8Q39++8B1irXh11B9C0OHOgbotw17iiKVbdUtIIwnwt5fGJscupu/R5dbnqWly+Zvderjkk4Ie5o3HEwX/Ds9OxabWZ9tbYUrIhzEif48WdEMe7DuNxzx0CT63Ed9Ae+Q3kzH/pO933iNJ5Bg+w8BxyPAQe1r830sutC8ahyzmC4rgyPY9BmD//uOBbSzLsVxievN8M1CdtaXZzjbaxgEytpC9d7XkKfCytdVtw2QbdypwZT/q6hTNXI8lHTXECh1/Dp3trf65BbnjPFdwDw540qrUXSqMTyCQu+UbGITEGUq2bLplfcDs9PWZfuyTiQqdbI7vEO2Y31kkkSVcXRKy9aySPuOfn0G1Pfe94mF7ErpITtULjpl3yhJtC7b6HHRgR6zDZmziXhbscRrZ/OZ7ICPJBXi9tw3K6jJcj5aPdTV3nr16viP5bd5YmwG1wwjlRf1ptxX6L1TZUuIMrIN7aTTphiq2/bWT7l5fRGdnFvO57ZEJYOmIX0fT8fVEF6/X7VCzxUZjgujcmuGimcFB4co5duXOn5lgI6jqM3xaGGShftcOGAEGbSpdNBUd3zdG0aIZOu/Ff7chjWsXhA5OzAmDbtooHQOFAOPH3S2WE17te5UNI/8LwEc7uerEnPE+1qLprKU6LYpafAAZsxP+h/qfruJywycCo9YctORqZ3m2Ao4d19LhovqCqUVivOD0lnIK/3sgsOhUUYO7yqfpkG0xZopa5Wv0aDH4MYO 6RKLfwzfw78WnkyvXwLtXz/ppgGShiwvBCtX3o0Ozf94cK6SKEo6bH0PhTCeMzKxTNpFyrdj1xpuKjQxjOvSd8XPe+Mk8UbWgRpWg5s4qXSmRk7ijDKzK6VA3MGbztT+yGkG6xfeb4V8wusV2enFxaN9YVHynANrpniFgl0bKovgaS8A1tQcJwz/Hz1PWzilTYVJ96ulG+PQm20wOw3S4kd3ga9qE7tur1zgBiLHsOcnS4gJNNPpw4W6ZZMGwmsOwLJlK63KnRGbDihvCBbvkdlT3H1eq1XBiXeYMryWXMXmYKG9qp8+012R9fFMi87gbif60aPl62Q91yy4icnReCkWt1lN93RqyvU2ge/Doq8cfCYH3k6LZgidxD+FXY1AGZhbXX8bfmImbHfKSw36pQDv/tecVNMhkJv22NxdFmR63DwBMFm8ZgS3WPgBZbT3tPZQvT9bs7VpU5nuFOVz50+KqZdXyzzILbIiONyJdBTr3SrINkDQldgmBWXOVdae/iIxy8L6/9FJfVIp8QKWiR7g4l4c9NI7+hg7gqQM5I1yK+G7m82qt6z8GK7/EYGo8cbGSrectRh1xYr7u3CCyvSu3LVL3riRb4L/8Hru+4dzBCpDocd7qVumGuYdyn0VZ8eQI80UjaLEXCXtPwejza3j/Uk0FlqP4uDSFn9j0t2Z+EiRWEIlKWOIEW0OfmSTlQFa6V6M7ze2uPp5sTkPNW7WRW+s+yNziYXaNnZ8jfLqf0gc4Tzkpud04d+ucIqya/pMJxhLUm36IAM55fWpzqJzcPY3ew1tRq9ppa/CBIfDNGDIQBjvPcID7DzK5AAa+kj4T97cKQW03+P0Q/s88tSIXwCkvSI+ayh17vld+Pi95viHbDpa2nxw+1+/q7z/vR97fhqVYO/KPyPh6Pf+i//6P/+L3p15hEgvA6+ljhtv5/a76fv7OlQjxe1f603qEO/3OUnv8F4JQrQa5d7ifFpJ+aP14NZ33qiHU fwYdlGT7TXwvTV6A/NzeU2vSQ7foLv3sZX6K7Bc9emh/DzvHaULPGZwF3QXmoFbsfDwMVr0otO0xc447ukN77pGc6Wnslv5yy/Kvmi9KrkZii/JlnQX3HfNU3xJmWJEOYBcYABzv3pP/nnf/7bf2uqH19PzQkh0s5k5ZbIKvubv/gHl0/83p/8HXp3aJ7ev6E3WavpYMaMJSIYfKKdasGTpSBZIj/BgnYCrBHXWuOvjx8rvulW5HyyAiPi61xW6b707mOckg7fAdBd9urcM9JbiR+C9X4f92bgk9zeXJL25nj5BdZYIdqPdxOtrytvIaxmo7Ca+09FWqiH2mBCr5U/YdnMvOb+vJML2qCfvfZ5KOYrd7X3yt3vhsdBIOARHDrFCXaTNdyOAm94xdkcpjEMp69oHvgbdBKsGXa7FajRj6X+Ov75Ef6J2RqpVu5HPZv4UdpORO101bA81Pv1vYf6WYI+GoF6u7taeCDwxNJBjpre+CY4HZvdxU9d5U+f5nq8vD/9fnEA+t3L0S/W+vFx/Pu38c8/OxBF/Bz/fIl/fsp6VY3ly55NfJm28wW181dPEV+qFxAootuJtP6nT773d4fvvfoHP/xTgaX90gaKQULJ5acL3+R77+//09HXN/7P5r//ZdptykBf0rge/uJ/n/+LoaPrf5IWOA7UULF+6rkzUjr1V7rmGizs/wdHHuul','a_1-0','scope','false')) - scope-31 +Tez vertex scope-32 +# Plan on vertex +POValueOutputTez - scope-38 -> [scope-22] +| +|---New For Each(true)[bag] - scope-37 + | | + | Project[tuple][1] - scope-36 + | + |---Package(Packager)[tuple]{tuple} - scope-35 +Tez vertex scope-22 +# Plan on vertex +c: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-21 +| +|---c: MergeCogroupTez[tuple] - scope-20 <- scope-32 + | + |---a: New For Each(false,false)[bag] - scope-7 + | | + | Cast[int] - scope-2 + | | + | |---Project[bytearray][0] - scope-1 + | | + | Cast[int] - scope-5 + | | + | |---Project[bytearray][1] - scope-4 + | + |---a: Load(file:///tmp/input1:org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader) - scope-0 \ No newline at end of file Added: pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld?rev=1811015&view=auto ============================================================================== --- pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld (added) +++ pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld Tue Oct 3 17:31:51 2017 @@ -0,0 +1,51 @@ +#-------------------------------------------------- +# There are 1 DAGs in the session +#-------------------------------------------------- +#-------------------------------------------------- +# TEZ DAG plan: pig-0_scope-0 +#-------------------------------------------------- +Tez vertex scope-30 -> Tez vertex scope-37, +Tez vertex scope-37 -> Tez vertex scope-29, +Tez vertex scope-29 + +Tez vertex scope-30 +# Plan on vertex +Local Rearrange[tuple]{tuple}(false) - scope-39 -> scope-37 +| | +| Project[tuple][*] - scope-38 +| +|---b: Load(file:///tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh3ilO1GLJNhT5TGq6OnpUsGZC3tJH izAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','eNq9V01sG0UUftk4ieukP2nTVgiFOjTApfKWn0NRiiAusTC4cVRboDqqlPF67Gy7uzOdnU3XPSB6gQMSXEACCSSEOPbEiSviwIUeQEJCnKre4QISCAmVN7O79tpxk6IEfFh7Zt6+3+9983z7F5jwBawy0SkQTqxNWuB2p9Ak1jXqtQqbpMUYL9CQWoG0mUe9ju2hyGbXty3iVEiXigJ3iOcX1uK9NVxB9BkzwGjAQVRUbS87TtnjgazABOMu4RJOV9CoGRk10ahpu9wx0YxjXgwcaV8kfCkU8MSQa0pKmyxUORVEMjFoMVuBnFsSzF1pdah/Hd6EMbTpvka7voTDlatki8RGUH8FptwKJVsUz46kziq2L/Ew41a5VjFegUn3EmNSrzIVOOTWWFsOm5nRu3WW3su66XXIMdsLo0JSdgtJ4HE4hqE0ZFzc0V7ot2eVm5H4K8TfxLOJqZ+//ubExvfjYJQg5zDSKhELE1OGA3JTUH+TOa2Qv/iS1jlzI4vPI+pnqH3KbjsYQyuLu6YdM5qq86tg2C1MtG8xTiUcjZKJ8h2zJoXtdZbCJKpFGYuhmct7AZ6gDlGHxEl8QhhWS0ysoL6+bzMNmLX9FSKcbp0K1/aIpK0GHLjmsRtezb5JGzCNiKzZLVr1nG4ZpjxWltT1K3ByALtrglnU9zEYCcdSARYZcyjxEDA5W 8uphtBAWYc526+zIi2hp5J6tLUsBOlKMNYbCBfG9eEl6lMZy0+rHCMk24g8CdX14SaJE2RGCTKHEmQOJMhMejJJzxLqRzwgyOpdrjBvrBcR2T4qd4mEsyM70mGdvsZI1KzpLxWwDLhDL5JrVEjID7/fIpKY9Z6EaufKXgo+HM9AjSeIZQUu8o2uwbKUykarDHOCXg8wWNpaI4I4DnVs3y1Crp8IhC1xbKJLNoMLrUDCow8OByOf1FIJHxx00FvSoXVBLJWKp4bf5dRLqCUliHpmmbAxVAyRWRrNicopFsiUhRwngnoaWxLW9gQLTdhmmrDRj3GhAPHynhRf0ilVZV7YlT76tdMUh2wi4cmRAFSvmSnWQSZROTkIcB8/oYSxJueKGY/1mVG3mWLx8NYP8x9/Sz4dh7EyZHzsds1DYzcyMdFd2U9ALlaHirn83p3V2Z/+/siAcTSvQFKGSdZuY8MPoC7k9+OPWj4W6niKe3EtqkWKoYswI6gMhFeTRAZIbpNRBwySdbV5lVpYQm7EdG30rhx1Xoi57k5evPXjJ3/9itXDztsiTqDyqmM5pV7S6TX0Otqd3OGu2dMlgHaFImXmpS8BpGoVRz/8yQYcsv14G29kD2+AHNuiQl2X6jo4znvsXiSdalv3ud9Q9KGyVMN9h+pNPC5D1pdEyAvMwUa1mBO4HnbP8dQI0UOgaofmfke4sm2vHyqEKtUndMKn4+28WhyKOuYZrtolKRXoUkEYb51O6g79avabJQWEsidph4qj9z774o9b75wzVH/FQBBwpC+3GrhNKt6+/eH89Ad331Xe8f7sMaj99f3HwQXip0CQwdvBwp1VJlcpxbJXYLptU6dVi2/AZ4cJCLuIBcKikcBisiz1X0LqzLYDz6pxhbdHhhWU4iMlJihx1H3TmzWj6aEr6ZIuWWGgZAupkuVSBYvzpZZmuG1brxcSMkFuDNOVxSQ/PZTk3SNM5a8IGakvzOkW9S1hc1WO+N rMeMSl8e/eSHFq54Qiz+TwnwAJJOvgdCTUZJZv41SdX7M7ee1DPpJUipcVKZ4c8j9JcH9kxmH9gOUQ319NHFqHrJqHl0UH0XBsfftsWoHD0dSAEn17cyH+TRm+yZqBjTO6V0AHa6gTr3LspvBBo7Shts/qWqnHC+pxPhxVy/M7qjB7es5rPeG2/h3BsjA86v+r4xRHa7ujARgCfyiWV/tnBvD9+H5Qklo/p36FI4/ODVg8vXtHXX6Ijro50FHqx4WcepbUg+rZpLwLIhralnpooG2MRMTGjiou9/RsaD3/PyI2RicQEREGQv3DeeP9/PNzX125l5C+kSpg4iRKRn9y9m0GW/r8zInm753vvuyZ7bXNhvarePu3k39OZut3ewI5RMPIEsyHOyVgXo+hO73432QfMMX/ACAlM/I=','b_1-1','scope','true')) - scope-8 +Tez vertex scope-37 +# Plan on vertex +POValueOutputTez - scope-43 -> [scope-29] +| +|---New For Each(true)[bag] - scope-42 + | | + | Project[tuple][1] - scope-41 + | + |---Package(Packager)[tuple]{tuple} - scope-40 +Tez vertex scope-29 +# Plan on vertex +d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-28 +| +|---d: New For Each(false,false,false)[bag] - scope-27 + | | + | Project[int][0] - scope-21 + | | + | Project[int][1] - scope-23 + | | + | Project[int][3] - scope-25 + | + |---c: MergeJoinTez[tuple] - scope-18 <- scope-37 + | + |---a: New For Each(false,false)[bag] - scope-7 + | | + | Cast[int] - scope-2 + | | + | |---Project[bytearray][0] - scope-1 + | | + | Cast[int] - scope-5 + | | + | |---Project[bytearray][1] - scope-4 + | + |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) - scope-0 \ No newline at end of file Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1811015&r1=1811014&r2=1811015&view=diff ============================================================================== --- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original) +++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Tue Oct 3 17:31:51 2017 @@ -49,6 +49,8 @@ import org.apache.pig.impl.io.FileLocali import org.apache.pig.impl.plan.NodeIdGenerator; import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat; import org.apache.pig.test.Util; +import org.apache.pig.test.TestMapSideCogroup.DummyCollectableLoader; +import org.apache.pig.test.TestMapSideCogroup.DummyIndexableLoader; import org.apache.pig.test.utils.TestHelper; import org.junit.AfterClass; import org.junit.Before; @@ -269,6 +271,30 @@ public class TestTezCompiler { } @Test + public void testMergeJoin() throws Exception { + String query = + "a = load 'file:///tmp/input1' as (x:int, y:int);" + + "b = load 'file:///tmp/input2' as (x:int, z:int);" + + "c = join a by x, b by x using 'merge';" + + "d = foreach c generate a::x as x, y, z;" + + "store d into 'file:///tmp/pigoutput';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld"); + } + + @Test + public void testMergeCogroup() throws Exception { + String query = + "a = load 'file:///tmp/input1' using "+ DummyCollectableLoader.class.getName() +"() as (x:int, y:int);" + + "b = load 'file:///tmp/input2' using " + DummyIndexableLoader.class.getName()+"() as (x:int, z:int);" + + "c = cogroup a by x, b by x using 'merge';" + + "store c into 'file:///tmp/pigoutput';"; + + run(query, "test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld"); + } + + + @Test public void testBloomJoin() throws Exception { String query = "a = load 'file:///tmp/input1' as (x, y:int);" + @@ -1429,6 +1455,7 @@ public class TestTezCompiler { String goldenPlanClean = Util.standardizeNewline(goldenPlan).trim(); String compiledPlanClean = Util.standardizeNewline(compiledPlan).trim(); + assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)), TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean))); }