Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Fri Mar  4 18:17:39 2016
@@ -22,16 +22,18 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
@@ -43,6 +45,10 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
@@ -52,18 +58,6 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigDecimalWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigIntegerWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBooleanWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingCharArrayWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDBAWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDateTimeWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDoubleWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingFloatWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingIntWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingLongWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
@@ -82,10 +76,10 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
+import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigWritableComparators;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -99,20 +93,21 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPOPackageAnnotator.LoRearrangeDiscoverer;
-import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POIdentityInOutTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POShuffleTezLoad;
-import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
-import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PartitionerDefinedVertexManager;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigGraceShuffleVertexManager;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigInputFormatTez;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigOutputFormatTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
-import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.util.TezUDFContextSeparator;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.builtin.DefaultIndexableLoader;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -121,7 +116,9 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.impl.util.UDFContextSeparator.UDFType;
 import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.pig.tools.pigstats.tez.TezScriptState.TezDAGScriptInfo;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.DataSinkDescriptor;
@@ -136,6 +133,7 @@ import org.apache.tez.dag.api.InputIniti
 import org.apache.tez.dag.api.OutputCommitterDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
@@ -145,50 +143,142 @@ import org.apache.tez.dag.library.vertex
 import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
+import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
+import 
org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto.Builder;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRSplitsProto;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedKVInput;
 import org.apache.tez.runtime.library.input.OrderedGroupedMergedKVInput;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
 
 /**
  * A visitor to construct DAG out of Tez plan.
  */
 public class TezDagBuilder extends TezOpPlanVisitor {
-    private static final Log log = LogFactory.getLog(TezJobCompiler.class);
+    private static final Log log = LogFactory.getLog(TezDagBuilder.class);
+
+    private static long SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT = 384 * 1024 
* 1024L;
+    private static long SHUFFLE_BYTES_PER_REDUCER_DEFAULT = 256 * 1024 * 1024L;
 
     private DAG dag;
     private Map<String, LocalResource> localResources;
     private PigContext pc;
     private Configuration globalConf;
+    private Configuration pigContextConf;
+    private FileSystem fs;
     private long intermediateTaskInputSize;
+    private Set<String> inputSplitInDiskVertices;
+    private TezUDFContextSeparator udfContextSeparator;
+
+    private String serializedTezPlan;
+    private String serializedPigContext;
+    private String serializedUDFImportList;
+
+    // Map corresponds to root vertices, reduce to intermediate and leaf 
vertices
+    private Resource mapTaskResource;
+    private Resource reduceTaskResource;
+    private Map<String, String> mapTaskEnv = new HashMap<String, String>();
+    private Map<String, String> reduceTaskEnv = new HashMap<String, String>();
+    private String mapTaskLaunchCmdOpts;
+    private String reduceTaskLaunchCmdOpts;
 
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.pc = pc;
-        this.globalConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
         this.localResources = localResources;
         this.dag = dag;
+        this.inputSplitInDiskVertices = new HashSet<String>();
 
         try {
-            // Add credentials from binary token file and get tokens for 
namenodes
-            // specified in mapreduce.job.hdfs-servers
-            SecurityHelper.populateTokenCache(globalConf, 
dag.getCredentials());
+            initialize(pc);
+
+            udfContextSeparator = new TezUDFContextSeparator(plan,
+                    new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+            udfContextSeparator.visit();
         } catch (IOException e) {
-            throw new RuntimeException("Error while fetching delegation 
tokens", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initialize(PigContext pc) throws IOException {
+
+        this.globalConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
+        this.pigContextConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+        MRToTezHelper.processMRSettings(pigContextConf, globalConf);
+
+        // Add credentials from binary token file and get tokens for namenodes
+        // specified in mapreduce.job.hdfs-servers
+        SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
+
+        // All these classes are @InterfaceAudience.Private in Hadoop. Switch 
to Tez methods in TEZ-1012
+        // set the timestamps, public/private visibility of the archives and 
files
+        ClientDistributedCacheManager
+                .determineTimestampsAndCacheVisibilities(globalConf);
+        // get DelegationToken for each cached file
+        ClientDistributedCacheManager.getDelegationTokens(globalConf,
+                dag.getCredentials());
+        MRApps.setupDistributedCache(globalConf, this.localResources);
+        dag.addTaskLocalFiles(this.localResources);
+
+        int mapMemoryMB;
+        int reduceMemoryMB;
+        int mapVCores;
+        int reduceVCores;
+        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB) != 
null) {
+            mapMemoryMB = globalConf.getInt(
+                    TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
+                    TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT);
+            reduceMemoryMB = mapMemoryMB;
+        } else {
+            // If tez setting is not defined, try MR setting
+            mapMemoryMB = globalConf.getInt(MRJobConfig.MAP_MEMORY_MB,
+                    MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+            reduceMemoryMB = globalConf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
+                    MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
+        }
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES) != 
null) {
+            mapVCores = globalConf.getInt(
+                    TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
+                    TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT);
+            reduceVCores = mapVCores;
+        } else {
+            mapVCores = globalConf.getInt(MRJobConfig.MAP_CPU_VCORES,
+                    MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+            reduceVCores = globalConf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+                    MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+        }
+        mapTaskResource = Resource.newInstance(mapMemoryMB, mapVCores);
+        reduceTaskResource = Resource.newInstance(reduceMemoryMB, 
reduceVCores);
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) 
{
+            // If tez setting is not defined
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, 
true);
+        }
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) 
{
+            mapTaskLaunchCmdOpts = 
globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS);
+            reduceTaskLaunchCmdOpts = mapTaskLaunchCmdOpts;
+        } else {
+            // If tez setting is not defined, try MR setting
+            mapTaskLaunchCmdOpts = 
MRHelpers.getJavaOptsForMRMapper(globalConf);
+            reduceTaskLaunchCmdOpts = 
MRHelpers.getJavaOptsForMRReducer(globalConf);
         }
 
         try {
-            intermediateTaskInputSize = 
HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf), 
FileLocalizer.getTemporaryResourcePath(pc));
+            fs = FileSystem.get(globalConf);
+            intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(fs, 
FileLocalizer.getTemporaryResourcePath(pc));
         } catch (Exception e) {
             log.warn("Unable to get the block size for temporary directory, 
defaulting to 128MB", e);
             intermediateTaskInputSize = 134217728L;
@@ -199,6 +289,51 @@ public class TezDagBuilder extends TezOp
                 globalConf.getLong(
                         InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                         InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+
+        serializedPigContext = ObjectSerializer.serialize(pc);
+        serializedUDFImportList = 
ObjectSerializer.serialize(PigContext.getPackageImportList());
+    }
+
+    private String getSerializedTezPlan() throws IOException {
+        if (serializedTezPlan == null) {
+            // Initialize lazy instead of constructor as this might not be 
needed
+            serializedTezPlan = ObjectSerializer.serialize(getPlan());
+        }
+        return serializedTezPlan;
+    }
+
+    // Hack to turn off relocalization till TEZ-2192 is fixed.
+    public void avoidContainerReuseIfInputSplitInDisk() throws IOException {
+        if (!inputSplitInDiskVertices.isEmpty()) {
+            // Create empty job.split file and add as resource to all other
+            // vertices that are not reading splits from disk so that their
+            // containers are not reused by vertices that read splits from disk
+            Path jobSplitFile = new Path(FileLocalizer.getTemporaryPath(pc),
+                    MRJobConfig.JOB_SPLIT);
+            FSDataOutputStream out = fs.create(jobSplitFile);
+            out.close();
+            log.info("Creating empty job.split in " + jobSplitFile);
+            FileStatus splitFileStatus = fs.getFileStatus(jobSplitFile);
+            LocalResource localResource = LocalResource.newInstance(
+                    ConverterUtils.getYarnUrlFromPath(jobSplitFile),
+                    LocalResourceType.FILE,
+                    LocalResourceVisibility.APPLICATION,
+                    splitFileStatus.getLen(),
+                    splitFileStatus.getModificationTime());
+            for (Vertex vertex : dag.getVertices()) {
+                if (!inputSplitInDiskVertices.contains(vertex.getName())) {
+                    if (vertex.getTaskLocalFiles().containsKey(
+                            MRJobConfig.JOB_SPLIT)) {
+                        throw new RuntimeException(
+                                "LocalResources already contains a"
+                                        + " resource named "
+                                        + MRJobConfig.JOB_SPLIT);
+                    }
+                    vertex.getTaskLocalFiles().put(MRJobConfig.JOB_SPLIT,
+                            localResource);
+                }
+            }
+        }
     }
 
     @Override
@@ -244,7 +379,7 @@ public class TezDagBuilder extends TezOp
                         if (tezOp.isVertexGroup()) {
                             groupMembers[i] = from;
                         } else {
-                            EdgeProperty prop = newEdge(pred, tezOp);
+                            EdgeProperty prop = newEdge(pred, tezOp, false);
                             Edge edge = Edge.create(from, to, prop);
                             dag.addEdge(edge);
                         }
@@ -262,7 +397,7 @@ public class TezDagBuilder extends TezOp
                 POStore store = tezOp.getVertexGroupInfo().getStore();
                 if (store != null) {
                     vertexGroup.addDataSink(store.getOperatorKey().toString(),
-                            new 
DataSinkDescriptor(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
+                            
DataSinkDescriptor.create(tezOp.getVertexGroupInfo().getStoreOutputDescriptor(),
                             
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()), 
dag.getCredentials()));
                 }
             }
@@ -272,7 +407,7 @@ public class TezDagBuilder extends TezOp
     private GroupInputEdge newGroupInputEdge(TezOperator fromOp,
             TezOperator toOp, VertexGroup from, Vertex to) throws IOException {
 
-        EdgeProperty edgeProperty = newEdge(fromOp, toOp);
+        EdgeProperty edgeProperty = newEdge(fromOp, toOp, true);
 
         String groupInputClass = 
ConcatenatedMergedKeyValueInput.class.getName();
 
@@ -284,8 +419,7 @@ public class TezDagBuilder extends TezOp
         }
 
         return GroupInputEdge.create(from, to, edgeProperty,
-                
InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload())
-                    
.setHistoryText(edgeProperty.getEdgeDestination().getHistoryText()));
+                
InputDescriptor.create(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
     }
 
     /**
@@ -296,7 +430,7 @@ public class TezDagBuilder extends TezOp
      * @return EdgeProperty
      * @throws IOException
      */
