Updated Branches: refs/heads/TEZ-1 8cd1ce6e9 -> e3c8f2081
TEZ-50. Addendum patch with bug fix and consolidation of user name (bikas) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/e3c8f208 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/e3c8f208 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/e3c8f208 Branch: refs/heads/TEZ-1 Commit: e3c8f2081f3212c4b71e1c282c5655fff959b23a Parents: 8cd1ce6 Author: bikassaha <[email protected]> Authored: Mon May 6 15:14:58 2013 -0700 Committer: bikassaha <[email protected]> Committed: Mon May 6 15:14:58 2013 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/tez/dag/api/DAG.java | 4 ++-- .../org/apache/tez/dag/api/DAGConfiguration.java | 9 ++++----- .../org/apache/tez/dag/api/TezConfiguration.java | 5 ----- .../java/org/apache/tez/dag/app/DAGAppMaster.java | 6 ++---- .../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 13 +++++-------- .../tez/dag/app/rm/TaskSchedulerEventHandler.java | 2 +- .../dag/app/rm/container/AMContainerHelpers.java | 8 +++----- .../tez/mapreduce/hadoop/DeprecatedKeys.java | 7 ------- .../java/org/apache/tez/mapreduce/YARNRunner.java | 11 ++++++----- 9 files changed, 23 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java index a876abd..185f9a4 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -24,7 +24,7 @@ import java.util.List; public class DAG { // FIXME rename to Topology List<Vertex> vertices; List<Edge> edges; - String name = "TezDagApplication"; + String name; HashMap<String, String> config = new HashMap<String, String>(); @@ -70,7 +70,7 @@ public class DAG { // FIXME rename to Topology public void setName(String name) { this.name = name; } - + public void verify() throws TezException { // FIXME better exception //FIXME are task resources compulsory or will the DAG AM put in a default //for each vertex if not specified? http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java index 6e57e4e..93ed118 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java @@ -355,17 +355,16 @@ public class DAGConfiguration extends Configuration { public final String TEZ_DAG_NAME = DAG + "name"; @Private public void setName(String name) { - setStrings(name, name); + set(TEZ_DAG_NAME, name); } @Public @Stable public String getName() { - String[] name = getStrings(TEZ_DAG_NAME); - assert name != null && name.length == 1; - return name[0]; + String name = get(TEZ_DAG_NAME); + return name; } - + @Private public void setVertices(List<Vertex> vertices) { setVertices(TEZ_DAG_VERTICES, vertices); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java index 0c450b2..be03cd8 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java @@ -39,11 +39,6 @@ public class TezConfiguration extends Configuration { public static final String TEZ_PREFIX = "tez."; public static final String DAG_AM_PREFIX = TEZ_PREFIX + "dag.am."; - public static final String JOB_NAME = TEZ_PREFIX + "job.name"; - public static final String JOB_NAME_DEFAULT = "TezJob"; - - public static final String USER_NAME = TEZ_PREFIX + "user.name"; - public static final String DAG_AM_STAGING_DIR = TEZ_PREFIX + "staging-dir"; public static final String DAG_AM_STAGING_DIR_DEFAULT = "/tmp/hadoop-yarn/staging"; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index 63aff0a..37587e4 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -532,7 +532,7 @@ public class DAGAppMaster extends CompositeService { // TODO Metrics //metrics, //committer, newApiCommitter, - currentUser.getUserName(), appSubmitTime, + currentUser.getShortUserName(), appSubmitTime, //amInfos, taskHeartbeatHandler, context, dagLocationHint); ((RunningAppContext) context).setDAG(newDag); @@ -777,7 +777,7 @@ public class DAGAppMaster extends CompositeService { @Override public String getUser() { - return this.conf.get(TezConfiguration.USER_NAME); + return dag.getUserName(); } @Override @@ -1003,8 +1003,6 @@ public class DAGAppMaster extends CompositeService { // SIGTERM I have a chance to write out the job history. I'll be closing // the objects myself. conf.setBoolean("fs.automatic.close", false); - - conf.set(TezConfiguration.USER_NAME, jobUserName); Map<String, String> config = dagPlan.getConfig(); for(Entry<String, String> entry : config.entrySet()) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index cbec3b2..2575a43 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -103,7 +103,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, private final TezDAGID dagId; private final Clock clock; private final ApplicationACLsManager aclsManager; - private final String username; // TODO Recovery //private final List<AMInfo> amInfos; @@ -327,7 +326,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, Credentials fsTokenCredentials, Clock clock, // TODO Metrics //MRAppMetrics metrics, - String userName, + String appUserName, long appSubmitTime, // TODO Recovery //List<AMInfo> amInfos, @@ -336,16 +335,17 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, DAGLocationHint dagLocationHint) { this.applicationAttemptId = applicationAttemptId; this.dagId = dagId; - this.dagName = dagPlan.getName(); - this.conf = conf; this.dagPlan = dagPlan; + this.conf = conf; + this.dagName = (dagPlan.getName() != null) ? dagPlan.getName() : + "<missing app name>"; + this.userName = appUserName; // TODO Metrics //this.metrics = metrics; this.clock = clock; // TODO Recovery //this.amInfos = amInfos; this.appContext = appContext; - this.userName = userName; this.queueName = conf.get(MRJobConfig.QUEUE_NAME, "default"); this.appSubmitTime = appSubmitTime; @@ -360,9 +360,6 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, this.jobTokenSecretManager = jobTokenSecretManager; this.aclsManager = new ApplicationACLsManager(conf); - this.username = System.getProperty("user.name"); - // TODO Construct ApplicationACLs - // this.appACLs; this.dagLocationHint = dagLocationHint; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java index d8780fe..31005b6 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java @@ -463,7 +463,7 @@ public class TaskSchedulerEventHandler extends AbstractService taskAttempt.getID().getTaskID().getVertexID(), event.getJobToken(), // TODO getConf from AMSchedulerEventTALaunchRequest - event.getCredentials(), false, event.getConf(), + event.getCredentials(), false, event.getConf(), taskAttempt.getLocalResources(), taskAttempt.getEnvironment(), taskAttempt.getJavaOpts())); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java index 2f5f498..c02ed7a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerHelpers.java @@ -28,7 +28,6 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -47,7 +46,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.tez.dag.api.DAGConfiguration; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; @@ -90,7 +88,7 @@ public class AMContainerHelpers { private static ContainerLaunchContext createCommonContainerLaunchContext( Map<ApplicationAccessType, String> applicationACLs, TezConfiguration conf, Token<JobTokenIdentifier> jobToken, - TezVertexID vertexId, Credentials credentials) { + TezVertexID vertexId, Credentials credentials, AppContext appContext) { // Application resources Map<String, LocalResource> localResources = @@ -139,7 +137,7 @@ public class AMContainerHelpers { // The null fields are per-container and will be constructed for each // container separately. ContainerLaunchContext container = BuilderUtils.newContainerLaunchContext( - conf.get(TezConfiguration.USER_NAME), localResources, + appContext.getDAG().getUserName(), localResources, environment, null, serviceData, taskCredentialsBuffer, applicationACLs); return container; @@ -159,7 +157,7 @@ public class AMContainerHelpers { synchronized (commonContainerSpecLock) { if (commonContainerSpec == null) { commonContainerSpec = createCommonContainerLaunchContext( - acls, conf, jobToken, vertexId, credentials); + acls, conf, jobToken, vertexId, credentials, appContext); } } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java index 3add31c..01d4fe0 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/DeprecatedKeys.java @@ -110,17 +110,10 @@ public class DeprecatedKeys { mrParamToDAGParamMap.put(MRJobConfig.APPLICATION_TOKENS_FILE, TezConfiguration.APPLICATION_TOKENS_FILE); - mrParamToDAGParamMap.put(MRJobConfig.JOB_NAME, TezConfiguration.JOB_NAME); - -// mrParamToDAGParamMap.put(MRJobConfig.MR_AM_JOB_SPECULATOR, -// TezConfiguration.DAG_AM_SPECULATOR_CLASS); - // TODO Default value handling. mrParamToDAGParamMap.put(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT, TezConfiguration.DAG_AM_TASK_LISTENER_THREAD_COUNT); - mrParamToDAGParamMap.put(MRJobConfig.USER_NAME, TezConfiguration.USER_NAME); - mrParamToDAGParamMap.put(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, TezConfiguration.DAG_MAX_TASK_FAILURES_PER_NODE); mrParamToDAGParamMap.put(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/e3c8f208/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index b4c6319..35aa0bb 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -699,13 +699,14 @@ public class YARNRunner implements ClientProtocol { for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) { if (mrConf.get(entry.getKey()) != null) { LOG.info("DEBUG: MR->DAG Setting new key: " + entry.getValue()); - if(entry.getValue().equals(TezConfiguration.JOB_NAME)) { - dag.setName(mrConf.get(entry.getKey())); - } else { - dag.addConfiguration(entry.getValue(), mrConf.get(entry.getKey())); - } + dag.addConfiguration(entry.getValue(), mrConf.get(entry.getKey())); } } + + String jobName = mrConf.get(MRJobConfig.JOB_NAME); + if(jobName != null) { + dag.setName(jobName); + } } private ApplicationSubmissionContext createApplicationSubmissionContext(
