Added: 
oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java?rev=1466926&view=auto
==============================================================================
--- 
oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
 (added)
+++ 
oozie/trunk/sharelib/sqoop/src/test/java/org/apache/oozie/action/hadoop/TestSqoopActionExecutor.java
 Thu Apr 11 15:40:45 2013
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import org.jdom.Namespace;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.StringReader;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+public class TestSqoopActionExecutor extends ActionExecutorTestCase {
+
+    private static final String SQOOP_COMMAND = "import --connect {0} --table 
TT --target-dir {1} -m 1";
+
+    private static final String SQOOP_ACTION_COMMAND_XML =
+            "<sqoop xmlns=\"uri:oozie:sqoop-action:0.1\">" +
+            "<job-tracker>{0}</job-tracker>" +
+            "<name-node>{1}</name-node>" +
+            "<configuration>" +
+            "<property>" +
+            "<name>{2}</name>" +
+            "<value>{3}</value>" +
+            "</property>" +
+            "</configuration>" +
+            "<command>{4}</command>" +
+            "</sqoop>";
+
+    private static final String SQOOP_ACTION_ARGS_XML =
+            "<sqoop xmlns=\"uri:oozie:sqoop-action:0.1\">" +
+            "<job-tracker>{0}</job-tracker>" +
+            "<name-node>{1}</name-node>" +
+            "<configuration>" +
+            "<property>" +
+            "<name>oozie.sqoop.log.level</name>" +
+            "<value>INFO</value>" +
+            "</property>" +
+            "</configuration>" +
+            "<arg>import</arg>" +
+            "<arg>--connect</arg>" +
+            "<arg>{2}</arg>" +
+            "<arg>--username</arg>" +
+            "<arg>sa</arg>" +
+            "<arg>--password</arg>" +
+            "<arg></arg>" +
+            "<arg>--verbose</arg>" +
+            "<arg>--query</arg>" +
+            "<arg>{3}</arg>" +
+            "<arg>--target-dir</arg>" +
+            "<arg>{4}</arg>" +
+            "<arg>--split-by</arg>" +
+            "<arg>I</arg>" +
+            "</sqoop>";
+
+    protected void setSystemProps() throws Exception {
+        super.setSystemProps();
+        setSystemProperty("oozie.service.ActionService.executor.ext.classes", 
SqoopActionExecutor.class.getName());
+    }
+
+    public void testSetupMethods() throws Exception {
+        SqoopActionExecutor ae = new SqoopActionExecutor();
+        assertEquals("sqoop", ae.getType());
+    }
+
+    public void testLauncherJar() throws Exception {
+        SqoopActionExecutor ae = new SqoopActionExecutor();
+        Path jar = new Path(ae.getOozieRuntimeDir(), ae.getLauncherJarName());
+        assertTrue(new File(jar.toString()).exists());
+    }
+
+    private String getDbFile() {
+        return "db.hsqldb";
+    }
+
+    private String getDbPath() {
+        return new File(getTestCaseDir(), getDbFile()).getAbsolutePath();
+    }
+
+    private String getLocalJdbcUri() {
+        return "jdbc:hsqldb:file:" + getDbPath() + ";shutdown=true";
+    }
+
+    private String getActionJdbcUri() {
+        return "jdbc:hsqldb:file:" + getDbFile();
+    }
+
+    private String getSqoopOutputDir() {
+        return new Path(getFsTestCaseDir(), "output").toString();
+    }
+
+    private String getActionXml() {
+        String command = MessageFormat.format(SQOOP_COMMAND, 
getActionJdbcUri(), getSqoopOutputDir());
+        return MessageFormat.format(SQOOP_ACTION_COMMAND_XML, 
getJobTrackerUri(), getNameNodeUri(),
+                                    "dummy", "dummyValue", command);
+    }
+
+    private String getActionXmlFreeFromQuery() {
+        String query = "select TT.I, TT.S from TT where $CONDITIONS";
+        return MessageFormat.format(SQOOP_ACTION_ARGS_XML, getJobTrackerUri(), 
getNameNodeUri(),
+                                    getActionJdbcUri(), query, 
getSqoopOutputDir());
+    }
+
+    private void createDB() throws Exception {
+        Class.forName("org.hsqldb.jdbcDriver");
+        Connection conn = DriverManager.getConnection(getLocalJdbcUri(), "sa", 
"");
+        Statement st = conn.createStatement();
+        st.executeUpdate("CREATE TABLE TT (I INTEGER PRIMARY KEY, S 
VARCHAR(256))");
+        st.executeUpdate("INSERT INTO TT (I, S) VALUES (1, 'a')");
+        st.executeUpdate("INSERT INTO TT (I, S) VALUES (2, 'a')");
+        st.executeUpdate("INSERT INTO TT (I, S) VALUES (3, 'a')");
+        st.close();
+        conn.close();
+    }
+
+    public void testSqoopAction() throws Exception {
+        createDB();
+
+        Context context = createContext(getActionXml());
+        final RunningJob launcherJob = submitAction(context);
+        String launcherId = context.getAction().getExternalId();
+        waitFor(120 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return launcherJob.isComplete();
+            }
+        });
+        assertTrue(launcherJob.isSuccessful());
+
+        assertFalse(LauncherMapper.hasIdSwap(launcherJob));
+
+        SqoopActionExecutor ae = new SqoopActionExecutor();
+        ae.check(context, context.getAction());
+        assertTrue(launcherId.equals(context.getAction().getExternalId()));
+        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertNotNull(context.getAction().getData());
+        assertNotNull(context.getAction().getExternalChildIDs());
+        ae.end(context, context.getAction());
+        assertEquals(WorkflowAction.Status.OK, 
context.getAction().getStatus());
+
+        String hadoopCounters = 
context.getVar(MapReduceActionExecutor.HADOOP_COUNTERS);
+        assertNotNull(hadoopCounters);
+        assertFalse(hadoopCounters.isEmpty());
+
+        FileSystem fs = getFileSystem();
+        BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(new Path(getSqoopOutputDir(), "part-m-00000"))));
+        int count = 0;
+        String line = br.readLine();
+        while (line != null) {
+            assertTrue(line.contains("a"));
+            count++;
+            line = br.readLine();
+        }
+        br.close();
+        assertEquals(3, count);
+
+        assertNotNull(context.getAction().getData());
+        Properties outputData = new Properties();
+        outputData.load(new StringReader(context.getAction().getData()));
+        assertTrue(outputData.containsKey(LauncherMain.HADOOP_JOBS));
+        
assertTrue(outputData.getProperty(LauncherMain.HADOOP_JOBS).trim().length() > 
0);
+    }
+
+    public void testSqoopActionFreeFormQuery() throws Exception {
+        createDB();
+
+        Context context = createContext(getActionXmlFreeFromQuery());
+        final RunningJob launcherJob = submitAction(context);
+        String launcherId = context.getAction().getExternalId();
+        waitFor(120 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return launcherJob.isComplete();
+            }
+        });
+        assertTrue(launcherJob.isSuccessful());
+
+        assertFalse(LauncherMapper.hasIdSwap(launcherJob));
+
+        SqoopActionExecutor ae = new SqoopActionExecutor();
+        ae.check(context, context.getAction());
+        assertTrue(launcherId.equals(context.getAction().getExternalId()));
+        assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+        assertNotNull(context.getAction().getData());
+        assertNotNull(context.getAction().getExternalChildIDs());
+        ae.end(context, context.getAction());
+        assertEquals(WorkflowAction.Status.OK, 
context.getAction().getStatus());
+
+        String hadoopCounters = 
context.getVar(MapReduceActionExecutor.HADOOP_COUNTERS);
+        assertNotNull(hadoopCounters);
+        assertFalse(hadoopCounters.isEmpty());
+
+        FileSystem fs = getFileSystem();
+        FileStatus[] parts = fs.listStatus(new Path(getSqoopOutputDir()), new 
PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return path.getName().startsWith("part-");
+            }
+        });
+        int count = 0;
+        for (FileStatus part : parts) {
+            BufferedReader br = new BufferedReader(new 
InputStreamReader(fs.open(part.getPath())));
+            String line = br.readLine();
+            while (line != null) {
+                assertTrue(line.contains("a"));
+                count++;
+                line = br.readLine();
+            }
+            br.close();
+        }
+        assertEquals(3, count);
+
+        assertNotNull(context.getAction().getData());
+        Properties outputData = new Properties();
+        outputData.load(new StringReader(context.getAction().getData()));
+        assertTrue(outputData.containsKey(LauncherMain.HADOOP_JOBS));
+        
assertTrue(outputData.getProperty(LauncherMain.HADOOP_JOBS).trim().length() > 
0);
+    }
+
+
+    private RunningJob submitAction(Context context) throws Exception {
+        SqoopActionExecutor ae = new SqoopActionExecutor();
+
+        WorkflowAction action = context.getAction();
+
+        ae.prepareActionDir(getFileSystem(), context);
+        ae.submitLauncher(getFileSystem(), context, action);
+
+        String jobId = action.getExternalId();
+        String jobTracker = action.getTrackerUri();
+        String consoleUrl = action.getConsoleUrl();
+        assertNotNull(jobId);
+        assertNotNull(jobTracker);
+        assertNotNull(consoleUrl);
+        Element e = XmlUtils.parseXml(action.getConf());
+        Namespace ns = Namespace.getNamespace("uri:oozie:sqoop-action:0.1");
+        XConfiguration conf = new XConfiguration(
+                new 
StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
+        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
+        conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
+        conf.set("user.name", context.getProtoActionConf().get("user.name"));
+        conf.set("mapreduce.framework.name", "yarn");
+        conf.set("group.name", getTestGroup());
+
+        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
+        XConfiguration.copy(conf, jobConf);
+        String user = jobConf.get("user.name");
+        String group = jobConf.get("group.name");
+        JobClient jobClient = 
Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
+        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
+        assertNotNull(runningJob);
+        return runningJob;
+    }
+
+    private Context createContext(String actionXml) throws Exception {
+        SqoopActionExecutor ae = new SqoopActionExecutor();
+
+        XConfiguration protoConf = new XConfiguration();
+        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+
+
+        FileSystem fs = getFileSystem();
+        SharelibUtils.addToDistributedCache("sqoop", fs, getFsTestCaseDir(), 
protoConf);
+
+        protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, 
copyDbToHdfs());
+
+        WorkflowJobBean wf = createBaseWorkflow(protoConf, "sqoop-action");
+        WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
+        action.setType(ae.getType());
+        action.setConf(actionXml);
+
+        return new Context(wf, action);
+    }
+
+    private String[] copyDbToHdfs() throws Exception {
+        List<String> files = new ArrayList<String>();
+        String[] exts = {".script", ".properties"};
+        for (String ext : exts) {
+            String file = getDbPath() + ext;
+            String name = getDbFile() + ext;
+            Path targetPath = new Path(getAppPath(), name);
+            FileSystem fs = getFileSystem();
+            InputStream is = new FileInputStream(file);
+            OutputStream os = fs.create(new Path(getAppPath(), targetPath));
+            IOUtils.copyStream(is, os);
+            files.add(targetPath.toString() + "#" + name);
+        }
+        return files.toArray(new String[files.size()]);
+    }
+}