-    private EdgeProperty newEdge(TezOperator from, TezOperator to)
+    private EdgeProperty newEdge(TezOperator from, TezOperator to, boolean 
isMergedInput)
             throws IOException {
         TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey());
         PhysicalPlan combinePlan = edge.combinePlan;
@@ -304,9 +438,11 @@ public class TezDagBuilder extends TezOp
         InputDescriptor in = InputDescriptor.create(edge.inputClassName);
         OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
 
-        Configuration conf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+        Configuration conf = new Configuration(pigContextConf);
+
         if (!combinePlan.isEmpty()) {
-            addCombiner(combinePlan, to, conf);
+            udfContextSeparator.serializeUDFContextForEdge(conf, from, to, 
UDFType.USERFUNC);
+            addCombiner(combinePlan, to, conf, isMergedInput);
         }
 
         List<POLocalRearrangeTez> lrs = 
PlanHelper.getPhysicalOperators(from.plan,
@@ -315,7 +451,7 @@ public class TezDagBuilder extends TezOp
         for (POLocalRearrangeTez lr : lrs) {
             if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
-                setIntermediateOutputKeyValue(keyType, conf, to, 
lr.isConnectedToPackage());
+                setIntermediateOutputKeyValue(keyType, conf, to, 
lr.isConnectedToPackage(), isMergedInput);
                 // In case of secondary key sort, main key type is the actual 
key type
                 conf.set("pig.reduce.key.type", 
Byte.toString(lr.getMainKeyType()));
                 break;
@@ -341,9 +477,9 @@ public class TezDagBuilder extends TezOp
         }
 
         conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
-        conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        conf.set("udf.import.list",
-                ObjectSerializer.serialize(PigContext.getPackageImportList()));
+        conf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
+        conf.set("pig.pigContext", serializedPigContext);
+        conf.set("udf.import.list", serializedUDFImportList);
 
         if(to.isGlobalSort() || to.isLimitAfterSort()){
             conf.set("pig.sortOrder",
@@ -371,34 +507,33 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
-        conf.set("udf.import.list",
-                ObjectSerializer.serialize(PigContext.getPackageImportList()));
-
-        MRToTezHelper.processMRSettings(conf, globalConf);
+        in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
+        out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
-        String historyString = convertToHistoryText("", conf);
-        
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
-        
out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)).setHistoryText(historyString);
-
-        if (edge.dataMovementType!=DataMovementType.BROADCAST && 
to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
+        if (edge.dataMovementType!=DataMovementType.BROADCAST && 
to.getEstimatedParallelism()!=-1 && to.getVertexParallelism()==-1 && 
(to.isGlobalSort()||to.isSkewedJoin())) {
             // Use custom edge
             return EdgeProperty.create((EdgeManagerPluginDescriptor)null,
                     edge.dataSourceType, edge.schedulingType, out, in);
             }
 
+        if (to.isUseGraceParallelism()) {
+            // Put datamovement to null to prevent vertex "to" from starting. 
It will be started by PigGraceShuffleVertexManager
+            return EdgeProperty.create((EdgeManagerPluginDescriptor)null, 
edge.dataSourceType,
+                    edge.schedulingType, out, in);
+        }
         return EdgeProperty.create(edge.dataMovementType, edge.dataSourceType,
                 edge.schedulingType, out, in);
     }
 
     private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp,
-            Configuration conf) throws IOException {
+            Configuration conf, boolean isMergedInput) throws IOException {
         POPackage combPack = (POPackage) combinePlan.getRoots().get(0);
         POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
                 .getLeaves().get(0);
-        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, 
pkgTezOp);
+        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, 
pkgTezOp, true, isMergedInput);
 
         LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
-                combinePlan, pkgTezOp, combPack);
+                combinePlan, null, pkgTezOp, combPack);
         lrDiscoverer.visit();
 
         combinePlan.remove(combPack);
@@ -406,10 +541,6 @@ public class TezDagBuilder extends TezOp
                 MRCombiner.class.getName());
         conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
                 PigCombiner.Combine.class.getName());
-        conf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
-        conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        conf.set("udf.import.list",
-                ObjectSerializer.serialize(PigContext.getPackageImportList()));
         conf.set("pig.combinePlan", ObjectSerializer.serialize(combinePlan));
         conf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
         conf.set("pig.map.keytype", ObjectSerializer
@@ -422,7 +553,7 @@ public class TezDagBuilder extends TezOp
                 tezOp.getProcessorName());
 
         // Pass physical plans to vertex as user payload.
-        JobConf payloadConf = new 
JobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), false));
+        JobConf payloadConf = new JobConf(pigContextConf);
 
         // We do this so that dag.getCredentials(), job.getCredentials(),
         // job.getConfiguration().getCredentials() all reference the same 
Credentials object
@@ -432,6 +563,38 @@ public class TezDagBuilder extends TezOp
         @SuppressWarnings("deprecation")
         Job job = new Job(payloadConf);
         payloadConf = (JobConf) job.getConfiguration();
