Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
 Fri Mar  4 18:17:39 2016
@@ -19,13 +19,21 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
+import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
@@ -35,10 +43,19 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator.VertexGroupInfo;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
+import org.apache.pig.builtin.AvroStorage;
+import org.apache.pig.builtin.JsonStorage;
+import org.apache.pig.builtin.OrcStorage;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.builtin.RoundRobinPartitioner;
+import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -63,8 +80,66 @@ import org.apache.tez.runtime.library.ou
  */
 public class UnionOptimizer extends TezOpPlanVisitor {
 
-    public UnionOptimizer(TezOperPlan plan) {
+    private static final Log LOG = LogFactory.getLog(UnionOptimizer.class);
+    private TezOperPlan tezPlan;
+    private static Set<String> builtinSupportedStoreFuncs = new 
HashSet<String>();
+    private List<String> supportedStoreFuncs;
+    private List<String> unsupportedStoreFuncs;
+
+    static {
+        builtinSupportedStoreFuncs.add(PigStorage.class.getName());
+        builtinSupportedStoreFuncs.add(JsonStorage.class.getName());
+        builtinSupportedStoreFuncs.add(OrcStorage.class.getName());
+        builtinSupportedStoreFuncs.add(HBaseStorage.class.getName());
+        builtinSupportedStoreFuncs.add(AvroStorage.class.getName());
+        
builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.AvroStorage");
+        
builtinSupportedStoreFuncs.add("org.apache.pig.piggybank.storage.avro.CSVExcelStorage");
+        builtinSupportedStoreFuncs.add(Storage.class.getName());
+    }
+
+    public UnionOptimizer(TezOperPlan plan, List<String> supportedStoreFuncs, 
List<String> unsupportedStoreFuncs) {
         super(plan, new ReverseDependencyOrderWalker<TezOperator, 
TezOperPlan>(plan));
+        tezPlan = plan;
+        this.supportedStoreFuncs = supportedStoreFuncs;
+        this.unsupportedStoreFuncs = unsupportedStoreFuncs;
+    }
+
+    public static boolean isOptimizable(TezOperator tezOp,
+            List<String> supportedStoreFuncs, List<String> 
unsupportedStoreFuncs)
+            throws VisitorException {
+        if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && 
tezOp.getRequestedParallelism() == 1) {
+            return false;
+        }
+        // Two vertices separately ranking with 1 to n and writing to output 
directly
+        // will make each rank repeate twice which is wrong. Rank always needs 
to be
+        // done from single vertex to have the counting correct.
+        if (tezOp.isRankCounter()) {
+            return false;
+        }
+        if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
+            List<POStoreTez> stores = 
PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
+            for (POStoreTez store : stores) {
+                String name = store.getStoreFunc().getClass().getName();
+                if (unsupportedStoreFuncs != null
+                        && unsupportedStoreFuncs.contains(name)) {
+                    return false;
+                }
+                if (supportedStoreFuncs != null
+                        && !supportedStoreFuncs.contains(name)) {
+                    if (!builtinSupportedStoreFuncs.contains(name)) {
+                        
LOG.warn(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+                                + " does not contain " + name
+                                + " and so disabling union optimization. There 
will be some performance degradation. "
+                                + "If your storefunc does not hardcode part 
file names and can work with multiple vertices writing to the output location,"
+                                + " run pig with -D"
+                                + 
PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS
+                                + "=<Comma separated list of fully qualified 
StoreFunc class names> to enable the optimization. Refer PIG-4691");
+                        return false;
+                    }
+                }
+            }
+        }
+        return true;
     }
 
     @Override
@@ -73,55 +148,104 @@ public class UnionOptimizer extends TezO
             return;
         }
 
-        if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && 
tezOp.getRequestedParallelism() == 1) {
+        if (!isOptimizable(tezOp, supportedStoreFuncs, unsupportedStoreFuncs)) 
{
             return;
         }
 
         TezOperator unionOp = tezOp;
-        String unionOpKey = unionOp.getOperatorKey().toString();
         String scope = unionOp.getOperatorKey().scope;
-        TezOperPlan tezPlan = getPlan();
+        PhysicalPlan unionOpPlan = unionOp.plan;
 
-        //TODO: PIG-3856 Handle replicated join. Replicate join input that was 
broadcast to union vertex
+        // TODO: PIG-3856 Handle replicated join and skewed join sample.
+        // Replicate join small table/skewed join sample that was broadcast to 
union vertex
         // now needs to be broadcast to all the union predecessors. How do we 
do that??
         // Wait for shared edge and do it or write multiple times??
-        // For now don't optimize
-        // Create a copy as disconnect while iterating modifies the original 
list
+        // For now don't optimize except in the case of Split where we need to 
write only once
+
+        Set<OperatorKey> uniqueUnionMembers = new 
HashSet<OperatorKey>(unionOp.getUnionMembers());
         List<TezOperator> predecessors = new 
ArrayList<TezOperator>(tezPlan.getPredecessors(unionOp));
-        if (predecessors.size() > unionOp.getVertexGroupMembers().size()) {
-            return;
+        List<TezOperator> successors = tezPlan.getSuccessors(unionOp) == null 
? null
+                : new ArrayList<TezOperator>(tezPlan.getSuccessors(unionOp));
+
+
+        if (successors != null && uniqueUnionMembers.size() > 1) {
+            for (TezOperator succ : successors) {
+                for (TezOperator pred : predecessors) {
+                    if (succ.inEdges.containsKey(pred.getOperatorKey())) {
+                        // Stop here, we cannot convert the node into vertex 
group
+                        // Otherwise, we will end up with a parallel edge 
between pred
+                        // and succ
+                        return;
+                    }
+                }
+            }
         }
+        if (predecessors.size() > unionOp.getUnionMembers().size()
+                && uniqueUnionMembers.size() != 1) {
+            return; // TODO: PIG-3856
+        }
+        if (uniqueUnionMembers.size() == 1) {
+            // We actually don't need VertexGroup in this case. The multiple
+            // sub-plans of Split can write to same MROutput or the Tez 
LogicalOutput
+            OperatorKey splitPredKey = uniqueUnionMembers.iterator().next();
+            TezOperator splitPredOp = tezPlan.getOperator(splitPredKey);
+            PhysicalPlan splitPredPlan = splitPredOp.plan;
+            if (splitPredPlan.getLeaves().get(0) instanceof POSplit) { //It 
has to be. But check anyways
+
+                try {
+                    connectUnionNonMemberPredecessorsToSplit(unionOp, 
splitPredOp, predecessors);
+
+                    // Remove POShuffledValueInputTez from union plan root
+                    unionOpPlan.remove(unionOpPlan.getRoots().get(0));
+                    // Clone union plan into split subplans
+                    for (int i=0; i < 
Collections.frequency(unionOp.getUnionMembers(), splitPredKey); i++ ) {
+                        cloneAndMergeUnionPlan(unionOp, splitPredOp);
+                    }
+                    copyOperatorProperties(splitPredOp, unionOp);
+                    tezPlan.disconnect(splitPredOp, unionOp);
 
-        PhysicalPlan unionOpPlan = unionOp.plan;
+                    connectSplitOpToUnionSuccessors(unionOp, splitPredOp, 
successors);
+                } catch (PlanException e) {
+                    throw new VisitorException(e);
+                }
 
-        // Union followed by Split followed by Store could have multiple stores
+                //Remove union operator from the plan
+                tezPlan.remove(unionOp);
+                return;
+            } else {
+                throw new VisitorException("Expected POSplit but found " + 
splitPredPlan.getLeaves().get(0));
+            }
+        }
+
+        // Create vertex group operator for each store. Union followed by Split
+        // followed by Store could have multiple stores
         List<POStoreTez> unionStoreOutputs = 
PlanHelper.getPhysicalOperators(unionOpPlan, POStoreTez.class);
         TezOperator[] storeVertexGroupOps = new 
TezOperator[unionStoreOutputs.size()];
-        List<TezOperator> succs = tezPlan.getSuccessors(unionOp);
-        // Create a copy as disconnect while iterating modifies the original 
list
-        List<TezOperator> successors = succs == null ? null : new 
ArrayList<TezOperator>(succs);
-        
         for (int i=0; i < storeVertexGroupOps.length; i++) {
             TezOperator existingVertexGroup = null;
             if (successors != null) {
                 for (TezOperator succ : successors) {
-                    if (succ.isVertexGroup() && 
succ.getVertexGroupInfo().getSFile().equals(unionStoreOutputs.get(i).getSFile()))
 {
+                    if (succ.isVertexGroup() && 
unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile()))
 {
                         existingVertexGroup = succ;
                     }
                 }
             }
             if (existingVertexGroup != null) {
                 storeVertexGroupOps[i] = existingVertexGroup;
+                
existingVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+                
existingVertexGroup.getVertexGroupMembers().addAll(unionOp.getUnionMembers());
+                
existingVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
             } else {
                 storeVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
                 storeVertexGroupOps[i].setVertexGroupInfo(new 
VertexGroupInfo(unionStoreOutputs.get(i)));
                 
storeVertexGroupOps[i].getVertexGroupInfo().setSFile(unionStoreOutputs.get(i).getSFile());
-                
storeVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+                storeVertexGroupOps[i].setVertexGroupMembers(new 
ArrayList<OperatorKey>(unionOp.getUnionMembers()));
                 tezPlan.add(storeVertexGroupOps[i]);
             }
         }
 
-        // Case of split, orderby, skewed join, rank, etc will have multiple 
outputs
+        // Create vertex group operator for each output. Case of split, 
orderby,
+        // skewed join, rank, etc will have multiple outputs
         List<TezOutput> unionOutputs = 
PlanHelper.getPhysicalOperators(unionOpPlan, TezOutput.class);
         // One TezOutput can write to multiple LogicalOutputs (POCounterTez, 
POValueOutputTez, etc)
         List<String> unionOutputKeys = new ArrayList<String>();
@@ -133,133 +257,344 @@ public class UnionOptimizer extends TezO
                 unionOutputKeys.add(key);
             }
         }
