TEZ-92. Pass m/r java-opts to DAG plan
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1b0fc357 Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1b0fc357 Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1b0fc357 Branch: refs/heads/master Commit: 1b0fc35734a07defe05d87b27db5188eb989901a Parents: 9fb2170 Author: Mike Liddell <[email protected]> Authored: Tue May 21 13:22:27 2013 -0700 Committer: Mike Liddell <[email protected]> Committed: Tue May 21 13:23:57 2013 -0700 ---------------------------------------------------------------------- .../java/org/apache/tez/mapreduce/YARNRunner.java | 51 ++++++++++++++- 1 files changed, 49 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1b0fc357/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index fcb2ca1..7582a5e 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -509,7 +509,7 @@ public class YARNRunner implements ClientProtocol { } } - private Vertex configureReduceStage(FileSystem fs, JobID jobId, + private Vertex configureIntermediateReduceStage(FileSystem fs, JobID jobId, Configuration jobConf, String jobSubmitDir, Credentials ts, Map<String, LocalResource> jobLocalResources, int iReduceIndex) throws IOException { @@ -538,6 +538,8 @@ public class YARNRunner implements ClientProtocol { vertex.setTaskLocationsHint(null); vertex.setTaskResource(reduceResource); + vertex.setJavaOpts(getReduceJavaOpts(conf)); + return vertex; } @@ -551,9 +553,15 @@ public class YARNRunner implements ClientProtocol { Vertex[] vertices = new Vertex[numIntermediateStages]; for (int i = 0; i < numIntermediateStages; i++) { - vertices[i] = configureReduceStage(fs, jobId, jobConf, jobSubmitDir, ts, + vertices[i] = configureIntermediateReduceStage(fs, jobId, jobConf, jobSubmitDir, ts, jobLocalResources, i); dag.addVertex(vertices[i]); + + LOG.info("XXXX Adding intermediate vertex to DAG" + + ", vertexName=" + vertices[i].getVertexName() + + ", processor=" + vertices[i].getProcessorName() + + ", parrellism=" + vertices[i].getParallelism() + + ", javaOpts=" + vertices[i].getJavaOpts()); } return vertices; } @@ -604,6 +612,8 @@ public class YARNRunner implements ClientProtocol { mapVertex.setTaskLocationsHint(inputSplitLocations); mapVertex.setTaskResource(mapResource); + mapVertex.setJavaOpts(getMapJavaOpts(jobConf)); + LOG.info("XXXX Adding map vertex to DAG" + ", vertexName=" + mapVertex.getVertexName() + ", processor=" + mapVertex.getProcessorName() @@ -645,6 +655,8 @@ public class YARNRunner implements ClientProtocol { reduceVertex.setTaskLocationsHint(null); reduceVertex.setTaskResource(reduceResource); + reduceVertex.setJavaOpts(getReduceJavaOpts(jobConf)); + LOG.info("XXXX Adding reduce vertex to DAG" + ", vertexName=" + reduceVertex.getVertexName() + ", processor=" + reduceVertex.getProcessorName() @@ -1114,4 +1126,39 @@ public class YARNRunner implements ClientProtocol { envConf + " config settings."); } } + + private String getMapJavaOpts(Configuration jobConf) { + // follows pattern from YARN MapReduceChildJVM.java + String adminOpts = ""; + adminOpts = jobConf.get( + MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, + MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); + + String userOpts = ""; + userOpts = + jobConf.get( + MRJobConfig.MAP_JAVA_OPTS, // same as JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + jobConf.get( + JobConf.MAPRED_TASK_JAVA_OPTS, + JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)); + + return adminOpts + " " + userOpts; + } + + private String getReduceJavaOpts(Configuration jobConf) { + // follows pattern from YARN MapReduceChildJVM.java + String adminOpts = ""; + adminOpts = jobConf.get( + MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, + MRJobConfig.DEFAULT_MAPRED_ADMIN_JAVA_OPTS); + + String userOpts = ""; + userOpts = + jobConf.get( + MRJobConfig.REDUCE_JAVA_OPTS, // same as JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS + jobConf.get( + JobConf.MAPRED_TASK_JAVA_OPTS, + JobConf.DEFAULT_MAPRED_TASK_JAVA_OPTS)); + return adminOpts + " " + userOpts; + } }
