Added: oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java URL: http://svn.apache.org/viewvc/oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java?rev=1466926&view=auto ============================================================================== --- oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java (added) +++ oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java Thu Apr 11 15:40:45 2013 @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.oozie.WorkflowActionBean; +import org.apache.oozie.WorkflowJobBean; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XmlUtils; +import org.jdom.Element; +import org.jdom.Namespace; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.StringReader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +public class TestSqoopActionExecutor extends ActionExecutorTestCase { + + private static final String SQOOP_COMMAND = "import --connect {0} --table TT --target-dir {1} -m 1"; + + private static final String SQOOP_ACTION_COMMAND_XML = + "<sqoop xmlns=\"uri:oozie:sqoop-action:0.1\">" + + "<job-tracker>{0}</job-tracker>" + + "<name-node>{1}</name-node>" + + "<configuration>" + + "<property>" + + "<name>{2}</name>" + + "<value>{3}</value>" + + "</property>" + + "</configuration>" + + "<command>{4}</command>" + + "</sqoop>"; + + private static final String SQOOP_ACTION_ARGS_XML = + "<sqoop xmlns=\"uri:oozie:sqoop-action:0.1\">" + + "<job-tracker>{0}</job-tracker>" + + "<name-node>{1}</name-node>" + + "<configuration>" + + "<property>" + + "<name>oozie.sqoop.log.level</name>" + + "<value>INFO</value>" + + "</property>" + + "</configuration>" + + "<arg>import</arg>" + + "<arg>--connect</arg>" + + "<arg>{2}</arg>" + + "<arg>--username</arg>" + + "<arg>sa</arg>" + + "<arg>--password</arg>" + + "<arg></arg>" + + "<arg>--verbose</arg>" + + "<arg>--query</arg>" + + "<arg>{3}</arg>" + + "<arg>--target-dir</arg>" + + "<arg>{4}</arg>" + + "<arg>--split-by</arg>" + + "<arg>I</arg>" + + "</sqoop>"; + + protected void setSystemProps() throws Exception { + super.setSystemProps(); + setSystemProperty("oozie.service.ActionService.executor.ext.classes", SqoopActionExecutor.class.getName()); + } + + public void testSetupMethods() throws Exception { + SqoopActionExecutor ae = new SqoopActionExecutor(); + assertEquals("sqoop", ae.getType()); + } + + public void testLauncherJar() throws Exception { + SqoopActionExecutor ae = new SqoopActionExecutor(); + Path jar = new Path(ae.getOozieRuntimeDir(), ae.getLauncherJarName()); + assertTrue(new File(jar.toString()).exists()); + } + + private String getDbFile() { + return "db.hsqldb"; + } + + private String getDbPath() { + return new File(getTestCaseDir(), getDbFile()).getAbsolutePath(); + } + + private String getLocalJdbcUri() { + return "jdbc:hsqldb:file:" + getDbPath() + ";shutdown=true"; + } + + private String getActionJdbcUri() { + return "jdbc:hsqldb:file:" + getDbFile(); + } + + private String getSqoopOutputDir() { + return new Path(getFsTestCaseDir(), "output").toString(); + } + + private String getActionXml() { + String command = MessageFormat.format(SQOOP_COMMAND, getActionJdbcUri(), getSqoopOutputDir()); + return MessageFormat.format(SQOOP_ACTION_COMMAND_XML, getJobTrackerUri(), getNameNodeUri(), + "dummy", "dummyValue", command); + } + + private String getActionXmlFreeFromQuery() { + String query = "select TT.I, TT.S from TT where $CONDITIONS"; + return MessageFormat.format(SQOOP_ACTION_ARGS_XML, getJobTrackerUri(), getNameNodeUri(), + getActionJdbcUri(), query, getSqoopOutputDir()); + } + + private void createDB() throws Exception { + Class.forName("org.hsqldb.jdbcDriver"); + Connection conn = DriverManager.getConnection(getLocalJdbcUri(), "sa", ""); + Statement st = conn.createStatement(); + st.executeUpdate("CREATE TABLE TT (I INTEGER PRIMARY KEY, S VARCHAR(256))"); + st.executeUpdate("INSERT INTO TT (I, S) VALUES (1, 'a')"); + st.executeUpdate("INSERT INTO TT (I, S) VALUES (2, 'a')"); + st.executeUpdate("INSERT INTO TT (I, S) VALUES (3, 'a')"); + st.close(); + conn.close(); + } + + public void testSqoopAction() throws Exception { + createDB(); + + Context context = createContext(getActionXml()); + final RunningJob launcherJob = submitAction(context); + String launcherId = context.getAction().getExternalId(); + waitFor(120 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return launcherJob.isComplete(); + } + }); + assertTrue(launcherJob.isSuccessful()); + + assertFalse(LauncherMapper.hasIdSwap(launcherJob)); + + SqoopActionExecutor ae = new SqoopActionExecutor(); + ae.check(context, context.getAction()); + assertTrue(launcherId.equals(context.getAction().getExternalId())); + assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertNotNull(context.getAction().getData()); + assertNotNull(context.getAction().getExternalChildIDs()); + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); + + String hadoopCounters = context.getVar(MapReduceActionExecutor.HADOOP_COUNTERS); + assertNotNull(hadoopCounters); + assertFalse(hadoopCounters.isEmpty()); + + FileSystem fs = getFileSystem(); + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(getSqoopOutputDir(), "part-m-00000")))); + int count = 0; + String line = br.readLine(); + while (line != null) { + assertTrue(line.contains("a")); + count++; + line = br.readLine(); + } + br.close(); + assertEquals(3, count); + + assertNotNull(context.getAction().getData()); + Properties outputData = new Properties(); + outputData.load(new StringReader(context.getAction().getData())); + assertTrue(outputData.containsKey(LauncherMain.HADOOP_JOBS)); + assertTrue(outputData.getProperty(LauncherMain.HADOOP_JOBS).trim().length() > 0); + } + + public void testSqoopActionFreeFormQuery() throws Exception { + createDB(); + + Context context = createContext(getActionXmlFreeFromQuery()); + final RunningJob launcherJob = submitAction(context); + String launcherId = context.getAction().getExternalId(); + waitFor(120 * 1000, new Predicate() { + public boolean evaluate() throws Exception { + return launcherJob.isComplete(); + } + }); + assertTrue(launcherJob.isSuccessful()); + + assertFalse(LauncherMapper.hasIdSwap(launcherJob)); + + SqoopActionExecutor ae = new SqoopActionExecutor(); + ae.check(context, context.getAction()); + assertTrue(launcherId.equals(context.getAction().getExternalId())); + assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); + assertNotNull(context.getAction().getData()); + assertNotNull(context.getAction().getExternalChildIDs()); + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); + + String hadoopCounters = context.getVar(MapReduceActionExecutor.HADOOP_COUNTERS); + assertNotNull(hadoopCounters); + assertFalse(hadoopCounters.isEmpty()); + + FileSystem fs = getFileSystem(); + FileStatus[] parts = fs.listStatus(new Path(getSqoopOutputDir()), new PathFilter() { + @Override + public boolean accept(Path path) { + return path.getName().startsWith("part-"); + } + }); + int count = 0; + for (FileStatus part : parts) { + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(part.getPath()))); + String line = br.readLine(); + while (line != null) { + assertTrue(line.contains("a")); + count++; + line = br.readLine(); + } + br.close(); + } + assertEquals(3, count); + + assertNotNull(context.getAction().getData()); + Properties outputData = new Properties(); + outputData.load(new StringReader(context.getAction().getData())); + assertTrue(outputData.containsKey(LauncherMain.HADOOP_JOBS)); + assertTrue(outputData.getProperty(LauncherMain.HADOOP_JOBS).trim().length() > 0); + } + + + private RunningJob submitAction(Context context) throws Exception { + SqoopActionExecutor ae = new SqoopActionExecutor(); + + WorkflowAction action = context.getAction(); + + ae.prepareActionDir(getFileSystem(), context); + ae.submitLauncher(getFileSystem(), context, action); + + String jobId = action.getExternalId(); + String jobTracker = action.getTrackerUri(); + String consoleUrl = action.getConsoleUrl(); + assertNotNull(jobId); + assertNotNull(jobTracker); + assertNotNull(consoleUrl); + Element e = XmlUtils.parseXml(action.getConf()); + Namespace ns = Namespace.getNamespace("uri:oozie:sqoop-action:0.1"); + XConfiguration conf = new XConfiguration( + new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString())); + conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns)); + conf.set("fs.default.name", e.getChildTextTrim("name-node", ns)); + conf.set("user.name", context.getProtoActionConf().get("user.name")); + conf.set("mapreduce.framework.name", "yarn"); + conf.set("group.name", getTestGroup()); + + JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); + XConfiguration.copy(conf, jobConf); + String user = jobConf.get("user.name"); + String group = jobConf.get("group.name"); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); + final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); + assertNotNull(runningJob); + return runningJob; + } + + private Context createContext(String actionXml) throws Exception { + SqoopActionExecutor ae = new SqoopActionExecutor(); + + XConfiguration protoConf = new XConfiguration(); + protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); + + + FileSystem fs = getFileSystem(); + SharelibUtils.addToDistributedCache("sqoop", fs, getFsTestCaseDir(), protoConf); + + protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, copyDbToHdfs()); + + WorkflowJobBean wf = createBaseWorkflow(protoConf, "sqoop-action"); + WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); + action.setType(ae.getType()); + action.setConf(actionXml); + + return new Context(wf, action); + } + + private String[] copyDbToHdfs() throws Exception { + List<String> files = new ArrayList<String>(); + String[] exts = {".script", ".properties"}; + for (String ext : exts) { + String file = getDbPath() + ext; + String name = getDbFile() + ext; + Path targetPath = new Path(getAppPath(), name); + FileSystem fs = getFileSystem(); + InputStream is = new FileInputStream(file); + OutputStream os = fs.create(new Path(getAppPath(), targetPath)); + IOUtils.copyStream(is, os); + files.add(targetPath.toString() + "#" + name); + } + return files.toArray(new String[files.size()]); + } +}
Modified: oozie/trunk/sharelib/streaming/pom.xml URL: http://svn.apache.org/viewvc/oozie/trunk/sharelib/streaming/pom.xml?rev=1466926&r1=1466925&r2=1466926&view=diff ============================================================================== --- oozie/trunk/sharelib/streaming/pom.xml (original) +++ oozie/trunk/sharelib/streaming/pom.xml Thu Apr 11 15:40:45 2013 @@ -49,6 +49,37 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-core</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-core</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-hadoop</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-hadoop-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-hcatalog</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> Added: oozie/trunk/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java URL: http://svn.apache.org/viewvc/oozie/trunk/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java?rev=1466926&view=auto ============================================================================== --- oozie/trunk/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java (added) +++ oozie/trunk/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java Thu Apr 11 15:40:45 2013 @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobClient; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RunningJob; + +public class StreamingMain extends MapReduceMain { + + public static void main(String[] args) throws Exception { + run(StreamingMain.class, args); + } + + protected RunningJob submitJob(Configuration actionConf) throws Exception { + JobConf jobConf = new JobConf(); + + jobConf.set("mapred.mapper.class", "org.apache.hadoop.streaming.PipeMapper"); + jobConf.set("mapred.reducer.class", "org.apache.hadoop.streaming.PipeReducer"); + jobConf.set("mapred.map.runner.class", "org.apache.hadoop.streaming.PipeMapRunner"); + + jobConf.set("mapred.input.format.class", "org.apache.hadoop.mapred.TextInputFormat"); + jobConf.set("mapred.output.format.class", "org.apache.hadoop.mapred.TextOutputFormat"); + jobConf.set("mapred.output.value.class", "org.apache.hadoop.io.Text"); + jobConf.set("mapred.output.key.class", "org.apache.hadoop.io.Text"); + + jobConf.set("mapred.create.symlink", "yes"); + jobConf.set("mapred.used.genericoptionsparser", "true"); + + jobConf.set("stream.addenvironment", ""); + + String value = actionConf.get("oozie.streaming.mapper"); + if (value != null) { + jobConf.set("stream.map.streamprocessor", value); + } + value = actionConf.get("oozie.streaming.reducer"); + if (value != null) { + jobConf.set("stream.reduce.streamprocessor", value); + } + value = actionConf.get("oozie.streaming.record-reader"); + if (value != null) { + jobConf.set("stream.recordreader.class", value); + } + String[] values = getStrings(actionConf, "oozie.streaming.record-reader-mapping"); + for (String s : values) { + String[] kv = s.split("="); + jobConf.set("stream.recordreader." + kv[0], kv[1]); + } + values = getStrings(actionConf, "oozie.streaming.env"); + value = jobConf.get("stream.addenvironment", ""); + if (value.length() > 0) { + value = value + " "; + } + for (String s : values) { + value = value + s + " "; + } + jobConf.set("stream.addenvironment", value); + + addActionConf(jobConf, actionConf); + + // propagate delegation related props from launcher job to MR job + if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) { + jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION")); + } + + JobClient jobClient = null; + RunningJob runJob = null; + boolean exception = false; + try { + jobClient = createJobClient(jobConf); + runJob = jobClient.submitJob(jobConf); + } + catch (Exception ex) { + exception = true; + throw ex; + } + finally { + try { + if (jobClient != null) { + jobClient.close(); + } + } + catch (Exception ex) { + if (exception) { + System.out.println("JobClient Error: " + ex); + } + else { + throw ex; + } + } + } + return runJob; + } +} Added: oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java URL: http://svn.apache.org/viewvc/oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java?rev=1466926&view=auto ============================================================================== --- oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java (added) +++ oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java Thu Apr 11 15:40:45 2013 @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.streaming.StreamJob; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.ClassUtils; +import java.io.OutputStream; +import java.io.InputStream; +import java.io.FileInputStream; +import java.io.Writer; +import java.io.OutputStreamWriter; +import org.apache.oozie.util.XConfiguration; + +public class TestMapReduceActionExecutorStreaming extends TestMapReduceActionExecutor { + + @Override + protected void setSystemProps() throws Exception { + super.setSystemProps(); + setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName()); + } + + public void testStreaming() throws Exception { + FileSystem fs = getFileSystem(); + Path streamingJar = new Path(getFsTestCaseDir(), "jar/hadoop-streaming.jar"); + + InputStream is = new FileInputStream(ClassUtils.findContainingJar(StreamJob.class)); + OutputStream os = fs.create(new Path(getAppPath(), streamingJar)); + IOUtils.copyStream(is, os); + + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output"); + + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); + + String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + + getNameNodeUri() + "</name-node>" + " <streaming>" + " <mapper>cat</mapper>" + + " <reducer>wc</reducer>" + " </streaming>" + + getStreamingConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "<file>" + + streamingJar + "</file>" + "</map-reduce>"; + _testSubmit("streaming", actionXml); + } + + protected XConfiguration getStreamingConfig(String inputDir, String outputDir) { + XConfiguration conf = new XConfiguration(); + conf.set("mapred.input.dir", inputDir); + conf.set("mapred.output.dir", outputDir); + return conf; + } +} Added: oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestStreamingMain.java URL: http://svn.apache.org/viewvc/oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestStreamingMain.java?rev=1466926&view=auto ============================================================================== --- oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestStreamingMain.java (added) +++ oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestStreamingMain.java Thu Apr 11 15:40:45 2013 @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.util.XConfiguration; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.Properties; + +public class TestStreamingMain extends MainTestCase { + + public Void call() throws Exception { + FileSystem fs = getFileSystem(); + + Path inputDir = new Path(getFsTestCaseDir(), "input"); + fs.mkdirs(inputDir); + Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + writer.write("hello"); + writer.close(); + + Path outputDir = new Path(getFsTestCaseDir(), "output"); + + XConfiguration jobConf = new XConfiguration(); + XConfiguration.copy(createJobConf(), jobConf); + + jobConf.setInt("mapred.map.tasks", 1); + jobConf.setInt("mapred.map.max.attempts", 1); + jobConf.setInt("mapred.reduce.max.attempts", 1); + + jobConf.set("user.name", getTestUser()); + jobConf.set("hadoop.job.ugi", getTestUser() + "," + getTestGroup()); + + SharelibUtils.addToDistributedCache("streaming", fs, getFsTestCaseDir(), jobConf); + + MapReduceActionExecutor.setStreaming(jobConf, "cat", "wc", null, null, null); + + jobConf.set("mapred.input.dir", inputDir.toString()); + jobConf.set("mapred.output.dir", outputDir.toString()); + + File actionXml = new File(getTestCaseDir(), "action.xml"); + OutputStream os = new FileOutputStream(actionXml); + jobConf.writeXml(os); + os.close(); + + File newIdProperties = new File(getTestCaseDir(), "newId.properties"); + + System.setProperty("oozie.action.conf.xml", actionXml.getAbsolutePath()); + System.setProperty("oozie.action.newId.properties", newIdProperties.getAbsolutePath()); + + StreamingMain.main(null); + + assertTrue(newIdProperties.exists()); + + InputStream is = new FileInputStream(newIdProperties); + Properties props = new Properties(); + props.load(is); + is.close(); + + assertTrue(props.containsKey("id")); + return null; + } + +} Modified: oozie/trunk/src/main/assemblies/partial-sharelib.xml URL: http://svn.apache.org/viewvc/oozie/trunk/src/main/assemblies/partial-sharelib.xml?rev=1466926&r1=1466925&r2=1466926&view=diff ============================================================================== --- oozie/trunk/src/main/assemblies/partial-sharelib.xml (original) +++ oozie/trunk/src/main/assemblies/partial-sharelib.xml Thu Apr 11 15:40:45 2013 @@ -24,7 +24,7 @@ <dependencySets> <dependencySet> - <useProjectArtifact>false</useProjectArtifact> + <useProjectArtifact>true</useProjectArtifact> <outputDirectory>/share/lib/${sharelib.action.postfix}</outputDirectory> <unpack>false</unpack> <useTransitiveDependencies>true</useTransitiveDependencies>