-
-        // Create vertex group operator for each output
         TezOperator[] outputVertexGroupOps = new 
TezOperator[unionOutputKeys.size()];
         String[] newOutputKeys = new String[unionOutputKeys.size()];
         for (int i=0; i < outputVertexGroupOps.length; i++) {
             outputVertexGroupOps[i] = new 
TezOperator(OperatorKey.genOpKey(scope));
             outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
             
outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
-            
outputVertexGroupOps[i].setVertexGroupMembers(unionOp.getVertexGroupMembers());
+            outputVertexGroupOps[i].setVertexGroupMembers(new 
ArrayList<OperatorKey>(unionOp.getUnionMembers()));
             newOutputKeys[i] = 
outputVertexGroupOps[i].getOperatorKey().toString();
             tezPlan.add(outputVertexGroupOps[i]);
         }
 
+        // Change plan from Predecessors -> Union -> Successor(s) to
+        // Predecessors -> Vertex Group(s) -> Successor(s)
         try {
-
-             // Clone plan of union and merge it into the predecessor operators
              // Remove POShuffledValueInputTez from union plan root
             unionOpPlan.remove(unionOpPlan.getRoots().get(0));
-            for (OperatorKey predKey : unionOp.getVertexGroupMembers()) {
+
+            for (OperatorKey predKey : unionOp.getUnionMembers()) {
                 TezOperator pred = tezPlan.getOperator(predKey);
-                PhysicalPlan predPlan = pred.plan;
-                PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
-                // if predLeaf not POValueOutputTez
-                if (predLeaf instanceof POSplit) {
-                    // Find the subPlan that connects to the union operator
-                    predPlan = getUnionPredPlanFromSplit(predPlan, unionOpKey);
-                    predLeaf = predPlan.getLeaves().get(0);
-                }
-
-                PhysicalPlan clonePlan = unionOpPlan.clone();
-                //Clone changes the operator keys
-                List<POStoreTez> clonedUnionStoreOutputs = 
PlanHelper.getPhysicalOperators(clonePlan, POStoreTez.class);
-
-                // Remove POValueOutputTez from predecessor leaf
-                predPlan.remove(predLeaf);
-                boolean isEmptyPlan = predPlan.isEmpty();
-                if (!isEmptyPlan) {
-                    predLeaf = predPlan.getLeaves().get(0);
-                }
-                predPlan.merge(clonePlan);
-                if (!isEmptyPlan) {
-                    predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
-                }
-
-                // Connect predecessor to the storeVertexGroups
-                int i = 0;
-                for (TezOperator storeVertexGroup : storeVertexGroupOps) {
-                    
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
-                    //Set the output key of cloned POStore to that of the 
initial union POStore.
-                    clonedUnionStoreOutputs.get(i).setOutputKey(
-                            storeVertexGroup.getVertexGroupInfo().getStore()
-                                    .getOperatorKey().toString());
-                    
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
-                            storeVertexGroup.getOperatorKey());
-                    tezPlan.connect(pred, storeVertexGroup);
-                }
-
-                for (TezOperator outputVertexGroup : outputVertexGroupOps) {
-                    
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
-                    tezPlan.connect(pred, outputVertexGroup);
-                }
-
-                copyOperatorProperties(pred, unionOp);
-                tezPlan.disconnect(pred, unionOp);
-            }
-
-            List<TezOutput> valueOnlyOutputs = new ArrayList<TezOutput>();
-            for (TezOutput tezOutput : unionOutputs) {
-                if (tezOutput instanceof POValueOutputTez) {
-                    valueOnlyOutputs.add(tezOutput);
-                }
-            }
-            // Connect to outputVertexGroupOps
-            // Copy output edges of union -> successor to 
predecessor->successor, vertexgroup -> successor
-            // and connect vertexgroup -> successor in the plan.
-            for (Entry<OperatorKey, TezEdgeDescriptor> entry : 
unionOp.outEdges.entrySet()) {
-                TezOperator succOp = tezPlan.getOperator(entry.getKey());
-                // Case of union followed by union.
-                // unionOp.outEdges will not point to vertex group, but to its 
output.
-                // So find the vertex group if there is one.
-                TezOperator succOpVertexGroup = null;
-                for (TezOperator succ : successors) {
-                    if (succ.isVertexGroup()
-                            && succ.getVertexGroupInfo().getOutput()
-                                    
.equals(succOp.getOperatorKey().toString())) {
-                        succOpVertexGroup = succ;
-                        break;
-                    }
-                }
-                TezEdgeDescriptor edge = entry.getValue();
-                // Edge cannot be one to one as it will get input from two or
-                // more union predecessors. Change it to SCATTER_GATHER
-                if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
-                    edge.dataMovementType = DataMovementType.SCATTER_GATHER;
-                    edge.partitionerClass = RoundRobinPartitioner.class;
-                    edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
-                    edge.inputClassName = UnorderedKVInput.class.getName();
-                }
-                TezOperator vertexGroupOp = 
outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
-                for (OperatorKey predKey : 
vertexGroupOp.getVertexGroupMembers()) {
-                    TezOperator pred = tezPlan.getOperator(predKey);
-                    // Keep the output edge directly to successor
-                    // Don't need to keep output edge for vertexgroup
-                    pred.outEdges.put(entry.getKey(), edge);
-                    succOp.inEdges.put(predKey, edge);
-                    if (succOpVertexGroup != null) {
-                        succOpVertexGroup.getVertexGroupMembers().add(predKey);
-                        
succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
-                        // Connect directly to the successor vertex group
-                        tezPlan.disconnect(pred, vertexGroupOp);
-                        tezPlan.connect(pred, succOpVertexGroup);
+                PhysicalPlan clonePlan = cloneAndMergeUnionPlan(unionOp, pred);
+                connectPredecessorsToVertexGroups(unionOp, pred, clonePlan,
+                        storeVertexGroupOps, outputVertexGroupOps);
+            }
+
+            connectVertexGroupsToSuccessors(unionOp, successors,
+                    unionOutputKeys, outputVertexGroupOps);
+
+            replaceSuccessorInputsAndDisconnect(unionOp, successors, 
unionOutputKeys, newOutputKeys);
+
+            //Remove union operator from the plan
+            tezPlan.remove(unionOp);
+        } catch (VisitorException e) {
+            throw e;
+        }  catch (Exception e) {
+            throw new VisitorException(e);
+        }
+
+    }
+
+    /**
+     * Connect the predecessors of the union which are not members of the union
+     * (usually FRJoin replicated table orSkewedJoin sample) to the Split op
+     * which is the only member of the union. Disconnect those predecessors 
from the union.
+     *
+     * Replace the output keys of those predecessors with the split operator
+     * key instead of the union operator key.
+     *
+     * @param unionOp Union operator
+     * @param splitPredOp Split operator which is the only member of the union 
and its predecessor
+     * @param unionPredecessors Predecessors of the union including the split 
operator
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    private void connectUnionNonMemberPredecessorsToSplit(TezOperator unionOp,
+            TezOperator splitPredOp,
+            List<TezOperator> unionPredecessors) throws PlanException, 
VisitorException {
+        String unionOpKey = unionOp.getOperatorKey().toString();
+        OperatorKey splitPredKey = splitPredOp.getOperatorKey();
+        for (TezOperator pred : unionPredecessors) {
+
+            if (!pred.getOperatorKey().equals(splitPredKey)) { //Skip 
splitPredOp which is also a predecessor
+                // Get actual predecessors if predecessor is a vertex group
+                TezOperator predVertexGroup = null;
+                List<TezOperator> actualPreds = new ArrayList<TezOperator>();
+                if (pred.isVertexGroup()) {
+                    predVertexGroup = pred;
+                    for (OperatorKey opKey : pred.getVertexGroupMembers()) {
+                        // There should not be multiple levels of vertex 
group. So no recursion required.
+                        actualPreds.add(tezPlan.getOperator(opKey));
                     }
+                    tezPlan.disconnect(predVertexGroup, unionOp);
+                    tezPlan.connect(predVertexGroup, splitPredOp);
+                } else {
+                    actualPreds.add(pred);
                 }
-                if (succOpVertexGroup != null) {
-                    
succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
-                    
succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
-                    //Discard the new vertex group created
-                    tezPlan.remove(vertexGroupOp);
+
+                for (TezOperator actualPred : actualPreds) {
+
+                    TezCompilerUtil.replaceOutput(actualPred, unionOpKey, 
splitPredKey.toString());
+
+                    TezEdgeDescriptor edge = 
actualPred.outEdges.remove(unionOp.getOperatorKey());
+                    if (edge == null) {
+                        throw new VisitorException("Edge description is 
empty");
+                    }
+                    actualPred.outEdges.put(splitPredKey, edge);
+                    splitPredOp.inEdges.put(actualPred.getOperatorKey(), edge);
+                    if (predVertexGroup == null) {
+                        // Disconnect FRJoin table/SkewedJoin sample edge to
+                        // union op and connect to POSplit
+                        tezPlan.disconnect(actualPred, unionOp);
+                        tezPlan.connect(actualPred, splitPredOp);
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Connect the split operator to the successors of the union operators and 
update the edges.
+     * Also change the inputs of the successor from the union operator to the 
split operator.
+     *
+     * @param unionOp Union operator
+     * @param splitPredOp Split operator which is the only member of the union
+     * @param successors Successors of the union operator
+     * @throws PlanException
+     * @throws VisitorException
+     */
+    private void connectSplitOpToUnionSuccessors(TezOperator unionOp,
+            TezOperator splitPredOp, List<TezOperator> successors)
+            throws PlanException, VisitorException {
+        String unionOpKey = unionOp.getOperatorKey().toString();
+        String splitPredOpKey = splitPredOp.getOperatorKey().toString();
+        List<TezOperator> splitSuccessors = tezPlan.getSuccessors(splitPredOp);
+        if (successors != null) {
+            for (TezOperator succ : successors) {
+                TezOperator successorVertexGroup = null;
+                boolean removeSuccessorVertexGroup = false;
+                List<TezOperator> actualSuccs = new ArrayList<TezOperator>();
+                if (succ.isVertexGroup()) {
+                    successorVertexGroup = succ;
+                    if (tezPlan.getSuccessors(successorVertexGroup) != null) {
+                        // There should not be multiple levels of vertex 
group. So no recursion required.
+                        
actualSuccs.addAll(tezPlan.getSuccessors(successorVertexGroup));
+                    }
+                    int index = 
succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey());
+                    while (index > -1) {
+                        succ.getVertexGroupMembers().set(index, 
splitPredOp.getOperatorKey());
+                        index = 
succ.getVertexGroupMembers().indexOf(unionOp.getOperatorKey());
+                    }
+                    // Store vertex group
+                    POStore store = 
successorVertexGroup.getVertexGroupInfo().getStore();
+                    if (store != null) {
+                        //Clone changes the operator keys
+                        List<POStoreTez> storeOutputs = 
PlanHelper.getPhysicalOperators(splitPredOp.plan, POStoreTez.class);
+                        for (POStoreTez storeOut : storeOutputs) {
+                            if 
(storeOut.getOutputKey().equals(store.getOperatorKey().toString())) {
+                                
splitPredOp.addVertexGroupStore(storeOut.getOperatorKey(), 
successorVertexGroup.getOperatorKey());
+                            }
+                        }
+                    }
+                    tezPlan.disconnect(unionOp, successorVertexGroup);
+                    Set<OperatorKey> uniqueVertexGroupMembers = new 
HashSet<OperatorKey>(succ.getVertexGroupMembers());
+                    if (uniqueVertexGroupMembers.size() == 1) {
+                        //Only splitPredOp is member of the vertex group. Get 
rid of the vertex group
+                        removeSuccessorVertexGroup = true;
+                    } else {
+                        // Avoid connecting multiple times in case of union + 
self join
+                        if (splitSuccessors == null || 
!splitSuccessors.contains(successorVertexGroup)) {
+                            tezPlan.connect(splitPredOp, successorVertexGroup);
+                        }
+                    }
                 } else {
-                    tezPlan.connect(vertexGroupOp, succOp);
+                    actualSuccs.add(succ);
+                }
+
+                // Store vertex group
+                if (actualSuccs.isEmpty() && removeSuccessorVertexGroup) {
+                    
splitPredOp.removeVertexGroupStore(successorVertexGroup.getOperatorKey());
+                    tezPlan.remove(successorVertexGroup);
                 }
+
+                for (TezOperator actualSucc : actualSuccs) {
+
+                    TezCompilerUtil.replaceInput(actualSucc, unionOpKey, 
splitPredOpKey);
+
+                    TezEdgeDescriptor edge = 
actualSucc.inEdges.remove(unionOp.getOperatorKey());
+                    if (edge == null) {
+                        throw new VisitorException("Edge description is 
empty");
+                    }
+                    actualSucc.inEdges.put(splitPredOp.getOperatorKey(), edge);
+                    splitPredOp.outEdges.put(actualSucc.getOperatorKey(), 
edge);
+                    if (successorVertexGroup == null || 
removeSuccessorVertexGroup) {
+                        if (removeSuccessorVertexGroup) {
+                            // Changes plan from SplitOp -> Union -> 
VertexGroup - > Successor
+                            // to SplitOp -> Successor
+                            tezPlan.disconnect(successorVertexGroup, 
actualSucc);
+                            tezPlan.remove(successorVertexGroup);
+                            TezCompilerUtil.replaceInput(actualSucc, 
successorVertexGroup.getOperatorKey().toString(), splitPredOpKey);
+                        } else {
+                            // Changes plan from SplitOp -> Union -> Successor
+                            // to SplitOp -> Successor
+                            tezPlan.disconnect(unionOp, actualSucc);
+                        }
+                        // Avoid connecting multiple times in case of union + 
self join
+                        if (splitSuccessors == null || 
!splitSuccessors.contains(actualSucc)) {
+                            tezPlan.connect(splitPredOp, actualSucc);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /**
+     * Clone plan of union and merge it into the predecessor operator
+     *
+     * @param unionOp Union operator
+     * @param predOp Predecessor operator of union to which union plan should 
be merged to
+     */
+    private PhysicalPlan cloneAndMergeUnionPlan(TezOperator unionOp, 
TezOperator predOp) throws VisitorException {
+        try {
+            PhysicalPlan predPlan = predOp.plan;
+            PhysicalOperator predLeaf = predPlan.getLeaves().get(0);
+            // if predLeaf not POValueOutputTez
+            if (predLeaf instanceof POSplit) {
+                // Find the subPlan that connects to the union operator
+                predPlan = getUnionPredPlanFromSplit(predPlan, 
unionOp.getOperatorKey().toString());
+                predLeaf = predPlan.getLeaves().get(0);
+            }
+            PhysicalPlan clonePlan = unionOp.plan.clone();
+
+            // Remove POValueOutputTez from predecessor leaf
+            predPlan.remove(predLeaf);
+            boolean isEmptyPlan = predPlan.isEmpty();
+            if (!isEmptyPlan) {
+                predLeaf = predPlan.getLeaves().get(0);
             }
+            predPlan.merge(clonePlan);
+            if (!isEmptyPlan) {
+                predPlan.connect(predLeaf, clonePlan.getRoots().get(0));
+            }
+            return clonePlan;
         } catch (Exception e) {
             throw new VisitorException(e);
         }
+    }
 
+    /**
+     * Connects the unionOp predecessor to the store vertex groups and the 
output vertex groups
+     * and disconnects it from the unionOp.
+     *
+     * @param pred Predecessor of union which will be made part of the vertex 
group
+     * @param unionOp Union operator
+     * @param predClonedUnionPlan Cloned plan of the union merged to the 
predecessor
+     * @param storeVertexGroupOps Store vertex groups to connect to
+     * @param outputVertexGroupOps Tez LogicalOutput vertex groups to connect 
to
+     */
+    public void connectPredecessorsToVertexGroups(TezOperator unionOp,
+            TezOperator pred, PhysicalPlan predClonedUnionPlan,
+            TezOperator[] storeVertexGroupOps,
+            TezOperator[] outputVertexGroupOps) throws 
VisitorException,PlanException {
+
+        //Clone changes the operator keys
+        List<POStoreTez> clonedUnionStoreOutputs = 
PlanHelper.getPhysicalOperators(predClonedUnionPlan, POStoreTez.class);
+
+        // Connect predecessor to the storeVertexGroups
+        int i = 0;
+        for (TezOperator storeVertexGroup : storeVertexGroupOps) {
+            
storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+            
pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
+                    storeVertexGroup.getOperatorKey());
+            tezPlan.connect(pred, storeVertexGroup);
+        }
+
+        for (TezOperator outputVertexGroup : outputVertexGroupOps) {
+            
outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
+            tezPlan.connect(pred, outputVertexGroup);
+        }
+
+        copyOperatorProperties(pred, unionOp);
+        tezPlan.disconnect(pred, unionOp);
+    }
+
+    /**
+     * Connect vertexgroup operator to successor operator in the plan.
+     *
+     * Copy the output edge between union operator and successor to between
+     * predecessors and successor. Predecessor output key and output edge 
points
+     * to successor so that we have all the edge configuration, but they are
+     * connected to the vertex group in the plan.
+     *
+     * @param unionOp Union operator
+     * @param successors Successors of the union operator
+     * @param unionOutputKeys Output keys of union
+     * @param outputVertexGroupOp  Tez LogicalOutput vertex groups 
corresponding to the output keys
+     *
+     * @throws PlanException
+     */
+    private void connectVertexGroupsToSuccessors(TezOperator unionOp,
+            List<TezOperator> successors, List<String> unionOutputKeys,
+            TezOperator[] outputVertexGroupOps) throws PlanException {
+        // Connect to outputVertexGroupOps
+        for (Entry<OperatorKey, TezEdgeDescriptor> entry : 
unionOp.outEdges.entrySet()) {
+            TezOperator succOp = tezPlan.getOperator(entry.getKey());
+            // Case of union followed by union.
+            // unionOp.outEdges will not point to vertex group, but to its 
output.
+            // So find the vertex group if there is one.
+            TezOperator succOpVertexGroup = null;
+            for (TezOperator succ : successors) {
+                if (succ.isVertexGroup()
+                        && succOp.getOperatorKey().toString()
+                                
.equals(succ.getVertexGroupInfo().getOutput())) {
+                    succOpVertexGroup = succ;
+                    break;
+                }
+            }
+            TezEdgeDescriptor edge = entry.getValue();
+            // Edge cannot be one to one as it will get input from two or
+            // more union predecessors. Change it to SCATTER_GATHER
+            if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
+                edge.dataMovementType = DataMovementType.SCATTER_GATHER;
+                edge.partitionerClass = RoundRobinPartitioner.class;
+                edge.outputClassName = 
UnorderedPartitionedKVOutput.class.getName();
+                edge.inputClassName = UnorderedKVInput.class.getName();
+            }
+            TezOperator vertexGroupOp = 
outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
+            for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {
+                TezOperator pred = tezPlan.getOperator(predKey);
+                // Keep the output edge directly to successor
+                // Don't need to keep output edge for vertexgroup
+                pred.outEdges.put(entry.getKey(), edge);
+                succOp.inEdges.put(predKey, edge);
+                if (succOpVertexGroup != null) {
+                    succOpVertexGroup.getVertexGroupMembers().add(predKey);
+                    succOpVertexGroup.getVertexGroupInfo().addInput(predKey);
+                    // Connect directly to the successor vertex group
+                    tezPlan.disconnect(pred, vertexGroupOp);
+                    tezPlan.connect(pred, succOpVertexGroup);
+                }
+            }
+            if (succOpVertexGroup != null) {
+                
succOpVertexGroup.getVertexGroupMembers().remove(unionOp.getOperatorKey());
+                
succOpVertexGroup.getVertexGroupInfo().removeInput(unionOp.getOperatorKey());
+                //Discard the new vertex group created
+                tezPlan.remove(vertexGroupOp);
+            } else {
+                tezPlan.connect(vertexGroupOp, succOp);
+            }
+        }
+    }
+
+    private void replaceSuccessorInputsAndDisconnect(TezOperator unionOp,
+            List<TezOperator> successors,
+            List<String> unionOutputKeys,
+            String[] newOutputKeys)
+            throws VisitorException {
         if (successors != null) {
+            String unionOpKey = unionOp.getOperatorKey().toString();
             // Successor inputs should now point to the vertex groups.
             for (TezOperator succ : successors) {
                 LinkedList<TezInput> inputs = 
PlanHelper.getPhysicalOperators(succ.plan, TezInput.class);
@@ -271,16 +606,27 @@ public class UnionOptimizer extends TezO
                         }
                     }
                 }
+
+                List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(succ.plan, POUserFunc.class);
+                for (POUserFunc userFunc : userFuncs) {
+                    if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                        TezInput tezInput = (TezInput)userFunc.getFunc();
+                        for (String inputKey : tezInput.getTezInputs()) {
+                            if (inputKey.equals(unionOpKey)) {
+                                tezInput.replaceInput(inputKey,
+                                        
newOutputKeys[unionOutputKeys.indexOf(succ.getOperatorKey().toString())]);
+                                
userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
+                            }
+                        }
+                    }
+                }
+
                 tezPlan.disconnect(unionOp, succ);
             }
         }
-
-        //Remove union operator from the plan
-        tezPlan.remove(unionOp);
-
     }
 
