Repository: oozie Updated Branches: refs/heads/master 532dce684 -> 746be5c29
OOZIE-2158 Overrides in action conf in streaming action do not work (rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/746be5c2 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/746be5c2 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/746be5c2 Branch: refs/heads/master Commit: 746be5c2917310ab7202767a1a6bb2c79ad26dd6 Parents: 532dce6 Author: Rohini Palaniswamy <[email protected]> Authored: Mon Mar 2 15:03:01 2015 -0800 Committer: Rohini Palaniswamy <[email protected]> Committed: Mon Mar 2 15:03:01 2015 -0800 ---------------------------------------------------------------------- release-log.txt | 1 + .../apache/oozie/action/hadoop/PipesMain.java | 20 ++++--- .../oozie/action/hadoop/StreamingMain.java | 21 ++++--- .../hadoop/TestMapReduceActionExecutor.java | 60 ++++++++++++++++++-- 4 files changed, 82 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/746be5c2/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index ca6dfc2..0851d1c 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.2.0 release (trunk - unreleased) +OOZIE-2158 Overrides in action conf in streaming action do not work (rohini) OOZIE-2042 Max substitution for config variables should be configurable (seoeun25 via puru) OOZIE-1913 Devise a way to turn off SLA alerts for bundle/coordinator flexibly (puru) OOZIE-2071 Add a Spark example (pavan kumar via rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/746be5c2/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java index bf91b43..5b5e9db 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PipesMain.java @@ -32,32 +32,32 @@ public class PipesMain extends MapReduceMain { } @Override - protected RunningJob submitJob(JobConf jobConf) throws Exception { - String value = jobConf.get("oozie.pipes.map"); + protected void addActionConf(JobConf jobConf, Configuration actionConf) { + String value = actionConf.get("oozie.pipes.map"); if (value != null) { jobConf.setBoolean("hadoop.pipes.java.mapper", true); jobConf.set("mapred.mapper.class", value); } - value = jobConf.get("oozie.pipes.reduce"); + value = actionConf.get("oozie.pipes.reduce"); if (value != null) { jobConf.setBoolean("hadoop.pipes.java.reducer", true); jobConf.set("mapred.reducer.class", value); } - value = jobConf.get("oozie.pipes.inputformat"); + value = actionConf.get("oozie.pipes.inputformat"); if (value != null) { jobConf.setBoolean("hadoop.pipes.java.recordreader", true); jobConf.set("mapred.input.format.class", value); } - value = jobConf.get("oozie.pipes.partitioner"); + value = actionConf.get("oozie.pipes.partitioner"); if (value != null) { jobConf.set("mapred.partitioner.class", value); } - value = jobConf.get("oozie.pipes.writer"); + value = actionConf.get("oozie.pipes.writer"); if (value != null) { jobConf.setBoolean("hadoop.pipes.java.recordwriter", true); jobConf.set("mapred.output.format.class", value); } - value = jobConf.get("oozie.pipes.program"); + value = actionConf.get("oozie.pipes.program"); if (value != null) { jobConf.set("hadoop.pipes.executable", value); if (value.contains("#")) { @@ -65,6 +65,12 @@ public class PipesMain extends MapReduceMain { } } + super.addActionConf(jobConf, actionConf); + } + + @Override + protected RunningJob submitJob(JobConf jobConf) throws Exception { + //propagate delegation related props from launcher job to MR job if (getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION") != null) { jobConf.set("mapreduce.job.credentials.binary", getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION")); http://git-wip-us.apache.org/repos/asf/oozie/blob/746be5c2/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java index c65c859..991bf7e 100644 --- a/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java +++ b/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java @@ -29,8 +29,9 @@ public class StreamingMain extends MapReduceMain { run(StreamingMain.class, args); } + @Override - protected RunningJob submitJob(JobConf jobConf) throws Exception { + protected void addActionConf(JobConf jobConf, Configuration actionConf) { jobConf.set("mapred.mapper.class", "org.apache.hadoop.streaming.PipeMapper"); jobConf.set("mapred.reducer.class", "org.apache.hadoop.streaming.PipeReducer"); jobConf.set("mapred.map.runner.class", "org.apache.hadoop.streaming.PipeMapRunner"); @@ -43,26 +44,24 @@ public class StreamingMain extends MapReduceMain { jobConf.set("mapred.create.symlink", "yes"); jobConf.set("mapred.used.genericoptionsparser", "true"); - jobConf.set("stream.addenvironment", ""); - - String value = jobConf.get("oozie.streaming.mapper"); + String value = actionConf.get("oozie.streaming.mapper"); if (value != null) { jobConf.set("stream.map.streamprocessor", value); } - value = jobConf.get("oozie.streaming.reducer"); + value = actionConf.get("oozie.streaming.reducer"); if (value != null) { jobConf.set("stream.reduce.streamprocessor", value); } - value = jobConf.get("oozie.streaming.record-reader"); + value = actionConf.get("oozie.streaming.record-reader"); if (value != null) { jobConf.set("stream.recordreader.class", value); } - String[] values = getStrings(jobConf, "oozie.streaming.record-reader-mapping"); + String[] values = getStrings(actionConf, "oozie.streaming.record-reader-mapping"); for (String s : values) { String[] kv = s.split("="); jobConf.set("stream.recordreader." + kv[0], kv[1]); } - values = getStrings(jobConf, "oozie.streaming.env"); + values = getStrings(actionConf, "oozie.streaming.env"); value = jobConf.get("stream.addenvironment", ""); if (value.length() > 0) { value = value + " "; @@ -72,6 +71,12 @@ public class StreamingMain extends MapReduceMain { } jobConf.set("stream.addenvironment", value); + super.addActionConf(jobConf, actionConf); + } + + @Override + protected RunningJob submitJob(JobConf jobConf) throws Exception { + // propagate delegation related props from launcher job to MR job if (getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION") != null) { jobConf.set("mapreduce.job.credentials.binary", getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION")); http://git-wip-us.apache.org/repos/asf/oozie/blob/746be5c2/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index 3204c00..d4095da 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -20,8 +20,10 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.RunningJob; @@ -45,7 +47,9 @@ import org.apache.oozie.util.ClassUtils; import org.jdom.Element; import java.io.File; +import java.io.FileNotFoundException; import java.io.FileWriter; +import java.io.IOException; import java.io.OutputStream; import java.io.InputStream; import java.io.FileInputStream; @@ -55,6 +59,7 @@ import java.io.OutputStreamWriter; import java.io.StringReader; import java.net.URI; import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Scanner; @@ -63,6 +68,8 @@ import java.util.regex.Pattern; import java.util.zip.ZipEntry; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.util.PropertiesUtils; @@ -799,7 +806,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { return conf; } - public void testStreaming() throws Exception { + private void runStreamingWordCountJob(Path inputDir, Path outputDir, XConfiguration streamingConf) throws Exception { FileSystem fs = getFileSystem(); Path streamingJar = new Path(getFsTestCaseDir(), "jar/hadoop-streaming.jar"); @@ -807,9 +814,6 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { OutputStream os = fs.create(new Path(getAppPath(), streamingJar)); IOUtils.copyStream(is, os); - Path inputDir = new Path(getFsTestCaseDir(), "input"); - Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); w.write("dummy\n"); w.write("dummy\n"); @@ -818,11 +822,57 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + " <streaming>" + " <mapper>cat</mapper>" + " <reducer>wc</reducer>" + " </streaming>" - + getStreamingConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "<file>" + + streamingConf.toXmlString(false) + "<file>" + streamingJar + "</file>" + "</map-reduce>"; _testSubmit("streaming", actionXml); } + public void testStreaming() throws Exception { + FileSystem fs = getFileSystem(); + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output"); + final XConfiguration streamingConf = getStreamingConfig(inputDir.toString(), outputDir.toString()); + + runStreamingWordCountJob(inputDir, outputDir, streamingConf); + + final FSDataInputStream dis = fs.open(getOutputFile(outputDir, fs)); + final List<String> lines = org.apache.commons.io.IOUtils.readLines(dis); + dis.close(); + assertEquals(1, lines.size()); + // Not sure why it is 14 instead of 12. \n twice ?? + assertEquals("2 2 14", lines.get(0).trim()); + } + + public void testStreamingConfOverride() throws Exception { + FileSystem fs = getFileSystem(); + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output"); + final XConfiguration streamingConf = getStreamingConfig(inputDir.toString(), outputDir.toString()); + streamingConf.set("mapred.output.format.class", "org.apache.hadoop.mapred.SequenceFileOutputFormat"); + + runStreamingWordCountJob(inputDir, outputDir, streamingConf); + + SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, getOutputFile(outputDir, fs), getFileSystem().getConf()); + Text key = new Text(), value = new Text(); + if (seqFile.next(key, value)) { + assertEquals("2 2 14", key.toString().trim()); + assertEquals("", value.toString()); + } + assertFalse(seqFile.next(key, value)); + seqFile.close(); + } + + private Path getOutputFile(Path outputDir, FileSystem fs) throws FileNotFoundException, IOException { + final FileStatus[] files = fs.listStatus(outputDir, new PathFilter() { + + @Override + public boolean accept(Path path) { + return path.getName().startsWith("part"); + } + }); + return files[0].getPath(); //part-[m/r]-00000 + } + protected XConfiguration getPipesConfig(String inputDir, String outputDir) { XConfiguration conf = new XConfiguration(); conf.setBoolean("hadoop.pipes.java.recordreader", true);
