Author: rohini
Date: Mon Jan 11 21:18:06 2016
New Revision: 1724131
URL: http://svn.apache.org/viewvc?rev=1724131&view=rev
Log:
PIG-4411: Support for vertex level configuration like speculative execution
(rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jan 11 21:18:06 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4411: Support for vertex level configuration like speculative execution
(rohini)
+
PIG-4775: Better default values for shuffle bytes per reducer (rohini)
PIG-4753: Pigmix should have option to delete outputs after completing the
tests (mitdesai via rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Mon Jan 11 21:18:06 2016
@@ -172,6 +172,7 @@ public class TezDagBuilder extends TezOp
private Map<String, LocalResource> localResources;
private PigContext pc;
private Configuration globalConf;
+ private Configuration pigContextConf;
private FileSystem fs;
private long intermediateTaskInputSize;
private Set<String> inputSplitInDiskVertices;
@@ -181,21 +182,98 @@ public class TezDagBuilder extends TezOp
private String serializedPigContext;
private String serializedUDFImportList;
+ // Map corresponds to root vertices, reduce to intermediate and leaf
vertices
+ private Resource mapTaskResource;
+ private Resource reduceTaskResource;
+ private Map<String, String> mapTaskEnv = new HashMap<String, String>();
+ private Map<String, String> reduceTaskEnv = new HashMap<String, String>();
+ private String mapTaskLaunchCmdOpts;
+ private String reduceTaskLaunchCmdOpts;
+
public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
this.pc = pc;
- this.globalConf =
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
this.localResources = localResources;
this.dag = dag;
this.inputSplitInDiskVertices = new HashSet<String>();
try {
- // Add credentials from binary token file and get tokens for
namenodes
- // specified in mapreduce.job.hdfs-servers
- SecurityHelper.populateTokenCache(globalConf,
dag.getCredentials());
+ initialize(pc);
+
+ udfContextSeparator = new TezUDFContextSeparator(plan,
+ new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ udfContextSeparator.visit();
} catch (IOException e) {
- throw new RuntimeException("Error while fetching delegation
tokens", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void initialize(PigContext pc) throws IOException {
+
+ this.globalConf =
ConfigurationUtil.toConfiguration(pc.getProperties(), true);
+
+ this.pigContextConf =
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+ MRToTezHelper.processMRSettings(pigContextConf, globalConf);
+
+ // Add credentials from binary token file and get tokens for namenodes
+ // specified in mapreduce.job.hdfs-servers
+ SecurityHelper.populateTokenCache(globalConf, dag.getCredentials());
+
+ // All these classes are @InterfaceAudience.Private in Hadoop. Switch
to Tez methods in TEZ-1012
+ // set the timestamps, public/private visibility of the archives and
files
+ ClientDistributedCacheManager
+ .determineTimestampsAndCacheVisibilities(globalConf);
+ // get DelegationToken for each cached file
+ ClientDistributedCacheManager.getDelegationTokens(globalConf,
+ dag.getCredentials());
+ MRApps.setupDistributedCache(globalConf, this.localResources);
+ dag.addTaskLocalFiles(this.localResources);
+
+ int mapMemoryMB;
+ int reduceMemoryMB;
+ int mapVCores;
+ int reduceVCores;
+ if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB) !=
null) {
+ mapMemoryMB = globalConf.getInt(
+ TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
+ TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT);
+ reduceMemoryMB = mapMemoryMB;
+ } else {
+ // If tez setting is not defined, try MR setting
+ mapMemoryMB = globalConf.getInt(MRJobConfig.MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+ reduceMemoryMB = globalConf.getInt(MRJobConfig.REDUCE_MEMORY_MB,
+ MRJobConfig.DEFAULT_REDUCE_MEMORY_MB);
+ }
+
+ if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES) !=
null) {
+ mapVCores = globalConf.getInt(
+ TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
+ TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT);
+ reduceVCores = mapVCores;
+ } else {
+ mapVCores = globalConf.getInt(MRJobConfig.MAP_CPU_VCORES,
+ MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ reduceVCores = globalConf.getInt(MRJobConfig.REDUCE_CPU_VCORES,
+ MRJobConfig.DEFAULT_REDUCE_CPU_VCORES);
+ }
+ mapTaskResource = Resource.newInstance(mapMemoryMB, mapVCores);
+ reduceTaskResource = Resource.newInstance(reduceMemoryMB,
reduceVCores);
+
+ if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) == null)
{
+ // If tez setting is not defined
+ MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, mapTaskEnv, true);
+ MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, reduceTaskEnv,
true);
+ }
+
+ if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS) != null)
{
+ mapTaskLaunchCmdOpts =
globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS);
+ reduceTaskLaunchCmdOpts = mapTaskLaunchCmdOpts;
+ } else {
+ // If tez setting is not defined, try MR setting
+ mapTaskLaunchCmdOpts =
MRHelpers.getJavaOptsForMRMapper(globalConf);
+ reduceTaskLaunchCmdOpts =
MRHelpers.getJavaOptsForMRReducer(globalConf);
}
try {
@@ -212,21 +290,13 @@ public class TezDagBuilder extends TezOp
InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
- try {
- serializedPigContext = ObjectSerializer.serialize(pc);
- serializedUDFImportList =
ObjectSerializer.serialize(PigContext.getPackageImportList());
-
- udfContextSeparator = new TezUDFContextSeparator(plan,
- new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
- udfContextSeparator.visit();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ serializedPigContext = ObjectSerializer.serialize(pc);
+ serializedUDFImportList =
ObjectSerializer.serialize(PigContext.getPackageImportList());
}
- public String getSerializedTezPlan() throws IOException {
+ private String getSerializedTezPlan() throws IOException {
if (serializedTezPlan == null) {
- // Initialize lazy as auto parallelism might not be in play
+ // Initialize lazy instead of constructor as this might not be
needed
serializedTezPlan = ObjectSerializer.serialize(getPlan());
}
return serializedTezPlan;
@@ -368,7 +438,7 @@ public class TezDagBuilder extends TezOp
InputDescriptor in = InputDescriptor.create(edge.inputClassName);
OutputDescriptor out = OutputDescriptor.create(edge.outputClassName);
- Configuration conf =
ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+ Configuration conf = new Configuration(pigContextConf);
if (!combinePlan.isEmpty()) {
udfContextSeparator.serializeUDFContextForEdge(conf, from, to,
UDFType.USERFUNC);
@@ -437,8 +507,6 @@ public class TezDagBuilder extends TezOp
edge.partitionerClass.getName());
}
- MRToTezHelper.processMRSettings(conf, globalConf);
-
in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
@@ -485,7 +553,7 @@ public class TezDagBuilder extends TezOp
tezOp.getProcessorName());
// Pass physical plans to vertex as user payload.
- JobConf payloadConf = new
JobConf(ConfigurationUtil.toConfiguration(pc.getProperties(), false));
+ JobConf payloadConf = new JobConf(pigContextConf);
// We do this so that dag.getCredentials(), job.getCredentials(),
// job.getConfiguration().getCredentials() all reference the same
Credentials object
@@ -502,7 +570,6 @@ public class TezDagBuilder extends TezOp
setOutputFormat(job);
payloadConf.set("udf.import.list", serializedUDFImportList);
payloadConf.set("exectype", "TEZ");
- MRToTezHelper.processMRSettings(payloadConf, globalConf);
// Process stores
LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
@@ -707,7 +774,7 @@ public class TezDagBuilder extends TezOp
}
}
if (containScatterGather && !containCustomPartitioner) {
- vmPluginConf = (vmPluginConf == null) ?
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+ vmPluginConf = (vmPluginConf == null) ? new
Configuration(pigContextConf) : vmPluginConf;
// Use auto-parallelism feature of ShuffleVertexManager to
dynamically
// reduce the parallelism of the vertex
if
(payloadConf.getBoolean(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, true)
@@ -750,7 +817,7 @@ public class TezDagBuilder extends TezOp
// limit job starts when 1 source task finishes.
// If limit is part of a group by or join because their
parallelism is 1,
// we should leave the configuration with the defaults.
- vmPluginConf = (vmPluginConf == null) ?
ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
+ vmPluginConf = (vmPluginConf == null) ? new
Configuration(pigContextConf) : vmPluginConf;
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
"0.00001");
vmPluginConf.set(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
"0.00001");
log.info("Set " +
ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION + " to 0.00001
for limit vertex " + tezOp.getOperatorKey().toString());
@@ -761,41 +828,19 @@ public class TezDagBuilder extends TezOp
if (tezOp.isUseGraceParallelism()) {
parallel = -1;
}
- Resource resource;
- if (globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB)!=null
&&
-
globalConf.get(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES)!=null) {
- resource =
Resource.newInstance(globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB,
- TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB_DEFAULT),
-
globalConf.getInt(TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES,
- TezConfiguration.TEZ_TASK_RESOURCE_CPU_VCORES_DEFAULT));
- } else {
- // If tez setting is not defined, try MR setting
- resource = tezOp.isUseMRMapSettings() ?
MRHelpers.getResourceForMRMapper(globalConf) :
MRHelpers.getResourceForMRReducer(globalConf);
- }
+ Resource resource = tezOp.isUseMRMapSettings() ? mapTaskResource :
reduceTaskResource;
+
Vertex vertex = Vertex.create(tezOp.getOperatorKey().toString(),
procDesc, parallel, resource);
- Map<String, String> taskEnv = new HashMap<String, String>();
- MRHelpers.updateEnvBasedOnMRTaskEnv(globalConf, taskEnv,
tezOp.isUseMRMapSettings());
- vertex.setTaskEnvironment(taskEnv);
- // All these classes are @InterfaceAudience.Private in Hadoop. Switch
to Tez methods in TEZ-1012
- // set the timestamps, public/private visibility of the archives and
files
- ClientDistributedCacheManager
- .determineTimestampsAndCacheVisibilities(globalConf);
- // get DelegationToken for each cached file
- ClientDistributedCacheManager.getDelegationTokens(globalConf,
- job.getCredentials());
- MRApps.setupDistributedCache(globalConf, localResources);
- vertex.addTaskLocalFiles(localResources);
-
- String javaOpts;
- if (globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS)!=null) {
- javaOpts =
globalConf.get(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS);
+ if (tezOp.isUseMRMapSettings()) {
+ vertex.setTaskLaunchCmdOpts(mapTaskLaunchCmdOpts);
+ vertex.setTaskEnvironment(mapTaskEnv);
} else {
- // If tez setting is not defined, try MR setting
- javaOpts = tezOp.isUseMRMapSettings() ?
MRHelpers.getJavaOptsForMRMapper(globalConf)
- : MRHelpers.getJavaOptsForMRReducer(globalConf);
+ vertex.setTaskLaunchCmdOpts(reduceTaskLaunchCmdOpts);
+ vertex.setTaskEnvironment(reduceTaskEnv);
}
- vertex.setTaskLaunchCmdOpts(javaOpts);
+
+ MRToTezHelper.setVertexConfig(vertex, tezOp.isUseMRMapSettings(),
globalConf);
log.info("For vertex - " + tezOp.getOperatorKey().toString()
+ ": parallelism=" + tezOp.getVertexParallelism()
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Mon Jan 11 21:18:06 2016
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -48,6 +49,7 @@ import org.apache.pig.backend.hadoop.exe
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.classification.InterfaceAudience;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -60,6 +62,10 @@ public class MRToTezHelper {
private static final String JOB_SPLIT_RESOURCE_NAME =
MRJobConfig.JOB_SPLIT;
private static final String JOB_SPLIT_METAINFO_RESOURCE_NAME =
MRJobConfig.JOB_SPLIT_METAINFO;
+ private static Map<String, String> mrAMParamToTezAMParamMap = new
HashMap<String, String>();
+ private static Map<String, String> mrMapParamToTezVertexParamMap = new
HashMap<String, String>();
+ private static Map<String, String> mrReduceParamToTezVertexParamMap = new
HashMap<String, String>();
+
private static List<String> mrSettingsToRetain = new ArrayList<String>();
private static List<String> mrSettingsToRemove = new ArrayList<String>();
@@ -68,10 +74,38 @@ public class MRToTezHelper {
}
static {
+ populateMRToTezParamsMap();
populateMRSettingsToRetain();
populateMRSettingsToRemove();
}
+ private static void populateMRToTezParamsMap() {
+
+ //AM settings
+ mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_VMEM_MB,
TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB);
+ mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_CPU_VCORES,
TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES);
+ mrAMParamToTezAMParamMap.put(MRJobConfig.MR_AM_MAX_ATTEMPTS,
TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS);
+ mrAMParamToTezAMParamMap.put(MRConfiguration.JOB_CREDENTIALS_BINARY,
TezConfiguration.TEZ_CREDENTIALS_PATH);
+ mrAMParamToTezAMParamMap.put(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN,
TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION);
+
+ //Map settings
+ mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_MAX_ATTEMPTS,
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
+ mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_SPECULATIVE,
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
+ mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL,
TezConfiguration.TEZ_TASK_LOG_LEVEL);
+ //TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez
0.8
+ mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit",
"tez.am.vertex.max-task-concurrency");
+ //TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez
0.8
+ mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT,
"tez.am.progress.stuck.interval-ms");
+
+ //Reduce settings
+ mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_MAX_ATTEMPTS,
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS);
+ mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE,
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
+ mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL,
TezConfiguration.TEZ_TASK_LOG_LEVEL);
+
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit",
"tez.am.vertex.max-task-concurrency");
+
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit",
"tez.am.vertex.max-task-concurrency");
+ mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT,
"tez.am.progress.stuck.interval-ms");
+ }
+
private static void populateMRSettingsToRetain() {
// FileInputFormat
@@ -93,23 +127,57 @@ public class MRToTezHelper {
private static void populateMRSettingsToRemove() {
- // TODO: Add all unwanted MR config once Tez UI starts showing config
-
// FileInputFormat.listStatus() on a task can cause job failure when
run from Oozie
mrSettingsToRemove.add(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY);
+
+ mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES);
+ mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_SIZES);
+ mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_TIMESTAMPS);
+ mrSettingsToRemove.add(MRJobConfig.CACHE_ARCHIVES_VISIBILITIES);
+ mrSettingsToRemove.add(MRJobConfig.CACHE_FILES);
+ mrSettingsToRemove.add(MRJobConfig.CACHE_FILES_SIZES);
+ mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_TIMESTAMPS);
+ mrSettingsToRemove.add(MRJobConfig.CACHE_FILE_VISIBILITIES);
+ mrSettingsToRemove.add(MRJobConfig.CLASSPATH_FILES);
}
- private static void removeUnwantedMRSettings(Configuration tezConf) {
+ private static void removeUnwantedSettings(Configuration tezConf, boolean
isAMConf) {
+
+ // It is good to clean up as much of the unapplicable settings as
possible.
+ // Tez has configs set on multiple places AM, DAG, Vertex,
VertexManager
+ // Plugin, Tasks (Processor, Edge, every input and output, combiner)
+ // If conf size is bigger, it places heavy pressurce on AM memory and
is
+ // inefficient while sending over RPC to tasks
- Iterator<Entry<String, String>> iter = tezConf.iterator();
+ for (String mrSetting : mrSettingsToRemove) {
+ tezConf.unset(mrSetting);
+ }
+
+ Iterator<Entry<String, String>> iter = new
Configuration(tezConf).iterator();
while (iter.hasNext()) {
- Entry<String, String> next = iter.next();
- for (String mrSetting : mrSettingsToRemove) {
- if (next.getKey().equals(mrSetting)) {
- iter.remove();
- break;
+ String key = iter.next().getKey();
+ if (!isAMConf) {
+ // Keep the setting in AM conf to be able to connect back to
the
+ // Oozie launcher job and look at the parameter values passed,
+ // but get rid of for others
+ if (key.startsWith("oozie.")) {
+ tezConf.unset(key);
+ continue;
}
}
+ if (key.startsWith("dfs.datanode")) {
+ tezConf.unset(key);
+ } else if (key.startsWith("dfs.namenode")) {
+ tezConf.unset(key);
+ } else if (key.startsWith("yarn.nodemanager")) {
+ tezConf.unset(key);
+ } else if (key.startsWith("mapreduce.jobhistory")) {
+ tezConf.unset(key);
+ } else if (key.startsWith("mapreduce.jobtracker")) {
+ tezConf.unset(key);
+ } else if (key.startsWith("mapreduce.tasktracker")) {
+ tezConf.unset(key);
+ }
}
}
@@ -118,20 +186,10 @@ public class MRToTezHelper {
// Set Tez parameters based on MR parameters.
TezConfiguration dagAMConf = new TezConfiguration(tezConf);
- Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
- .getMRToDAGParamMap();
- for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
- if (dagAMConf.get(entry.getKey()) != null) {
- dagAMConf.setIfUnset(entry.getValue(),
dagAMConf.get(entry.getKey()));
- dagAMConf.unset(entry.getKey());
- if (LOG.isDebugEnabled()) {
- LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
- + " to Tez key: " + entry.getValue()
- + " with value " +
dagAMConf.get(entry.getValue()));
- }
- }
- }
+
+ convertMRToTezConf(dagAMConf, dagAMConf,
DeprecatedKeys.getMRToDAGParamMap());
+ convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
@@ -151,77 +209,81 @@ public class MRToTezHelper {
YarnConfiguration.DEFAULT_QUEUE_NAME);
dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
- int amMemMB = tezConf.getInt(MRJobConfig.MR_AM_VMEM_MB,
- MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
- int amCores = tezConf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
- MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
- dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, ""
- + amMemMB);
- dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
- + amCores);
-
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB,
MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB,
MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
- dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
- + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
- MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
-
- if (tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY) != null) {
- dagAMConf.setIfUnset(TezConfiguration.TEZ_CREDENTIALS_PATH,
- tezConf.get(MRConfiguration.JOB_CREDENTIALS_BINARY));
- }
-
- if (tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN) != null) {
-
dagAMConf.setIfUnset(TezConfiguration.TEZ_CANCEL_DELEGATION_TOKENS_ON_COMPLETION,
- tezConf.get(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN));
- }
-
// Hardcoding at AM level instead of setting per vertex till TEZ-2710
is available
dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION,
"0.5");
- removeUnwantedMRSettings(dagAMConf);
+ removeUnwantedSettings(dagAMConf, true);
return dagAMConf;
}
/**
+ * Set config with Scope.Vertex in TezConfiguration on the vertex
+ *
+ * @param vertex Vertex on which config is to be set
+ * @param isMapVertex Whether map or reduce vertex. i.e root or
intermediate/leaf vertex
+ * @param conf Config that contains the tez or equivalent mapreduce
settings.
+ */
+ public static void setVertexConfig(Vertex vertex, boolean isMapVertex,
+ Configuration conf) {
+ Map<String, String> configMapping = isMapVertex ?
mrMapParamToTezVertexParamMap
+ : mrReduceParamToTezVertexParamMap;
+ for (Entry<String, String> dep : configMapping.entrySet()) {
+
+ String value = conf.get(dep.getValue(), conf.get(dep.getKey()));
+ if (value != null) {
+ vertex.setConf(dep.getValue(), value);
+ LOG.debug("Setting " + dep.getValue() + " to " + value
+ + " for the vertex " + vertex.getName());
+ }
+ }
+ }
+
+ /**
* Process the mapreduce configuration settings and
* - copy as is the still required ones (like those used by
FileInputFormat/FileOutputFormat)
* - convert and set equivalent tez runtime settings
* - handle compression related settings
*
- * @param conf Configuration on which the mapreduce settings will have to
be transferred
+ * @param tezConf Configuration on which the mapreduce settings will have
to be transferred
* @param mrConf Configuration that contains mapreduce settings
*/
- public static void processMRSettings(Configuration conf, Configuration
mrConf) {
+ public static void processMRSettings(Configuration tezConf, Configuration
mrConf) {
for (String mrSetting : mrSettingsToRetain) {
if (mrConf.get(mrSetting) != null) {
- conf.set(mrSetting, mrConf.get(mrSetting));
+ tezConf.set(mrSetting, mrConf.get(mrSetting));
}
}
- JobControlCompiler.configureCompression(conf);
- convertMRToTezRuntimeConf(conf, mrConf);
- removeUnwantedMRSettings(conf);
+ JobControlCompiler.configureCompression(tezConf);
+ convertMRToTezConf(tezConf, mrConf,
DeprecatedKeys.getMRToTezRuntimeParamMap());
+ removeUnwantedSettings(tezConf, false);
}
/**
* Convert MR settings to Tez settings and set on conf.
*
- * @param conf Configuration on which MR equivalent Tez settings should
be set
+ * @param tezConf Configuration on which MR equivalent Tez settings
should be set
* @param mrConf Configuration that contains MR settings
+ * @param mrToTezConfigMapping Mapping of MR config to equivalent Tez
config
*/
- private static void convertMRToTezRuntimeConf(Configuration conf,
Configuration mrConf) {
- for (Entry<String, String> dep :
DeprecatedKeys.getMRToTezRuntimeParamMap().entrySet()) {
+ private static void convertMRToTezConf(Configuration tezConf,
Configuration mrConf, Map<String, String> mrToTezConfigMapping) {
+ for (Entry<String, String> dep : mrToTezConfigMapping.entrySet()) {
if (mrConf.get(dep.getKey()) != null) {
- conf.unset(dep.getKey());
- LOG.info("Setting " + dep.getValue() + " to "
- + mrConf.get(dep.getKey()) + " from MR setting "
- + dep.getKey());
- conf.setIfUnset(dep.getValue(), mrConf.get(dep.getKey()));
+ if (tezConf.get(dep.getValue()) == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting " + dep.getValue() + " to "
+ + mrConf.get(dep.getKey()) + " from MR setting
"
+ + dep.getKey());
+ }
+ tezConf.set(dep.getValue(), mrConf.get(dep.getKey()));
+ }
+ tezConf.unset(dep.getKey());
}
}
}
Modified: pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java Mon Jan 11
21:18:06 2016
@@ -20,6 +20,9 @@ package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
+import java.util.HashMap;
+
+import org.apache.hadoop.yarn.api.records.LocalResource;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezCompiler;
@@ -68,7 +71,7 @@ public class TestGroupConstParallelTez e
parallelismSetter.visit();
DAG tezDag = getTezDAG(tezPlan, pc);
- TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag,
null);
+ TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new
HashMap<String, LocalResource>());
dagBuilder.visit();
for (Vertex v : tezDag.getVertices()) {
if (!v.getInputVertices().isEmpty()) {
@@ -88,7 +91,7 @@ public class TestGroupConstParallelTez e
parallelismSetter.visit();
DAG tezDag = getTezDAG(tezPlan, pc);
- TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag,
null);
+ TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new
HashMap<String, LocalResource>());
dagBuilder.visit();
for (Vertex v : tezDag.getVertices()) {
if (!v.getInputVertices().isEmpty()) {
Modified: pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java?rev=1724131&r1=1724130&r2=1724131&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java Mon Jan 11
21:18:06 2016
@@ -20,7 +20,10 @@ package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import java.util.HashMap;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.TezDagBuilder;
@@ -62,7 +65,7 @@ public class TestJobSubmissionTez extend
parallelismSetter.visit();
DAG tezDag = getTezDAG(tezPlan, pc);
- TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag,
null);
+ TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new
HashMap<String, LocalResource>());
try {
dagBuilder.visit();
} catch (VisitorException jce) {
@@ -81,7 +84,7 @@ public class TestJobSubmissionTez extend
parallelismSetter.visit();
DAG tezDag = getTezDAG(tezPlan, pc);
- TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag,
null);
+ TezDagBuilder dagBuilder = new TezDagBuilder(pc, tezPlan, tezDag, new
HashMap<String, LocalResource>());
dagBuilder.visit();
for (Vertex v : tezDag.getVertices()) {
if (!v.getInputVertices().isEmpty()) {