+        //TODO: Investigate. Setting as map writes empty output.
+        //payloadConf.setBoolean(MRConfig.IS_MAP_PROCESSOR, 
tezOp.isUseMRMapSettings());
+        payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
+        payloadConf.setBoolean(MRConfiguration.REDUCER_NEW_API, true);
+        payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
+                PigInputFormatTez.class, InputFormat.class);
+        setOutputFormat(job);
+        payloadConf.set("udf.import.list", serializedUDFImportList);
+        payloadConf.set("exectype", "TEZ");
+
+        // Process stores
+        LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
+
+        Configuration inputPayLoad = null;
+        Configuration outputPayLoad = null;
+
+        if (!stores.isEmpty()) {
+            outputPayLoad = new Configuration(payloadConf);
+            outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES,
+                    ObjectSerializer.serialize(new ArrayList<POStore>()));
+        }
+
+        if (!(tezOp.getLoaderInfo().getLoads().isEmpty())) {
+            payloadConf.set(PigInputFormat.PIG_INPUTS, 
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
+            payloadConf.set(PigInputFormat.PIG_INPUT_SIGNATURES, 
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
+            payloadConf.set(PigInputFormat.PIG_INPUT_LIMITS, 
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
+            inputPayLoad = new Configuration(payloadConf);
+            if (tezOp.getLoaderInfo().getLoads().get(0).getLoadFunc() 
instanceof DefaultIndexableLoader) {
+                inputPayLoad.set("pig.pigContext", serializedPigContext);
+            }
+        }
+        payloadConf.set("pig.pigContext", serializedPigContext);
 
         if (tezOp.getSampleOperator() != null) {
             payloadConf.set(PigProcessor.SAMPLE_VERTEX, 
tezOp.getSampleOperator().getOperatorKey().toString());
@@ -445,7 +608,7 @@ public class TezDagBuilder extends TezOp
             // usually followed by limit other than store. But would benefit
             // cases like skewed join followed by group by.
             if (tezOp.getSortOperator().getEstimatedParallelism() != -1
-                    && 
TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) {
+                    && tezOp.getSortOperator().isIntermediateReducer()) {
                 payloadConf.setLong(
                         InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                         intermediateTaskInputSize);
@@ -453,20 +616,6 @@ public class TezDagBuilder extends TezOp
 
         }
 
-        payloadConf.set("pig.inputs", 
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
-        payloadConf.set("pig.inpSignatures", 
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
-        payloadConf.set("pig.inpLimits", 
ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
-        // Process stores
-        LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
-
-        payloadConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-        payloadConf.set("udf.import.list",
-                ObjectSerializer.serialize(PigContext.getPackageImportList()));
-        payloadConf.set("exectype", "TEZ");
-        payloadConf.setBoolean(MRConfiguration.MAPPER_NEW_API, true);
-        payloadConf.setClass(MRConfiguration.INPUTFORMAT_CLASS,
-                PigInputFormat.class, InputFormat.class);
-
         // Set parent plan for all operators in the Tez plan.
         new PhyPlanSetter(tezOp.plan).visit();
 
@@ -492,23 +641,27 @@ public class TezDagBuilder extends TezOp
             byte keyType = pack.getPkgr().getKeyType();
             tezOp.plan.remove(pack);
             payloadConf.set("pig.reduce.package", 
ObjectSerializer.serialize(pack));
-            setIntermediateOutputKeyValue(keyType, payloadConf, tezOp);
+
             POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
             if (tezOp.isSkewedJoin()) {
                 newPack.setSkewedJoins(true);
             }
             tezOp.plan.add(newPack);
 
+            boolean isMergedInput = false;
             // Set input keys for POShuffleTezLoad. This is used to identify
             // the inputs that are attached to the POShuffleTezLoad in the
             // backend.
             Map<Integer, String> localRearrangeMap = new TreeMap<Integer, 
String>();
+            TezOperator from = null;
             for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
                 if (tezOp.getSampleOperator() != null && 
tezOp.getSampleOperator() == pred) {
                     // skip sample vertex input
                 } else {
                     String inputKey = pred.getOperatorKey().toString();
+                    boolean isVertexGroup = false;
                     if (pred.isVertexGroup()) {
+                        isVertexGroup = true;
                         pred = 
mPlan.getOperator(pred.getVertexGroupMembers().get(0));
                     }
                     LinkedList<POLocalRearrangeTez> lrs =
@@ -517,6 +670,10 @@ public class TezDagBuilder extends TezOp
                         if (lr.isConnectedToPackage()
                                 && 
lr.getOutputKey().equals(tezOp.getOperatorKey().toString())) {
                             localRearrangeMap.put((int) lr.getIndex(), 
inputKey);
+                            if (isVertexGroup) {
+                                isMergedInput = true;
+                            }
+                            from = pred;
                         }
                     }
                 }
@@ -531,36 +688,19 @@ public class TezDagBuilder extends TezOp
                 }
             }
 
-            setIntermediateOutputKeyValue(pack.getPkgr().getKeyType(), 
payloadConf, tezOp);
-        } else if (roots.size() == 1 && roots.get(0) instanceof 
POIdentityInOutTez) {
-            POIdentityInOutTez identityInOut = (POIdentityInOutTez) 
roots.get(0);
-            // TODO Need to fix multiple input key mapping
-            TezOperator identityInOutPred = null;
-            for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
-                if (!pred.isSampleAggregation()) {
-                    identityInOutPred = pred;
-                    break;
-                }
-            }
-            
identityInOut.setInputKey(identityInOutPred.getOperatorKey().toString());
-        } else if (roots.size() == 1 && roots.get(0) instanceof 
POValueInputTez) {
-            POValueInputTez valueInput = (POValueInputTez) roots.get(0);
-
-            LinkedList<String> scalarInputs = new LinkedList<String>();
-            for (POUserFunc userFunc : 
PlanHelper.getPhysicalOperators(tezOp.plan, POUserFunc.class) ) {
-                if (userFunc.getFunc() instanceof ReadScalarsTez) {
-                    
scalarInputs.add(((ReadScalarsTez)userFunc.getFunc()).getTezInputs()[0]);
-                }
-            }
-            // Make sure we don't find the scalar
-            for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
-                if (!scalarInputs.contains(pred.getOperatorKey().toString())) {
-                    valueInput.setInputKey(pred.getOperatorKey().toString());
-                    break;
-                }
+            //POShuffleTezLoad accesses the comparator setting
+            selectKeyComparator(keyType, payloadConf, tezOp, isMergedInput);
+
+            if (tezOp.isUseSecondaryKey()) {
+                TezEdgeDescriptor edge = 
tezOp.inEdges.get(from.getOperatorKey());
+                // Currently only PigSecondaryKeyGroupingComparator is used in 
POShuffleTezLoad.
+                // When PIG-4685: SecondaryKeyOptimizerTez does not optimize 
cogroup is fixed
+                // in future, PigSecondaryKeyComparator will have to be used 
and that will require this.
+                payloadConf.set("pig.secondarySortOrder", ObjectSerializer
+                        .serialize(edge.getSecondarySortOrder()));
             }
+
         }
-        setOutputFormat(job);
 
         // set parent plan in all operators. currently the parent plan is 
really
         // used only when POStream, POSplit are present in the plan
@@ -570,9 +710,7 @@ public class TezDagBuilder extends TezOp
         payloadConf.set(PigProcessor.PLAN,
                 ObjectSerializer.serialize(tezOp.plan));
 
-        UDFContext.getUDFContext().serialize(payloadConf);
-
-        MRToTezHelper.processMRSettings(payloadConf, globalConf);
+        udfContextSeparator.serializeUDFContext(payloadConf, tezOp);
 
         if (!pc.inIllustrator) {
             for (POStore store : stores) {
@@ -604,66 +742,190 @@ public class TezDagBuilder extends TezOp
 
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = 
TezUtils.createUserPayloadFromConf(payloadConf);
-        
procDesc.setUserPayload(userPayload).setHistoryText(convertToHistoryText(tezOp.getOperatorKey().toString(),
 payloadConf));
+        TezDAGScriptInfo dagScriptInfo = 
TezScriptState.get().getDAGScriptInfo(dag.getName());
+        String alias = dagScriptInfo.getAlias(tezOp);
+        String aliasLocation = dagScriptInfo.getAliasLocation(tezOp);
+        String features = dagScriptInfo.getPigFeatures(tezOp);
+        String vertexInfo = aliasLocation + " (" + features + ")" ;
+        
procDesc.setUserPayload(userPayload).setHistoryText(TezUtils.convertToHistoryText(vertexInfo,
 payloadConf));
+
+        String vmPluginName = null;
+        Configuration vmPluginConf = null;
+
+        // Set the right VertexManagerPlugin
+        if (tezOp.getEstimatedParallelism() != -1) {
+            if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
+                if (tezOp.getVertexParallelism()==-1 && (
+                        tezOp.isGlobalSort() 
&&getPlan().getPredecessors(tezOp).size()==1||
+                        tezOp.isSkewedJoin() 
&&getPlan().getPredecessors(tezOp).size()==2)) {
+                    // Set VertexManagerPlugin to 
PartitionerDefinedVertexManager, which is able
+                    // to decrease/increase parallelism of sorting vertex 
dynamically
+                    // based on the numQuantiles calculated by sample 
aggregation vertex
+                    vmPluginName = 
PartitionerDefinedVertexManager.class.getName();
+                    log.info("Set VertexManagerPlugin to 
PartitionerDefinedParallelismVertexManager for vertex " + 
tezOp.getOperatorKey().toString());
+                }
+            } else {
+                boolean containScatterGather = false;
+                boolean containCustomPartitioner = false;
+                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+                    if (edge.dataMovementType == 
DataMovementType.SCATTER_GATHER) {
+                        containScatterGather = true;
+                    }
+                    if (edge.partitionerClass!=null) {
+                        containCustomPartitioner = true;
+                    }
+                }
+                if (containScatterGather && !containCustomPartitioner) {
+                    vmPluginConf = (vmPluginConf == null) ? new 
Configuration(pigContextConf) : vmPluginConf;
+                    // Use auto-parallelism feature of ShuffleVertexManager to 
dynamically
+                    // reduce the parallelism of the vertex
+                    if 
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
+                            && 
!TezOperPlan.getGrandParentsForGraceParallelism(getPlan(), tezOp).isEmpty()) {
+                        vmPluginName = 
PigGraceShuffleVertexManager.class.getName();
+                        tezOp.setUseGraceParallelism(true);
+                        vmPluginConf.set("pig.tez.plan", 
getSerializedTezPlan());
+                        vmPluginConf.set("pig.pigContext", 
serializedPigContext);
+                    } else {
+                        vmPluginName = ShuffleVertexManager.class.getName();
+                    }
+                    
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
 true);
+                    // For Intermediate reduce, set the bytes per reducer to 
be block size.
+                    long bytesPerReducer = intermediateTaskInputSize;
+                    // If there are store statements, use 
BYTES_PER_REDUCER_PARAM configured by user.
+                    // If not as default use 384MB for group bys and 256 MB 
for joins. Not using
+                    // default 1G as that value was suited for mapreduce logic 
where numReducers=(map input size/bytesPerReducer).
+                    // In Tez, numReducers=(map output size/bytesPerReducer) 
we need lower values to avoid skews in reduce
+                    // as map input sizes are mostly always high compared to 
map output.
+                    if (stores.size() > 0) {
+                        if 
(vmPluginConf.get(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM) != null) {
+                            bytesPerReducer = vmPluginConf.getLong(
+                                            
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                            
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+                        } else if (tezOp.isGroupBy()) {
+                            bytesPerReducer = 
SHUFFLE_BYTES_PER_REDUCER_GROUPBY_DEFAULT;
+                        } else {
+                            bytesPerReducer = 
SHUFFLE_BYTES_PER_REDUCER_DEFAULT;
+                        }
+                    }
+                    
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
 bytesPerReducer);
+                    log.info("Set auto parallelism for vertex " + 
tezOp.getOperatorKey().toString());
+                }
+            }
+        }
+        if (tezOp.isLimit() && (vmPluginName == null || 
vmPluginName.equals(PigGraceShuffleVertexManager.class.getName())||
+                vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
+            if 
(tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName()))
 {
+                // Setting SRC_FRACTION to 0.00001 so that even if there are 
100K source tasks,
+                // limit job starts when 1 source task finishes.
+                // If limit is part of a group by or join because their 
parallelism is 1,
+                // we should leave the configuration with the defaults.
+                vmPluginConf = (vmPluginConf == null) ? new 
Configuration(pigContextConf) : vmPluginConf;
+                
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
 "0.00001");
+                
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
 "0.00001");