-    private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) 
{
+    private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) 
throws VisitorException {
         pred.UDFs.addAll(unionOp.UDFs);
         pred.scalars.addAll(unionOp.scalars);
         // Copy only map side properties. For eg: crossKeys.
@@ -292,6 +638,17 @@ public class UnionOptimizer extends TezO
             }
         }
         pred.copyFeatures(unionOp, Arrays.asList(new 
OPER_FEATURE[]{OPER_FEATURE.UNION}));
+
+        // For skewed join right input
+        if (unionOp.getSampleOperator() !=  null) {
+            if (pred.getSampleOperator() == null) {
+                pred.setSampleOperator(unionOp.getSampleOperator());
+            } else if 
(!pred.getSampleOperator().equals(unionOp.getSampleOperator())) {
+                throw new VisitorException("Conflicting sample operators "
+                        + pred.getSampleOperator().toString() + " and "
+                        + unionOp.getSampleOperator().toString());
+            }
+        }
     }
 
     public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, 
String unionOpKey) throws VisitorException {

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/ReadScalarsTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/ReadScalarsTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/ReadScalarsTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/udf/ReadScalarsTez.java
 Fri Mar  4 18:17:39 2016
@@ -67,9 +67,10 @@ public class ReadScalarsTez extends Eval
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf) throws ExecException {
         String cacheKey = "scalar-" + inputKey;
-        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
-        if (cacheValue != null) {
-            t = (Tuple) cacheValue;
+        String cacheKeyPresent = "scalar-present" + inputKey;
+        
+        if (ObjectCache.getInstance().retrieve(cacheKeyPresent) != null) {
+            t = (Tuple)ObjectCache.getInstance().retrieve(cacheKey);
             return;
         }
         input = inputs.get(inputKey);
@@ -84,7 +85,8 @@ public class ReadScalarsTez extends Eval
                 if (reader.next()) {
                     String msg = "Scalar has more than one row in the output. "
                             + "1st : " + first + ", 2nd :"
-                            + reader.getCurrentValue();
+                            + reader.getCurrentValue()
+                            + " (common cause: \"JOIN\" then \"FOREACH ... 
GENERATE foo.bar\" should be \"foo::bar\" )";
                     throw new ExecException(msg);
                 }
             } else {
@@ -94,12 +96,16 @@ public class ReadScalarsTez extends Eval
         } catch (Exception e) {
             throw new ExecException(e);
         }
+        ObjectCache.getInstance().cache(cacheKeyPresent, Boolean.TRUE);
         ObjectCache.getInstance().cache(cacheKey, t);
         log.info("Cached scalar in Tez ObjectRegistry with vertex scope. 
cachekey=" + cacheKey);
     }
 
     @Override
     public Object exec(Tuple input) throws IOException {
+        if (t == null) {
+            return null;
+        }
         int pos = (Integer) input.get(0);
         Object obj = t.get(pos);
         return obj;

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
 Fri Mar  4 18:17:39 2016
@@ -24,6 +24,8 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
+import org.apache.tez.dag.api.EdgeProperty;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
@@ -68,7 +70,7 @@ public class PartitionerDefinedVertexMan
     }
 
     @Override
-    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) 
throws Exception {
         // There could be multiple partition vertex sending VertexManagerEvent
         // Only need to setVertexParallelism once
         if (isParallelismSet) {
@@ -86,14 +88,14 @@ public class PartitionerDefinedVertexMan
             if (dynamicParallelism!=currentParallelism) {
                 LOG.info("Pig Partitioner Defined Vertex Manager: reset 
parallelism to " + dynamicParallelism
                         + " from " + currentParallelism);
-                Map<String, EdgeManagerPluginDescriptor> edgeManagers =
-                    new HashMap<String, EdgeManagerPluginDescriptor>();
-                for(String vertex : 
getContext().getInputVertexEdgeProperties().keySet()) {
-                    EdgeManagerPluginDescriptor edgeManagerDescriptor =
-                            
EdgeManagerPluginDescriptor.create(ScatterGatherEdgeManager.class.getName());
-                    edgeManagers.put(vertex, edgeManagerDescriptor);
+                Map<String, EdgeProperty> edgeManagers = new HashMap<String, 
EdgeProperty>();
+                for(Map.Entry<String,EdgeProperty> entry : 
getContext().getInputVertexEdgeProperties().entrySet()) {
+                    EdgeProperty edge = entry.getValue();
+                    edge = 
EdgeProperty.create(DataMovementType.SCATTER_GATHER, edge.getDataSourceType(), 
edge.getSchedulingType(),
+                            edge.getEdgeSource(), edge.getEdgeDestination());
+                    edgeManagers.put(entry.getKey(), edge);
                 }
-                getContext().setVertexParallelism(dynamicParallelism, null, 
edgeManagers, null);
+                getContext().reconfigureVertex(dynamicParallelism, null, 
edgeManagers);
             }
         }
     }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigOutputFormatTez.java
 Fri Mar  4 18:17:39 2016
@@ -20,21 +20,34 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputCommitter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.UDFContext;
 
 public class PigOutputFormatTez extends PigOutputFormat {
 
+
+    @Override
+    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
+            TaskAttemptContext taskattemptcontext) throws IOException,
+            InterruptedException {
+        resetUDFContextForThreadReuse();
+        return super.getRecordWriter(taskattemptcontext);
+    }
+
     @Override
     public OutputCommitter getOutputCommitter(
             TaskAttemptContext taskattemptcontext) throws IOException,
             InterruptedException {
+        resetUDFContextForThreadReuse();
         setupUdfEnvAndStores(taskattemptcontext);
 
         // we return an instance of PigOutputCommitterTez (PIG-4202) to Hadoop 
- this instance
@@ -44,6 +57,21 @@ public class PigOutputFormatTez extends
                 reduceStores);
     }
 