Modified: oozie/trunk/sharelib/streaming/pom.xml
URL: 
http://svn.apache.org/viewvc/oozie/trunk/sharelib/streaming/pom.xml?rev=1466926&r1=1466925&r2=1466926&view=diff
==============================================================================
--- oozie/trunk/sharelib/streaming/pom.xml (original)
+++ oozie/trunk/sharelib/streaming/pom.xml Thu Apr 11 15:40:45 2013
@@ -49,6 +49,37 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-core</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-core</artifactId>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-hadoop</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-hadoop-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-hcatalog</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>

Added: 
oozie/trunk/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java?rev=1466926&view=auto
==============================================================================
--- 
oozie/trunk/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java
 (added)
+++ 
oozie/trunk/sharelib/streaming/src/main/java/org/apache/oozie/action/hadoop/StreamingMain.java
 Thu Apr 11 15:40:45 2013
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+
+public class StreamingMain extends MapReduceMain {
+
+    public static void main(String[] args) throws Exception {
+        run(StreamingMain.class, args);
+    }
+
+    protected RunningJob submitJob(Configuration actionConf) throws Exception {
+        JobConf jobConf = new JobConf();
+
+        jobConf.set("mapred.mapper.class", 
"org.apache.hadoop.streaming.PipeMapper");
+        jobConf.set("mapred.reducer.class", 
"org.apache.hadoop.streaming.PipeReducer");
+        jobConf.set("mapred.map.runner.class", 
"org.apache.hadoop.streaming.PipeMapRunner");
+
+        jobConf.set("mapred.input.format.class", 
"org.apache.hadoop.mapred.TextInputFormat");
+        jobConf.set("mapred.output.format.class", 
"org.apache.hadoop.mapred.TextOutputFormat");
+        jobConf.set("mapred.output.value.class", "org.apache.hadoop.io.Text");
+        jobConf.set("mapred.output.key.class", "org.apache.hadoop.io.Text");
+
+        jobConf.set("mapred.create.symlink", "yes");
+        jobConf.set("mapred.used.genericoptionsparser", "true");
+
+        jobConf.set("stream.addenvironment", "");
+
+        String value = actionConf.get("oozie.streaming.mapper");
+        if (value != null) {
+            jobConf.set("stream.map.streamprocessor", value);
+        }
+        value = actionConf.get("oozie.streaming.reducer");
+        if (value != null) {
+            jobConf.set("stream.reduce.streamprocessor", value);
+        }
+        value = actionConf.get("oozie.streaming.record-reader");
+        if (value != null) {
+            jobConf.set("stream.recordreader.class", value);
+        }
+        String[] values = getStrings(actionConf, 
"oozie.streaming.record-reader-mapping");
+        for (String s : values) {
+            String[] kv = s.split("=");
+            jobConf.set("stream.recordreader." + kv[0], kv[1]);
+        }
+        values = getStrings(actionConf, "oozie.streaming.env");
+        value = jobConf.get("stream.addenvironment", "");
+        if (value.length() > 0) {
+            value = value + " ";
+        }
+        for (String s : values) {
+            value = value + s + " ";
+        }
+        jobConf.set("stream.addenvironment", value);
+
+        addActionConf(jobConf, actionConf);
+
+        // propagate delegation related props from launcher job to MR job
+        if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+            jobConf.set("mapreduce.job.credentials.binary", 
System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
+        }
+
+        JobClient jobClient = null;
+        RunningJob runJob = null;
+        boolean exception = false;
+        try {
+            jobClient = createJobClient(jobConf);
+            runJob = jobClient.submitJob(jobConf);
+        }
+        catch (Exception ex) {
+            exception = true;
+            throw ex;
+        }
+        finally {
+            try {
+                if (jobClient != null) {
+                    jobClient.close();
+                }
+            }
+            catch (Exception ex) {
+                if (exception) {
+                    System.out.println("JobClient Error: " + ex);
+                }
+                else {
+                    throw ex;
+                }
+            }
+        }
+        return runJob;
+    }
+}