+                log.info("Set " + 
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 
for limit vertex " + tezOp.getOperatorKey().toString());
+            }
+        }
 
-        Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), 
procDesc, tezOp.getVertexParallelism(),
-                tezOp.isUseMRMapSettings() ? 
MRHelpers.getResourceForMRMapper(globalConf) : 
MRHelpers.getResourceForMRReducer(globalConf));
+        int parallel = tezOp.getVertexParallelism();
+        if (tezOp.isUseGraceParallelism()) {
+            parallel = -1;
+        }
+        Resource resource = tezOp.isUseMRMapSettings() ? mapTaskResource : 
reduceTaskResource;
 
-        Map<String, String> taskEnv = new HashMap<String, String>();
-        MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, 
tezOp.isUseMRMapSettings());
-        vertex.setTaskEnvironment(taskEnv);
+        Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), 
procDesc, parallel, resource);
 
-        // All these classes are @InterfaceAudience.Private in Hadoop. Switch 
to Tez methods in TEZ-1012
-        // set the timestamps, public/private visibility of the archives and 
files
-        ClientDistributedCacheManager
-                .determineTimestampsAndCacheVisibilities(globalConf);
-        // get DelegationToken for each cached file
-        ClientDistributedCacheManager.getDelegationTokens(globalConf,
-                job.getCredentials());
-        MRApps.setupDistributedCache(globalConf, localResources);
-        vertex.addTaskLocalFiles(localResources);
+        if (tezOp.isUseMRMapSettings()) {
+            vertex.setTaskLaunchCmdOpts(mapTaskLaunchCmdOpts);
+            vertex.setTaskEnvironment(mapTaskEnv);
+        } else {
+            vertex.setTaskLaunchCmdOpts(reduceTaskLaunchCmdOpts);
+            vertex.setTaskEnvironment(reduceTaskEnv);
+        }
 
-        vertex.setTaskLaunchCmdOpts(tezOp.isUseMRMapSettings() ? 
MRHelpers.getJavaOptsForMRMapper(globalConf)
-                : MRHelpers.getJavaOptsForMRReducer(globalConf));
+        MRToTezHelper.setVertexConfig(vertex, tezOp.isUseMRMapSettings(), 
globalConf);
 
         log.info("For vertex - " + tezOp.getOperatorKey().toString()
                 + ": parallelism=" + tezOp.getVertexParallelism()
                 + ", memory=" + vertex.getTaskResource().getMemory()
                 + ", java opts=" + vertex.getTaskLaunchCmdOpts()
                 );
-
+        log.info("Processing aliases: " + alias);
+        log.info("Detailed locations: " + aliasLocation);
+        log.info("Pig features in the vertex: " + features);
         // Right now there can only be one of each of these. Will need to be
         // more generic when there can be more.
         for (POLoad ld : tezOp.getLoaderInfo().getLoads()) {
 
             // TODO: These should get the globalConf, or a merged version that
             // keeps settings like pig.maxCombinedSplitSize
-            
vertex.setLocationHint(VertexLocationHint.create(tezOp.getLoaderInfo().getInputSplitInfo().getTaskLocationHints()));
+            Builder userPayLoadBuilder = 
MRRuntimeProtos.MRInputUserPayloadProto.newBuilder();
+
+            InputSplitInfo inputSplitInfo = 
tezOp.getLoaderInfo().getInputSplitInfo();
+            Map<String, LocalResource> additionalLocalResources = null;
+            int spillThreshold = payloadConf
+                    
.getInt(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD,
+                            
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT);
+
+            // Currently inputSplitInfo is always InputSplitInfoMem at this 
point
+            if (inputSplitInfo instanceof InputSplitInfoMem) {
+                MRSplitsProto splitsProto = inputSplitInfo.getSplitsProto();
+                int splitsSerializedSize = splitsProto.getSerializedSize();
+                if(splitsSerializedSize > spillThreshold) {
+                    inputPayLoad.setBoolean(
+                            
org.apache.tez.mapreduce.hadoop.MRJobConfig.MR_TEZ_SPLITS_VIA_EVENTS,
+                            false);
+                    // Write splits to disk
+                    Path inputSplitsDir = FileLocalizer.getTemporaryPath(pc);
+                    log.info("Writing input splits to " + inputSplitsDir
+                            + " for vertex " + vertex.getName()
+                            + " as the serialized size in memory is "
+                            + splitsSerializedSize + ". Configured "
+                            + 
PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD
+                            + " is " + spillThreshold);
+                    inputSplitInfo = MRToTezHelper.writeInputSplitInfoToDisk(
+                            (InputSplitInfoMem)inputSplitInfo, inputSplitsDir, 
payloadConf, fs);
+                    additionalLocalResources = new HashMap<String, 
LocalResource>();
+                    MRToTezHelper.updateLocalResourcesForInputSplits(
+                            fs, inputSplitInfo,
+                            additionalLocalResources);
+                    inputSplitInDiskVertices.add(vertex.getName());
+                } else {
+                    // Send splits via RPC to AM
+                    userPayLoadBuilder.setSplits(splitsProto);
+                }
+                //Free up memory
+                tezOp.getLoaderInfo().setInputSplitInfo(null);
+            }
+
+            udfContextSeparator.serializeUDFContext(inputPayLoad, tezOp, 
UDFType.LOADFUNC);
+            
userPayLoadBuilder.setConfigurationBytes(TezUtils.createByteStringFromConf(inputPayLoad));
+
+            
vertex.setLocationHint(VertexLocationHint.create(inputSplitInfo.getTaskLocationHints()));
             vertex.addDataSource(ld.getOperatorKey().toString(),
                     
DataSourceDescriptor.create(InputDescriptor.create(MRInput.class.getName())
-                          
.setUserPayload(UserPayload.create(MRRuntimeProtos.MRInputUserPayloadProto.newBuilder()
-                          
.setConfigurationBytes(TezUtils.createByteStringFromConf(payloadConf))
-                          
.setSplits(tezOp.getLoaderInfo().getInputSplitInfo().getSplitsProto()).build().toByteString().asReadOnlyByteBuffer()))
-                          .setHistoryText(convertToHistoryText("", 
payloadConf)),
-                    
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()), 
dag.getCredentials()));
+                            
.setUserPayload(UserPayload.create(userPayLoadBuilder.build().toByteString().asReadOnlyByteBuffer())),
+                    
InputInitializerDescriptor.create(MRInputSplitDistributor.class.getName()),
+                    inputSplitInfo.getNumTasks(),
+                    dag.getCredentials(),
+                    null,
+                    additionalLocalResources));
         }
 