+    public static void resetUDFContextForThreadReuse() {
+        // On the Tez AM, MROutput OutputCommitters are initialized and 
setupJob
+        // called on them in a loop in the same thread.
+        // commitJob/abortJob can be called from any thread based on events 
received from vertices
+
+        // On the Tez tasks, it initializes different inputs/outputs in 
different Initializer threads
+        // by submitting them to a thread pool. Even though 
threadpoolsize=numInputs+numOutputs
+        // a thread can be reused.
+
+        // Since deserialized UDFContext from input and output payload contains
+        // information only for that input or output reduce payload sizes, we 
need to
+        // ensure it is deserialized everytime before use in a thread to get 
the right one.
+        UDFContext.getUDFContext().reset();
+    }
+
     public static class PigOutputCommitterTez extends PigOutputCommitter {
 
         public PigOutputCommitterTez(TaskAttemptContext context,
@@ -54,39 +82,35 @@ public class PigOutputFormatTez extends
 
         @Override
         public void setupJob(JobContext context) throws IOException {
-            cleanupForContainerReuse();
+            resetUDFContextForThreadReuse();
             try {
                 super.setupJob(context);
             } finally {
-                cleanupForContainerReuse();
+                resetUDFContextForThreadReuse();
             }
 
         }
 
         @Override
         public void commitJob(JobContext context) throws IOException {
-            cleanupForContainerReuse();
+            resetUDFContextForThreadReuse();
             try {
                 super.commitJob(context);
             } finally {
-                cleanupForContainerReuse();
+                resetUDFContextForThreadReuse();
             }
         }
 
         @Override
         public void abortJob(JobContext context, State state)
                 throws IOException {
-            cleanupForContainerReuse();
+            resetUDFContextForThreadReuse();
             try {
                 super.abortJob(context, state);
             } finally {
-                cleanupForContainerReuse();
+                resetUDFContextForThreadReuse();
             }
         }
 
-        private void cleanupForContainerReuse() {
-            UDFContext.getUDFContext().reset();
-        }
-
     }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 Fri Mar  4 18:17:39 2016
@@ -30,13 +30,17 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -57,6 +61,7 @@ import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
 import org.apache.tez.runtime.api.Event;
@@ -116,6 +121,11 @@ public class PigProcessor extends Abstra
         // Reset static variables cleared for avoiding OOM.
         new JVMReuseImpl().cleanupStaticData();
 
+        // Set an empty reporter for now. Once we go to Tez 0.8
+        // which adds support for mapreduce like progress (TEZ-808),
+        // we need to call progress on Tez API
+        PhysicalOperator.setReporter(new ProgressableReporter());
+
         UserPayload payload = getContext().getUserPayload();
         conf = TezUtils.createConfFromUserPayload(payload);
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
@@ -124,6 +134,22 @@ public class PigProcessor extends Abstra
 
         // To determine front-end in UDFContext
         conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, 
getContext().getUniqueIdentifier());
+
+        // For compatibility with mapreduce. Some users use these configs in 
their UDF
+        // Copied logic from the tez class - 
org.apache.tez.mapreduce.output.MROutput
+        // Currently isMapperOutput is always false. Setting it to true 
produces empty output with MROutput
+        boolean isMapperOutput = conf.getBoolean(MRConfig.IS_MAP_PROCESSOR, 
false);
+        TaskAttemptID taskAttemptId = 
org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl
+                
.createMockTaskAttemptID(getContext().getApplicationId().getClusterTimestamp(),
+                    getContext().getTaskVertexIndex(), 
getContext().getApplicationId().getId(),
+                    getContext().getTaskIndex(), 
getContext().getTaskAttemptNumber(), isMapperOutput);
+        conf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+        conf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+        conf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+        conf.setInt(JobContext.TASK_PARTITION,
+              taskAttemptId.getTaskID().getId());
+        conf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+
         conf.set(PigConstants.TASK_INDEX, 
Integer.toString(getContext().getTaskIndex()));
         UDFContext.getUDFContext().addJobConf(conf);
         UDFContext.getUDFContext().deserialize();