Added: 
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java?rev=1466926&view=auto
==============================================================================
--- 
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java
 (added)
+++ 
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutorStreaming.java
 Thu Apr 11 15:40:45 2013
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.streaming.StreamJob;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.ClassUtils;
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.FileInputStream;
+import java.io.Writer;
+import java.io.OutputStreamWriter;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestMapReduceActionExecutorStreaming extends 
TestMapReduceActionExecutor {
+
+    @Override
+    protected void setSystemProps() throws Exception {
+        super.setSystemProps();
+        setSystemProperty("oozie.service.ActionService.executor.classes", 
MapReduceActionExecutor.class.getName());
+    }
+
+    public void testStreaming() throws Exception {
+        FileSystem fs = getFileSystem();
+        Path streamingJar = new Path(getFsTestCaseDir(), 
"jar/hadoop-streaming.jar");
+
+        InputStream is = new 
FileInputStream(ClassUtils.findContainingJar(StreamJob.class));
+        OutputStream os = fs.create(new Path(getAppPath(), streamingJar));
+        IOUtils.copyStream(is, os);
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        String actionXml = "<map-reduce>" + "<job-tracker>" + 
getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>" + "      <streaming>" + "  
      <mapper>cat</mapper>"
+                + "        <reducer>wc</reducer>" + "      </streaming>"
+                + getStreamingConfig(inputDir.toString(), 
outputDir.toString()).toXmlString(false) + "<file>"
+                + streamingJar + "</file>" + "</map-reduce>";
+        _testSubmit("streaming", actionXml);
+    }
+
+    protected XConfiguration getStreamingConfig(String inputDir, String 
outputDir) {
+        XConfiguration conf = new XConfiguration();
+        conf.set("mapred.input.dir", inputDir);
+        conf.set("mapred.output.dir", outputDir);
+        return conf;
+    }
+}