+        // Union within a split can have multiple stores writing to same output
+        Set<String> uniqueStoreOutputs = new HashSet<String>();
         for (POStore store : stores) {
 
-            ArrayList<POStore> emptyList = new ArrayList<POStore>();
             ArrayList<POStore> singleStore = new ArrayList<POStore>();
             singleStore.add(store);
 
-            Configuration outputPayLoad = new Configuration(payloadConf);
-            outputPayLoad.set(JobControlCompiler.PIG_MAP_STORES,
-                    ObjectSerializer.serialize(emptyList));
-            outputPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
+            Configuration outPayLoad = new Configuration(outputPayLoad);
+            udfContextSeparator.serializeUDFContext(outPayLoad, tezOp, store);
+            outPayLoad.set(JobControlCompiler.PIG_REDUCE_STORES,
                     ObjectSerializer.serialize(singleStore));
 
             OutputDescriptor storeOutDescriptor = OutputDescriptor.create(
                     MROutput.class.getName()).setUserPayload(TezUtils
-                    .createUserPayloadFromConf(outputPayLoad))
-                    .setHistoryText(convertToHistoryText("", outputPayLoad));
+                    .createUserPayloadFromConf(outPayLoad));
             if (tezOp.getVertexGroupStores() != null) {
                 OperatorKey vertexGroupKey = 
tezOp.getVertexGroupStores().get(store.getOperatorKey());
                 if (vertexGroupKey != null) {
@@ -672,10 +934,14 @@ public class TezDagBuilder extends TezOp
                     continue;
                 }
             }
-            vertex.addDataSink(store.getOperatorKey().toString(),
-                    new DataSinkDescriptor(storeOutDescriptor,
-                    
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
-                    dag.getCredentials()));
+            String outputKey = ((POStoreTez) store).getOutputKey();
+            if (!uniqueStoreOutputs.contains(outputKey)) {
+                vertex.addDataSink(outputKey.toString(),
+                        DataSinkDescriptor.create(storeOutDescriptor,
+                        
OutputCommitterDescriptor.create(MROutputCommitter.class.getName()),
+                        dag.getCredentials()));
+                uniqueStoreOutputs.add(outputKey);
+            }
         }
 
         // LoadFunc and StoreFunc add delegation tokens to Job Credentials in
@@ -686,62 +952,6 @@ public class TezDagBuilder extends TezOp
             new PigOutputFormat().checkOutputSpecs(job);
         }
 
-        String vmPluginName = null;
-        Configuration vmPluginConf = null;
-
-        // Set the right VertexManagerPlugin
-        if (tezOp.getEstimatedParallelism() != -1) {
-            if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
-                // Set VertexManagerPlugin to PartitionerDefinedVertexManager, 
which is able
-                // to decrease/increase parallelism of sorting vertex 
dynamically
-                // based on the numQuantiles calculated by sample aggregation 
vertex
-                vmPluginName = PartitionerDefinedVertexManager.class.getName();
-                log.info("Set VertexManagerPlugin to 
PartitionerDefinedParallelismVertexManager for vertex " + 
tezOp.getOperatorKey().toString());
-            } else {
-                boolean containScatterGather = false;
-                boolean containCustomPartitioner = false;
-                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
-                    if (edge.dataMovementType == 
DataMovementType.SCATTER_GATHER) {
-                        containScatterGather = true;
-                    }
-                    if (edge.partitionerClass!=null) {
-                        containCustomPartitioner = true;
-                    }
-                }
-                if (containScatterGather && !containCustomPartitioner) {
-                    // Use auto-parallelism feature of ShuffleVertexManager to 
dynamically
-                    // reduce the parallelism of the vertex
-                    vmPluginName = ShuffleVertexManager.class.getName();
-                    vmPluginConf = (vmPluginConf == null) ? 
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
-                    
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL,
 true);
-                    if (stores.size() <= 0) {
-                        // Intermediate reduce. Set the bytes per reducer to 
be block size.
-                        
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                        intermediateTaskInputSize);
-                    } else if 
(vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                    
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
-                                    
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
-                        
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
-                                
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                                        
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
-                    }
-                    log.info("Set auto parallelism for vertex " + 
tezOp.getOperatorKey().toString());
-                }
-            }
-        }
-        if (tezOp.isLimit() && (vmPluginName == null || 
vmPluginName.equals(ShuffleVertexManager.class.getName()))) {
-            if 
(tezOp.inEdges.values().iterator().next().inputClassName.equals(UnorderedKVInput.class.getName()))
 {
-                // Setting SRC_FRACTION to 0.00001 so that even if there are 
100K source tasks,
-                // limit job starts when 1 source task finishes.
-                // If limit is part of a group by or join because their 
parallelism is 1,
-                // we should leave the configuration with the defaults.
-                vmPluginName = ShuffleVertexManager.class.getName();
-                vmPluginConf = (vmPluginConf == null) ? 
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
-                
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
 "0.00001");
-                
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
 "0.00001");
-                log.info("Set " + 
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 
for limit vertex " + tezOp.getOperatorKey().toString());
-            }
-        }
         // else if(tezOp.isLimitAfterSort())
         // TODO: PIG-4049 If standalone Limit we need a new VertexManager or 
new input
         // instead of ShuffledMergedInput. For limit part of the sort (order 
by parallel 1) itself
@@ -750,8 +960,7 @@ public class TezDagBuilder extends TezOp
         if (vmPluginName != null) {
             VertexManagerPluginDescriptor vmPluginDescriptor = 
VertexManagerPluginDescriptor.create(vmPluginName);
             if (vmPluginConf != null) {
-                
vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf))
-                    .setHistoryText(convertToHistoryText(vmPluginName, 
vmPluginConf));
+                
vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
             }
             vertex.setVertexManagerPlugin(vmPluginDescriptor);
         }
@@ -828,14 +1037,9 @@ public class TezDagBuilder extends TezOp
         return stores;
     }
 
-    private void setIntermediateOutputKeyValue(byte keyType, Configuration 
conf, TezOperator tezOp)
-            throws JobCreationException, ExecException {
-        setIntermediateOutputKeyValue(keyType, conf, tezOp, true);
-    }
-
     @SuppressWarnings("rawtypes")
     private void setIntermediateOutputKeyValue(byte keyType, Configuration 
conf, TezOperator tezOp,
-            boolean isConnectedToPackage) throws JobCreationException, 
ExecException {
+            boolean isConnectedToPackage, boolean isMergedInput) throws 
JobCreationException, ExecException {
         if (tezOp != null && tezOp.isUseSecondaryKey() && 
isConnectedToPackage) {
             conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS,
                     NullableTuple.class.getName());
@@ -853,12 +1057,86 @@ public class TezDagBuilder extends TezOp
                 NullableTuple.class.getName());
         conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS,
                 MRPartitioner.class.getName());
-        selectOutputComparator(keyType, conf, tezOp);
+        selectKeyComparator(keyType, conf, tezOp, isMergedInput);
     }
 
