Author: rohini
Date: Tue Oct  3 17:31:51 2017
New Revision: 1811015

URL: http://svn.apache.org/viewvc?rev=1811015&view=rev
Log:
PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin 
(satishsaley via rohini)

Added:
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java
    pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java
    
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld
Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
    pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1811015&r1=1811014&r2=1811015&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Oct  3 17:31:51 2017
@@ -26,6 +26,8 @@ PIG-5282: Upgade to Java 8 (satishsaley
  
 IMPROVEMENTS
 
+PIG-4120: Broadcast the index file in case of POMergeCoGroup and POMergeJoin 
(satishsaley via rohini)
+
 PIG-5306: REGEX_EXTRACT() logs every line that doesn't match (satishsaley via 
rohini)
 
 PIG-5298: Verify if org.mortbay.jetty is removable (nkollar via szita)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1811015&r1=1811014&r2=1811015&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
 Tue Oct  3 17:31:51 2017
@@ -21,6 +21,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.PriorityQueue;
@@ -41,6 +42,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataBag;
@@ -53,6 +55,7 @@ import org.apache.pig.impl.PigImplConsta
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
 
@@ -78,8 +81,6 @@ public class POMergeCogroup extends Phys
     // relation is also included in the count.
     private transient int relationCnt;
 
-    private transient TupleFactory mTupleFactory;
-
     private String indexFileName;
 
     private FuncSpec idxFuncSpec; 
@@ -113,7 +114,19 @@ public class POMergeCogroup extends Phys
         for(int i=0; i < lrs.length; i++)
             LRs[i].setStripKeyFromValue(false);
     }
-    
+
+    public POMergeCogroup(POMergeCogroup copy) {
+        super(copy);
+        this.sidFuncSpecs = copy.sidFuncSpecs;
+        this.sideFileSpecs = copy.sideFileSpecs;
+        this.LRs = copy.LRs;
+        this.indexFileName = copy.indexFileName;
+        this.idxFuncSpec = copy.idxFuncSpec;
+        this.loaderSignatures = copy.loaderSignatures;
+        this.endOfRecordMark = copy.endOfRecordMark;
+        this.counter = copy.counter;
+    }
+
     // Set to POStatus.STATUS_EOP (default) for MR and POStatus.STATUS_NULL 
for Tez.
     // This is because:
     // For MR, we send EOP at the end of every record
@@ -413,7 +426,7 @@ public class POMergeCogroup extends Phys
         }
     }
 