Added: 
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestStreamingMain.java
URL: 
http://svn.apache.org/viewvc/oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestStreamingMain.java?rev=1466926&view=auto
==============================================================================
--- 
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestStreamingMain.java
 (added)
+++ 
oozie/trunk/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestStreamingMain.java
 Thu Apr 11 15:40:45 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.util.XConfiguration;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Properties;
+
+public class TestStreamingMain extends MainTestCase {
+
+    public Void call() throws Exception {
+        FileSystem fs = getFileSystem();
+
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        fs.mkdirs(inputDir);
+        Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        writer.write("hello");
+        writer.close();
+
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        XConfiguration jobConf = new XConfiguration();
+        XConfiguration.copy(createJobConf(), jobConf);
+
+        jobConf.setInt("mapred.map.tasks", 1);
+        jobConf.setInt("mapred.map.max.attempts", 1);
+        jobConf.setInt("mapred.reduce.max.attempts", 1);
+
+        jobConf.set("user.name", getTestUser());
+        jobConf.set("hadoop.job.ugi", getTestUser() + "," + getTestGroup());
+
+        SharelibUtils.addToDistributedCache("streaming", fs, 
getFsTestCaseDir(), jobConf);
+
+        MapReduceActionExecutor.setStreaming(jobConf, "cat", "wc", null, null, 
null);
+
+        jobConf.set("mapred.input.dir", inputDir.toString());
+        jobConf.set("mapred.output.dir", outputDir.toString());
+
+        File actionXml = new File(getTestCaseDir(), "action.xml");
+        OutputStream os = new FileOutputStream(actionXml);
+        jobConf.writeXml(os);
+        os.close();
+
+        File newIdProperties = new File(getTestCaseDir(), "newId.properties");
+
+        System.setProperty("oozie.action.conf.xml", 
actionXml.getAbsolutePath());
+        System.setProperty("oozie.action.newId.properties", 
newIdProperties.getAbsolutePath());
+
+        StreamingMain.main(null);
+
+        assertTrue(newIdProperties.exists());
+
+        InputStream is = new FileInputStream(newIdProperties);
+        Properties props = new Properties();
+        props.load(is);
+        is.close();
+
+        assertTrue(props.containsKey("id"));
+        return null;
+    }
+
+}

Modified: oozie/trunk/src/main/assemblies/partial-sharelib.xml
URL: 
http://svn.apache.org/viewvc/oozie/trunk/src/main/assemblies/partial-sharelib.xml?rev=1466926&r1=1466925&r2=1466926&view=diff
==============================================================================
--- oozie/trunk/src/main/assemblies/partial-sharelib.xml (original)
+++ oozie/trunk/src/main/assemblies/partial-sharelib.xml Thu Apr 11 15:40:45 
2013
@@ -24,7 +24,7 @@
 
     <dependencySets>
         <dependencySet>
-            <useProjectArtifact>false</useProjectArtifact>
+            <useProjectArtifact>true</useProjectArtifact>
             
<outputDirectory>/share/lib/${sharelib.action.postfix}</outputDirectory>
             <unpack>false</unpack>
             <useTransitiveDependencies>true</useTransitiveDependencies>


Reply via email to