@@ -161,7 +187,6 @@ public class PigProcessor extends Abstra
 
     @Override
     public void close() throws Exception {
-
         execPlan = null;
         fileOutputs = null;
         leaf = null;
@@ -173,6 +198,8 @@ public class PigProcessor extends Abstra
         // The Reporter and Context objects hold TezProcessorContextImpl
         // which holds input and its sort buffers which are huge.
         new JVMReuseImpl().cleanupStaticData();
+        // Do only in close() and not initialize().
+        UDFContext.staticDataCleanup();
     }
 
     @Override
@@ -193,8 +220,22 @@ public class PigProcessor extends Abstra
                 leaf = leaves.get(0);
             }
 
+            LOG.info("Aliases being processed per job phase 
(AliasName[line,offset]): " + conf.get("pig.alias.location"));
+
             runPipeline(leaf);
 
+            if (Boolean.valueOf(conf.get(JobControlCompiler.END_OF_INP_IN_MAP, 
"false"))
+                    && !execPlan.endOfAllInput) {
+                // If there is a stream in the pipeline or if this map job 
belongs to merge-join we could
+                // potentially have more to process - so lets
+                // set the flag stating that all map input has been sent
+                // already and then lets run the pipeline one more time
+                // This will result in nothing happening in the case
+                // where there is no stream or it is not a merge-join in the 
pipeline
+                execPlan.endOfAllInput = true;
+                runPipeline(leaf);
+            }
+
             // Calling EvalFunc.finish()
             UDFFinishVisitor finisher = new UDFFinishVisitor(execPlan,
                     new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
@@ -207,13 +248,15 @@ public class PigProcessor extends Abstra
                 throw new VisitorException(msg, errCode, PigException.BUG, e);
             }
 
-            while (!getContext().canCommit()) {
-                Thread.sleep(100);
-            }
-            for (MROutput fileOutput : fileOutputs){
-                fileOutput.flush();
-                if (fileOutput.isCommitRequired()) {
-                    fileOutput.commit();
+            if (!fileOutputs.isEmpty()) {
+                while (!getContext().canCommit()) {
+                    Thread.sleep(100);
+                }
+                for (MROutput fileOutput : fileOutputs){
+                    fileOutput.flush();
+                    if (fileOutput.isCommitRequired()) {
+                        fileOutput.commit();
+                    }
                 }
             }
 
@@ -233,8 +276,8 @@ public class PigProcessor extends Abstra
                 getContext().sendEvents(events);
             }
         } catch (Exception e) {
-            abortOutput();
             LOG.error("Encountered exception while processing: ", e);
+            abortOutput();
             throw e;
         }
     }