-    private List<Pair<Integer,Tuple>> readIndex() throws ExecException{
+    protected List<Pair<Integer, Tuple>> readIndex() throws ExecException {
 
         // Assertions on index we are about to read:
         // We are reading index from a file through POLoad which will return 
tuples.
@@ -438,20 +451,28 @@ public class POMergeCogroup extends Phys
         List<Pair<Integer,Tuple>> index = new ArrayList<Pair<Integer,Tuple>>();
 
         for(Result res = ld.getNextTuple(); res.returnStatus != 
POStatus.STATUS_EOP; res = ld.getNextTuple()){
-
-            Tuple  idxTuple = (Tuple)res.result;
-            int colCnt = idxTuple.size()-2;
-            Tuple keyTuple = mTupleFactory.newTuple(colCnt);
-
-            for (int i=0; i< colCnt; i++)
-                keyTuple.set(i, idxTuple.get(i));
-
-            index.add(new Pair<Integer, 
Tuple>((Integer)idxTuple.get(colCnt+1), keyTuple));
+            addTupleToIndex((Tuple) res.result, index);
         }
 
         return index;
     }
 
+    /**
+     * Separates out key tuple from given tuple and adds it to index.
+     *
+     * @param tuple
+     * @param index
+     * @throws ExecException
+     */
+    protected void addTupleToIndex(Tuple tuple, List<Pair<Integer, Tuple>> 
index) throws ExecException {
+        int colCnt = tuple.size() - 2;
+        Tuple keyTuple = mTupleFactory.newTuple(colCnt);
+        for (int i = 0; i < colCnt; i++) {
+            keyTuple.set(i, tuple.get(i));
+        }
+        index.add(new Pair<Integer, Tuple>((Integer) tuple.get(colCnt + 1), 
keyTuple));
+    }
+
     @SuppressWarnings("unchecked")
     private Comparable<Object> getFirstKeyOfNextSplit(final int curSplitIdx, 
final List<Pair<Integer,Tuple>> index) throws IOException{
 
@@ -548,7 +569,6 @@ public class POMergeCogroup extends Phys
     ClassNotFoundException, ExecException {
 
         is.defaultReadObject();
-        mTupleFactory = TupleFactory.getInstance();
         this.heap = new PriorityQueue<Tuple>(11, new Comparator<Tuple>() {
 
             @SuppressWarnings("unchecked")
@@ -622,4 +642,21 @@ public class POMergeCogroup extends Phys
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
+
+    @Override
+    public POMergeCogroup clone() throws CloneNotSupportedException {
+        POMergeCogroup clone = (POMergeCogroup) super.clone();
+        clone.sidFuncSpecs = new ArrayList<FuncSpec>();
+        for (FuncSpec f : this.sidFuncSpecs) {
+            clone.sidFuncSpecs.add(f.clone());
+        }
+        clone.sideFileSpecs = new ArrayList<String>(this.sideFileSpecs);
+        clone.LRs = new POLocalRearrange[this.LRs.length];
+        for (int i = 0; i < this.LRs.length; i++) {
+            clone.LRs[i] = this.LRs[i].clone();
+        }
+        clone.idxFuncSpec = this.idxFuncSpec.clone();
+        clone.loaderSignatures = new ArrayList<String>(this.loaderSignatures);
+        return clone;
+    }
 }

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java?rev=1811015&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroupTez.java
 Tue Oct  3 17:31:51 2017
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.Pair;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class POMergeCogroupTez extends POMergeCogroup implements TezInput {
+
+    private static final Log LOG = LogFactory.getLog(POMergeJoinTez.class);
+    private static final long serialVersionUID = 1L;
+    private String inputKey;
+    private transient String cacheKey;
+    private transient KeyValueReader reader;
+    private transient List<Pair<Integer, Tuple>> index;
+
+    public POMergeCogroupTez(OperatorKey k, List<PhysicalOperator> inpPOs, 
POLocalRearrange[] lrs, int parallel) {
+        super(k, inpPOs, lrs, parallel);
+    }
+
+    public POMergeCogroupTez(POMergeCogroup copy) {
+        super(copy);
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { this.inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        this.inputKey = newInputKey;
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        cacheKey = "mergecogrp-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs, Configuration 
conf) throws ExecException {
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            this.index = (LinkedList<Pair<Integer, Tuple>>) cacheValue;
+            return;
+        }
+
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is 
missing");
+        }
+
+        try {
+            input = inputs.get(this.inputKey);
+            reader = (KeyValueReader) input.getReader();
+            LOG.info(
+                    "Attached input from vertex " + this.inputKey + " : 
input=" + input + ", reader=" + reader);
+            index = new LinkedList<>();
+            while (reader.next()) {
+                Tuple origTuple = (Tuple) reader.getCurrentValue();
+                Tuple copyTuple = mTupleFactory.newTuple(origTuple.getAll());
+                addTupleToIndex(copyTuple, index);
+            }
+            ObjectCache.getInstance().cache(cacheKey, this.index);
+        }
+        catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return super.name().replace("MergeCogroup", "MergeCogroupTez") + 
"\t<-\t " + this.inputKey;
+    }
+
+    @Override
+    protected List<Pair<Integer, Tuple>> readIndex() {
+        return this.index;
+    }
+
+    @Override
+    public POMergeCogroupTez clone() throws CloneNotSupportedException {
+        return (POMergeCogroupTez) super.clone();
+    }
+}
\ No newline at end of file

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1811015&r1=1811014&r2=1811015&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
 Tue Oct  3 17:31:51 2017
@@ -43,7 +43,6 @@ import org.apache.pig.data.SchemaTupleBa
 import org.apache.pig.data.SchemaTupleClassGenerator.GenContext;
 import org.apache.pig.data.SchemaTupleFactory;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.data.TupleMaker;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -55,6 +54,7 @@ import org.apache.pig.impl.plan.PlanExce
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
 
 /** This operator implements merge join algorithm to do map side joins.
  *  Currently, only two-way joins are supported. One input of join is 
identified as left
@@ -82,21 +82,21 @@ public class POMergeJoin extends Physica
     //The Local Rearrange operators modeling the join key
     private POLocalRearrange[] LRs;
 
-    private transient LoadFunc rightLoader;
+    protected transient LoadFunc rightLoader;
     private OperatorKey opKey;
 
-    private Object prevLeftKey;
+    private transient Object prevLeftKey;
 
-    private Result prevLeftInp;
+    private transient Result prevLeftInp;
 
-    private Object prevRightKey = null;
+    private transient Object prevRightKey = null;
 
-    private Result prevRightInp;
+    private transient Result prevRightInp;
 
     //boolean denoting whether we are generating joined tuples in this 
getNext() call or do we need to read in more data.
-    private boolean doingJoin;
+    private transient boolean doingJoin;
 
-    private FuncSpec rightLoaderFuncSpec;
+    protected FuncSpec rightLoaderFuncSpec;
 
     private String rightInputFileName;
 
@@ -113,17 +113,17 @@ public class POMergeJoin extends Physica
 
     private boolean noInnerPlanOnRightSide;
 
-    private Object curJoinKey;
+    private transient Object curJoinKey;
 
-    private Tuple curJoiningRightTup;
+    private transient Tuple curJoiningRightTup;
 
     private int counter; // # of tuples on left side with same key.
 
-    private int leftTupSize = -1;
+    private transient int leftTupSize;
 
-    private int rightTupSize = -1;
+    private transient int rightTupSize;
 
-    private int arrayListSize = 1024;
+    private static int ARRAY_LIST_SIZE = 1024;
 
     private LOJoin.JOINTYPE joinType;
 
@@ -147,10 +147,9 @@ public class POMergeJoin extends Physica
     // Only for spark.
     // it means that current operator reaches at its end and the last left 
input was
     // added into 'leftTuples', ready for join.
-    private boolean leftInputConsumedInSpark = false;
+    private transient boolean leftInputConsumedInSpark = false;
 
     // This serves as the default TupleFactory
-    private transient TupleFactory mTupleFactory;
 
     /**
      * These TupleFactories are used for more efficient Tuple generation. This 
should
@@ -185,6 +184,25 @@ public class POMergeJoin extends Physica
         this.mergedInputSchema = mergedInputSchema;
     }
 
+    public POMergeJoin(POMergeJoin copy) {
+        super(copy);
+        this.firstTime = copy.firstTime;
+        this.LRs = copy.LRs;
+        this.rightLoaderFuncSpec = copy.rightLoaderFuncSpec;
+        this.rightInputFileName = copy.rightInputFileName;
+        this.indexFile = copy.indexFile;
+        this.inpPlans = copy.inpPlans;
+        this.rightPipelineLeaf = copy.rightPipelineLeaf;
+        this.rightPipelineRoot = copy.rightPipelineRoot;
+        this.noInnerPlanOnRightSide = copy.noInnerPlanOnRightSide;
+        this.counter = copy.counter;
+        this.joinType = copy.joinType;
+        this.signature = copy.signature;
+        this.endOfRecordMark = copy.endOfRecordMark;
+        this.leftInputSchema = copy.leftInputSchema;
+        this.mergedInputSchema = copy.mergedInputSchema;
+    }
+
     /**
      * Configures the Local Rearrange operators to get keys out of tuple.
      * @throws ExecException
@@ -211,8 +229,6 @@ public class POMergeJoin extends Physica
      * This is a helper method that sets up all of the TupleFactory members.
      */
     private void prepareTupleFactories() {
-        mTupleFactory = TupleFactory.getInstance();
-
         if (leftInputSchema != null) {
             leftTupleMaker = 
SchemaTupleBackend.newSchemaTupleFactory(leftInputSchema, false, 
GenContext.MERGE_JOIN);
         }
@@ -241,7 +257,7 @@ public class POMergeJoin extends Physica
      * @return the list object to store Tuples in
      */
     private TuplesToSchemaTupleList newLeftTupleArray() {
-        return new TuplesToSchemaTupleList(arrayListSize, leftTupleMaker);
+        return new TuplesToSchemaTupleList(ARRAY_LIST_SIZE, leftTupleMaker);
     }
 
     /**
@@ -546,14 +562,8 @@ public class POMergeJoin extends Physica
         }
     }
 
-    private void seekInRightStream(Object firstLeftKey) throws IOException{
-        rightLoader = 
(LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
-
-        // check if hadoop distributed cache is used
-        if (indexFile != null && rightLoader instanceof 
DefaultIndexableLoader) {
-            DefaultIndexableLoader loader = 
(DefaultIndexableLoader)rightLoader;
-            loader.setIndexFile(indexFile);
-        }
+    private void seekInRightStream(Object firstLeftKey) throws IOException {
+        rightLoader = getRightLoader();
 
         // Pass signature of the loader to rightLoader
         // make a copy of the conf to use in calls to rightLoader.
@@ -565,6 +575,23 @@ public class POMergeJoin extends Physica
                 firstLeftKey instanceof Tuple ? (Tuple)firstLeftKey : 
mTupleFactory.newTuple(firstLeftKey));
     }
 
+    /**
+     * Instantiate right loader
+     *
+     * @return
+     * @throws IOException
+     * @throws ExecException
+     */
+    protected LoadFunc getRightLoader() throws ExecException, IOException {
+        LoadFunc loader = (LoadFunc) 
PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
+        // check if hadoop distributed cache is used
+        if (indexFile != null && loader instanceof DefaultIndexableLoader) {
+            DefaultIndexableLoader defLoader = (DefaultIndexableLoader) loader;
+            defLoader.setIndexFile(indexFile);
+        }
+        return loader;
+    }
+
     private Result getNextRightInp(Object leftKey) throws ExecException{
 
         /*
@@ -668,7 +695,6 @@ public class POMergeJoin extends Physica
     private void readObject(ObjectInputStream is) throws IOException, 
ClassNotFoundException, ExecException{
 
         is.defaultReadObject();
-        mTupleFactory = TupleFactory.getInstance();
     }
 
 
@@ -746,4 +772,26 @@ public class POMergeJoin extends Physica
     public POLocalRearrange[] getLRs() {
         return LRs;
     }
+
+    @Override
+    public POMergeJoin clone() throws CloneNotSupportedException {
+        POMergeJoin clone = (POMergeJoin) super.clone();
+        clone.LRs = new POLocalRearrange[this.LRs.length];
+        for (int i = 0; i < this.LRs.length; i++) {
+            clone.LRs[i] = this.LRs[i].clone();
+        }
+        clone.rightLoaderFuncSpec = this.rightLoaderFuncSpec.clone();
+        clone.inpPlans = new MultiMap<PhysicalOperator, PhysicalPlan>();
+        for (PhysicalOperator op : this.inpPlans.keySet()) {
+            PhysicalOperator cloneOp = op.clone();
+            for (PhysicalPlan phyPlan : this.inpPlans.get(op)) {
+                clone.inpPlans.put(cloneOp, phyPlan.clone());
+            }
+        }
+        clone.rightPipelineLeaf = this.rightPipelineLeaf.clone();
+        clone.rightPipelineRoot = this.rightPipelineRoot.clone();
+        clone.leftInputSchema = this.leftInputSchema.clone();
+        clone.mergedInputSchema = this.mergedInputSchema.clone();
+        return clone;
+    }
 }

Added: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java?rev=1811015&view=auto
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java
 (added)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoinTez.java
 Tue Oct  3 17:31:51 2017
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.ObjectCache;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.builtin.TezIndexableLoader;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+public class POMergeJoinTez extends POMergeJoin implements TezInput {
+
+    private static final Log LOG = LogFactory.getLog(POMergeJoinTez.class);
+    private static final long serialVersionUID = 1L;
+    private String inputKey;
+    private transient String cacheKey;
+    private transient KeyValueReader reader;
+    private LinkedList<Tuple> index;
+
+    public POMergeJoinTez(POMergeJoin joinOp) {
+        super(joinOp);
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+
+    @Override
+    public String[] getTezInputs() {
+        return new String[] { this.inputKey };
+    }
+
+    @Override
+    public void replaceInput(String oldInputKey, String newInputKey) {
+        this.inputKey = newInputKey;
+    }
+
+    @Override
+    public void addInputsToSkip(Set<String> inputsToSkip) {
+        cacheKey = "merge-" + inputKey;
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            inputsToSkip.add(inputKey);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs, Configuration 
conf) throws ExecException {
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
+            this.index = (LinkedList<Tuple>) cacheValue;
+            rightLoader = getRightLoader();
+            return;
+        }
+
+        LogicalInput input = inputs.get(inputKey);
+        if (input == null) {
+            throw new ExecException("Input from vertex " + inputKey + " is 
missing");
+        }
+
+        try {
+            reader = (KeyValueReader) input.getReader();
+            LOG.info(
+                    "Attached input from vertex " + this.inputKey + " : 
input=" + input + ", reader=" + reader);
+            this.index = new LinkedList<Tuple>();
+            while (reader.next()) {
+                Tuple origTuple = (Tuple) reader.getCurrentValue();
+                Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
+                this.index.add(copy);
+            }
+            ObjectCache.getInstance().cache(cacheKey, this.index);
+            rightLoader = getRightLoader();
+        }
+        catch (Exception e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public String name() {
+        return super.name().replace("MergeJoin", "MergeJoinTez") + "\t<-\t " + 
this.inputKey;
+    }
+
+    @Override
+    protected LoadFunc getRightLoader() throws ExecException {
+        LoadFunc loader = (LoadFunc) 
PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec);
+        if (loader instanceof TezIndexableLoader) {
+            ((TezIndexableLoader) loader).setIndex(index);
+        }
+        return loader;
+    }
+
+    @Override
+    public POMergeJoinTez clone() throws CloneNotSupportedException {
+        return (POMergeJoinTez) super.clone();
+    }
+}

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1811015&r1=1811014&r2=1811015&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
 Tue Oct  3 17:31:51 2017
@@ -72,7 +72,9 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroupTez;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoinTez;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPoissonSample;
@@ -112,9 +114,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.builtin.GetMemNumRows;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+import org.apache.pig.impl.builtin.TezIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.NullableIntWritable;
@@ -925,7 +927,9 @@ public class TezCompiler extends PhyPlan
     }
 
     @Override
-    public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws 
VisitorException {
+    public void visitMergeCoGroup(POMergeCogroup poCoGroup) throws 
VisitorException {
+
+        POMergeCogroupTez poCoGrp = new POMergeCogroupTez(poCoGroup);
         if(compiledInputs.length < 2){
             int errCode=2251;
             String errMsg = "Merge Cogroup work on two or more relations." +
@@ -998,43 +1002,53 @@ public class TezCompiler extends PhyPlan
 
             // Create new map-reduce operator for indexing job and then 
configure it.
             TezOperator indexerTezOp = getTezOp();
-            FileSpec idxFileSpec = getIndexingJob(indexerTezOp, baseMROp, 
poCoGrp.getLRInnerPlansOf(0));
-            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
-            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
+            configureIndexerOp(indexerTezOp, baseMROp, poCoGrp);
 
             baseMROp.plan.addAsLeaf(poCoGrp);
+            baseMROp.markMergeCogroup();
             for (FuncSpec funcSpec : funcSpecs)
                 baseMROp.UDFs.add(funcSpec.toString());
 
-            phyToTezOpMap.put(poCoGrp,baseMROp);
+            phyToTezOpMap.put(poCoGrp, baseMROp);
             // Going forward, new operators should be added in baseMRop. To 
make
             // sure, reset curMROp.
             curTezOp = baseMROp;
         }
-        catch (ExecException e){
-           throw new 
TezCompilerException(e.getDetailedMessage(),e.getErrorCode(),e.getErrorSource(),e);
+        catch (ExecException e) {
+            throw new TezCompilerException(e.getDetailedMessage(), 
e.getErrorCode(), e.getErrorSource(), e);
         }
-        catch (TezCompilerException mrce){
-            throw(mrce);
+        catch (TezCompilerException mrce) {
+            throw (mrce);
         }
         catch (CloneNotSupportedException e) {
             throw new TezCompilerException(e);
         }
-        catch(PlanException e){
+        catch (PlanException e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + 
poCoGrp.getClass().getCanonicalName();
             throw new TezCompilerException(msg, errCode, PigException.BUG, e);
         }
-        catch (IOException e){
+        catch (IOException e) {
             int errCode = 3000;
             String errMsg = "IOException caught while compiling 
POMergeCoGroup";
-            throw new TezCompilerException(errMsg, errCode,e);
+            throw new TezCompilerException(errMsg, errCode, e);
         }
     }
 
-    // Sets up the indexing job for map-side cogroups.
-    private FileSpec getIndexingJob(TezOperator indexerTezOp,
-            final TezOperator baseTezOp, final List<PhysicalPlan> 
mapperLRInnerPlans)
+    /**
+     * Sets up the indexing vertex for map-side cogroups.
+     * 
+     * @param indexerTezOp
+     * @param baseTezOp
+     * @param poCoGrp
+     * @throws TezCompilerException
+     * @throws PlanException
+     * @throws ExecException
+     * @throws IOException
+     * @throws CloneNotSupportedException
+     */
+    private void configureIndexerOp(TezOperator indexerTezOp,
+            final TezOperator baseTezOp, final POMergeCogroupTez poCoGrp)
         throws TezCompilerException, PlanException, ExecException, 
IOException, CloneNotSupportedException {
 
         // First replace loader with  MergeJoinIndexer.
@@ -1054,7 +1068,7 @@ public class TezCompiler extends PhyPlan
 
         String[] indexerArgs = new String[6];
         indexerArgs[0] = funcSpec.toString();
-        indexerArgs[1] = 
ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
+        indexerArgs[1] = ObjectSerializer.serialize((Serializable) 
poCoGrp.getLRInnerPlansOf(0));
         indexerArgs[3] = baseLoader.getSignature();
         indexerArgs[4] = baseLoader.getOperatorKey().scope;
         indexerArgs[5] = Boolean.toString(false); // we care for nulls.
@@ -1076,6 +1090,7 @@ public class TezCompiler extends PhyPlan
         indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
 
         POLoad idxJobLoader = new POLoad(new 
OperatorKey(scope,nig.getNextNodeId(scope)));
+        idxJobLoader.copyAliasFrom(baseLoader);
         idxJobLoader.setPc(pigContext);
         idxJobLoader.setIsTmpLoad(true);
         idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
@@ -1092,19 +1107,19 @@ public class TezCompiler extends PhyPlan
         tezPlan.add(indexAggrOper);
         tezPlan.add(indexerTezOp);
         TezCompilerUtil.simpleConnectTwoVertex(tezPlan, indexerTezOp, 
indexAggrOper, scope, nig);
-        TezCompilerUtil.connect(tezPlan, indexAggrOper, baseTezOp);
-        indexAggrOper.segmentBelow = true;
-
         indexerTezOp.setRequestedParallelism(1); // we need exactly one 
reducer for indexing job.
         indexerTezOp.setDontEstimateParallelism(true);
 
-        POStore st = TezCompilerUtil.getStore(scope, nig);
-        FileSpec strFile = getTempFileSpec(pigContext);
-        st.setSFile(strFile);
-        indexAggrOper.plan.addAsLeaf(st);
+        // Convert the index as a broadcast input
+        POValueOutputTez indexAggrOperOutput = new 
POValueOutputTez(OperatorKey.genOpKey(scope));
+        indexAggrOper.plan.addAsLeaf(indexAggrOperOutput);
+        
indexAggrOperOutput.addOutputKey(baseTezOp.getOperatorKey().toString());
+        indexAggrOper.markIndexer();
         indexAggrOper.setClosed(true);
-
-        return strFile;
+        TezEdgeDescriptor edge = new TezEdgeDescriptor();
+        TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
+        TezCompilerUtil.connect(tezPlan, indexAggrOper, baseTezOp, edge);
+        poCoGrp.setInputKey(indexAggrOper.getOperatorKey().toString());
     }
 
     /** Since merge-join works on two inputs there are exactly two TezOper 
predecessors identified  as left and right.
@@ -1118,27 +1133,23 @@ public class TezCompiler extends PhyPlan
      *                  in physical plan, that is yanked and set as inner 
plans of joinOp.
      *  2) LeftTezOper:  add the Join operator in it.
      *
-     *  We also need to segment the DAG into two, because POMergeJoin depends 
on the index file which loads with
-     *  DefaultIndexableLoader. It is possible to convert the index as a 
broadcast input, but that is costly
-     *  because a lot of logic is built into DefaultIndexableLoader. We can 
revisit it later.
      */
     @Override
     public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
-
         try{
-            if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
+            if (compiledInputs.length != 2 || joinOp.getInputs().size() != 2) {
                 int errCode=1101;
                 throw new TezCompilerException("Merge Join must have exactly 
two inputs. Found : "+compiledInputs.length, errCode);
             }
 
             curTezOp = phyToTezOpMap.get(joinOp.getInputs().get(0));
-
             TezOperator rightTezOpr = null;
             TezOperator rightTezOprAggr = null;
-            if(curTezOp.equals(compiledInputs[0]))
+            if (curTezOp.equals(compiledInputs[0])) {
                 rightTezOpr = compiledInputs[1];
-            else
+            } else {
                 rightTezOpr = compiledInputs[0];
+            }
 
             // We will first operate on right side which is indexer job.
             // First yank plan of the compiled right input and set that as an 
inner plan of right operator.
@@ -1220,9 +1231,11 @@ public class TezCompiler extends PhyPlan
                     }
                 }
             } else {
+                joinOp = new POMergeJoinTez(joinOp);
                 LoadFunc loadFunc = rightLoader.getLoadFunc();
                 //Replacing POLoad with indexer is disabled for 'merge-sparse' 
joins.  While
-                //this feature would be useful, the current implementation of 
DefaultIndexableLoader
+                // this feature would be useful, the current implementation of
+                // DefaultIndexableLoader
                 //is not designed to handle multiple calls to seekNear.  
Specifically, it rereads the entire index
                 //for each call.  Some refactoring of this class is required - 
and then the check below could be removed.
                 if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
@@ -1265,32 +1278,37 @@ public class TezCompiler extends PhyPlan
                 rightTezOprAggr.setRequestedParallelism(1); // we need exactly 
one task for indexing job.
                 rightTezOprAggr.setDontEstimateParallelism(true);
 
-                POStore st = TezCompilerUtil.getStore(scope, nig);
-                FileSpec strFile = getTempFileSpec(pigContext);
-                st.setSFile(strFile);
-                rightTezOprAggr.plan.addAsLeaf(st);
-                rightTezOprAggr.setClosed(true);
-                rightTezOprAggr.segmentBelow = true;
-
-                // set up the DefaultIndexableLoader for the join operator
-                String[] defaultIndexableLoaderArgs = new String[5];
-                defaultIndexableLoaderArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
-                defaultIndexableLoaderArgs[1] = strFile.getFileName();
-                defaultIndexableLoaderArgs[2] = 
strFile.getFuncSpec().toString();
-                defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
-                defaultIndexableLoaderArgs[4] = 
origRightLoaderFileSpec.getFileName();
-                joinOp.setRightLoaderFuncSpec((new 
FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
+                // Convert the index as a broadcast input
+                POValueOutputTez rightTezOprAggrOutput = new 
POValueOutputTez(OperatorKey.genOpKey(scope));
+                rightTezOprAggr.plan.addAsLeaf(rightTezOprAggrOutput);
+                
rightTezOprAggrOutput.addOutputKey(curTezOp.getOperatorKey().toString());
+
+                TezEdgeDescriptor edge = new TezEdgeDescriptor();
+                TezCompilerUtil.configureValueOnlyTupleOutput(edge, 
DataMovementType.BROADCAST);
+                TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp, 
edge);
+
+                ((POMergeJoinTez) 
joinOp).setInputKey(rightTezOprAggr.getOperatorKey().toString());
+                // set up the TezIndexableLoader for the join operator
+                String[] tezIndexableLoaderArgs = new String[3];
+                tezIndexableLoaderArgs[0] = 
origRightLoaderFileSpec.getFuncSpec().toString();
+                tezIndexableLoaderArgs[1] = joinOp.getOperatorKey().scope;
+                tezIndexableLoaderArgs[2] = 
origRightLoaderFileSpec.getFileName();
+                joinOp.setRightLoaderFuncSpec(
+                        (new FuncSpec(TezIndexableLoader.class.getName(), 
tezIndexableLoaderArgs)));
                 
joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
-
-                joinOp.setIndexFile(strFile.getFileName());
                 udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
             }
 
+            if(joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+                curTezOp.markMergeSparseJoin();
+            } else {
+                curTezOp.markMergeJoin();
+            }
             // We are done with right side. Lets work on left now.
             // Join will be materialized in leftTezOper.
-            if(!curTezOp.isClosed()) // Life is easy
+            if (!curTezOp.isClosed()) {// Life is easy
                 curTezOp.plan.addAsLeaf(joinOp);
-
+            }
             else{
                 int errCode = 2022;
                 String msg = "Input plan has been closed. This is unexpected 
while compiling.";
@@ -1298,14 +1316,14 @@ public class TezCompiler extends PhyPlan
             }
             if(rightTezOprAggr != null) {
                 rightTezOprAggr.markIndexer();
-                // We want to ensure indexing job runs prior to actual join 
job. So, connect them in order.
-                TezCompilerUtil.connect(tezPlan, rightTezOprAggr, curTezOp);
             }
+
             phyToTezOpMap.put(joinOp, curTezOp);
             // no combination of small splits as there is currently no way to 
guarantee the sortness
             // of the combined splits.
             curTezOp.noCombineSmallSplits();
             curTezOp.UDFs.addAll(udfs);
+
         }
         catch(PlanException e){
             int errCode = 2034;
@@ -2661,5 +2679,6 @@ public class TezCompiler extends PhyPlan
     private TezOperator getTezOp() {
         return new TezOperator(OperatorKey.genOpKey(scope));
     }
+
 }
 

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1811015&r1=1811014&r2=1811015&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
 Tue Oct  3 17:31:51 2017
@@ -185,7 +185,13 @@ public class TezOperator extends Operato
         // Indicate if this job constructs bloom filter
         BUILDBLOOM,
         // Indicate if this job applies bloom filter
-        FILTERBLOOM;
+        FILTERBLOOM,
+        // Indicate if this job is a merge join job
+        MERGE_JOIN,
+        // Indicate if this job is a merge sparse join job
+        MERGE_SPARSE_JOIN,
+        // Indicate if this job is a merge cogroup job
+        MERGE_COGROUP;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -393,6 +399,14 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.COGROUP.ordinal());
     }
 
+    public boolean isMergeCogroup() {
+        return feature.get(OPER_FEATURE.MERGE_COGROUP.ordinal());
+    }
+
+    public void markMergeCogroup() {
+        feature.set(OPER_FEATURE.MERGE_COGROUP.ordinal());
+    }
+
     public boolean isRegularJoin() {
         return feature.get(OPER_FEATURE.HASHJOIN.ordinal());
     }
@@ -473,6 +487,22 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.FILTERBLOOM.ordinal());
     }
 
+    public boolean isMergeJoin() {
+        return feature.get(OPER_FEATURE.MERGE_JOIN.ordinal());
+    }
+
+    public void markMergeJoin() {
+        feature.set(OPER_FEATURE.MERGE_JOIN.ordinal());
+    }
+
+    public boolean isMergeSparseJoin() {
+        return feature.get(OPER_FEATURE.MERGE_SPARSE_JOIN.ordinal());
+    }
+
+    public void markMergeSparseJoin() {
+        feature.set(OPER_FEATURE.MERGE_SPARSE_JOIN.ordinal());
+    }
+
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> 
excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {

Modified: pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1811015&r1=1811014&r2=1811015&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java 
(original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Tue 
Oct  3 17:31:51 2017
@@ -71,8 +71,8 @@ public class DefaultIndexableLoader exte
 
     private LoadFunc loader;
     // Index is modeled as FIFO queue and LinkedList implements java Queue 
interface.
-    private LinkedList<Tuple> index;
-    private FuncSpec rightLoaderFuncSpec;
+    protected LinkedList<Tuple> index;
+    protected FuncSpec rightLoaderFuncSpec;
 
     private String scope;
     private Tuple dummyTuple = null;
@@ -94,6 +94,9 @@ public class DefaultIndexableLoader exte
         this.inpLocation = inputLocation;
     }
 
+    public DefaultIndexableLoader() {
+    }
+
     @SuppressWarnings("unchecked")
     @Override
     public void seekNear(Tuple keys) throws IOException{
@@ -113,16 +116,8 @@ public class DefaultIndexableLoader exte
         // there are multiple Join keys, the tuple itself represents
         // the join key
         Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
-        POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new 
FuncSpec(indexFileLoadFuncSpec)));
-
-        Properties props = ConfigurationUtil.getLocalFSProperties();
-        PigContext pc = new PigContext(ExecType.LOCAL, props);
-        ld.setPc(pc);
-        index = new LinkedList<Tuple>();
-        for(Result 
res=ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple())
-            index.offer((Tuple) res.result);
-
 
+        loadIndex();
         Tuple prevIdxEntry = null;
         Tuple matchedEntry;
 
@@ -193,6 +188,22 @@ public class DefaultIndexableLoader exte
         initRightLoader(splitsAhead);
     }
 
