Repository: oozie
Updated Branches:
  refs/heads/master 3badb2d0b -> 4bd777e5e


OOZIE-1728 When an ApplicationMaster restarts, it restarts the launcher job: 
DistCp followup (ryota)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4bd777e5
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4bd777e5
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4bd777e5

Branch: refs/heads/master
Commit: 4bd777e5e45ce097aa07fbc1385623139d102069
Parents: 3badb2d
Author: egashira <[email protected]>
Authored: Wed Oct 15 15:33:33 2014 -0700
Committer: egashira <[email protected]>
Committed: Wed Oct 15 15:33:33 2014 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |   6 +
 .../action/hadoop/DistcpActionExecutor.java     |  21 ++-
 .../action/hadoop/TestDistCpActionExecutor.java | 157 ++++++++++++++++++
 .../oozie/action/hadoop/TestDistcpMain.java     |  73 +++++++++
 release-log.txt                                 |   3 +-
 sharelib/distcp/pom.xml                         |  11 --
 .../apache/oozie/action/hadoop/DistcpMain.java  |  97 ++++++++++++
 .../action/hadoop/TestDistCpActionExecutor.java | 158 -------------------
 8 files changed, 352 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 7cd1f70..597775c 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -260,6 +260,12 @@
             <scope>compile</scope>
         </dependency>
 
+         <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-sharelib-distcp</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>

http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
index 86d21fb..4d2f7b2 100644
--- 
a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.action.hadoop;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -27,9 +28,9 @@ import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XLog;
 import org.jdom.Element;
 
