Author: rohini
Date: Mon Jan 11 21:18:06 2016
New Revision: 1724131

URL: http://svn.apache.org/viewvc?rev=1724131&view=rev
Log:
PIG-4411: Support for vertex level configuration like speculative execution 
(rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
    pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
    pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 21:18:06 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-4411: Support for vertex level configuration like speculative execution 
(rohini)
+
 PIG-4775: Better default values for shuffle bytes per reducer (rohini)
 
 PIG-4753: Pigmix should have option to delete outputs after completing the 
tests (mitdesai via rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
 Mon Jan 11 21:18:06 2016
@@ -172,6 +172,7 @@ public class TezDagBuilder extends TezOp
     private Map<String, LocalResource> localResources;
     private PigContext pc;
     private Configuration globalConf;
+    private Configuration pigContextConf;
     private FileSystem fs;
     private long intermediateTaskInputSize;
     private Set<String> inputSplitInDiskVertices;
@@ -181,21 +182,98 @@ public class TezDagBuilder extends TezOp
     private String serializedPigContext;
     private String serializedUDFImportList;
 
+    // Map corresponds to root vertices, reduce to intermediate and leaf 
vertices
+    private Resource mapTaskResource;
+    private Resource reduceTaskResource;
+    private Map<String, String> mapTaskEnv = new HashMap<String, String>();
+    private Map<String, String> reduceTaskEnv = new HashMap<String, String>();
+    private String mapTaskLaunchCmdOpts;
+    private String reduceTaskLaunchCmdOpts;
+
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.pc = pc;
-        this.globalConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
         this.localResources = localResources;
         this.dag = dag;
         this.inputSplitInDiskVertices = new HashSet<String>();
 
         try {
-            // Add credentials from binary token file and get tokens for 
namenodes
-            // specified in mapreduce.job.hdfs-servers
-            SecurityHelper.populateTokenCache(globalConf, 
dag.getCredentials());
+            initialize(pc);
+
+            udfContextSeparator = new TezUDFContextSeparator(plan,
+                    new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+            udfContextSeparator.visit();
         } catch (IOException e) {
-            throw new RuntimeException("Error while fetching delegation 
tokens", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initialize(PigContext pc) throws IOException {
+
+        this.globalConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
+        this.pigContextConf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+        MRToTezHelper.processMRSettings(pigContextConf, globalConf);
+
+        // Add credentials from binary token file and get tokens for namenodes
+        // specified in mapreduce.job.hdfs-servers
+        SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
+
+        // All these classes are @InterfaceAudience.Private in Hadoop. Switch 
to Tez methods in TEZ-1012
+        // set the timestamps, public/private visibility of the archives and 
files
+        ClientDistributedCacheManager
+                .determineTimestampsAndCacheVisibilities(globalConf);
+        // get DelegationToken for each cached file
+        ClientDistributedCacheManager.getDelegationTokens(globalConf,
+                dag.getCredentials());
+        MRApps.setupDistributedCache(globalConf, this.localResources);
+        dag.addTaskLocalFiles(this.localResources);
+
+        int mapMemoryMB;
+        int reduceMemoryMB;
+        int mapVCores;
+        int reduceVCores;
+        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB) != 
null) {
+            mapMemoryMB = globalConf.getInt(
+                    TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
+                    TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT);
+            reduceMemoryMB = mapMemoryMB;
+        } else {
+            // If tez setting is not defined, try MR setting
+            mapMemoryMB = globalConf.getInt(MRJobConfig.MAP_MEMORY_MB,
+                    MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+            reduceMemoryMB = globalConf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
+                    MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
+        }
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES) != 
null) {
+            mapVCores = globalConf.getInt(
+                    TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
+                    TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT);
+            reduceVCores = mapVCores;
+        } else {
+            mapVCores = globalConf.getInt(MRJobConfig.MAP_CPU_VCORES,
+                    MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+            reduceVCores = globalConf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+                    MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+        }
+        mapTaskResource = Resource.newInstance(mapMemoryMB, mapVCores);
+        reduceTaskResource = Resource.newInstance(reduceMemoryMB, 
reduceVCores);
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null) 
{
+            // If tez setting is not defined
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
+            MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv, 
true);
+        }
+
+        if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null) 
{
+            mapTaskLaunchCmdOpts = 
globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS);
+            reduceTaskLaunchCmdOpts = mapTaskLaunchCmdOpts;
+        } else {
+            // If tez setting is not defined, try MR setting
+            mapTaskLaunchCmdOpts = 
MRHelpers.getJavaOptsForMRMapper(globalConf);
+            reduceTaskLaunchCmdOpts = 
MRHelpers.getJavaOptsForMRReducer(globalConf);
         }
 
         try {
@@ -212,21 +290,13 @@ public class TezDagBuilder extends TezOp
                         InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                         InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
 
-        try {
-            serializedPigContext = ObjectSerializer.serialize(pc);
-            serializedUDFImportList = 
ObjectSerializer.serialize(PigContext.getPackageImportList());
-
-            udfContextSeparator = new TezUDFContextSeparator(plan,
-                    new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
-            udfContextSeparator.visit();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
+        serializedPigContext = ObjectSerializer.serialize(pc);
+        serializedUDFImportList = 
ObjectSerializer.serialize(PigContext.getPackageImportList());
     }
 
-    public String getSerializedTezPlan() throws IOException {
+    private String getSerializedTezPlan() throws IOException {
         if (serializedTezPlan == null) {
-            // Initialize lazy as auto parallelism might not be in play
+            // Initialize lazy instead of constructor as this might not be 
needed
             serializedTezPlan = ObjectSerializer.serialize(getPlan());
         }
         return serializedTezPlan;
@@ -368,7 +438,7 @@ public class TezDagBuilder extends TezOp
         InputDescriptor in = InputDescriptor.create(edge.inputClassName);
         OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
 
-        Configuration conf = 
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+        Configuration conf = new Configuration(pigContextConf);
 
         if (!combinePlan.isEmpty()) {
             udfContextSeparator.serializeUDFContextForEdge(conf, from, to, 
UDFType.USERFUNC);
@@ -437,8 +507,6 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
-        MRToTezHelper.processMRSettings(conf, globalConf);
-
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
@@ -485,7 +553,7 @@ public class TezDagBuilder extends TezOp
                 tezOp.getProcessorName());
 
         // Pass physical plans to vertex as user payload.
-        JobConf payloadConf = new 
JobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), false));
+        JobConf payloadConf = new JobConf(pigContextConf);
 
         // We do this so that dag.getCredentials(), job.getCredentials(),
         // job.getConfiguration().getCredentials() all reference the same 
Credentials object
@@ -502,7 +570,6 @@ public class TezDagBuilder extends TezOp
         setOutputFormat(job);
         payloadConf.set("udf.import.list", serializedUDFImportList);
         payloadConf.set("exectype", "TEZ");
-        MRToTezHelper.processMRSettings(payloadConf, globalConf);
 
         // Process stores
         LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -707,7 +774,7 @@ public class TezDagBuilder extends TezOp
                     }
                 }
                 if (containScatterGather && !containCustomPartitioner) {
-                    vmPluginConf = (vmPluginConf == null) ? 
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+                    vmPluginConf = (vmPluginConf == null) ? new 
Configuration(pigContextConf) : vmPluginConf;
                     // Use auto-parallelism feature of ShuffleVertexManager to 
dynamically
                     // reduce the parallelism of the vertex
                     if 
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
@@ -750,7 +817,7 @@ public class TezDagBuilder extends TezOp
                 // limit job starts when 1 source task finishes.
                 // If limit is part of a group by or join because their 
parallelism is 1,
                 // we should leave the configuration with the defaults.
-                vmPluginConf = (vmPluginConf == null) ? 
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+                vmPluginConf = (vmPluginConf == null) ? new 
Configuration(pigContextConf) : vmPluginConf;
                 
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
 "0.00001");
                 
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
 "0.00001");
                 log.info("Set " + 
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001 
for limit vertex " + tezOp.getOperatorKey().toString());
@@ -761,41 +828,19 @@ public class TezDagBuilder extends TezOp
         if (tezOp.isUseGraceParallelism()) {
             parallel = -1;
         }
-        Resource resource;
-        if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB)!=null 
&&
-                
globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES)!=null) {
-            resource = 
Resource.newInstance(globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
-                    TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT),
-                    
globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
-                    TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
-        } else {
-            // If tez setting is not defined, try MR setting
-            resource = tezOp.isUseMRMapSettings() ? 
MRHelpers.getResourceForMRMapper(globalConf) : 
MRHelpers.getResourceForMRReducer(globalConf);
-        }
+        Resource resource = tezOp.isUseMRMapSettings() ? mapTaskResource : 
reduceTaskResource;
+
         Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(), 