@@ -243,7 +286,7 @@ public class PigProcessor extends Abstra
         for (MROutput fileOutput : fileOutputs){
             try {
                 fileOutput.abort();
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOG.error("Encountered exception while aborting output", e);
             }
         }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/SkewedPartitionerTez.java
 Fri Mar  4 18:17:39 2016
@@ -17,73 +17,14 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
-import java.util.Iterator;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SkewedPartitioner;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.builtin.PartitionSkewedKeys;
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezRuntimeUtil;
 
 public class SkewedPartitionerTez extends SkewedPartitioner {
-    private static final Log LOG = 
LogFactory.getLog(SkewedPartitionerTez.class);
 
     @Override
     protected void init() {
-
-        Map<String, Object> distMap = null;
-        if (PigProcessor.sampleMap != null) {
-            // We've collected sampleMap in PigProcessor
-            distMap = PigProcessor.sampleMap;
-        } else {
-            LOG.info("Key distribution map is empty");
-            inited = true;
-            return;
-        }
-
-        long start = System.currentTimeMillis();
-
-        try {
-            // The distMap is structured as (key, min, max) where min, max
-            // being the index of the reducers
-            DataBag partitionList = (DataBag) 
distMap.get(PartitionSkewedKeys.PARTITION_LIST);
-            totalReducers = Integer.valueOf("" + 
distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
-            Iterator<Tuple> it = partitionList.iterator();
-            while (it.hasNext()) {
-                Tuple idxTuple = it.next();
-                Integer maxIndex = (Integer) idxTuple.get(idxTuple.size() - 1);
-                Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
-                // Used to replace the maxIndex with the number of reducers
-                if (maxIndex < minIndex) {
-                    maxIndex = totalReducers + maxIndex;
-                }
-
-                Tuple keyT;
-                // if the join is on more than 1 key
-                if (idxTuple.size() > 3) {
-                    // remove the last 2 fields of the tuple, i.e: minIndex 
and maxIndex and store
-                    // it in the reducer map
-                    Tuple keyTuple = tf.newTuple();
-                    for (int i=0; i < idxTuple.size() - 2; i++) {
-                        keyTuple.append(idxTuple.get(i));
-                    }
-                    keyT = keyTuple;
-                } else {
-                    keyT = tf.newTuple(1);
-                    keyT.set(0,idxTuple.get(0));
-                }
-                // number of reducers
-                Integer cnt = maxIndex - minIndex;
-                // 1 is added to account for the 0 index
-                reducerMap.put(keyT, new Pair<Integer, Integer>(minIndex, 
cnt));
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        LOG.info("Initialized SkewedPartitionerTez. Time taken: " + 
(System.currentTimeMillis() - start));
+        reducerMap = TezRuntimeUtil.readReduceMapFromSample(tf);
         inited = true;
     }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 Fri Mar  4 18:17:39 2016
@@ -17,7 +17,10 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.util;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -25,30 +28,82 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobSubmissionFiles;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.split.JobSplitWriter;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 
 @InterfaceAudience.Private
 public class MRToTezHelper {
 
     private static final Log LOG = LogFactory.getLog(MRToTezHelper.class);
+    private static final String JOB_SPLIT_RESOURCE_NAME = 
MRJobConfig.JOB_SPLIT;
+    private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = 
MRJobConfig.JOB_SPLIT_METAINFO;
+
+    private static Map<String, String> mrAMParamToTezAMParamMap = new 
HashMap<String, String>();
+    private static Map<String, String> mrMapParamToTezVertexParamMap = new 
HashMap<String, String>();
+    private static Map<String, String> mrReduceParamToTezVertexParamMap = new 
HashMap<String, String>();
 
     private static List<String> mrSettingsToRetain = new ArrayList<String>();
 
+    private static List<String> mrSettingsToRemove = new ArrayList<String>();
+
     private MRToTezHelper() {
     }
 
     static {
+        populateMRToTezParamsMap();
         populateMRSettingsToRetain();
+        populateMRSettingsToRemove();
+    }
+
+    private static void populateMRToTezParamsMap() {
+
+        //AM settings
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_VMEM_MB, 
TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_CPU_VCORES, 
TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_MAX_ATTEMPTS, 
TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS);
+        mrAMParamToTezAMParamMap.put(MRConfiguration.JOB_CREDENTIALS_BINARY, 
TezConfiguration.TEZ_CREDENTIALS_PATH);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, 
TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION);
+
+        //Map settings
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_MAX_ATTEMPTS, 
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_SPECULATIVE, 
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, 
TezConfiguration.TEZ_TASK_LOG_LEVEL);
+        //TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 
0.8
+        mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", 
"tez.am.vertex.max-task-concurrency");
+        //TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 
0.8
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.am.progress.stuck.interval-ms");
+
+        //Reduce settings
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_MAX_ATTEMPTS, 
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, 
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, 
TezConfiguration.TEZ_TASK_LOG_LEVEL);
+        
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", 
"tez.am.vertex.max-task-concurrency");
+        
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", 
"tez.am.vertex.max-task-concurrency");
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.am.progress.stuck.interval-ms");
     }
 
     private static void populateMRSettingsToRetain() {
@@ -70,25 +125,71 @@ public class MRToTezHelper {
         
mrSettingsToRetain.add(FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER);
     }
 
+    private static void populateMRSettingsToRemove() {
+
+        // FileInputFormat.listStatus() on a task can cause job failure when 
run from Oozie
+        mrSettingsToRemove.add(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_SIZES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILES_SIZES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_VISIBILITIES);
+        mrSettingsToRemove.add(MRJobConfig.CLASSPATH_FILES);
+    }
+
+    private static void removeUnwantedSettings(Configuration tezConf, boolean 
isAMConf) {
+
+        // It is good to clean up as much of the unapplicable settings as 
possible.
+        // Tez has configs set on multiple places AM, DAG, Vertex, 
VertexManager
+        // Plugin, Tasks (Processor, Edge, every input and output, combiner)
+        // If conf size is bigger, it places heavy pressurce on AM memory and 
is
+        // inefficient while sending over RPC to tasks
+
+        for (String mrSetting : mrSettingsToRemove) {
+            tezConf.unset(mrSetting);
+        }
+
+        Iterator<Entry<String, String>> iter = new 
Configuration(tezConf).iterator();
+        while (iter.hasNext()) {
+            String key = iter.next().getKey();
+            if (!isAMConf) {
+                // Keep the setting in AM conf to be able to connect back to 
the
+                // Oozie launcher job and look at the parameter values passed,
+                // but get rid of for others
+                if (key.startsWith("oozie.")) {
+                    tezConf.unset(key);
+                    continue;
+                }
+            }
+            if (key.startsWith("dfs.datanode")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("dfs.namenode")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("yarn.nodemanager")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.jobhistory")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.jobtracker")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.tasktracker")) {
+                tezConf.unset(key);
+            }
+        }
+    }
+
     public static TezConfiguration getDAGAMConfFromMRConf(
             Configuration tezConf) {
 
         // Set Tez parameters based on MR parameters.
         TezConfiguration dagAMConf = new TezConfiguration(tezConf);
-        Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
-                .getMRToDAGParamMap();
 
-        for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
-            if (dagAMConf.get(entry.getKey()) != null) {
-                dagAMConf.set(entry.getValue(), dagAMConf.get(entry.getKey()));
-                dagAMConf.unset(entry.getKey());
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
-                            + " to Tez key: " + entry.getValue()
-                            + " with value " + 
dagAMConf.get(entry.getValue()));
-                }
-            }
-        }
+
+        convertMRToTezConf(dagAMConf, dagAMConf, 
DeprecatedKeys.getMRToDAGParamMap());
+        convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
 
         String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
         if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
