TEZ-152. Add a way for MR tasks to figure out which stage they belong to. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/7d401d2c Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/7d401d2c Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/7d401d2c Branch: refs/heads/master Commit: 7d401d2cd0af2c68ff2a060849b71e1b0796c27d Parents: ef37823 Author: Siddharth Seth <[email protected]> Authored: Tue May 21 18:51:32 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue May 21 18:51:32 2013 -0700 ---------------------------------------------------------------------- .../apache/tez/mapreduce/hadoop/MRJobConfig.java | 2 + .../mapreduce/hadoop/MultiStageMRConfigUtil.java | 17 +++++++++++++++ .../apache/tez/mapreduce/task/MRRuntimeTask.java | 1 + 3 files changed, 20 insertions(+), 0 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7d401d2c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java index 7e650b1..a42597e 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java @@ -649,6 +649,8 @@ public interface MRJobConfig { public static final String MRR_VERTEX_PREFIX = "mrr.vertex."; + public static final String VERTEX_NAME = "mapreduce.task.vertex.name"; + // Stage specific properties // Format of each property is mapred.ireducer.stage.<stage-num>.<suffix> // where suffix is one of MRR_INTERMEDIATE_STAGE_* fields defined below. http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7d401d2c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java index b4b34e0..fd3dd78 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MultiStageMRConfigUtil.java @@ -96,14 +96,31 @@ public class MultiStageMRConfigUtil { public static String getInitialMapVertexName() { return INITIAL_MAP_VERTEX_NAME; } + + public boolean isInitialMapVertex(String vertexName) { + return vertexName.equals(INITIAL_MAP_VERTEX_NAME); + } public static String getFinalReduceVertexName() { return FINAL_REDUCE_VERTEX_NAME; } + public boolean isFinalReduceVertex(String vertexName) { + return vertexName.equals(FINAL_REDUCE_VERTEX_NAME); + } + public static String getIntermediateStageVertexName(int stageNum) { return INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + stageNum; } + + public static int getIntermediateStageNum(String vertexName) { + if (vertexName.matches(INTERMEDIATE_TASK_VERTEX_NAME_PREFIX + "\\d+")) { + return Integer.parseInt(vertexName + .substring(INTERMEDIATE_TASK_VERTEX_NAME_PREFIX.length())); + } else { + return -1; + } + } // Returns config settings specific to named vertex public static Configuration getBasicConfForVertex(Configuration baseConf, http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/7d401d2c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java ---------------------------------------------------------------------- diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java index e5dfd66..34aaa1b 100644 --- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java +++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MRRuntimeTask.java @@ -97,6 +97,7 @@ public class MRRuntimeTask extends RuntimeTask { // submit configuration parameters to the AM and effectively tasks via RPC. final JobConf job = new JobConf(taskConf); + job.set(MRJobConfig.VERTEX_NAME, taskContext.getVertexName()); MRTask mrTask = (MRTask) getProcessor(); this.mrTask = mrTask;