procDesc, parallel, resource);
-        Map<String, String> taskEnv = new HashMap<String, String>();
-        MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv, 
tezOp.isUseMRMapSettings());
-        vertex.setTaskEnvironment(taskEnv);
 
-        // All these classes are @InterfaceAudience.Private in Hadoop. Switch 
to Tez methods in TEZ-1012
-        // set the timestamps, public/private visibility of the archives and 
files
-        ClientDistributedCacheManager
-                .determineTimestampsAndCacheVisibilities(globalConf);
-        // get DelegationToken for each cached file
-        ClientDistributedCacheManager.getDelegationTokens(globalConf,
-                job.getCredentials());
-        MRApps.setupDistributedCache(globalConf, localResources);
-        vertex.addTaskLocalFiles(localResources);
-
-        String javaOpts;
-        if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS)!=null) {
-            javaOpts = 
globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS);
+        if (tezOp.isUseMRMapSettings()) {
+            vertex.setTaskLaunchCmdOpts(mapTaskLaunchCmdOpts);
+            vertex.setTaskEnvironment(mapTaskEnv);
         } else {
-            // If tez setting is not defined, try MR setting
-            javaOpts = tezOp.isUseMRMapSettings() ? 
MRHelpers.getJavaOptsForMRMapper(globalConf)
-                    : MRHelpers.getJavaOptsForMRReducer(globalConf);
+            vertex.setTaskLaunchCmdOpts(reduceTaskLaunchCmdOpts);
+            vertex.setTaskEnvironment(reduceTaskEnv);
         }