-
 public class DistcpActionExecutor extends JavaActionExecutor{
-    public static final String CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS = 
"org.apache.hadoop.tools.DistCp";
+    public static final String CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS = 
"org.apache.oozie.action.hadoop.DistcpMain";
+    private static final String DISTCP_MAIN_CLASS_NAME = 
"org.apache.hadoop.tools.DistCp";
     public static final String CLASS_NAMES = "oozie.actions.main.classnames";
     private static final XLog LOG = XLog.getLog(DistcpActionExecutor.class);
     public static final String DISTCP_TYPE = "distcp";
@@ -47,13 +48,20 @@ public class DistcpActionExecutor extends 
JavaActionExecutor{
         if(name != null){
             classNameDistcp = name;
         }
-        actionConf.set(JavaMain.JAVA_MAIN_CLASS, classNameDistcp);
+        actionConf.set(JavaMain.JAVA_MAIN_CLASS, DISTCP_MAIN_CLASS_NAME);
         return actionConf;
     }
 
     @Override
     public List<Class> getLauncherClasses() {
-       return super.getLauncherClasses();
+        List<Class> classes = new ArrayList<Class>();
+        try {
+            classes.add(Class.forName(CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS));
+        }
+        catch (ClassNotFoundException e) {
+            throw new RuntimeException("Class not found", e);
+        }
+        return classes;
     }
 
     /**
@@ -106,4 +114,9 @@ public class DistcpActionExecutor extends 
JavaActionExecutor{
         return "distcp";
     }
 
+    @Override
+    protected String getLauncherMain(Configuration launcherConf, Element 
actionXml) {
+        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, 
CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
new file mode 100644
index 0000000..d6ac554
--- /dev/null
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.WorkflowActionBean;
+import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+
+public class TestDistCpActionExecutor extends ActionExecutorTestCase{
+
+    @Override
+    protected void setSystemProps() throws Exception {
+        super.setSystemProps();
+        setSystemProperty("oozie.service.ActionService.executor.classes", 
DistcpActionExecutor.class.getName());
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testSetupMethods() throws Exception {
+        DistcpActionExecutor ae = new DistcpActionExecutor();
+        assertEquals(Arrays.asList(DistcpMain.class), ae.getLauncherClasses());
+    }
+
+    public void testDistCpFile() throws Exception {
+        Path inputPath = new Path(getFsTestCaseDir(), "input.txt");
+        final Path outputPath = new Path(getFsTestCaseDir(), "output.txt");
+        byte[] content = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes();
+
+        OutputStream os = getFileSystem().create(inputPath);
+        os.write(content);
+        os.close();
+
+        String actionXml = "<distcp>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<arg>" + inputPath + "</arg>"+
+                "<arg>" + outputPath + "</arg>" +
+                "</distcp>";
+        Context context = createContext(actionXml);
+        final RunningJob runningJob = submitAction(context);
+        waitFor(60 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return runningJob.isComplete();
+            }
+        });
+        assertTrue(runningJob.isSuccessful());
+
+        waitFor(60 * 1000, new Predicate() {
+            public boolean evaluate() throws Exception {
+                return getFileSystem().exists(outputPath);
+            }
+        });
+        assertTrue(getFileSystem().exists(outputPath));
+
+        byte[] readContent = new byte[content.length];
+        InputStream is = getFileSystem().open(outputPath);
+        int offset = 0;
+        while (offset < readContent.length)
+        {
+            int numRead = is.read(readContent, offset, readContent.length);
+            if(numRead == -1) {
+                break;
+            }
+            offset += numRead;
+        }
+        assertEquals(is.read(), -1);
+        is.close();
+        offset = 0;
+        while (offset < readContent.length)
+        {
+            assertEquals(readContent[offset], content[offset]);
+            offset++;
+        }
+    }
+
+
+    protected Context createContext(String actionXml) throws Exception {
+        DistcpActionExecutor ae = new DistcpActionExecutor();
+
+        Path appJarPath = new Path("lib/test.jar");
+        File jarFile = IOUtils.createJar(new File(getTestCaseDir()), 
"test.jar", LauncherMainTester.class);
+        InputStream is = new FileInputStream(jarFile);
+        OutputStream os = getFileSystem().create(new Path(getAppPath(), 
"lib/test.jar"));
+        IOUtils.copyStream(is, os);
+
+        Path appSoPath = new Path("lib/test.so");
+        getFileSystem().create(new Path(getAppPath(), appSoPath)).close();
+
+        XConfiguration protoConf = new XConfiguration();
+        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+        protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, 
appJarPath.toString(), appSoPath.toString());
+
+
+        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
+        WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
+        action.setType(ae.getType());
+        action.setConf(actionXml);
+
+        return new Context(wf, action);
+    }
+
+    protected RunningJob submitAction(Context context) throws Exception {
+        DistcpActionExecutor ae = new DistcpActionExecutor();
+
+        WorkflowAction action = context.getAction();
+
+        ae.prepareActionDir(getFileSystem(), context);
+        ae.submitLauncher(getFileSystem(), context, action);
+
+        String jobId = action.getExternalId();
+        String jobTracker = action.getTrackerUri();
+        String consoleUrl = action.getConsoleUrl();
+        assertNotNull(jobId);
+        assertNotNull(jobTracker);
+        assertNotNull(consoleUrl);
+
+        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
+        jobConf.set("mapred.job.tracker", jobTracker);
+
+        JobClient jobClient =
+            
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
+        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
+        assertNotNull(runningJob);
+        return runningJob;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/core/src/test/java/org/apache/oozie/action/hadoop/TestDistcpMain.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestDistcpMain.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistcpMain.java
new file mode 100644
index 0000000..84351f1
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistcpMain.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.util.XConfiguration;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+
+import org.apache.oozie.action.hadoop.DistcpMain;
+
+public class TestDistcpMain extends MainTestCase {
+
+    @Override
+    public Void call() throws Exception {
+
+        XConfiguration jobConf = new XConfiguration();
+        XConfiguration.copy(createJobConf(), jobConf);
+
+        FileSystem fs = getFileSystem();
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        fs.mkdirs(inputDir);
+        Writer writer = new OutputStreamWriter(fs.create(new Path(inputDir, 
"data.txt")));
+        writer.write("hello");
+        writer.close();
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        jobConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, 
"org.apache.hadoop.tools.DistCp");
+
+        jobConf.set("mapreduce.job.tags", "" + System.currentTimeMillis());
+        setSystemProperty("oozie.job.launch.time", "" + 
System.currentTimeMillis());
+
+        File actionXml = new File(getTestCaseDir(), "action.xml");
+        OutputStream os = new FileOutputStream(actionXml);
+        jobConf.writeXml(os);
+        os.close();
+
+        System.setProperty("oozie.action.conf.xml", 
actionXml.getAbsolutePath());
+
+        // Check normal execution
+        DistcpMain.main(new String[]{inputDir.toString(), 
outputDir.toString()});
+        assertTrue(getFileSystem().exists(outputDir));
+
+        // Check exception handling
+        try {
+            DistcpMain.main(new String[0]);
+        } catch(RuntimeException re) {
+            assertTrue(re.getMessage().indexOf("Returned value from distcp is 
non-zero") != -1);
+        }
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 353174c..6fb9a8f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,7 +1,7 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-1728 When an ApplicationMaster restarts, it restarts the launcher job: 
DistCp followup (ryota)
 OOZIE-2009 Requeue CoordActionInputCheck in case of permission error (ryota)
-OOZIE-2005 Coordinator rerun fails to initialize error code and message (ryota)
 OOZIE-1896 ZKUUIDService - Too many job submission fails (puru)
 OOZIE-2019 SLA miss processed on server2 not send email (puru)
 OOZIE-1391 Sub wf suspend doesn't update parent wf (jaydeepvishwakarma via 
shwethags)
@@ -35,6 +35,7 @@ OOZIE-1943 Bump up trunk to 4.2.0-SNAPSHOT (bzhang)
 
 -- Oozie 4.1.0 release (4.1 - unreleased)
 
+OOZIE-2005 Coordinator rerun fails to initialize error code and message (ryota)
 OOZIE-2026 fix synchronization in SLACalculatorMemory.addJobStatus to avoid 
duplicated SLA message (ryota)
 OOZIE-2017 On startup, StatusTransitService can transition Coordinators that 
were in PREPSUSPENDED to RUNNING (rkanter)
 OOZIE-1932 Services should load CallableQueueService after MemoryLocksService 
(mona)

http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/sharelib/distcp/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/distcp/pom.xml b/sharelib/distcp/pom.xml
index 04e436d..b788ed0 100644
--- a/sharelib/distcp/pom.xml
+++ b/sharelib/distcp/pom.xml
@@ -45,17 +45,6 @@
         </dependency>
         <dependency>
             <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-core</artifactId>
-            <classifier>tests</classifier>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.oozie</groupId>
             <artifactId>oozie-hadoop</artifactId>
             <scope>provided</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/sharelib/distcp/src/main/java/org/apache/oozie/action/hadoop/DistcpMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/distcp/src/main/java/org/apache/oozie/action/hadoop/DistcpMain.java 
b/sharelib/distcp/src/main/java/org/apache/oozie/action/hadoop/DistcpMain.java
new file mode 100644
index 0000000..67b445e
--- /dev/null
+++ 
b/sharelib/distcp/src/main/java/org/apache/oozie/action/hadoop/DistcpMain.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Tool;
+
+public class DistcpMain extends JavaMain {
+
+    private Constructor<?> construct;
+    private Object[] constArgs;
+
+    public static void main(String[] args) throws Exception {
+        run(DistcpMain.class, args);
+    }
+
+    @Override
+    protected void run(String[] args) throws Exception {
+
+        Configuration actionConf = loadActionConf();
+        LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
+        Class<?> klass = 
actionConf.getClass(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS,
+                org.apache.hadoop.tools.DistCp.class);
+        System.out.println("Main class        : " + klass.getName());
+        System.out.println("Arguments         :");
+        for (String arg : args) {
+            System.out.println("                    " + arg);
+        }
+
+        // propagate delegation related props from launcher job to MR job
+        if (getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION") != null) {
+            actionConf.set("mapreduce.job.credentials.binary", 
getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION"));
+        }
+
+        getConstructorAndArgs(klass, actionConf);
+        if (construct == null) {
+            throw new RuntimeException("Distcp constructor was not found, 
unable to instantiate");
+        }
+        if (constArgs == null) {
+            throw new RuntimeException("Arguments for distcp constructor is 
null, unable to instantiate");
+        }
+        try {
+            Tool distcp = (Tool) construct.newInstance(constArgs);
+            int i = distcp.run(args);
+            if (i != 0) {
+                throw new RuntimeException("Returned value from distcp is 
non-zero (" + i + ")");
+            }
+        }
+        catch (InvocationTargetException ex) {
+            throw new JavaMainException(ex.getCause());
+        }
+    }
+
+    protected void getConstructorAndArgs(Class<?> klass, Configuration 
actionConf) throws Exception {
+        Constructor<?>[] allConstructors = klass.getConstructors();
+        for (Constructor<?> cstruct : allConstructors) {
+            Class<?>[] pType = cstruct.getParameterTypes();
+            construct = cstruct;
+            if (pType.length == 1 && 
pType[0].equals(Class.forName("org.apache.hadoop.conf.Configuration"))) {
+                System.out.println("found Distcp v1 Constructor");
+                System.out.println("                    " + 
cstruct.toString());
+                constArgs = new Object[1];
+                constArgs[0] = actionConf;
+                break;
+            }
+            else if (pType.length == 2 && 
pType[0].equals(Class.forName("org.apache.hadoop.conf.Configuration"))) {
+                // 2nd argument is org.apache.hadoop.tools.DistCpOptions
+                System.out.println("found Distcp v2 Constructor");
+                System.out.println("                    " + 
cstruct.toString());
+                constArgs = new Object[2];
+                constArgs[0] = actionConf;
+                constArgs[1] = null;
+                break;
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/4bd777e5/sharelib/distcp/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/sharelib/distcp/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
 
b/sharelib/distcp/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
deleted file mode 100644
index 7a098f3..0000000
--- 
a/sharelib/distcp/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.action.hadoop;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.Arrays;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.WorkflowAppService;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.XConfiguration;
-
-public class TestDistCpActionExecutor extends ActionExecutorTestCase{
-
-    @Override
-    protected void setSystemProps() throws Exception {
-        super.setSystemProps();
-        setSystemProperty("oozie.service.ActionService.executor.classes", 
DistcpActionExecutor.class.getName());
-    }
-
-    @SuppressWarnings("unchecked")
-    public void testSetupMethods() throws Exception {
-        DistcpActionExecutor ae = new DistcpActionExecutor();
-        assertEquals(Arrays.asList(JavaMain.class), ae.getLauncherClasses());
-    }
-
-    public void testDistCpFile() throws Exception {
-        Path inputPath = new Path(getFsTestCaseDir(), "input.txt");
-        final Path outputPath = new Path(getFsTestCaseDir(), "output.txt");
-        byte[] content = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".getBytes();
-
-        OutputStream os = getFileSystem().create(inputPath);
-        os.write(content);
-        os.close();
-
-        String actionXml = "<distcp>" +
-                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
-                "<name-node>" + getNameNodeUri() + "</name-node>" +
-                "<arg>" + inputPath + "</arg>"+
-                "<arg>" + outputPath + "</arg>" +
-                "</distcp>";
-        Context context = createContext(actionXml);
-        final RunningJob runningJob = submitAction(context);
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return runningJob.isComplete();
-            }
-        });
-        assertTrue(runningJob.isSuccessful());
-
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return getFileSystem().exists(outputPath);
-            }
-        });
-        assertTrue(getFileSystem().exists(outputPath));
-
-        byte[] readContent = new byte[content.length];
-        InputStream is = getFileSystem().open(outputPath);
-        int offset = 0;
-        while (offset < readContent.length)
-        {
-            int numRead = is.read(readContent, offset, readContent.length);
-            if(numRead == -1) {
-                break;
-            }
-            offset += numRead;
-        }
-        assertEquals(is.read(), -1);
-        is.close();
-        offset = 0;
-        while (offset < readContent.length)
-        {
-            assertEquals(readContent[offset], content[offset]);
-            offset++;
-        }
-    }
-
-
-    protected Context createContext(String actionXml) throws Exception {
-        DistcpActionExecutor ae = new DistcpActionExecutor();
-
-        Path appJarPath = new Path("lib/test.jar");
-        File jarFile = IOUtils.createJar(new File(getTestCaseDir()), 
"test.jar", LauncherMainTester.class);
-        InputStream is = new FileInputStream(jarFile);
-        OutputStream os = getFileSystem().create(new Path(getAppPath(), 
"lib/test.jar"));
-        IOUtils.copyStream(is, os);
-
-        Path appSoPath = new Path("lib/test.so");
-        getFileSystem().create(new Path(getAppPath(), appSoPath)).close();
-
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-        protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, 
appJarPath.toString(), appSoPath.toString());
-
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
-        WorkflowActionBean action = (WorkflowActionBean) 
wf.getActions().get(0);
-        action.setType(ae.getType());
-        action.setConf(actionXml);
-
-        return new Context(wf, action);
-    }
-
-
-    protected RunningJob submitAction(Context context) throws Exception {
-        DistcpActionExecutor ae = new DistcpActionExecutor();
-
-        WorkflowAction action = context.getAction();
-
-        ae.prepareActionDir(getFileSystem(), context);
-        ae.submitLauncher(getFileSystem(), context, action);
-
-        String jobId = action.getExternalId();
-        String jobTracker = action.getTrackerUri();
-        String consoleUrl = action.getConsoleUrl();
-        assertNotNull(jobId);
-        assertNotNull(jobTracker);
-        assertNotNull(consoleUrl);
-
-        JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        jobConf.set("mapred.job.tracker", jobTracker);
-
-        JobClient jobClient =
-            
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
-    }
-}

Reply via email to