Updated Branches: refs/heads/TEZ-1 45ace5414 -> 8cd1ce6e9
TEZ-50. In TaskAttempt get user and job name from API (bikas) Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/8cd1ce6e Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/8cd1ce6e Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/8cd1ce6e Branch: refs/heads/TEZ-1 Commit: 8cd1ce6e968ab0212e16edcdcbcf5bad2fcbae15 Parents: 45ace54 Author: bikassaha <[email protected]> Authored: Mon May 6 10:59:23 2013 -0700 Committer: bikassaha <[email protected]> Committed: Mon May 6 10:59:23 2013 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/tez/dag/api/DAG.java | 10 +++++++--- .../org/apache/tez/dag/api/DAGConfiguration.java | 14 ++++++++++++++ .../java/org/apache/tez/dag/app/DAGAppMaster.java | 2 +- .../org/apache/tez/dag/app/dag/impl/DAGImpl.java | 5 ++--- .../tez/dag/app/dag/impl/TaskAttemptImpl.java | 6 ++++-- .../java/org/apache/tez/mapreduce/YARNRunner.java | 6 +++++- 6 files changed, 33 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8cd1ce6e/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java index f0392cc..a876abd 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAG.java @@ -23,7 +23,8 @@ import java.util.List; public class DAG { // FIXME rename to Topology List<Vertex> vertices; - List<Edge> edges; + List<Edge> edges; + String name = "TezDagApplication"; HashMap<String, String> config = new HashMap<String, String>(); @@ -65,18 +66,21 @@ public class DAG { // FIXME rename to Topology public void addConfiguration(String key, String value) { config.put(key, value); } + + public void setName(String name) { + this.name = name; + } public void verify() throws TezException { // FIXME better exception - //FIXME are task resources compulsory or will the DAG AM put in a default //for each vertex if not specified? - } // FIXME DAGConfiguration is not public API public DAGConfiguration serializeDag() { DAGConfiguration dagConf = new DAGConfiguration(); + dagConf.setName(name); dagConf.setVertices(vertices); dagConf.setEdgeProperties(edges); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8cd1ce6e/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java ---------------------------------------------------------------------- diff --git a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java index 0a40ead..6e57e4e 100644 --- a/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java +++ b/tez-dag-api/src/main/java/org/apache/tez/dag/api/DAGConfiguration.java @@ -351,6 +351,20 @@ public class DAGConfiguration extends Configuration { return env; } + + public final String TEZ_DAG_NAME = DAG + "name"; + @Private + public void setName(String name) { + setStrings(name, name); + } + + @Public + @Stable + public String getName() { + String[] name = getStrings(TEZ_DAG_NAME); + assert name != null && name.length == 1; + return name[0]; + } @Private public void setVertices(List<Vertex> vertices) { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8cd1ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java index e74294c..63aff0a 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java @@ -221,7 +221,7 @@ public class DAGAppMaster extends CompositeService { // Job name is the same as the app name util we support DAG of jobs // for an app later - appName = dagPlan.get(TezConfiguration.JOB_NAME, "<missing app name>"); + appName = dagPlan.getName(); dagId = new TezDAGID(appAttemptID.getApplicationId(), 1); http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8cd1ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java index 7215d30..cbec3b2 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/DAGImpl.java @@ -336,8 +336,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, DAGLocationHint dagLocationHint) { this.applicationAttemptId = applicationAttemptId; this.dagId = dagId; - this.dagName = conf.get(TezConfiguration.JOB_NAME, - TezConfiguration.JOB_NAME_DEFAULT); + this.dagName = dagPlan.getName(); this.conf = conf; this.dagPlan = dagPlan; // TODO Metrics @@ -663,7 +662,7 @@ public class DAGImpl implements org.apache.tez.dag.app.dag.DAG, public String getUserName() { return userName; } - + @Override public String getQueueName() { return queueName; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8cd1ce6e/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java ---------------------------------------------------------------------- diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java index 0ca41c7..f61c049 100644 --- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java +++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java @@ -63,6 +63,7 @@ import org.apache.tez.dag.api.records.TaskAttemptState; import org.apache.tez.dag.app.AppContext; import org.apache.tez.dag.app.TaskAttemptListener; import org.apache.tez.dag.app.TaskHeartbeatHandler; +import org.apache.tez.dag.app.dag.DAG; import org.apache.tez.dag.app.dag.Task; import org.apache.tez.dag.app.dag.TaskAttempt; import org.apache.tez.dag.app.dag.TaskAttemptStateInternal; @@ -88,7 +89,6 @@ import org.apache.tez.dag.app.dag.event.TaskEventType; import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptFetchFailure; import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded; import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest; -import org.apache.tez.dag.app.speculate.SpeculatorEvent; import org.apache.tez.dag.history.DAGHistoryEvent; import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent; import org.apache.tez.dag.history.events.TaskAttemptStartedEvent; @@ -298,9 +298,11 @@ public class TaskAttemptImpl implements TaskAttempt, TezTaskContext createRemoteTask() { Vertex vertex = getTask().getVertex(); + DAG dag = vertex.getDAG(); // TODO TEZ-50 user and jobname - return new TezEngineTaskContext(getID(), "user", "jobname", getTask() + return new TezEngineTaskContext(getID(), dag.getUserName(), + dag.getName(), getTask() .getVertex().getName(), mrxModuleClassName, vertex.getInputSpecList(), vertex.getOutputSpecList()); } http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/8cd1ce6e/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index 691c7d1..b4c6319 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -699,7 +699,11 @@ public class YARNRunner implements ClientProtocol { for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) { if (mrConf.get(entry.getKey()) != null) { LOG.info("DEBUG: MR->DAG Setting new key: " + entry.getValue()); - dag.addConfiguration(entry.getValue(), mrConf.get(entry.getKey())); + if(entry.getValue().equals(TezConfiguration.JOB_NAME)) { + dag.setName(mrConf.get(entry.getKey())); + } else { + dag.addConfiguration(entry.getValue(), mrConf.get(entry.getKey())); + } } } }