-        vertex.setTaskLaunchCmdOpts(javaOpts);
+
+        MRToTezHelper.setVertexConfig(vertex, tezOp.isUseMRMapSettings(), 
globalConf);
 
         log.info("For vertex - " + tezOp.getOperatorKey().toString()
                 + ": parallelism=" + tezOp.getVertexParallelism()

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 Mon Jan 11 21:18:06 2016
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -48,6 +49,7 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -60,6 +62,10 @@ public class MRToTezHelper {
     private static final String JOB_SPLIT_RESOURCE_NAME = 
MRJobConfig.JOB_SPLIT;
     private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME = 
MRJobConfig.JOB_SPLIT_METAINFO;
 
+    private static Map<String, String> mrAMParamToTezAMParamMap = new 
HashMap<String, String>();
+    private static Map<String, String> mrMapParamToTezVertexParamMap = new 
HashMap<String, String>();
+    private static Map<String, String> mrReduceParamToTezVertexParamMap = new 
HashMap<String, String>();
+
     private static List<String> mrSettingsToRetain = new ArrayList<String>();
 
     private static List<String> mrSettingsToRemove = new ArrayList<String>();
@@ -68,10 +74,38 @@ public class MRToTezHelper {
     }
 
     static {
+        populateMRToTezParamsMap();
         populateMRSettingsToRetain();
         populateMRSettingsToRemove();
     }
 
+    private static void populateMRToTezParamsMap() {
+
+        //AM settings
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_VMEM_MB, 
TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_CPU_VCORES, 
TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_MAX_ATTEMPTS, 
TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS);
+        mrAMParamToTezAMParamMap.put(MRConfiguration.JOB_CREDENTIALS_BINARY, 
TezConfiguration.TEZ_CREDENTIALS_PATH);
+        mrAMParamToTezAMParamMap.put(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, 
TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION);
+
+        //Map settings
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_MAX_ATTEMPTS, 
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_SPECULATIVE, 
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, 
TezConfiguration.TEZ_TASK_LOG_LEVEL);
+        //TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 
0.8
+        mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", 
"tez.am.vertex.max-task-concurrency");
+        //TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 
0.8
+        mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.am.progress.stuck.interval-ms");
+
+        //Reduce settings
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_MAX_ATTEMPTS, 
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, 
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, 
TezConfiguration.TEZ_TASK_LOG_LEVEL);
+        
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", 
"tez.am.vertex.max-task-concurrency");
+        
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", 
"tez.am.vertex.max-task-concurrency");
+        mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.am.progress.stuck.interval-ms");
+    }
+
     private static void populateMRSettingsToRetain() {
 
         // FileInputFormat
@@ -93,23 +127,57 @@ public class MRToTezHelper {
 
     private static void populateMRSettingsToRemove() {
 
-        // TODO: Add all unwanted MR config once Tez UI starts showing config
-
         // FileInputFormat.listStatus() on a task can cause job failure when 
run from Oozie
         mrSettingsToRemove.add(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_SIZES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILES_SIZES);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+        mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_VISIBILITIES);
+        mrSettingsToRemove.add(MRJobConfig.CLASSPATH_FILES);
     }
 
-    private static void removeUnwantedMRSettings(Configuration tezConf) {
+    private static void removeUnwantedSettings(Configuration tezConf, boolean 
isAMConf) {
+
+        // It is good to clean up as much of the unapplicable settings as 
possible.
+        // Tez has configs set on multiple places AM, DAG, Vertex, 
VertexManager
+        // Plugin, Tasks (Processor, Edge, every input and output, combiner)
+        // If conf size is bigger, it places heavy pressurce on AM memory and 
is
+        // inefficient while sending over RPC to tasks
 
-        Iterator<Entry<String, String>> iter = tezConf.iterator();
+        for (String mrSetting : mrSettingsToRemove) {
+            tezConf.unset(mrSetting);
+        }
+
+        Iterator<Entry<String, String>> iter = new 
Configuration(tezConf).iterator();
         while (iter.hasNext()) {
-            Entry<String, String> next = iter.next();
-            for (String mrSetting : mrSettingsToRemove) {
-                if (next.getKey().equals(mrSetting)) {
-                    iter.remove();
-                    break;
+            String key = iter.next().getKey();
+            if (!isAMConf) {
+                // Keep the setting in AM conf to be able to connect back to 
the
+                // Oozie launcher job and look at the parameter values passed,
+                // but get rid of for others
+                if (key.startsWith("oozie.")) {
+                    tezConf.unset(key);
+                    continue;
                 }
             }
+            if (key.startsWith("dfs.datanode")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("dfs.namenode")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("yarn.nodemanager")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.jobhistory")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.jobtracker")) {
+                tezConf.unset(key);
+            } else if (key.startsWith("mapreduce.tasktracker")) {
+                tezConf.unset(key);
+            }
         }
     }
 
@@ -118,20 +186,10 @@ public class MRToTezHelper {
 
         // Set Tez parameters based on MR parameters.
         TezConfiguration dagAMConf = new TezConfiguration(tezConf);
-        Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
-                .getMRToDAGParamMap();
 
-        for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
-            if (dagAMConf.get(entry.getKey()) != null) {
-                dagAMConf.setIfUnset(entry.getValue(), 
dagAMConf.get(entry.getKey()));
-                dagAMConf.unset(entry.getKey());
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
-                            + " to Tez key: " + entry.getValue()
-                            + " with value " + 
dagAMConf.get(entry.getValue()));
-                }
-            }
-        }
+
+        convertMRToTezConf(dagAMConf, dagAMConf, 
DeprecatedKeys.getMRToDAGParamMap());
+        convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
 
         String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
         if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
@@ -151,77 +209,81 @@ public class MRToTezHelper {
                 YarnConfiguration.DEFAULT_QUEUE_NAME);
         dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
 
-        int amMemMB = tezConf.getInt(MRJobConfig.MR_AM_VMEM_MB,
-                MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
-        int amCores = tezConf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
-                MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, ""
-                + amMemMB);
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
-                + amCores);
-
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
                 tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, 
MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
                 tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, 
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
 
-        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
-                + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
-                        MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
-
-        if (tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY) != null) {
-            dagAMConf.setIfUnset(TezConfiguration.TEZ_CREDENTIALS_PATH,
-                    tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY));
-        }
-
-        if (tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN) != null) {
-            
dagAMConf.setIfUnset(TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION,
-                    tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN));
-        }
-
         // Hardcoding at AM level instead of setting per vertex till TEZ-2710 
is available
         
dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, 
"0.5");
 
-        removeUnwantedMRSettings(dagAMConf);
+        removeUnwantedSettings(dagAMConf, true);
 
         return dagAMConf;
     }
 
     /**
+     * Set config with Scope.Vertex in TezConfiguration on the vertex
+     *
+     * @param vertex Vertex on which config is to be set
+     * @param isMapVertex Whether map or reduce vertex. i.e root or 
intermediate/leaf vertex
+     * @param conf Config that contains the tez or equivalent mapreduce 
settings.
+     */
+    public static void setVertexConfig(Vertex vertex, boolean isMapVertex,
+            Configuration conf) {
+        Map<String, String> configMapping = isMapVertex ? 
mrMapParamToTezVertexParamMap
+                : mrReduceParamToTezVertexParamMap;
+        for (Entry<String, String> dep : configMapping.entrySet()) {
+
+            String value = conf.get(dep.getValue(), conf.get(dep.getKey()));
+            if (value != null) {
+                vertex.setConf(dep.getValue(), value);
+                LOG.debug("Setting " + dep.getValue() + " to " + value
+                        + " for the vertex " + vertex.getName());
+            }
+        }
+    }
+
+    /**
      * Process the mapreduce configuration settings and
      *    - copy as is the still required ones (like those used by 
FileInputFormat/FileOutputFormat)
      *    - convert and set equivalent tez runtime settings
      *    - handle compression related settings
      *
-     * @param conf Configuration on which the mapreduce settings will have to 
be transferred
+     * @param tezConf Configuration on which the mapreduce settings will have 
to be transferred
      * @param mrConf Configuration that contains mapreduce settings
      */