-    private static Class<? extends WritableComparator> 
comparatorForKeyType(byte keyType, boolean hasOrderBy)
+    private static Class<? extends WritableComparator> getRawBytesComparator(
+            byte keyType) throws JobCreationException {
+
+        // These comparators only compare bytes and we will use them except for
+        // order by for faster sorting.
+        // This ordering is good enough to be fed to reducer (POShuffleTezLoad)
+        // which will use the full comparator (GroupingComparator) for correct
+        // sorting and grouping.
+        // TODO: PIG-4652. Till Tez exposes a way to get bytes of keys being 
compared,
+        // we can use this only for groupby and distinct which are single 
inputs in
+        // POShuffleTezLoad and not join which has multiple inputs.
+
+        switch (keyType) {
+        case DataType.BOOLEAN:
+            return PigWritableComparators.PigBooleanRawBytesComparator.class;
+
+        case DataType.INTEGER:
+            return PigWritableComparators.PigIntRawBytesComparator.class;
+
+        case DataType.BIGINTEGER:
+            return 
PigWritableComparators.PigBigIntegerRawBytesComparator.class;
+
+        case DataType.BIGDECIMAL:
+            return 
PigWritableComparators.PigBigDecimalRawBytesComparator.class;
+
+        case DataType.LONG:
+            return PigWritableComparators.PigLongRawBytesComparator.class;
+
+        case DataType.FLOAT:
+            return PigWritableComparators.PigFloatRawBytesComparator.class;
+
+        case DataType.DOUBLE:
+            return PigWritableComparators.PigDoubleRawBytesComparator.class;
+
+        case DataType.DATETIME:
+            return PigWritableComparators.PigDateTimeRawBytesComparator.class;
+
+        case DataType.CHARARRAY:
+            return PigWritableComparators.PigTextRawBytesComparator.class;
+
+        case DataType.BYTEARRAY:
+            return PigWritableComparators.PigBytesRawBytesComparator.class;
+
+        case DataType.MAP:
+            int errCode = 1068;
+            String msg = "Using Map as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        case DataType.TUPLE:
+            return PigWritableComparators.PigTupleSortBytesComparator.class;
+
+        case DataType.BAG:
+            errCode = 1068;
+            msg = "Using Bag as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        default:
+            errCode = 2036;
+            msg = "Unhandled key type " + DataType.findTypeName(keyType);
+            throw new JobCreationException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    private static Class<? extends WritableComparator> getRawComparator(byte 
keyType)
             throws JobCreationException {
 
+        // These are full comparators used in order by jobs and as 
GroupingComparator in
+        // POShuffleTezLoad for other operations.
+
+        // Mapreduce uses PigGrouping<DataType>WritableComparator for 
non-orderby jobs.
+        // In Tez, we will use the raw comparators itself on the reduce side 
as well as it is
+        // now fixed to handle nulls for different indexes.
+        // Also PigGrouping<DataType>WritableComparator use 
PigNullablePartitionWritable.compareTo
+        // which is not that efficient for cases like tuple where tuple is 
iterated for null checking
+        // instead of taking advantage of 
TupleRawComparator.hasComparedTupleNull().
+        // Also skips multi-query index checking
+
         switch (keyType) {
         case DataType.BOOLEAN:
             return PigBooleanRawComparator.class;
@@ -888,11 +1166,7 @@ public class TezDagBuilder extends TezOp
             return PigTextRawComparator.class;
 
         case DataType.BYTEARRAY:
-            //if (hasOrderBy) {
-                return PigBytesRawComparator.class;
-            //} else {
-            //    return PigDBAWritableComparator.class;
-            //}
+            return PigBytesRawComparator.class;
 
         case DataType.MAP:
             int errCode = 1068;
@@ -900,14 +1174,7 @@ public class TezDagBuilder extends TezOp
             throw new JobCreationException(msg, errCode, PigException.INPUT);
 
         case DataType.TUPLE:
-            //TODO: PigTupleWritableComparator gives wrong results with 
cogroup in
-            //Checkin_2 and few other e2e tests. But MR has 
PigTupleWritableComparator
-            //Investigate the difference later
-            //if (hasOrderBy) {
-                return PigTupleSortComparator.class;
-            //} else {
-            //    return PigTupleWritableComparator.class;
-            //}
+            return PigTupleSortComparator.class;
 
         case DataType.BAG:
             errCode = 1068;
@@ -921,39 +1188,40 @@ public class TezDagBuilder extends TezOp
         }
     }
 
-    private static Class<? extends WritableComparator> 
getGroupingComparatorForKeyType(byte keyType)
+    private static Class<? extends WritableComparator> 
getRawBytesComparatorForSkewedJoin(byte keyType)
             throws JobCreationException {
 
+        // Extended Raw Bytes Comparators for SkewedJoin which unwrap the 
NullablePartitionWritable
         switch (keyType) {
         case DataType.BOOLEAN:
-            return PigGroupingBooleanWritableComparator.class;
+            return 
PigWritableComparators.PigBooleanRawBytesPartitionComparator.class;
 
         case DataType.INTEGER:
-            return PigGroupingIntWritableComparator.class;
+            return 
PigWritableComparators.PigIntRawBytesPartitionComparator.class;
 
         case DataType.BIGINTEGER:
-            return PigGroupingBigIntegerWritableComparator.class;
+            return 
PigWritableComparators.PigBigIntegerRawBytesPartitionComparator.class;
 
         case DataType.BIGDECIMAL:
-            return PigGroupingBigDecimalWritableComparator.class;
+            return 
PigWritableComparators.PigBigDecimalRawBytesPartitionComparator.class;
 
         case DataType.LONG:
-            return PigGroupingLongWritableComparator.class;
+            return 
PigWritableComparators.PigLongRawBytesPartitionComparator.class;
 
         case DataType.FLOAT:
-            return PigGroupingFloatWritableComparator.class;
+            return 
PigWritableComparators.PigFloatRawBytesPartitionComparator.class;
 
         case DataType.DOUBLE:
-            return PigGroupingDoubleWritableComparator.class;
+            return 
PigWritableComparators.PigDoubleRawBytesPartitionComparator.class;
 
         case DataType.DATETIME:
-            return PigGroupingDateTimeWritableComparator.class;
+            return 
PigWritableComparators.PigDateTimeRawBytesPartitionComparator.class;
 
         case DataType.CHARARRAY:
-            return PigGroupingCharArrayWritableComparator.class;
+            return 
PigWritableComparators.PigTextRawBytesPartitionComparator.class;
 
         case DataType.BYTEARRAY:
-            return PigGroupingDBAWritableComparator.class;
+            return 
PigWritableComparators.PigBytesRawBytesPartitionComparator.class;
 
         case DataType.MAP:
             int errCode = 1068;
@@ -961,7 +1229,7 @@ public class TezDagBuilder extends TezOp
             throw new JobCreationException(msg, errCode, PigException.INPUT);
 
         case DataType.TUPLE:
-            return PigGroupingTupleWritableComparator.class;
+            return 
PigWritableComparators.PigTupleSortBytesPartitionComparator.class;
 
         case DataType.BAG:
             errCode = 1068;
@@ -975,30 +1243,121 @@ public class TezDagBuilder extends TezOp
         }
     }
 
-    void selectOutputComparator(byte keyType, Configuration conf, TezOperator 
tezOp)
+    private static Class<? extends WritableComparator> 
getRawComparatorForSkewedJoin(byte keyType)
+            throws JobCreationException {
+
+        // Extended Raw Comparators for SkewedJoin which unwrap the 
NullablePartitionWritable
+        switch (keyType) {
+        case DataType.BOOLEAN:
+            return 
PigWritableComparators.PigBooleanRawPartitionComparator.class;
+
+        case DataType.INTEGER:
+            return PigWritableComparators.PigIntRawPartitionComparator.class;
+
+        case DataType.BIGINTEGER:
+            return 
PigWritableComparators.PigBigIntegerRawPartitionComparator.class;
+
+        case DataType.BIGDECIMAL:
+            return 
PigWritableComparators.PigBigDecimalRawPartitionComparator.class;
+
+        case DataType.LONG:
+            return PigWritableComparators.PigLongRawPartitionComparator.class;
+
+        case DataType.FLOAT:
+            return PigWritableComparators.PigFloatRawPartitionComparator.class;
+
+        case DataType.DOUBLE:
+            return 
PigWritableComparators.PigDoubleRawPartitionComparator.class;
+
+        case DataType.DATETIME:
+            return 
PigWritableComparators.PigDateTimeRawPartitionComparator.class;
+
+        case DataType.CHARARRAY:
+            return PigWritableComparators.PigTextRawPartitionComparator.class;
+
+        case DataType.BYTEARRAY:
+            return PigWritableComparators.PigBytesRawPartitionComparator.class;
+
+        case DataType.MAP:
+            int errCode = 1068;
+            String msg = "Using Map as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        case DataType.TUPLE:
+            return 
PigWritableComparators.PigTupleSortPartitionComparator.class;
+
+        case DataType.BAG:
+            errCode = 1068;
+            msg = "Using Bag as key not supported.";
+            throw new JobCreationException(msg, errCode, PigException.INPUT);
+
+        default:
+            errCode = 2036;
+            msg = "Unhandled key type " + DataType.findTypeName(keyType);
+            throw new JobCreationException(msg, errCode, PigException.BUG);
+        }
+    }
+
+    void selectKeyComparator(byte keyType, Configuration conf, TezOperator 
tezOp, boolean isMergedInput)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
         // TODO: Group comparators as in JobControlCompiler
-        if (tezOp != null && tezOp.isUseSecondaryKey()) {
+        if (tezOp == null) {
+            return;
+        }
+        if (tezOp.isUseSecondaryKey()) {
             conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
                     PigSecondaryKeyComparator.class.getName());
             setGroupingComparator(conf, 
PigSecondaryKeyGroupComparator.class.getName());
         } else {
-            if (tezOp != null && tezOp.isSkewedJoin()) {
-                // TODO: PigGroupingPartitionWritableComparator only used as 
Group comparator in MR.
-                // What should be TEZ_RUNTIME_KEY_COMPARATOR_CLASS if same as 
MR?
-                
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-                        
PigGroupingPartitionWritableComparator.class.getName());
-                setGroupingComparator(conf, 
PigGroupingPartitionWritableComparator.class.getName());
+            // If it is not a merged input (OrderedGroupedMergedKVInput) from 
union then
+            // use bytes only comparator. This is temporary till PIG-4652 is 
done
+            if (!isMergedInput && (tezOp.isGroupBy() || tezOp.isDistinct())) {
+                conf.setClass(
+                        
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawBytesComparator(keyType), RawComparator.class);
+            } else if (tezOp.isSkewedJoin()) {
+                
conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawComparatorForSkewedJoin(keyType), 
RawComparator.class);
             } else {
-                boolean hasOrderby = hasOrderby(tezOp);
                 conf.setClass(
                         
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
-                        comparatorForKeyType(keyType, hasOrderby), 
RawComparator.class);
-                if (!hasOrderby) {
-                    setGroupingComparator(conf, 
getGroupingComparatorForKeyType(keyType).getName());
-                }
+                        getRawComparator(keyType), RawComparator.class);
+            }
+
+            // Comparators now
+            //             groupby/distinct : Comparator - RawBytesComparator
+            // groupby/distinct after union : Comparator - RawComparator
+            //                      orderby : Comparator - RawComparator
+            //                  skewed join : Comparator - 
RawPartitionComparator
+            //           Rest (other joins) : Comparator - RawComparator
+
+            //TODO: In PIG-4652: After Tez support for exposing key bytes
+            //    groupby/distinct : Comparator - RawBytesComparator. No 
grouping comparator required.
+            //             orderby : Comparator - RawComparator. No grouping 
comparator required.
+            //         skewed join : Comparator - RawBytesPartitionComparator, 
GroupingComparator - RawPartitionComparator
+            //  Rest (other joins) : Comparator - RawBytesComparator, 
GroupingComparator - RawComparator
+
+            /*
+            if (tezOp.isSkewedJoin()) {
+                
conf.setClass(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawBytesComparatorForSkewedJoin(keyType), 
RawComparator.class);
+                setGroupingComparator(conf,  
getRawComparatorForSkewedJoin(keyType).getName());
+            } else if (tezOp.isGroupBy() || tezOp.isDistinct()) {
+                conf.setClass(
+                        
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawBytesComparator(keyType), RawComparator.class);
+            } else if (hasOrderby(tezOp)) {
+                conf.setClass(
+                        
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawComparator(keyType), RawComparator.class);
+            } else {
+                conf.setClass(
+                        
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_COMPARATOR_CLASS,
+                        getRawBytesComparator(keyType), RawComparator.class);
+                setGroupingComparator(conf, 
getRawComparator(keyType).getName());
             }
+            */
         }
     }
 
