Repository: oozie Updated Branches: refs/heads/master 9acaf0431 -> b54417b79
OOZIE-2102. Streaming actions are broken cause of incorrect method signature. Contributed by Harsh J. Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/b54417b7 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/b54417b7 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/b54417b7 Branch: refs/heads/master Commit: b54417b79b12b20450ff4758f3b690d4200a83dd Parents: 9acaf04 Author: Harsh J <[email protected]> Authored: Sat Dec 27 15:31:03 2014 +0530 Committer: Harsh J <[email protected]> Committed: Tue Dec 30 17:23:09 2014 +0530 ---------------------------------------------------------------------- .../org/apache/oozie/action/hadoop/PipesMain.java | 2 -- .../apache/oozie/action/hadoop/StreamingMain.java | 17 +++++++---------- 2 files changed, 7 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/b54417b7/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java index 0d38040..bf91b43 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java @@ -65,8 +65,6 @@ public class PipesMain extends MapReduceMain { } } - addActionConf(jobConf, jobConf); - //propagate delegation related props from launcher job to MR job if (getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION") != null) { jobConf.set("mapreduce.job.credentials.binary", getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION")); http://git-wip-us.apache.org/repos/asf/oozie/blob/b54417b7/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java index b412490..c65c859 100644 --- a/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java +++ b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java @@ -29,9 +29,8 @@ public class StreamingMain extends MapReduceMain { run(StreamingMain.class, args); } - protected RunningJob submitJob(Configuration actionConf) throws Exception { - JobConf jobConf = new JobConf(); - + @Override + protected RunningJob submitJob(JobConf jobConf) throws Exception { jobConf.set("mapred.mapper.class", "org.apache.hadoop.streaming.PipeMapper"); jobConf.set("mapred.reducer.class", "org.apache.hadoop.streaming.PipeReducer"); jobConf.set("mapred.map.runner.class", "org.apache.hadoop.streaming.PipeMapRunner"); @@ -46,24 +45,24 @@ public class StreamingMain extends MapReduceMain { jobConf.set("stream.addenvironment", ""); - String value = actionConf.get("oozie.streaming.mapper"); + String value = jobConf.get("oozie.streaming.mapper"); if (value != null) { jobConf.set("stream.map.streamprocessor", value); } - value = actionConf.get("oozie.streaming.reducer"); + value = jobConf.get("oozie.streaming.reducer"); if (value != null) { jobConf.set("stream.reduce.streamprocessor", value); } - value = actionConf.get("oozie.streaming.record-reader"); + value = jobConf.get("oozie.streaming.record-reader"); if (value != null) { jobConf.set("stream.recordreader.class", value); } - String[] values = getStrings(actionConf, "oozie.streaming.record-reader-mapping"); + String[] values = getStrings(jobConf, "oozie.streaming.record-reader-mapping"); for (String s : values) { String[] kv = s.split("="); jobConf.set("stream.recordreader." + kv[0], kv[1]); } - values = getStrings(actionConf, "oozie.streaming.env"); + values = getStrings(jobConf, "oozie.streaming.env"); value = jobConf.get("stream.addenvironment", ""); if (value.length() > 0) { value = value + " "; @@ -73,8 +72,6 @@ public class StreamingMain extends MapReduceMain { } jobConf.set("stream.addenvironment", value); - addActionConf(jobConf, actionConf); - // propagate delegation related props from launcher job to MR job if (getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION") != null) { jobConf.set("mapreduce.job.credentials.binary", getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION"));