-    public static void processMRSettings(Configuration conf, Configuration 
mrConf) {
+    public static void processMRSettings(Configuration tezConf, Configuration 
mrConf) {
         for (String mrSetting : mrSettingsToRetain) {
             if (mrConf.get(mrSetting) != null) {
-                conf.set(mrSetting, mrConf.get(mrSetting));
+                tezConf.set(mrSetting, mrConf.get(mrSetting));
             }
         }
-        JobControlCompiler.configureCompression(conf);
-        convertMRToTezRuntimeConf(conf, mrConf);
-        removeUnwantedMRSettings(conf);
+        JobControlCompiler.configureCompression(tezConf);
+        convertMRToTezConf(tezConf, mrConf, 
DeprecatedKeys.getMRToTezRuntimeParamMap());
+        removeUnwantedSettings(tezConf, false);
     }
 
     /**
      * Convert MR settings to Tez settings and set on conf.
      *
-     * @param conf  Configuration on which MR equivalent Tez settings should 
be set
+     * @param tezConf  Configuration on which MR equivalent Tez settings 
should be set
      * @param mrConf Configuration that contains MR settings
+     * @param mrToTezConfigMapping  Mapping of MR config to equivalent Tez 
config
      */
-    private static void convertMRToTezRuntimeConf(Configuration conf, 
Configuration mrConf) {
-        for (Entry<String, String> dep : 
DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) {
+    private static void convertMRToTezConf(Configuration tezConf, 
Configuration mrConf, Map<String, String> mrToTezConfigMapping) {
+        for (Entry<String, String> dep : mrToTezConfigMapping.entrySet()) {
             if (mrConf.get(dep.getKey()) != null) {
-                conf.unset(dep.getKey());
-                LOG.info("Setting " + dep.getValue() + " to "
-                        + mrConf.get(dep.getKey()) + " from MR setting "
-                        + dep.getKey());
-                conf.setIfUnset(dep.getValue(), mrConf.get(dep.getKey()));
+                if (tezConf.get(dep.getValue()) == null) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Setting " + dep.getValue() + " to "
+                                + mrConf.get(dep.getKey()) + " from MR setting 
"
+                                + dep.getKey());
+                    }
+                    tezConf.set(dep.getValue(), mrConf.get(dep.getKey()));
+                }
+                tezConf.unset(dep.getKey());
             }
         }
     }

Modified: pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java Mon Jan 11 
21:18:06 2016
@@ -20,6 +20,9 @@ package org.apache.pig.tez;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.HashMap;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
@@ -68,7 +71,7 @@ public class TestGroupConstParallelTez e
         parallelismSetter.visit();
 
         DAG tezDag = getTezDAG(tezPlan, pc);
-        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, 
null);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new 
HashMap<String, LocalResource>());
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
             if (!v.getInputVertices().isEmpty()) {
@@ -88,7 +91,7 @@ public class TestGroupConstParallelTez e
         parallelismSetter.visit();
 
         DAG tezDag = getTezDAG(tezPlan, pc);
-        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, 
null);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new 
HashMap<String, LocalResource>());
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
             if (!v.getInputVertices().isEmpty()) {

Modified: pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java Mon Jan 11 
21:18:06 2016
@@ -20,7 +20,10 @@ package org.apache.pig.tez;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.util.HashMap;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
@@ -62,7 +65,7 @@ public class TestJobSubmissionTez extend
         parallelismSetter.visit();
 
         DAG tezDag = getTezDAG(tezPlan, pc);
-        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, 
null);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new 
HashMap<String, LocalResource>());
         try {
             dagBuilder.visit();
         } catch (VisitorException jce) {
@@ -81,7 +84,7 @@ public class TestJobSubmissionTez extend
         parallelismSetter.visit();
 
         DAG tezDag = getTezDAG(tezPlan, pc);
-        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, 
null);
+        TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new 
HashMap<String, LocalResource>());
         dagBuilder.visit();
         for (Vertex v : tezDag.getVertices()) {
             if (!v.getInputVertices().isEmpty()) {


Reply via email to