@@ -1016,6 +1375,7 @@ public class TezDagBuilder extends TezOp
         return hasOrderBy;
     }
 
+
     private void setGroupingComparator(Configuration conf, String 
comparatorClass) {
         // In MR - job.setGroupingComparatorClass() or 
MRJobConfig.GROUP_COMPARATOR_CLASS
         // TODO: Check why tez-mapreduce ReduceProcessor use two different tez
@@ -1046,27 +1406,4 @@ public class TezDagBuilder extends TezOp
             job.setOutputFormatClass(PigOutputFormatTez.class);
         }
     }
-
-    // Borrowed from TezUtils.convertToHistoryText since it is not part of Tez 
0.5.2
-    public static String convertToHistoryText(String description, 
Configuration conf) throws IOException {
-        // Add a version if this serialization is changed
-        JSONObject jsonObject = new JSONObject();
-        try {
-            if (description != null && !description.isEmpty()) {
-                jsonObject.put("desc", description);
-        }
-        if (conf != null) {
-            JSONObject confJson = new JSONObject();
-            Iterator<Entry<String, String>> iter = conf.iterator();
-            while (iter.hasNext()) {
-                Entry<String, String> entry = iter.next();
-                confJson.put(entry.getKey(), entry.getValue());
-            }
-            jsonObject.put("config", confJson);
-        }
-        } catch (JSONException e) {
-            throw new IOException("Error when trying to convert 
description/conf to JSON", e);
-        }
-        return jsonObject.toString();
-    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
 Fri Mar  4 18:17:39 2016
@@ -223,10 +223,16 @@ public class TezJob implements Runnable
 
     private class DAGStatusReporter extends TimerTask {
 
+        private String prevDAGStatus;
+
         @Override
         public void run() {
             if (dagStatus == null) return;
-            log.info("DAG Status: " + dagStatus.toString());
+            String currDAGStatus = dagStatus.toString();
+            if (!currDAGStatus.equals(prevDAGStatus)) {
+                log.info("DAG Status: " + currDAGStatus);
+                prevDAGStatus = currDAGStatus;
+            }
         }
     }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
 Fri Mar  4 18:17:39 2016
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.pig.PigException;
@@ -36,8 +35,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainer;
 import 
org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPlanContainerNode;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
 
 /**
  * This is compiler class that takes a TezOperPlan and converts it into a
@@ -58,9 +60,10 @@ public class TezJobCompiler {
     public DAG buildDAG(TezPlanContainerNode tezPlanNode, Map<String, 
LocalResource> localResources)
             throws IOException, YarnException {
         DAG tezDag = DAG.create(tezPlanNode.getOperatorKey().toString());
-        tezDag.setCredentials(new Credentials());
+        tezDag.setCredentials(tezPlanNode.getTezOperPlan().getCredentials());
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, 
tezPlanNode.getTezOperPlan(), tezDag, localResources);
         dagBuilder.visit();
+        dagBuilder.avoidContainerReuseIfInputSplitInDisk();
         return tezDag;
     }
 
@@ -92,20 +95,19 @@ public class TezJobCompiler {
             String shipFiles = 
pigContext.getProperties().getProperty("pig.streaming.ship.files");
             if (shipFiles != null) {
                 for (String file : shipFiles.split(",")) {
-                    TezResourceManager.getInstance().addTezResource(new 
File(file).toURI());
+                    TezResourceManager.getInstance().addTezResource(new 
File(file.trim()).toURI());
                 }
             }
             String cacheFiles = 
pigContext.getProperties().getProperty("pig.streaming.cache.files");
             if (cacheFiles != null) {
-                for (String file : cacheFiles.split(",")) {
-                    // Do new URI() before passing to Path constructor else it 
encodes # when there is symlink
-                    TezResourceManager.getInstance().addTezResource(new 
Path(new URI(file.trim())).toUri());
-                }
+                addCacheResources(cacheFiles.split(","));
             }
             for (Map.Entry<String, LocalResource> entry : 
localResources.entrySet()) {
                 log.info("Local resource: " + entry.getKey());
             }
             DAG tezDag = buildDAG(tezPlanNode, localResources);
+            tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
+            log.info("Total estimated parallelism is " + 
tezPlan.getEstimatedTotalParallelism());
             return new TezJob(tezConf, tezDag, localResources, 
tezPlan.getEstimatedTotalParallelism());
         } catch (Exception e) {
             int errCode = 2017;
@@ -113,5 +115,33 @@ public class TezJobCompiler {
             throw new JobCreationException(msg, errCode, PigException.BUG, e);
         }
     }
+
+    private void addCacheResources(String[] fileNames) throws Exception {
+        for (String fileName : fileNames) {
+            fileName = fileName.trim();
+            if (fileName.length() > 0) {
+                URI resourceURI = new URI(fileName);
+                String fragment = resourceURI.getFragment();
+
+                Path remoteFsPath = new Path(resourceURI);
+                String resourceName = (fragment != null && fragment.length() > 
0) ? fragment : remoteFsPath.getName();
+                TezResourceManager.getInstance().addTezResource(resourceName, 
remoteFsPath);
+            }
+        }
+    }
+
+    private String createDagInfo(String script) throws IOException {
+        String dagInfo;
+        try {
+            JSONObject jsonObject = new JSONObject();
+            jsonObject.put("context", "Pig");
+            jsonObject.put("description", script);
+            dagInfo = jsonObject.toString();
+        } catch (JSONException e) {
+            throw new IOException("Error when trying to convert Pig script to 
JSON", e);
+        }
+        log.debug("DagInfo: " + dagInfo);
+        return dagInfo;
+    }
 }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
 Fri Mar  4 18:17:39 2016
@@ -19,7 +19,9 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.io.PrintStream;
+import java.lang.reflect.Field;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -32,8 +34,9 @@ import java.util.concurrent.ThreadFactor
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.counters.Limits;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
@@ -112,10 +115,32 @@ public class TezLauncher extends Launche
         if (pc.getExecType().isLocal()) {
             pc.getProperties().setProperty(TezConfiguration.TEZ_LOCAL_MODE, 
"true");
             
pc.getProperties().setProperty(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH,
 "true");
-            pc.getProperties().setProperty("tez.ignore.lib.uris", "true");
-            pc.getProperties().setProperty("tez.am.dag.scheduler.class", 
DAGSchedulerNaturalOrderControlled.class.getName());
+            
pc.getProperties().setProperty(TezConfiguration.TEZ_IGNORE_LIB_URIS, "true");
+            
pc.getProperties().setProperty(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, 
DAGSchedulerNaturalOrderControlled.class.getName());
         }
         Configuration conf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+        // Make sure MR counter does not exceed limit
+        if (conf.get(TezConfiguration.TEZ_COUNTERS_MAX) != null) {
+            
conf.setInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTERS_MAX_KEY, Math.max(
+                    
conf.getInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTERS_MAX_KEY, 0),
+                    conf.getInt(TezConfiguration.TEZ_COUNTERS_MAX, 0)));
+        }
+        if (conf.get(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS) != null) {
+            
conf.setInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTER_GROUPS_MAX_KEY, 
Math.max(
+                    
conf.getInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTER_GROUPS_MAX_KEY, 0),
+                    conf.getInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 0)));
+        }
+
+        // This is hacky, but Limits cannot be initialized twice
+        try {
+            Field f = Limits.class.getDeclaredField("isInited");
+            f.setAccessible(true);
+            f.setBoolean(null, false);
+            Limits.init(conf);
+        } catch (Throwable e) {
+            log.warn("Error when setting counter limit: " + e.getMessage());
+        }
+
         if (pc.defaultParallel == -1 && 
!conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) {
             pc.defaultParallel = 1;
         }
@@ -124,9 +149,16 @@ public class TezLauncher extends Launche
         TezResourceManager tezResourceManager = 
TezResourceManager.getInstance();
         tezResourceManager.init(pc, conf);
 
-        Path stagingDir = tezResourceManager.getStagingDir();
-        log.info("Tez staging directory is " + stagingDir.toString());
-        conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir.toString());
+        String stagingDir = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
+        String resourcesDir = tezResourceManager.getResourcesDir().toString();
+        if (stagingDir == null) {
+            // If not set in tez-site.xml, use Pig's tez resources directory 
as staging directory
+            // instead of TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT
+            stagingDir = resourcesDir;
+            conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, resourcesDir);
+        }
+        log.info("Tez staging directory is " + stagingDir + " and resources 
directory is " + resourcesDir);
+
 
         List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
 
@@ -403,11 +435,27 @@ public class TezLauncher extends Launche
             skOptimizer.visit();
         }
 
+        boolean isUnionOpt = 
conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
+        List<String> supportedStoreFuncs = null;
+        String unionSupported = 
conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS);
+        if (unionSupported != null && unionSupported.trim().length() > 0) {
+            supportedStoreFuncs = Arrays
+                    .asList(StringUtils.split(unionSupported.trim()));
+        }
+        List<String> unionUnsupportedStoreFuncs = null;
+        String unionUnSupported = 
conf.get(PigConfiguration.PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS);
+        if (unionUnSupported != null && unionUnSupported.trim().length() > 0) {
+            unionUnsupportedStoreFuncs = Arrays
+                    .asList(StringUtils.split(unionUnSupported.trim()));
+        }
+
         boolean isMultiQuery = 
conf.getBoolean(PigConfiguration.PIG_OPT_MULTIQUERY, true);
         if (isMultiQuery) {
             // reduces the number of TezOpers in the Tez plan generated
             // by multi-query (multi-store) script.
-            MultiQueryOptimizerTez mqOptimizer = new 
MultiQueryOptimizerTez(tezPlan);
+            MultiQueryOptimizerTez mqOptimizer = new MultiQueryOptimizerTez(
+                    tezPlan, isUnionOpt, supportedStoreFuncs,
+                    unionUnsupportedStoreFuncs);
             mqOptimizer.visit();
         }
 
@@ -419,9 +467,8 @@ public class TezLauncher extends Launche
         }
 
         // Use VertexGroup in Tez
-        boolean isUnionOpt = 
conf.getBoolean(PigConfiguration.PIG_TEZ_OPT_UNION, true);
         if (isUnionOpt) {
-            UnionOptimizer uo = new UnionOptimizer(tezPlan);
+            UnionOptimizer uo = new UnionOptimizer(tezPlan, 
supportedStoreFuncs, unionUnsupportedStoreFuncs);
             uo.visit();
         }
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
 Fri Mar  4 18:17:39 2016
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
+import static 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.getFromCache;
+
 import java.io.IOException;
 import java.net.URI;
 import java.util.HashMap;
@@ -33,13 +35,14 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 
 public class TezResourceManager {
     private static TezResourceManager instance = null;
     private boolean inited = false;
-    private Path stagingDir;
+    private Path resourcesDir;
     private FileSystem remoteFs;
     private Configuration conf;
     private PigContext pigContext;
@@ -54,7 +57,7 @@ public class TezResourceManager {
 
     public void init(PigContext pigContext, Configuration conf) throws 
IOException {
         if (!inited) {
-            this.stagingDir = 
FileLocalizer.getTemporaryResourcePath(pigContext);
+            this.resourcesDir = 
FileLocalizer.getTemporaryResourcePath(pigContext);
             this.remoteFs = FileSystem.get(conf);
             this.conf = conf;
             this.pigContext = pigContext;
@@ -62,8 +65,8 @@ public class TezResourceManager {
         }
     }
 
-    public Path getStagingDir() {
-        return stagingDir;
+    public Path getResourcesDir() {
+        return resourcesDir;
     }
 
     // Add files from the source FS as local resources. The resource name will
@@ -79,7 +82,19 @@ public class TezResourceManager {
 
             // Ship the local resource to the staging directory on the remote 
FS
             if (!pigContext.getExecType().isLocal() && 
uri.toString().startsWith("file:")) {
-                Path remoteFsPath = remoteFs.makeQualified(new 
Path(stagingDir, resourceName));
+                boolean cacheEnabled =
+                        
conf.getBoolean(PigConfiguration.PIG_USER_CACHE_ENABLED, false);
+
+                if(cacheEnabled){
+                    Path pathOnDfs = getFromCache(pigContext, conf, 
uri.toURL());
+                    if(pathOnDfs != null) {
+                        resources.put(resourceName, pathOnDfs);
+                        return pathOnDfs;
+                    }
+
+                }
+
+                Path remoteFsPath = remoteFs.makeQualified(new 
Path(resourcesDir, resourceName));
                 remoteFs.copyFromLocalFile(resourcePath, remoteFsPath);
                 remoteFs.setReplication(remoteFsPath, 
(short)conf.getInt(Job.SUBMIT_REPLICATION, 3));
                 resources.put(resourceName, remoteFsPath);
@@ -115,7 +130,8 @@ public class TezResourceManager {
             // The resource name will be symlinked to the resource path in the
             // container's working directory.
             Path resourcePath = resources.get(resourceName);
-            FileStatus fstat = remoteFs.getFileStatus(resourcePath);
+            FileSystem fileSystem = resourcePath.getFileSystem(conf);
+            FileStatus fstat = fileSystem.getFileStatus(resourcePath);
 
             LocalResource tezResource = LocalResource.newInstance(
                     ConverterUtils.getYarnUrlFromPath(fstat.getPath()),



Reply via email to