@@ -108,70 +209,144 @@ public class MRToTezHelper {
                 YarnConfiguration.DEFAULT_QUEUE_NAME);
         dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
 
-        int amMemMB = tezConf.getInt(MRJobConfig.MR_AM_VMEM_MB,
-                MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
-        int amCores = tezConf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
-                MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, ""
-                + amMemMB);
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
-                + amCores);
-
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
                 tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, 
MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
                 tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, 
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
 
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
-                + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
-                        MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
-
-        if (tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY) != null) {
-            dagAMConf.setIfUnset(TezConfiguration.TEZ_CREDENTIALS_PATH,
-                    tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY));
-        }
+        // Hardcoding at AM level instead of setting per vertex till TEZ-2710 
is available
+        
dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 
"0.5");
 
-        //TODO: Strip out all MR settings
+        removeUnwantedSettings(dagAMConf, true);
 
         return dagAMConf;
     }
 
     /**
+     * Set config with Scope.Vertex in TezConfiguration on the vertex
+     *
+     * @param vertex Vertex on which config is to be set
+     * @param isMapVertex Whether map or reduce vertex. i.e root or 
intermediate/leaf vertex
+     * @param conf Config that contains the tez or equivalent mapreduce 
settings.
+     */
+    public static void setVertexConfig(Vertex vertex, boolean isMapVertex,
+            Configuration conf) {
+        Map<String, String> configMapping = isMapVertex ? 
mrMapParamToTezVertexParamMap
+                : mrReduceParamToTezVertexParamMap;
+        for (Entry<String, String> dep : configMapping.entrySet()) {
+
+            String value = conf.get(dep.getValue(), conf.get(dep.getKey()));
+            if (value != null) {
+                vertex.setConf(dep.getValue(), value);
+                LOG.debug("Setting " + dep.getValue() + " to " + value
+                        + " for the vertex " + vertex.getName());
+            }
+        }
+    }
+
+    /**
      * Process the mapreduce configuration settings and
      *    - copy as is the still required ones (like those used by 
FileInputFormat/FileOutputFormat)
      *    - convert and set equivalent tez runtime settings
      *    - handle compression related settings
      *
-     * @param conf Configuration on which the mapreduce settings will have to 
be transferred
+     * @param tezConf Configuration on which the mapreduce settings will have 
to be transferred
      * @param mrConf Configuration that contains mapreduce settings
      */