+    /**
+     * Set indices as LinkedList from index file
+     *
+     * @throws ExecException
+     */
+    protected void loadIndex() throws ExecException {
+        POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new 
FuncSpec(indexFileLoadFuncSpec)));
+
+        Properties props = ConfigurationUtil.getLocalFSProperties();
+        PigContext pc = new PigContext(ExecType.LOCAL, props);
+        ld.setPc(pc);
+        index = new LinkedList<Tuple>();
+        for (Result res = ld.getNextTuple(); res.returnStatus != 
POStatus.STATUS_EOP; res = ld.getNextTuple())
+            index.offer((Tuple) res.result);
+    }
+
     private void initRightLoader(int [] splitsToBeRead) throws IOException{
         Properties properties = (Properties) ObjectSerializer
                 
.deserialize(PigMapReduce.sJobConfInternal.get().get("pig.client.sys.props"));

Added: pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java?rev=1811015&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java (added)
+++ pig/trunk/src/org/apache/pig/impl/builtin/TezIndexableLoader.java Tue Oct  
3 17:31:51 2017
@@ -0,0 +1,34 @@
+package org.apache.pig.impl.builtin;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.data.Tuple;
+import org.apache.tez.runtime.api.LogicalInput;
+
+public class TezIndexableLoader extends DefaultIndexableLoader {
+
+    public TezIndexableLoader(String loaderFuncSpec, String scope, String 
inputLocation) {
+        super(loaderFuncSpec, null, null, scope, inputLocation);
+    }
+
+    @Override
+    public void loadIndex() throws ExecException {
+        // no op
+    }
+
+    /**
+     * Loads indices from provided LinkedList
+     *
+     * @param index
+     * @throws ExecException
+     */
+    public void setIndex(LinkedList<Tuple> index) throws ExecException {
+            this.index = index;
+    }
+}

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java?rev=1811015&r1=1811014&r2=1811015&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezScriptState.java Tue Oct 
 3 17:31:51 2017
@@ -290,6 +290,15 @@ public class TezScriptState extends Scri
                 if (tezOp.isLimit() || tezOp.isLimitAfterSort()) {
                     feature.set(PIG_FEATURE.LIMIT.ordinal());
                 }
+                if (tezOp.isMergeJoin()) {
+                    feature.set(PIG_FEATURE.MERGE_JOIN.ordinal());
+                }
+                if (tezOp.isMergeSparseJoin()) {
+                    feature.set(PIG_FEATURE.MERGE_SPARSE_JOIN.ordinal());
+                }
+                if (tezOp.isMergeCogroup()) {
+                    feature.set(PIG_FEATURE.MERGE_COGROUP.ordinal());
+                }
                 try {
                     new FeatureVisitor(tezOp.plan, feature).visit();
                 } catch (VisitorException e) {

Added: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld?rev=1811015&view=auto
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld 
(added)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld 
Tue Oct  3 17:31:51 2017
@@ -0,0 +1,43 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-24    ->      Tez vertex scope-32,
+Tez vertex scope-32    ->      Tez vertex scope-22,
+Tez vertex scope-22
+
+Tez vertex scope-24
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-34        ->       scope-32
+|   |
+|   Project[tuple][*] - scope-33
+|
+|---a: 
Load(file:///tmp/input1:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh
 
3ilO1GLJNhT5TGq6OnpUsGZC3tJHizAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','eNq9fVlsHVeaXnERrd2SvKglWRJl0drMqrtwEWWNxqYo0qKbW5N0L1LacvFWXbLE2lxVl7yU4c6Mg06CyaATBBO400jcQZKHeWggQD8NkDxMFmBeMsiCDhAkCDAYzGvykgAzyCDo/P9/zqk6VXXqkrSNcbfpe+ue/fz/96/n1C/+p3YkjrSlINo0zNBsbdlG6GwaG2Zr2/YtY8u0giA07K7d6iRO4Nv+puNDka292GmZ7oK5Z0dG6Jp+bKzwZyvwTWP/9PVr/U+0U9DQcnvadef9sJMsaEeC0DPDRLu+AJ3WWKc16LTmeKFbg27c2mLHTZxFM7zfjbS3CkPDUtSlsRzakZkEUb7HowvacW8uCrxZa9OOP9F+pPVBn9637b040V5eeG7umLwTaH9Be8lbsM0dG347I/224MQJ/DjoLYfUxMCCNuStBkFC3wYXtNPeWtBOit2cpKfrgfzsqCd/74aw2tdUU8J+DTFxPp3+fmxh0IMnNAqqfRaHyYo/NuMt+O3IS//tX//b1z/+TwNa/5x23A1Ma85swcLMa8eSrciOtwLX6obvvkdtntw9Cn/P4Mcujelo6Yc+6GVk32WHFZX2+QOt37FgoeNWENqJdo4tJpTfrK0lkeNv3u+KWV1JeDHo5gdfh/Ai2zXxR9MVYwIyXJ4Lo
 
lloLxvbySfaWSeeNSN3b92OPMc3E9t6oh3b9oNdf815YT/RTgBFrjmWvey7e/PaS34wn9hevKCdz9HuShS07DiGySTaK9IEHwaBa5s+EMxxh8ohQxChPNVedeL14KE9ByNNbN+2pqPI3Eu0/qdPgFyCkH5ctWM74eVP4BoDSbaB8hJt+WmRSfgC1dgC1QoLVMstUE3wpFie+9A+0AMQ2fpeiDTf//QhUHYMjXtmotWVHOkGm1mLrGhtjf6DE046oWsvmtt2lGjDxfqWmZi19bQEsvPC19nw4nxye3zEbLU6HuAN7cF0kmAf1rz2amR/0oHJ2taKGZmua7tO7D3UjmcLAWRruo5JW3YSvlADiXapejow8yEqJfDglAujNTft9chs4VLcLNYNbV9Ai1QQ2jkbRA5MFaYYtIiaRZMvBZ1E6uF4aEa2T7SVaCtfiywIsGsyYMM4BiIkiEdfq+FVWlLc5mv7wke2dwRxgCaJdkNJgFitJqEOIAmuySlN+zX80020PhNx8ZUMF4nJEMO7v/2fL//DPzL/8YDWN68NxsDrhEJ9u4Mc5r77zePPAuBvNrkhABqAAC/Exx9oR1zHcxJ4Bhz/YfgIGgZ6c+ccFzDzmnL2TlDDn9dCuwW71B+2FHxGBVeczZnAT+wuSq5jsbMJONcB2kei7uLf14mUTvCxXcAvp/ka4ucrYciE3dXcCv3wm2TYkeUCsU//5I+Xzv7X//fTfm0ANgiZaF4bCtptWJ4cV7Jtg38HaajdEHf84dcZGqNVSYI91E5GNqyYv5bAwgH4DzGEyAuz5Y3ndgtIPOwPaenwHw3GclVF8E5giL3LJHo/6ARteLpkejZHnKPtjt/CUol2obi3c/wn6BJGghXfqdVqiRfWCIAa0Pf5Qt+iStYn6C/HWq4Zx2mnT7WjqCJMR5vA9a88LYvrBe1lBqRQgsE91nsVR/Gg0F8CyGGsw59FJkVngs0o6IQjjzqetzcTAOS2EnPDtZ
 
EF7Ii27opquTIKzrblzLz2smW3TdgIAd9PtJOOD22HbFjIYP5sF1DC8Qn95123EydEbwvaOaIh21prRU6Y4G4IhU40uxBsLtg7NtDCt+S1B7E3/rxGv9xnhYGGrJnA80zfEm2c4Y9xxVPshudHkfZQtCg2dJb/hNITtiR5Jsuel6lbUDSAYRIHEfl1SS3Nnt9nAsH00hmBiBiiJ5nASEvjk7sL2ok4vwhQ6Ey87YQhLM+WE66YyRZ73g2JrPuQsM9nsAqia9u2uNI5/odLf+2M+4d/CfgN+hPIXtCMliPY366sVGqStomtvS7tO03VoBWuxy9dvdr/7yYHoFi4C6i0NAw1hrTB+aW55byaqlXpr7kfDg0Oif0CSPgF4pIrdiijQg1E2oOv1KCqLWChgYY+BoN8NVvcbG/vnfuPL8LJ3/wpScajnErZHnZzYg53ghjrlX/58lvXPvizPziQAXCU1onEQFdA/gz7rf9d/K3/1/DvSKKd3jMj32ibjqu3gU5Ba2+bLgCzdhm05ci2Oi3beB5sGJ0NGLcZbxu2j4PB9h4nWo1qQ+dBJ2oBm/qg9ESGZ3b1VgDMboM+ppth6Dpc64HmG3X4J9FeA9jc2ANQMWBBDFju1nbc8WDNJhrNRJtWtmuBKN6zLfyvvUkN6kkA2wMFvWDHdHUHcCXCDx72NMZ6+g1qK3E8GyWPHkMJByblIklaG3r2A6ylDU2Zlt7C/ddRk0i0Y436+NTE3Uls6kI7NuIx02gFvm8TFlC7ASqSRyZYd7eydWu5DihzRtzZAHXAQFSH5tPFAMW8AeUnOGnFQFmRk+wZ214satp+K9oLYRGNbXvPoFEBCYZOBGrU0fGxZp11aWRdxluddhv6yUaob9t2qAP87NiqvfMDK11fqJQAvtqRLp4gYeF6BB0fptjfzM8P6GLLwWWDsaFtBBVyO3B0anKcD3GUOgNSkMZqelmPiBMGqU3G9gYoe1Dleml8oWd7OpGKzmZ
 
iJdpgEnVgl+6oN3kLYNyF+cjzwOl/WKRvMQ8LWFu3HFDuGyOfqsccJybw/iaW+qzG69WwHswyK4i8AuIJ0CMytmzTTbZo4NISAclMssV5KzcUUYv3Y0RBkBg0pOGRTzmxgGZgUP+sZo2XhaaUjBPZrWDHhunxZeMUMJIfrgGyhBMpPWOChrqtKMhKfJZoV4ExpjdQHrdI8qztgSnmMZJHqZ9ob0rAyicBdVxEYoPweA56Og2PrI60QkOTnH6uqDvYMiPe/kVl+4/NCNst8ySHJ+TgEHSXrMfBiSIX56icKiuo6VoFWgIUsscwir5GoulZuQLHAm1jF4wHABNbNrLcYN1oTsDqCZCIXYQCgqoYULvFhwVKX6JNlZcgBRWsiOv2ENUkMK/WqAUmQABMzpd4DXoBqZNoZ2p20qptRxsT6aM7vPGtJAkNs5NswUg5pkGZCKYexGnhiyOfdmLkgcCzP+MGZvrjaxxPPfSIATEBVgLm4pJ+BxgUJERsf0I0FAdRwuk4smiJEu0lkiSEf2rm57sF+n60lxcNgw2qd80JWxlBEGCy0ikF4lA+T7ST8ZixAbS6zUTC0cm7jfoUgFsORWi/ccwwVr41nDjqpdW1nHhbZ7CgF3GBRjjUaLK5MflsWp4Df1tAoH13AJsOJdJCUPmARcvgga3URGEV4Aoc4cSpe7Bng1ONe00mv2HuMGrL7kocO9BoTiXaJVgwtrD6LtCfrYeIaQkXqUcmJybGJhPtphKskCbAOBOjR/rV3qoYfEwwUIs8KptotwvbgcQDGgZ0AcOzSc0BoQG/xBwCVXKJUwTycZ52YPJN2pSrW6bxIghQsiJQo7sQdRGmChhMOQe8Pg2rkBP4fWMg70urbHa6YgvjbBjPUlHOuPtmmbtZWWONFXzMZF2ivX0QWYuKbYfphR/l4E6tSGR6zlBjilHmOQmT8nLlYh4MCSftCOfWJQKBym+oKZgBAzbzaaKd4OjAAP6GEuDx97Wx
 
6UwkAFcryFgp/u4UQZIM2tjwSbncsZkaiD+jE3WsXl2DFdylZW8jUKUkcEcBMbAMRFhgBRiZBol9jAPU14qLx/Ro6MNv2a6k+xqk+1JHHwPzllVTkDkwXSfeEl3wMd2t5nMud+ChLhwpeivs6BxyBxq48XfU9AWy2NK3HdcVjKCzVfg2iC8lo3NQsyz4ISYeVxbbCuLEN0F6vDNVHwMGOgszBT0ApiWjNBDxdeBKjqle4DuACMyuSPmXb8vnxMC0kc/tBHYR+AEWjxCCORLEok72UpFxB1ux0fEjexP0A3hE9kk263fU2iB+Znia2ktstA7alNmMYKf0g0jaECq2QHoCj7z6eH19pfbs8fLa+nsLyzPTC/gp0Zp5kipzOFheFOLrwAAE3tSq8CYFV/6Zww8AQjzmE+NkshLn8eNEe/9Q8grINkrooV6wxvjCMvCIgeBsi/MoVeQjf788clYBdziS64kNYNXn/UXalrWZxTUmTt4uU1TskjnVpv124pSnBWVV6CK4g3phBxNyHQ3FDnG4dqM0Ky5B+bhTRjlZN+h/wA8IGHeVXCMRbYxziJIN20xkNYgPeLpE48AOurm5GQmciUBB1jMazdoAGAx8C4FLB9X2lkysLaDMWGdRD2AqN9i1Lf6VOv4doErquNSZjfyl5xlX9IQ1/yjRXi+pjWEn2hR27T2lNSVp8US8fscD9gFDZhMXNQYhTQiMzodvZQ2ghgP4Z+y0UF3GDjogwVXsFAEQ4BgdFivQoYTthQnzWaDAAPQcQ/R8maFny81b4xf56jEwgVFGXkFclXR6WEowfj3Ea750n3QctAsIWHktXbG9gOrATKBpgWq/VbDx+1CpLfaEhAub94L64Q2f45vAqgvh3pwAvW5SKSZxMUM0aF3LxN/MCDoHBQ3lJRVItFduBeR5n0FGfkAPb1fqCknCVOVjk/XxKW4J1LOtYYE10gDZx3YQeSaz/HDL4YOFu3KnjBag2qalHjHP3
 
AwrfK3saUFpngldoVBNKJmSqatCdOhhh8Qz/JDf6dfLvYRgVKDzbGxiEij0kop19Dw9lTQ68rAwHzh5XGFhyxocmvZJ0ApcdKCtL6ztAGOfRR2Gdk4WT4BYNyX5H4Y6+blIFebsH7omrXrZ3SAxZoGKgc4rFd0Pgo11VmuRreC8jx7LhweHXIa1hukH/p4XdGIBTVze3zs44ZpJwsnWQOUEoKMFYx+uxMD8Dl9rx5l5Ggfujk0+TFDz4j0PZrEtNJCpSvtEB5ULNwadrVyb1DMjhOr/jClLyKUbQEZkd4JW8VrZmIIiOTWhqK/ETO5h4xt2G7HQ2URVj+splzLJjy5dHdRIXbh0sciiINkSDqEXDUjtSnlIvqenJaB6Qn4AmA3naz4hZlAONSebjfHxnHtFIjEYTgygio9iCvsB/OJo+RpXOK+2LHjObI8LaucSFAAmKNJMBHZY4AHwExVyPxla4Cdr8KzWYb8D8ChdTKmtwnQS6IYMYKPS9Sc8kKIit4TzDiwSZHzlWixWh9ox05fqlQy3aIbrsPsj8N9lqvyQVl1Y7nmyFFCFOBPb6BpMxArraspi3j+YjAHyKoYB2KT6cQitVdqkTG4xT03e1gBQu1Lcj45jGbIldwQoRTjzhf8QHTLIn6wgH7U6uIHJMVbHzdR2dCA6LQrjtXprnqU4ibBSsAoOFcNDAPSkQczwhytMkYBnK9jPHogHCbYz07KN/oYYzDX2pOu5JVsyDmFNXMaozLPB9FjxHF0zDeb/HFFj6q69gZueqqI3uM1WBt/MaGtMTVV6xssUIFAFCOBtJTPDwEHJsZCXxbZyy+BzMezqLeMUv3q4fZLqi50Su7MmfspNUbayQL4QVaeWvAXlYvLs1ieEs6Za6y9EYCbqRZADyWLpfpA4bSHkGGNgKhrQjVBIXimoay1Cif7GRE6pyXn3ETOyYPPzRDufV4y5x5qcgo16c7zkfhNbBvwZtDdMDKMKQ4TLwWEalS
 
9p8p0QI5txQV6eB5WMSxjX2TDMHdNxhbLzcXHpq2F/C7ZVaPHvKUllN4i2de4vRFsng2O20fhs13Qk30ZT7bQwUWkOQOF0Wjq6HbEVeVJ5VORMK8wRCVncgiv6joJeWOhNDEFnJqondv6tEo5SyoeQoGmoYQAIMtGu5/c4lRmy0owN/4/MG0b+faYK4To7rVRhwklHjoXQeNSCxd0KHtwH0Mh3gckJOPMWcxTSujl+K+KrdEXtJM4C1mc5BqWPPgPNVkkQKWwdIdwBrULegQ76cDJEUiolXOby6nez6mxJkW3oE7c1QLsHRma0BwYnD0MKCmRgBZIH1IMWWgFWhKYvqOh+EksO1zFu3IxU62folkIzHXGoWn/mtgyK9BmpQu2g4eN0SH0TOeW/aKvwFJ5WELK0lL4psY2y/mf7O/rulpNgqia0ufTB9Hennz1eXpwdfTz9aHl55dnM8uLi8lLu0eNHc2uFMktzzx7Nr4rvP5heZTVyYCTRQIpAoNsh4KPNjvvxL4BoChCJM6eJ47wBKyfrxUbJ6IXVRNLrRDyXgffEFXBFjAkxGio6lkstcxy5XN6Fsmf9StFL8GJb0oPEWl8rDdIgBwcRd2qlfigcKUy3X4cG1jI3C/x6i0FNpu/AmD2gYrSZWDs684ugdxys0oslt0yal5Jox0FBvzs+NTY5frcymtsGo4kp3FeUCjf+PpeEGNGVdSAuKSUdiD8hHYjFyvgOSPSn5yJ+RcfYt4VHXSWbyL1pSP7NC7l4EMvh1V9gZzBxHvTUt8DEGi9xwa4D1sBurGfRmdTfXtj7C1I81DVf7AEQc1TmSHmjxJGeHW0iNoGkzvaCCxJ9P/BiJIN6GZ/oR9myp/RHWrbwOVBCHPNqHlufffKMXNDC+5sz/Ry/082mrLO0KtSEeUAlRvM+0VOR1onN1L+nKwWC8CXLoUM0u7DS74kQbA8nqSAHYUtzHritrPdiW1dSzee5XBwJdWS8BjbacOE
 
jwh7V+hWY5MiFCcCOpzOxzCOlg+P1e5NCUqiEPtCbE1g67hJqYWDZlMJvz3c8FD5p1BMkz4WKqLW3waHwapVSkydJvTQuxiIUFC2pr4hO50EThd6IiCUTHn8C8po+ZC4UTAsGCY25ArRBNiESlWOs6BLVQeQk2puL0wtAmc+mV2eXpp8tTn//wUjxSdFXJXiBfNNtDHx3eSYAz0FAv64E9Kj8cKas9gih4wbGhts2hQ2UscEKkGBUfIIfdOZ3ernWiaPahuPXWPEqBinbrTl/4Rtl8DC7sg3xK+FoK4XxPD3yWCIeeoEpEgZbpEekvObDBah7UVKY8JCWs6vSYaaS2ihz2fVKOdiyHeyVB2QwNdcHHLe7aFen7j4yT5Fh7gxHu4l2holB2Uf2qspHpnAu5P2L2OW/ESZBOT4FFMhtJhbeyDHvwF2Mi1+K0KXHQ4ufdExo2i3k7b1ZiTBxZoz2dKJ+p2N3bNQCJet1XL0dSPwEEYhWtl/maO33Ab1Ulq/AmuGCto84E4fIsnKS09SB7dqCPXSu5OciLWizaBQK/xostiN0/bbTRY83UgZf2/38WiJDkYvDC4W5Zf4Vzv0XU7dlzuYSEvUvSmBttlx9x7F3ddxarW8YTF+uVaFlk/d6vsQzU3OeksKgmZhiQSjUA1O2aOyz2CJJIb/gUY78cjmOiG8pCPejHDqF+loxFHxLwbnldFo2zr5TIilJ5iWWNlAkQ6A0ULQMYWrrmUWEoMy80QYwl5SlA1jUrCuy/VhiBdPjKepRckc8PFx0RzLxwiBw01D2h7mAVQ8nZxkAL0jSJj8rTpqFbWJWNnNESaiOfHLngM4Thou1rAx+jOdBHioifiSrA+YuYF43jEvCv60E1a4QTHHKV31WxiledTkEeRK703Y8k0Qz9IyCcKOVNT5o2aXSqmyXssrN09tUWvfVArZ1fJ/EC7A7FefLfbVEpkVXaRa1v4sO0VekDUSfJzIcp4mJaq+t9Flv
 
mSFI3QSogrCfeRQbqgnvq257gose53wiEmyKjM2blA4qtrnGHteeb7H8mSyFK9HuH34YW44dYXBtD2Ml3GrCVpSJ99waRSK+WPQVq+FMpvupnPdmv2Xmuy4WeqBuwDqfoQkS7oXcM38M82+eLS8t/KBIOKn4YUofZ+T/nnOjyMkEpfQBTmcXFTYe98LitC5XRS3AYslbRmSisrjU0Sx/cmxfzQXEZ+JgVI7lOjmpL+GqrOjysD4vjMkGsVInocLo0V0jx+GHQETfszfmqLbwGVXGFkACcp5KtMvqoin33dw/UNGoN1EWTMn+AunsCg+s6CJATs7ZfFrxQJMnh1YmbaFxzJXEW8p9UmeYo9n0BgIGt6qYMgb6IehxqTLx93I0J6QJwj4QHeMwTkTnWU51yd7kVljzYFEjP8gCR1wq3VCvcUX+5XmVEh8EoBAeq4lnoD+qZa2c8LV4uHBOOpzM9bS6iARo87QvntvMjFQ3CLzURv2J8AiocwIzRGMDe9h7YHIjaZ4Jb2uWN1VSMstm2SkEx12nnXDn2ZQ6U5ZKxDyKsobfluiznDt7Z7/cqcz1Bdw31hifuIvp3td7pXLlc7ea1XZGK/Awy5TCbJjrqktpq2joojoppVMLB6vazUvMJ7j/lJC9E/U65fEqsZl42bMtB3EyOwj0m4c+CPRMbkl4IOXNLmT+sxObUigbdXtxuOsN5VjLU2vU683Ks0QUI2DnMdFPJlS/66rcv8K+0ZE9clBcrk6spoV6U52kr/teGkZTeTlYlgcbHYpHytmnFBJoghIV9nCrf1IJ8srkKTCxHdcy6BxlEKKxf1T/vteFVQUyHy2qq1SM+TJF/obpbgbw2xYUP7r2eLqxsrr0vkC4agMxn1Z/OrV4OSAMl1mTDnl+p+O0ttcox+s1+fkWpjnzPFGvE8H/izucKhawvkU74ee5eIUsEarDbbdLGoDvZV69otX1RASwe/jXcU+lkOnPhZZxwJCpt2FbV
 
hozNXplZHO/Vv6ozxhx0b3KUfJkqnjPb4npkZGGw2Y4NzCBLUwe6BAPehnZoz1MD2Ye192cF0TaNEl5RI78JSiVuUgCHfg5txtErvWO6e8BtLwT7bYsszLnbRNdNLB8st6SUz2ls7g8zFFeURY/Zgoey/bSebqXwbQcTEI4mzv/g0QlsvCr9E8MgErOdVAX3vJtQL0AFiDY3KMhoJRCs1cEkJlAu1XmGqzJTvvTibZFVj515cFcV7OuQEbWK3wDvr2rs2QwhRltF51tqfknUiolC/BamgNXdfAJC/79RHt0cMPIh20iUALatnh3OobbRcjnjaIK9dw0Y2IJu4uh4xkaisq7W/ZVXZL1VhbIp8g+/oGmWPwelPJS1kKWaUta+9Dc9PzC7KMqKQvUa8d0+4SUvoMpfFzeSTqsw0+jmpYJPGWx5EKQjeONe+Nj9fHKiDgoCYi5qYw80WjeJSnZeCffAXlGcH4K395NddtI8ywjUjJAgfDGemj9+2Sdvl2MxCFL59MIfBv2Cz6H3NUlkg7RWxyj2z/JHwY1+IUC0HgF7yQg5czIWqPK6TnQi5KRoU5svlLhj1OeTrhbaeYUZAMBy9A9TgKYWp6KAphQEAsabR7kgHVencKavkgiLx9BOszho6ZohhN+CigSrH0kLPAqESppsPlTom8pzCF+tNAo4dLEQY5OVsUo7x5C/gJwbwDCclfBa0yjc22cim67jDZUoX86kWGwqG1fItSEHkpwYZjQ6FwV6XCYPGDIiQ/gtiLAFW1jplcxFQor3DzUEjMRTuYwGK+IZKPDpDTcP0BCearlitRyiXC/XUq6S7P48ec/A3UXs5Qrs67f7HH6I6X9a5z2c5sj0T368ofLJ6FKzvRxljp6SbluuZOdr7dJxFT4H9LjPQBA5OA00tj154r7MvbJ0fdQg0SxJD5c7nmZQaLNo4POT1xn450t/O1BK+w8iE2E8HgU1iB8EINgj0cBnFv2A3+UKY0P9k
 
aBXTYAquARtvfgLWjr3QMrje3IBprCDFhu+kAxTxFnIkMZIZ7BCTd6OHd5BqYrgbCI011i14XoZe56UzrlQkc2yFW/8cIJm+iRi0yUB6d4TINpNsBEvXxbFDWBPhwLtgXvX5kkehiuPr6TxTU/LhgSdIUM3e2nyyfZ0PvWzZC2XnmSNU2xdnwU0sYOhjMQqQaXlpdmS2E3zPXnWUN4gejIp+1UhM6tfdb7kpbCMjBEkONSr6aaYdl7oSvsSaLKXqe1jR5cXeFAnBIO+Uo4LZOOgmYsypk32rbfQmaLwToVeTMSai2nZ9APQ5Bnc9YA0/zfUmsvS3aCibrrvHCV07uA1QUD5CNlzh2BTpam+VHuNJaMF+VofCkcqvQ/TeadNHKl0lGoyjO4BkbvVWehpg9uXIjVinV+S45wUswf3GtJTgruulyYmV0VTaZ3EVxX2gDF2FjmxLo3hcGxc9Qf6qJR2BLD0iuGhcUe0/cfwNcVdM+vrswk2niFybeJl3K2O66OVKwy+yJxLUQuaMiG3ONM15XqK2BYxn/fvSrhCEoWmPrIp39ZdYYBWIkM+Hzs9n6Fczwj+l53cTUrlN7qOp+luWi9skKZ3PpJVfYcX0pkkfhwGvi9umiz8sYO6ThcORJO+8LNPeblZEcC8GorMCdnv7Jljo2iigyYBmx756Onz4xp/Ympv6jr9374VH9P/vppfbQ5MfHZu09HfvjuSBle8maL2oYm2a84AoMwL51eboNCYyNQ0hFF/L0Gv0uBZ0pBznJ0B8dJYJN2VqlS3lSLc4nm9IPhiGgi8yIvsBzOdf6DHJJ5syLBoJBQkzqJq6gIF+Ff5VJlBfADCPsxtpU/WdgYa9TvNlmGZuWavN0jlAyWPmF8tswDDaOUSV5x/I+v5Ae97mLYaRpbsfGYNbBGa5uFdLI1XGMbpbr3TdIeqIvEBNuN+azoyp8jeLqnrr4gCA+tIkToGP2hMMAxurcU79TIMusUCdMlW/e
 
TThDhDYCnaTi4Ye80G1ONnAWEq0tRTx7/Td0cJVRiel8JlTBC/prkPMForjBiT+c1uUSb+eqeOoppcM+ZH2wE1l4u16d6x3kec5q7/GvhQqm63mrDjhPdboM1IjI4rlVxqMSb9w/ImyWWJG4cPcypZK7pPv6qd4AwP2rpEpD9r5sACtkKMIB16v3Z9dGV5bX10cez04/woAdW6e2lPSlbAAm7zJfu5NW+pUgu5jLsSk9Lul51WEw+n1gV/i+ojDy1XxwY4zejCZ/+ZTJo95nhpZJDpHTTxUjZfLXpMqJiHOhXIjJWPbXDudrqRbmdBT5z3kY5/ImXAxm9qUx1OuB32ckfdqUQAXsYOH5SvF2o3tPBy+/hRvkU2ewsBR3aLt19p6fXEuhpLZ3Vks/RVzpHRDqoUAQlNzQ6ESf3TeCBJXCZRZZLP8Z7WTEJ971eKlvx4lU2D57skz/cOHqgxMb0Gs8RBecU00Y+U51lzBJdTUvWnX6enl/qcTpB6bSeFG4zxQ1qdC+0zuLJxzCejJLRE0pHzuVLLyPQU1fdz3JZpuyUIWPQvOOeCba3Kw0/uiuThXVSrz1mx2F4gF2+ERsdz0TvER0/Gqg3m0XbpMBLZHngjTw3qiPLRYb/WZaDj6tC/iO/7Wx2IsYRqb9lqG40xwoHS2Wb1/JjxmmwDEjj/b+TO5C47y0uPDi+OjuzvPpI8H9VuCHvEDfSOwPmDhBVxpB5O9aBL4Ho0KAjv7ilR3ExO/jGQTyUsSuOdVXGuzgnnaVzXxHl0QnL7ZfKS2tEKRj/96BjHNOaXHF0v1rmThQY0/AHL5bH5MmvWH3VbmNoIFe76gzW/ob1nZ53ERTMyAPdSIB2JN012y5eaVN9P6v60PRg3bgLjS0ezM8LShE9R7cJX5bM25vdvQfN3mNZjdKRv5Ticxn1v5Vov3HIQ1OgHOm7JiYomdE2zWCsfRB1aouCPrDEb39fXxVvW9G/5yRbo3ShP6gaeAf6
 
6HSrZYfJKHsJBEBP7xsQxf25PW9W5GEc7T+IC45zt0KzF/awqwd5MBELf6EyWHZKV0hzLrtczjAuHL/9WKh+MswXrylgqi+5nPFFMqB4+JtMzfixGLvSdVHWl0cr3ECu2fHpKIFCSWzse5eb5PHXGXQO+nR19cSh7+xi1s3tWzdyl3bh09u34ulFIAMcGb6L4sGn9c9u367M/ZIsTzrW2AkVrudLpZOn7MwwSC8gyZHYjN13sPn3RofpM1th8Q39++8B1irXh11B9C0OHOgbotw17iiKVbdUtIIwnwt5fGJscupu/R5dbnqWly+Zvderjkk4Ie5o3HEwX/Ds9OxabWZ9tbYUrIhzEif48WdEMe7DuNxzx0CT63Ed9Ae+Q3kzH/pO933iNJ5Bg+w8BxyPAQe1r830sutC8ahyzmC4rgyPY9BmD//uOBbSzLsVxievN8M1CdtaXZzjbaxgEytpC9d7XkKfCytdVtw2QbdypwZT/q6hTNXI8lHTXECh1/Dp3trf65BbnjPFdwDw540qrUXSqMTyCQu+UbGITEGUq2bLplfcDs9PWZfuyTiQqdbI7vEO2Y31kkkSVcXRKy9aySPuOfn0G1Pfe94mF7ErpITtULjpl3yhJtC7b6HHRgR6zDZmziXhbscRrZ/OZ7ICPJBXi9tw3K6jJcj5aPdTV3nr16viP5bd5YmwG1wwjlRf1ptxX6L1TZUuIMrIN7aTTphiq2/bWT7l5fRGdnFvO57ZEJYOmIX0fT8fVEF6/X7VCzxUZjgujcmuGimcFB4co5duXOn5lgI6jqM3xaGGShftcOGAEGbSpdNBUd3zdG0aIZOu/Ff7chjWsXhA5OzAmDbtooHQOFAOPH3S2WE17te5UNI/8LwEc7uerEnPE+1qLprKU6LYpafAAZsxP+h/qfruJywycCo9YctORqZ3m2Ao4d19LhovqCqUVivOD0lnIK/3sgsOhUUYO7yqfpkG0xZopa5Wv0aDH4MYO
 
6RKLfwzfw78WnkyvXwLtXz/ppgGShiwvBCtX3o0Ozf94cK6SKEo6bH0PhTCeMzKxTNpFyrdj1xpuKjQxjOvSd8XPe+Mk8UbWgRpWg5s4qXSmRk7ijDKzK6VA3MGbztT+yGkG6xfeb4V8wusV2enFxaN9YVHynANrpniFgl0bKovgaS8A1tQcJwz/Hz1PWzilTYVJ96ulG+PQm20wOw3S4kd3ga9qE7tur1zgBiLHsOcnS4gJNNPpw4W6ZZMGwmsOwLJlK63KnRGbDihvCBbvkdlT3H1eq1XBiXeYMryWXMXmYKG9qp8+012R9fFMi87gbif60aPl62Q91yy4icnReCkWt1lN93RqyvU2ge/Doq8cfCYH3k6LZgidxD+FXY1AGZhbXX8bfmImbHfKSw36pQDv/tecVNMhkJv22NxdFmR63DwBMFm8ZgS3WPgBZbT3tPZQvT9bs7VpU5nuFOVz50+KqZdXyzzILbIiONyJdBTr3SrINkDQldgmBWXOVdae/iIxy8L6/9FJfVIp8QKWiR7g4l4c9NI7+hg7gqQM5I1yK+G7m82qt6z8GK7/EYGo8cbGSrectRh1xYr7u3CCyvSu3LVL3riRb4L/8Hru+4dzBCpDocd7qVumGuYdyn0VZ8eQI80UjaLEXCXtPwejza3j/Uk0FlqP4uDSFn9j0t2Z+EiRWEIlKWOIEW0OfmSTlQFa6V6M7ze2uPp5sTkPNW7WRW+s+yNziYXaNnZ8jfLqf0gc4Tzkpud04d+ucIqya/pMJxhLUm36IAM55fWpzqJzcPY3ew1tRq9ppa/CBIfDNGDIQBjvPcID7DzK5AAa+kj4T97cKQW03+P0Q/s88tSIXwCkvSI+ayh17vld+Pi95viHbDpa2nxw+1+/q7z/vR97fhqVYO/KPyPh6Pf+i//6P/+L3p15hEgvA6+ljhtv5/a76fv7OlQjxe1f603qEO/3OUnv8F4JQrQa5d7ifFpJ+aP14NZ33qiHU
 
fwYdlGT7TXwvTV6A/NzeU2vSQ7foLv3sZX6K7Bc9emh/DzvHaULPGZwF3QXmoFbsfDwMVr0otO0xc447ukN77pGc6Wnslv5yy/Kvmi9KrkZii/JlnQX3HfNU3xJmWJEOYBcYABzv3pP/nnf/7bf2uqH19PzQkh0s5k5ZbIKvubv/gHl0/83p/8HXp3aJ7ev6E3WavpYMaMJSIYfKKdasGTpSBZIj/BgnYCrBHXWuOvjx8rvulW5HyyAiPi61xW6b707mOckg7fAdBd9urcM9JbiR+C9X4f92bgk9zeXJL25nj5BdZYIdqPdxOtrytvIaxmo7Ca+09FWqiH2mBCr5U/YdnMvOb+vJML2qCfvfZ5KOYrd7X3yt3vhsdBIOARHDrFCXaTNdyOAm94xdkcpjEMp69oHvgbdBKsGXa7FajRj6X+Ov75Ef6J2RqpVu5HPZv4UdpORO101bA81Pv1vYf6WYI+GoF6u7taeCDwxNJBjpre+CY4HZvdxU9d5U+f5nq8vD/9fnEA+t3L0S/W+vFx/Pu38c8/OxBF/Bz/fIl/fsp6VY3ly55NfJm28wW181dPEV+qFxAootuJtP6nT773d4fvvfoHP/xTgaX90gaKQULJ5acL3+R77+//09HXN/7P5r//ZdptykBf0rge/uJ/n/+LoaPrf5IWOA7UULF+6rkzUjr1V7rmGizs/wdHHuul','a_1-0','scope','false'))
 - scope-31
+Tez vertex scope-32
+# Plan on vertex
+POValueOutputTez - scope-38    ->       [scope-22]
+|
+|---New For Each(true)[bag] - scope-37
+    |   |
+    |   Project[tuple][1] - scope-36
+    |
+    |---Package(Packager)[tuple]{tuple} - scope-35
+Tez vertex scope-22
+# Plan on vertex
+c: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-21
+|
+|---c: MergeCogroupTez[tuple] - scope-20       <-       scope-32
+    |
+    |---a: New For Each(false,false)[bag] - scope-7
+        |   |
+        |   Cast[int] - scope-2
+        |   |
+        |   |---Project[bytearray][0] - scope-1
+        |   |
+        |   Cast[int] - scope-5
+        |   |
+        |   |---Project[bytearray][1] - scope-4
+        |
+        |---a: 
Load(file:///tmp/input1:org.apache.pig.test.TestMapSideCogroup$DummyCollectableLoader)
 - scope-0
\ No newline at end of file

Added: 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld?rev=1811015&view=auto
==============================================================================
--- 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld 
(added)
+++ 
pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld 
Tue Oct  3 17:31:51 2017
@@ -0,0 +1,51 @@
+#--------------------------------------------------
+# There are 1 DAGs in the session
+#--------------------------------------------------
+#--------------------------------------------------
+# TEZ DAG plan: pig-0_scope-0
+#--------------------------------------------------
+Tez vertex scope-30    ->      Tez vertex scope-37,
+Tez vertex scope-37    ->      Tez vertex scope-29,
+Tez vertex scope-29
+
+Tez vertex scope-30
+# Plan on vertex
+Local Rearrange[tuple]{tuple}(false) - scope-39        ->       scope-37
+|   |
+|   Project[tuple][*] - scope-38
+|
+|---b: 
Load(file:///tmp/input2:org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer('org.apache.pig.builtin.PigStorage','eNqtVb9v00AUfk2aNoRSWqAsVQfEr81ekTpAC60IuE1Eu5CJV/vqGM6+6/lcHAakLjCwwsCAxMDYfwIxMMPIhNiZGeHdOWncFIRE8WD53rt7P7573+f971BLFZx9iLvoZDrizpJS2POiVOd7nxdef8Q3VRhrwngaPWG5BICxx+PmTYfWhQodlOh3mSOj0NlC/xFLAqeLgRDSYTnzKaJIWBJGCW3p9tLIR+5hjylHckxSp923tWkFxTNWgUoHTlGg1vYS581EZtqDmpAxSg0XPUrqFkldSupGseSuKdxdy7iO1lAu5gouj5RmdtmUTksyhVqowxnrHjTiVSXilSBk6Q48hSrljO+yXqrhtGfQ6Seh+B5Mxh7DXUa+mZLPgEbO8bglbYiaBxPxPSG0XU14MB1viG09mmbKWjdF2VaPy+tcEtoXfteSvbFB4/12KhUYM1WQxVZhT88OL/g2pl3y1Sa/vP9w/sGnKlRWocEFBqvoEzBNOKG7iqVdwYNcXr9RBH1cp9eM+cptTY2+Y+rAYSbi0l9hJ0RL93wHKlFAQKe+kEzDmQJM2h+6G1pFSbiYD7qa1f1tlOb+cQaPppi6S8k9qInGsNVW4iHz9bC2iQ5MR2nfTJeRBB1oiF2mDFKMFnNSCd8ESsJlDFvbm5nkLO3AOYpOF7JBds6skdxNqKcalb4pOE2PL3gWJzQ9c6XpOeCdmd+t/93hyhHbsFWghN5xEg5YfDT0VAdq6PtZTIyODJOXtDY5gqbBaScjCrGgjQo5ZzxK42VoFPBt9iSjwUAeoZ23aVrYABrmRyUgQI2uRZrYN2F3DRh3ilO1GLJNhT5TGq6OnpUsGZC3tJH
 
izAoVUavUovDRdD4IOSkyXcrQkKhYoo2caGiPhu/D6BYwuiMwuodgdK0kumVJpDqqysjMrWMFvmchNXN14a8EHd6dFRHiq4YrvxVdc8wt8Zq4ajA5DfCTnoakx4JU6Me41Q5ptGj5OLNW9FLSkGWYUkxnKtnQqLOUJqCYoMNy0toyPKYSKwNBgYPihn+0QiXNAaeZaBYydebb23c/9p5fq5ifYG0XecYIxpnhvvUs3mLq2f6rhZMvv74wZLIJ8nw0vFnO/1E8rdfClx8BzRrqh04e0eN/doPMfwFKyneZ','eNq9V01sG0UUftk4ieukP2nTVgiFOjTApfKWn0NRiiAusTC4cVRboDqqlPF67Gy7uzOdnU3XPSB6gQMSXEACCSSEOPbEiSviwIUeQEJCnKre4QISCAmVN7O79tpxk6IEfFh7Zt6+3+9983z7F5jwBawy0SkQTqxNWuB2p9Ak1jXqtQqbpMUYL9CQWoG0mUe9ju2hyGbXty3iVEiXigJ3iOcX1uK9NVxB9BkzwGjAQVRUbS87TtnjgazABOMu4RJOV9CoGRk10ahpu9wx0YxjXgwcaV8kfCkU8MSQa0pKmyxUORVEMjFoMVuBnFsSzF1pdah/Hd6EMbTpvka7voTDlatki8RGUH8FptwKJVsUz46kziq2L/Ew41a5VjFegUn3EmNSrzIVOOTWWFsOm5nRu3WW3su66XXIMdsLo0JSdgtJ4HE4hqE0ZFzc0V7ot2eVm5H4K8TfxLOJqZ+//ubExvfjYJQg5zDSKhELE1OGA3JTUH+TOa2Qv/iS1jlzI4vPI+pnqH3KbjsYQyuLu6YdM5qq86tg2C1MtG8xTiUcjZKJ8h2zJoXtdZbCJKpFGYuhmct7AZ6gDlGHxEl8QhhWS0ysoL6+bzMNmLX9FSKcbp0K1/aIpK0GHLjmsRtezb5JGzCNiKzZLVr1nG4ZpjxWltT1K3ByALtrglnU9zEYCcdSARYZcyjxEDA5W
 
8uphtBAWYc526+zIi2hp5J6tLUsBOlKMNYbCBfG9eEl6lMZy0+rHCMk24g8CdX14SaJE2RGCTKHEmQOJMhMejJJzxLqRzwgyOpdrjBvrBcR2T4qd4mEsyM70mGdvsZI1KzpLxWwDLhDL5JrVEjID7/fIpKY9Z6EaufKXgo+HM9AjSeIZQUu8o2uwbKUykarDHOCXg8wWNpaI4I4DnVs3y1Crp8IhC1xbKJLNoMLrUDCow8OByOf1FIJHxx00FvSoXVBLJWKp4bf5dRLqCUliHpmmbAxVAyRWRrNicopFsiUhRwngnoaWxLW9gQLTdhmmrDRj3GhAPHynhRf0ilVZV7YlT76tdMUh2wi4cmRAFSvmSnWQSZROTkIcB8/oYSxJueKGY/1mVG3mWLx8NYP8x9/Sz4dh7EyZHzsds1DYzcyMdFd2U9ALlaHirn83p3V2Z/+/siAcTSvQFKGSdZuY8MPoC7k9+OPWj4W6niKe3EtqkWKoYswI6gMhFeTRAZIbpNRBwySdbV5lVpYQm7EdG30rhx1Xoi57k5evPXjJ3/9itXDztsiTqDyqmM5pV7S6TX0Otqd3OGu2dMlgHaFImXmpS8BpGoVRz/8yQYcsv14G29kD2+AHNuiQl2X6jo4znvsXiSdalv3ud9Q9KGyVMN9h+pNPC5D1pdEyAvMwUa1mBO4HnbP8dQI0UOgaofmfke4sm2vHyqEKtUndMKn4+28WhyKOuYZrtolKRXoUkEYb51O6g79avabJQWEsidph4qj9z774o9b75wzVH/FQBBwpC+3GrhNKt6+/eH89Ad331Xe8f7sMaj99f3HwQXip0CQwdvBwp1VJlcpxbJXYLptU6dVi2/AZ4cJCLuIBcKikcBisiz1X0LqzLYDz6pxhbdHhhWU4iMlJihx1H3TmzWj6aEr6ZIuWWGgZAupkuVSBYvzpZZmuG1brxcSMkFuDNOVxSQ/PZTk3SNM5a8IGakvzOkW9S1hc1WO+N
 
rMeMSl8e/eSHFq54Qiz+TwnwAJJOvgdCTUZJZv41SdX7M7ee1DPpJUipcVKZ4c8j9JcH9kxmH9gOUQ319NHFqHrJqHl0UH0XBsfftsWoHD0dSAEn17cyH+TRm+yZqBjTO6V0AHa6gTr3LspvBBo7Shts/qWqnHC+pxPhxVy/M7qjB7es5rPeG2/h3BsjA86v+r4xRHa7ujARgCfyiWV/tnBvD9+H5Qklo/p36FI4/ODVg8vXtHXX6Ijro50FHqx4WcepbUg+rZpLwLIhralnpooG2MRMTGjiou9/RsaD3/PyI2RicQEREGQv3DeeP9/PNzX125l5C+kSpg4iRKRn9y9m0GW/r8zInm753vvuyZ7bXNhvarePu3k39OZut3ewI5RMPIEsyHOyVgXo+hO73432QfMMX/ACAlM/I=','b_1-1','scope','true'))
 - scope-8
+Tez vertex scope-37
+# Plan on vertex
+POValueOutputTez - scope-43    ->       [scope-29]
+|
+|---New For Each(true)[bag] - scope-42
+    |   |
+    |   Project[tuple][1] - scope-41
+    |
+    |---Package(Packager)[tuple]{tuple} - scope-40
+Tez vertex scope-29
+# Plan on vertex
+d: Store(file:///tmp/pigoutput:org.apache.pig.builtin.PigStorage) - scope-28
+|
+|---d: New For Each(false,false,false)[bag] - scope-27
+    |   |
+    |   Project[int][0] - scope-21
+    |   |
+    |   Project[int][1] - scope-23
+    |   |
+    |   Project[int][3] - scope-25
+    |
+    |---c: MergeJoinTez[tuple] - scope-18      <-       scope-37
+        |
+        |---a: New For Each(false,false)[bag] - scope-7
+            |   |
+            |   Cast[int] - scope-2
+            |   |
+            |   |---Project[bytearray][0] - scope-1
+            |   |
+            |   Cast[int] - scope-5
+            |   |
+            |   |---Project[bytearray][1] - scope-4
+            |
+            |---a: Load(file:///tmp/input1:org.apache.pig.builtin.PigStorage) 
- scope-0
\ No newline at end of file

Modified: pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java?rev=1811015&r1=1811014&r2=1811015&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezCompiler.java Tue Oct  3 17:31:51 
2017
@@ -49,6 +49,8 @@ import org.apache.pig.impl.io.FileLocali
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.TestMultiQueryBasic.DummyStoreWithOutputFormat;
 import org.apache.pig.test.Util;
+import org.apache.pig.test.TestMapSideCogroup.DummyCollectableLoader;
+import org.apache.pig.test.TestMapSideCogroup.DummyIndexableLoader;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -269,6 +271,30 @@ public class TestTezCompiler {
     }
 
     @Test
+    public void testMergeJoin() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' as (x:int, z:int);" +
+                "c = join a by x, b by x using 'merge';" +
+                "d = foreach c generate a::x as x, y, z;" +
+                "store d into 'file:///tmp/pigoutput';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeJoin-1.gld");
+    }
+
+    @Test
+    public void testMergeCogroup() throws Exception {
+        String query =
+                "a = load 'file:///tmp/input1' using "+ 
DummyCollectableLoader.class.getName() +"() as (x:int, y:int);" +
+                "b = load 'file:///tmp/input2' using " + 
DummyIndexableLoader.class.getName()+"() as (x:int, z:int);" +
+                "c = cogroup a by x, b by x using 'merge';" +
+                "store c into 'file:///tmp/pigoutput';";
+
+        run(query, 
"test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MergeCogroup-1.gld");
+    }
+
+
+    @Test
     public void testBloomJoin() throws Exception {
         String query =
                 "a = load 'file:///tmp/input1' as (x, y:int);" +
@@ -1429,6 +1455,7 @@ public class TestTezCompiler {
 
         String goldenPlanClean = Util.standardizeNewline(goldenPlan).trim();
         String compiledPlanClean = 
Util.standardizeNewline(compiledPlan).trim();
+
         
assertEquals(TestHelper.sortUDFs(Util.removeSignature(goldenPlanClean)),
                 TestHelper.sortUDFs(Util.removeSignature(compiledPlanClean)));
     }


Reply via email to