-    public static void processMRSettings(Configuration conf, Configuration 
mrConf) {
+    public static void processMRSettings(Configuration tezConf, Configuration 
mrConf) {
         for (String mrSetting : mrSettingsToRetain) {
             if (mrConf.get(mrSetting) != null) {
-                conf.set(mrSetting, mrConf.get(mrSetting));
+                tezConf.set(mrSetting, mrConf.get(mrSetting));
             }
         }
-        JobControlCompiler.configureCompression(conf);
-        convertMRToTezRuntimeConf(conf, mrConf);
+        JobControlCompiler.configureCompression(tezConf);
+        convertMRToTezConf(tezConf, mrConf, 
DeprecatedKeys.getMRToTezRuntimeParamMap());
+        removeUnwantedSettings(tezConf, false);
     }
 
     /**
      * Convert MR settings to Tez settings and set on conf.
      *
-     * @param conf  Configuration on which MR equivalent Tez settings should 
be set
+     * @param tezConf  Configuration on which MR equivalent Tez settings 
should be set
      * @param mrConf Configuration that contains MR settings
+     * @param mrToTezConfigMapping  Mapping of MR config to equivalent Tez 
config
      */
-    private static void convertMRToTezRuntimeConf(Configuration conf, 
Configuration mrConf) {
-        for (Entry<String, String> dep : 
DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) {
+    private static void convertMRToTezConf(Configuration tezConf, 
Configuration mrConf, Map<String, String> mrToTezConfigMapping) {
+        for (Entry<String, String> dep : mrToTezConfigMapping.entrySet()) {
             if (mrConf.get(dep.getKey()) != null) {
-                conf.unset(dep.getKey());
-                LOG.info("Setting " + dep.getValue() + " to "
-                        + mrConf.get(dep.getKey()) + " from MR setting "
-                        + dep.getKey());
-                conf.setIfUnset(dep.getValue(), mrConf.get(dep.getKey()));
+                if (tezConf.get(dep.getValue()) == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Setting " + dep.getValue() + " to "
+                                + mrConf.get(dep.getKey()) + " from MR setting 
"
+                                + dep.getKey());
+                    }
+                    tezConf.set(dep.getValue(), mrConf.get(dep.getKey()));
+                }
+                tezConf.unset(dep.getKey());
             }
         }
     }
 
+    /**
+     * Write input splits (job.split and job.splitmetainfo) to disk
+     */
+    public static InputSplitInfoDisk writeInputSplitInfoToDisk(
+            InputSplitInfoMem infoMem, Path inputSplitsDir, JobConf jobConf,
+            FileSystem fs) throws IOException, InterruptedException {
+
+        InputSplit[] splits = infoMem.getNewFormatSplits();
+        JobSplitWriter.createSplitFiles(inputSplitsDir, jobConf, fs, splits);
+
+        return new InputSplitInfoDisk(
+                JobSubmissionFiles.getJobSplitFile(inputSplitsDir),
+                JobSubmissionFiles.getJobSplitMetaFile(inputSplitsDir),
+                splits.length, infoMem.getTaskLocationHints(),
+                jobConf.getCredentials());
+    }
+
+    /**
+     * Exact copy of private method from from 
org.apache.tez.mapreduce.hadoop.MRInputHelpers
+     *
+     * Update provided localResources collection with the required local
+     * resources needed by MapReduce tasks with respect to Input splits.
+     *
+     * @param fs Filesystem instance to access status of splits related files
+     * @param inputSplitInfo Information on location of split files
+     * @param localResources LocalResources collection to be updated
+     * @throws IOException
+     */
+    public static void updateLocalResourcesForInputSplits(
+        FileSystem fs,
+        InputSplitInfo inputSplitInfo,
+        Map<String, LocalResource> localResources) throws IOException {
+      if (localResources.containsKey(JOB_SPLIT_RESOURCE_NAME)) {
+        throw new RuntimeException("LocalResources already contains a"
+            + " resource named " + JOB_SPLIT_RESOURCE_NAME);
+      }
+      if (localResources.containsKey(JOB_SPLIT_METAINFO_RESOURCE_NAME)) {
+        throw new RuntimeException("LocalResources already contains a"
+            + " resource named " + JOB_SPLIT_METAINFO_RESOURCE_NAME);
+      }
+
+      FileStatus splitFileStatus =
+          fs.getFileStatus(inputSplitInfo.getSplitsFile());
+      FileStatus metaInfoFileStatus =
+          fs.getFileStatus(inputSplitInfo.getSplitsMetaInfoFile());
+      localResources.put(JOB_SPLIT_RESOURCE_NAME,
+          LocalResource.newInstance(
+              
ConverterUtils.getYarnUrlFromPath(inputSplitInfo.getSplitsFile()),
+              LocalResourceType.FILE,
+              LocalResourceVisibility.APPLICATION,
+              splitFileStatus.getLen(), 
splitFileStatus.getModificationTime()));
+      localResources.put(JOB_SPLIT_METAINFO_RESOURCE_NAME,
+          LocalResource.newInstance(
+              ConverterUtils.getYarnUrlFromPath(
+                  inputSplitInfo.getSplitsMetaInfoFile()),
+              LocalResourceType.FILE,
+              LocalResourceVisibility.APPLICATION,
+              metaInfoFileStatus.getLen(),
+              metaInfoFileStatus.getModificationTime()));
+    }
+
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
 Fri Mar  4 18:17:39 2016
@@ -19,13 +19,14 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.commons.lang.ArrayUtils;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -37,6 +38,9 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -123,19 +127,96 @@ public class TezCompilerUtil {
 
     static public void connect(TezOperPlan plan, TezOperator from, TezOperator 
to, TezEdgeDescriptor edge) throws PlanException {
         plan.connect(from, to);
-        if (from.plan.getLeaves()!=null && !from.plan.getLeaves().isEmpty()) {
-            PhysicalOperator leaf = from.plan.getLeaves().get(0);
-            // It could be POStoreTez incase of sampling job in order by
-            if (leaf instanceof POLocalRearrangeTez) {
-                POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
-                lr.setOutputKey(to.getOperatorKey().toString());
-            }
-        }
+
         // Add edge descriptors to old and new operators
         to.inEdges.put(from.getOperatorKey(), edge);
         from.outEdges.put(to.getOperatorKey(), edge);
     }
 
+    static public void connectTezOpToNewPredecessor(TezOperPlan plan,
+            TezOperator tezOp, TezOperator newPredecessor,
+            TezEdgeDescriptor edge, String oldInputKey) throws PlanException {
+        plan.connect(newPredecessor, tezOp);
+        // Add edge descriptors to old and new operators
+        tezOp.inEdges.put(newPredecessor.getOperatorKey(), edge);
+        newPredecessor.outEdges.put(tezOp.getOperatorKey(), edge);
+
+        if (oldInputKey != null) {
+            replaceInput(tezOp, oldInputKey, 
newPredecessor.getOperatorKey().toString());
+        }
+    }
+
+    public static void replaceInput(TezOperator tezOp, String oldInputKey,
+            String newInputKey) throws PlanException {
+        try {
+            List<TezInput> inputs = 
PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
+            for (TezInput input : inputs) {
+                input.replaceInput(oldInputKey, newInputKey);
+            }
+            List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+            for (POUserFunc userFunc : userFuncs) {
+                if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                    TezInput input = (TezInput)userFunc.getFunc();
+                    input.replaceInput(oldInputKey, newInputKey);
+                    userFunc.getFuncSpec().setCtorArgs(input.getTezInputs());
+                }
+            }
+        } catch (VisitorException e) {
+            throw new PlanException(e);
+        }
+    }
+
+    static public void connectTezOpToNewSuccesor(TezOperPlan plan,
+            TezOperator tezOp, TezOperator newSuccessor,
+            TezEdgeDescriptor edge, String oldOutputKey) throws PlanException {
+        plan.connect(tezOp, newSuccessor);
+        // Add edge descriptors to old and new operators
+        newSuccessor.inEdges.put(tezOp.getOperatorKey(), edge);
+        tezOp.outEdges.put(newSuccessor.getOperatorKey(), edge);
+
+        if (oldOutputKey != null) {
+            replaceOutput(tezOp, oldOutputKey, 
newSuccessor.getOperatorKey().toString());
+        }
+    }
+
+    public static void replaceOutput(TezOperator tezOp, String oldOutputKey,
+            String newOutputKey) throws PlanException {
+        try {
+            List<TezOutput> tezOutputs = 
PlanHelper.getPhysicalOperators(tezOp.plan,
+                    TezOutput.class);
+            for (TezOutput tezOut : tezOutputs) {
+                if (ArrayUtils.contains(tezOut.getTezOutputs(), oldOutputKey)) 
{
+                    tezOut.replaceOutput(oldOutputKey, newOutputKey);
+                }
+            }
+        } catch (VisitorException e) {
+            throw new PlanException(e);
+        }
+    }
+
+    public static boolean isNonPackageInput(String inputKey, TezOperator 
tezOp) throws PlanException {
+        try {
+            List<TezInput> inputs = 
PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
+            for (TezInput input : inputs) {
+                if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
+                    return true;
+                }
+            }
+            List<POUserFunc> userFuncs = 
PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class);
+            for (POUserFunc userFunc : userFuncs) {
+                if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                    TezInput input = (TezInput)userFunc.getFunc();
+                    if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        } catch (VisitorException e) {
+            throw new PlanException(e);
+        }
+    }
+
     static public POForEach getForEach(POProject project, int rp, String 
scope, NodeIdGenerator nig) {
         PhysicalPlan forEachPlan = new PhysicalPlan();
         forEachPlan.add(project);
@@ -192,19 +273,4 @@ public class TezCompilerUtil {
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);
     }
 
-    /**
-     * Returns true if there are no loads or stores in a TezOperator.
-     * To be called only after LoaderProcessor is called
-     */
-    static public boolean isIntermediateReducer(TezOperator tezOper) throws 
VisitorException {
-        boolean intermediateReducer = false;
-        LinkedList<POStore> stores = 
PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class);
-        // Not map and not final reducer
-        if (stores.size() <= 0 &&
-                (tezOper.getLoaderInfo().getLoads() == null || 
tezOper.getLoaderInfo().getLoads().size() <= 0)) {
-            intermediateReducer = true;
-        }
-        return intermediateReducer;
-    }
-
 }


Reply via email to