Repository: oozie
Updated Branches:
  refs/heads/oya aa1dd9613 -> 4a9542caf


OOZIE-2698 Refactor LauncherAM to make it more testable

Change-Id: I63acfd9a8ec8d4077221deff2e954dda4f3a8281


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9151f4eb
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9151f4eb
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9151f4eb

Branch: refs/heads/oya
Commit: 9151f4eb189da00022465f2c87f3b7db3ae2f024
Parents: aa1dd96
Author: Peter Bacsko <[email protected]>
Authored: Mon Oct 17 17:08:16 2016 +0200
Committer: Peter Bacsko <[email protected]>
Committed: Mon Oct 17 17:08:16 2016 +0200

----------------------------------------------------------------------
 .../oozie/action/hadoop/TestLauncherAM.java     |  46 --
 .../hadoop/TestLauncherAMCallbackNotifier.java  |  25 +-
 .../action/hadoop/AMRMCallBackHandler.java      |  72 +++
 .../action/hadoop/AMRMClientAsyncFactory.java   |  32 +
 .../apache/oozie/action/hadoop/ErrorHolder.java |  56 ++
 .../oozie/action/hadoop/HdfsOperations.java     |  88 +++
 .../apache/oozie/action/hadoop/LauncherAM.java  | 602 ++++++++-----------
 .../hadoop/LauncherAMCallbackNotifier.java      |  15 +-
 .../LauncherAMCallbackNotifierFactory.java      |  27 +
 .../oozie/action/hadoop/LocalFsOperations.java  | 100 +++
 .../action/hadoop/PrepareActionsDriver.java     |  38 +-
 .../action/hadoop/PrepareActionsHandler.java    | 100 +++
 .../hadoop/SequenceFileWriterFactory.java       |  34 ++
 13 files changed, 764 insertions(+), 471 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
deleted file mode 100644
index ed29299..0000000
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.action.hadoop;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.test.XFsTestCase;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.XConfiguration;
-
-import java.io.File;
-import java.io.FileWriter;
-import java.io.Writer;
-import java.net.URI;
-import java.util.Map;
-
-public class TestLauncherAM extends XFsTestCase {
-
-
-    // TODO: OYA: write tests later
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
index 9ba04da..fcf99e3 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
@@ -21,6 +21,7 @@ package org.apache.oozie.action.hadoop;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.oozie.QueryServlet;
+import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
 import org.apache.oozie.command.wf.HangServlet;
 import org.apache.oozie.test.EmbeddedServletContainer;
 import org.apache.oozie.test.XTestCase;
@@ -109,7 +110,7 @@ public class TestLauncherAMCallbackNotifier extends 
XTestCase {
         LauncherAMCallbackNotifier cnSpy = Mockito.spy(new 
LauncherAMCallbackNotifier(conf));
 
         long start = System.currentTimeMillis();
-        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false);
+        cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
         long end = System.currentTimeMillis();
         Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
         Assert.assertTrue("Should have taken more than 5 seconds but it only 
took " + (end - start), end - start >= 5000);
@@ -120,7 +121,7 @@ public class TestLauncherAMCallbackNotifier extends 
XTestCase {
 
         cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
         start = System.currentTimeMillis();
-        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false);
+        cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
         end = System.currentTimeMillis();
         Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce();
         Assert.assertTrue("Should have taken more than 9 seconds but it only 
took " + (end - start), end - start >= 9000);
@@ -133,7 +134,7 @@ public class TestLauncherAMCallbackNotifier extends 
XTestCase {
 
         LauncherAMCallbackNotifier cnSpy = Mockito.spy(new 
LauncherAMCallbackNotifier(conf));
         long start = System.currentTimeMillis();
-        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false);
+        cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
         long end = System.currentTimeMillis();
         Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
         Assert.assertTrue("Should have taken more than 5 seconds but it only 
took " + (end - start), end - start >= 5000);
@@ -145,7 +146,7 @@ public class TestLauncherAMCallbackNotifier extends 
XTestCase {
         LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
 
         assertNull(QueryServlet.lastQueryString);
-        cn.notifyURL(FinalApplicationStatus.SUCCEEDED, false);
+        cn.notifyURL(OozieActionResult.SUCCEEDED);
         
waitForCallbackAndCheckResult(FinalApplicationStatus.SUCCEEDED.toString());
     }
 
@@ -155,18 +156,8 @@ public class TestLauncherAMCallbackNotifier extends 
XTestCase {
         LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
 
         assertNull(QueryServlet.lastQueryString);
-        cn.notifyURL(FinalApplicationStatus.SUCCEEDED, true);
-        waitForCallbackAndCheckResult("RUNNING");
-    }
-
-    public void testNotifyBackgroundActionWhenSubmitFailsWithKilled() throws 
Exception {
-        Configuration conf = setupEmbeddedContainer(QueryServlet.class, 
"/count/*", "/count/?status=$jobStatus", null);
-
-        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
-
-        assertNull(QueryServlet.lastQueryString);
-        cn.notifyURL(FinalApplicationStatus.KILLED, true);
-        
waitForCallbackAndCheckResult(FinalApplicationStatus.KILLED.toString());
+        cn.notifyURL(OozieActionResult.RUNNING);
+        waitForCallbackAndCheckResult(OozieActionResult.RUNNING.toString());
     }
 
     public void testNotifyBackgroundActionWhenSubmitFailsWithFailed() throws 
Exception {
@@ -175,7 +166,7 @@ public class TestLauncherAMCallbackNotifier extends 
XTestCase {
         LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
 
         assertNull(QueryServlet.lastQueryString);
-        cn.notifyURL(FinalApplicationStatus.FAILED, true);
+        cn.notifyURL(OozieActionResult.FAILED);
         
waitForCallbackAndCheckResult(FinalApplicationStatus.FAILED.toString());
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
new file mode 100644
index 0000000..63213e6
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+
+// Note: methods which modify/read the state of errorHolder are synchronized 
to avoid data races when LauncherAM invokes getError()
+public class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler {
+    private ErrorHolder errorHolder;
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> containerStatuses) 
{
+        //noop
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+        //noop
+    }
+
+    @Override
+    public synchronized void onShutdownRequest() {
+        System.out.println("Resource manager requested AM Shutdown");
+        errorHolder = new ErrorHolder();
+        errorHolder.setErrorCode(0);
+        errorHolder.setErrorMessage("ResourceManager requested AM Shutdown");
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> nodeReports) {
+        //noop
+    }
+
+    @Override
+    public float getProgress() {
+        return 0.5f;    //TODO: OYA: maybe some action types can report better 
progress?
+    }
+
+    @Override
+    public synchronized void onError(final Throwable ex) {
+        System.out.println("Received asynchronous error");
+        ex.printStackTrace();
+        errorHolder = new ErrorHolder();
+        errorHolder.setErrorCode(0);
+        errorHolder.setErrorMessage(ex.getMessage());
+        errorHolder.setErrorCause(ex);
+    }
+
+    public synchronized ErrorHolder getError() {
+        return errorHolder;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
new file mode 100644
index 0000000..b4cbb4b
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+
+public class AMRMClientAsyncFactory {
+
+    public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs) {
+        AMRMClient<?> amRmClient = AMRMClient.createAMRMClient();
+        AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
+        AMRMClientAsync<?> amRmClientAsync = 
AMRMClientAsync.createAMRMClientAsync(amRmClient, intervalMs, callBackHandler);
+
+        return amRmClientAsync;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
new file mode 100644
index 0000000..6a755db
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+public class ErrorHolder {
+    private int errorCode = 0;
+    private Throwable errorCause = null;
+    private String errorMessage = null;
+    private boolean populated = false;
+
+    public int getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(int errorCode) {
+        this.errorCode = errorCode;
+        this.populated = true;
+    }
+
+    public Throwable getErrorCause() {
+        return errorCause;
+    }
+
+    public void setErrorCause(Throwable errorCause) {
+        this.errorCause = errorCause;
+        this.populated = true;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+        this.populated = true;
+    }
+
+    public boolean isPopulated() {
+        return populated;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
new file mode 100644
index 0000000..593de00
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Preconditions;
+
+public class HdfsOperations {
+    private final SequenceFileWriterFactory seqFileWriterFactory;
+    private final UserGroupInformation ugi;
+
+    public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory, 
UserGroupInformation ugi) {
+        this.seqFileWriterFactory = 
Preconditions.checkNotNull(seqFileWriterFactory, "seqFileWriterFactory should 
not be null");
+        this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
+    }
+
+    /**
+     * Creates a Sequence file which contains the output from an action and 
uploads it to HDFS.
+     */
+    public void uploadActionDataToHDFS(final Configuration launcherJobConf, 
final Path actionDir, final Map<String, String> actionData) throws IOException {
+        IOException ioe = ugi.doAs(new PrivilegedAction<IOException>() {
+            @Override
+            public IOException run() {
+                Path finalPath = new Path(actionDir, 
LauncherAM.ACTION_DATA_SEQUENCE_FILE);
+                // upload into sequence file
+                System.out.println("Oozie Launcher, uploading action data to 
HDFS sequence file: "
+                        + new Path(actionDir, 
LauncherAM.ACTION_DATA_SEQUENCE_FILE).toUri());
+
+                SequenceFile.Writer wr = null;
+                try {
+                    wr = 
seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, 
Text.class, Text.class);
+
+                    if (wr != null) {
+                        Set<String> keys = actionData.keySet();
+                        for (String propsKey : keys) {
+                            wr.append(new Text(propsKey), new 
Text(actionData.get(propsKey)));
+                        }
+                    } else {
+                        throw new IOException("SequenceFile.Writer is null for 
" + finalPath);
+                    }
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    return e;
+                } finally {
+                    if (wr != null) {
+                        try {
+                            wr.close();
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                            return e;
+                        }
+                    }
+                }
+
+                return null;
+            }
+        });
+
+        if (ioe != null) {
+            throw ioe;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index 85d78c6..89357ad 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -17,120 +17,133 @@
  */
 package org.apache.oozie.action.hadoop;
 
-import java.io.BufferedReader;
 import java.io.File;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.Reader;
 import java.io.StringWriter;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.security.Permission;
 import java.security.PrivilegedAction;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.StringTokenizer;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeReport;
-import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 public class LauncherAM {
-
-    static final String CONF_OOZIE_ACTION_MAIN_CLASS = 
"oozie.launcher.action.main.class";
-
-    static final String ACTION_PREFIX = "oozie.action.";
+    private static final String OOZIE_ACTION_CONF_XML = 
"oozie.action.conf.xml";
+    private static final String OOZIE_LAUNCHER_JOB_ID = 
"oozie.launcher.job.id";
+    public static final String ACTIONOUTPUTTYPE_ID_SWAP = "IdSwap";
+    public static final String ACTIONOUTPUTTYPE_OUTPUT = "Output";
+    public static final String ACTIONOUTPUTTYPE_STATS = "Stats";
+    public static final String ACTIONOUTPUTTYPE_EXT_CHILD_ID = "ExtChildID";
+
+    public static final String JAVA_CLASS_PATH = "java.class.path";
+    public static final String OOZIE_ACTION_ID = "oozie.action.id";
+    public static final String OOZIE_JOB_ID = "oozie.job.id";
+    public static final String ACTION_PREFIX = "oozie.action.";
     public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = 
ACTION_PREFIX + "max.output.data";
-    static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + 
"main.arg.";
-    static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = 
CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count";
-    static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = 
"oozie.external.stats.max.size";
-
-    static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path";
-    static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml";
-    static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // 
COMBO FILE
-    static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs";
-    static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
-    static final String ACTION_DATA_STATS = "stats.properties";
-    static final String ACTION_DATA_NEW_ID = "newId";
-    static final String ACTION_DATA_ERROR_PROPS = "error.properties";
+    public static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = 
ACTION_PREFIX + "main.arg.";
+    public static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = 
CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count";
+    public static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = 
"oozie.external.stats.max.size";
+    public static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + 
"dir.path";
+    public static final String ACTION_PREPARE_XML = ACTION_PREFIX + 
"prepare.xml";
+    public static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; 
// COMBO FILE
+    public static final String ACTION_DATA_EXTERNAL_CHILD_IDS = 
"externalChildIDs";
+    public static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
+    public static final String ACTION_DATA_STATS = "stats.properties";
+    public static final String ACTION_DATA_NEW_ID = "newId";
+    public static final String ACTION_DATA_ERROR_PROPS = "error.properties";
+    public static final String CONF_OOZIE_ACTION_MAIN_CLASS = 
"oozie.launcher.action.main.class";
 
     // TODO: OYA: more unique file names?  action.xml may be stuck for 
backwards compat though
     public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml";
     public static final String ACTION_CONF_XML = "action.xml";
     public static final String ACTION_DATA_FINAL_STATUS = "final.status";
 
-    private static AMRMClientAsync<AMRMClient.ContainerRequest> 
amRmClientAsync = null;
-    private static Configuration launcherJobConf = null;
-    private static Path actionDir;
-    private static Map<String, String> actionData = new 
HashMap<String,String>();
+    private final UserGroupInformation ugi;
+    private final AMRMCallBackHandler callbackHandler;
+    private final AMRMClientAsyncFactory amRmClientAsyncFactory;
+    private final HdfsOperations hdfsOperations;
+    private final LocalFsOperations localFsOperations;
+    private final PrepareActionsHandler prepareHandler;
+    private final LauncherAMCallbackNotifierFactory callbackNotifierFactory;
+    private final LauncherSecurityManager launcherSecurityManager;
+
+    private Configuration launcherJobConf;
+    private AMRMClientAsync<?> amRmClientAsync;
+    private Path actionDir;
+    private Map<String, String> actionData = new HashMap<String,String>();
+
+    public LauncherAM(UserGroupInformation ugi,
+            AMRMClientAsyncFactory amRmClientAsyncFactory,
+            AMRMCallBackHandler callbackHandler,
+            HdfsOperations hdfsOperations,
+            LocalFsOperations localFsOperations,
+            PrepareActionsHandler prepareHandler,
+            LauncherAMCallbackNotifierFactory callbackNotifierFactory,
+            LauncherSecurityManager launcherSecurityManager) {
+        this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
+        this.amRmClientAsyncFactory = 
Preconditions.checkNotNull(amRmClientAsyncFactory, "amRmClientAsyncFactory 
should not be null");
+        this.callbackHandler = Preconditions.checkNotNull(callbackHandler, 
"callbackHandler should not be null");
+        this.hdfsOperations = Preconditions.checkNotNull(hdfsOperations, 
"hdfsOperations should not be null");
+        this.localFsOperations = Preconditions.checkNotNull(localFsOperations, 
"localFsOperations should not be null");
+        this.prepareHandler = Preconditions.checkNotNull(prepareHandler, 
"prepareHandler should not be null");
+        this.callbackNotifierFactory = 
Preconditions.checkNotNull(callbackNotifierFactory, "callbackNotifierFactory 
should not be null");
+        this.launcherSecurityManager = 
Preconditions.checkNotNull(launcherSecurityManager, "launcherSecurityManager 
should not be null");
+    }
 
-    private static void printDebugInfo(String[] mainArgs) throws IOException {
-        printContentsOfCurrentDir();
+    public static void main(String[] args) throws Exception {
+        UserGroupInformation ugi = null;
+        String submitterUser = System.getProperty("submitter.user", "").trim();
+        Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user 
is undefined");
+        System.out.println("Submitter user is: " + submitterUser);
 
-        System.out.println();
-        System.out.println("Oozie Launcher Application Master configuration");
-        System.out.println("===============================================");
-        System.out.println("Workflow job id   : " + 
launcherJobConf.get("oozie.job.id"));
-        System.out.println("Workflow action id: " + 
launcherJobConf.get("oozie.action.id"));
-        System.out.println();
-        System.out.println("Classpath         :");
-        System.out.println("------------------------");
-        StringTokenizer st = new 
StringTokenizer(System.getProperty("java.class.path"), ":");
-        while (st.hasMoreTokens()) {
-            System.out.println("  " + st.nextToken());
-        }
-        System.out.println("------------------------");
-        System.out.println();
-        String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
-        System.out.println("Main class        : " + mainClass);
-        System.out.println();
-        System.out.println("Maximum output    : "
-                + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 
* 1024));
-        System.out.println();
-        System.out.println("Arguments         :");
-        for (String arg : mainArgs) {
-            System.out.println("                    " + arg);
+        if 
(UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) {
+            System.out.println("Using login user for UGI");
+            ugi = UserGroupInformation.getLoginUser();
+        } else {
+            ugi = UserGroupInformation.createRemoteUser(submitterUser);
+            
ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials());
         }
 
-        System.out.println();
-        System.out.println("Java System Properties:");
-        System.out.println("------------------------");
-        System.getProperties().store(System.out, "");
-        System.out.flush();
-        System.out.println("------------------------");
-        System.out.println();
-
-        
System.out.println("=================================================================");
-        System.out.println();
-        System.out.println(">>> Invoking Main class now >>>");
-        System.out.println();
-        System.out.flush();
+        AMRMClientAsyncFactory amRmClientAsyncFactory = new 
AMRMClientAsyncFactory();
+        AMRMCallBackHandler callbackHandler = new AMRMCallBackHandler();
+        HdfsOperations hdfsOperations = new HdfsOperations(new 
SequenceFileWriterFactory(), ugi);
+        LocalFsOperations localFSOperations = new LocalFsOperations();
+        PrepareActionsHandler prepareHandler = new PrepareActionsHandler();
+        LauncherAMCallbackNotifierFactory callbackNotifierFactory = new 
LauncherAMCallbackNotifierFactory();
+        LauncherSecurityManager launcherSecurityManager = new 
LauncherSecurityManager();
+
+        LauncherAM launcher = new LauncherAM(ugi,
+                amRmClientAsyncFactory,
+                callbackHandler,
+                hdfsOperations,
+                localFSOperations,
+                prepareHandler,
+                callbackNotifierFactory,
+                launcherSecurityManager);
+
+        launcher.run();
     }
 
     // TODO: OYA: rethink all print messages and formatting
-    public static void main(String[] AMargs) throws Exception {
-        final ErrorHolder eHolder = new ErrorHolder();
-        FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
-        String submitterUser = System.getProperty("submitter.user", "").trim();
-        Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user 
is undefined");
-        System.out.println("Submitter user is: " + submitterUser);
+    public void run() throws Exception {
+        final ErrorHolder errorHolder = new ErrorHolder();
+        OozieActionResult actionResult = OozieActionResult.FAILED;
+        boolean launcerExecutedProperly = false;
 
         String jobUserName = 
System.getenv(ApplicationConstants.Environment.USER.name());
 
@@ -144,25 +157,15 @@ public class LauncherAM {
         System.out.println("Login authMethod: " + 
login.getAuthenticationMethod());
         System.out.println("JobUserName:" + jobUserName);
 
-        UserGroupInformation ugi = null;
-
-        if 
(UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) {
-            System.out.println("Using login user for UGI");
-            ugi = UserGroupInformation.getLoginUser();
-        } else {
-            ugi = UserGroupInformation.createRemoteUser(submitterUser);
-            
ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials());
-        }
-
         boolean backgroundAction = false;
 
         try {
             try {
-                launcherJobConf = readLauncherConf();
+                launcherJobConf = localFsOperations.readLauncherConf();
                 System.out.println("Launcher AM configuration loaded");
             } catch (Exception ex) {
-                eHolder.setErrorMessage("Could not load the Launcher AM 
configuration file");
-                eHolder.setErrorCause(ex);
+                errorHolder.setErrorMessage("Could not load the Launcher AM 
configuration file");
+                errorHolder.setErrorCause(ex);
                 throw ex;
             }
 
@@ -175,8 +178,8 @@ public class LauncherAM {
                 executePrepare(ugi);
                 System.out.println("Completed the execution of prepare actions 
successfully");
             } catch (Exception ex) {
-                eHolder.setErrorMessage("Prepare execution in the Launcher AM 
has failed");
-                eHolder.setErrorCause(ex);
+                errorHolder.setErrorMessage("Prepare execution in the Launcher 
AM has failed");
+                errorHolder.setErrorCause(ex);
                 throw ex;
             }
 
@@ -185,14 +188,14 @@ public class LauncherAM {
             // TODO: OYA: should we allow turning this off?
             // TODO: OYA: what should default be?
             if (launcherJobConf.getBoolean("oozie.launcher.print.debug.info", 
true)) {
-                printDebugInfo(mainArgs);
+                printDebugInfo();
             }
 
             setupMainConfiguration();
 
-            finalStatus = runActionMain(mainArgs, eHolder, ugi);
+            launcerExecutedProperly = runActionMain(mainArgs, errorHolder, 
ugi);
 
-            if (finalStatus == FinalApplicationStatus.SUCCEEDED) {
+            if (launcerExecutedProperly) {
                 handleActionData();
                 if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
                     System.out.println();
@@ -218,36 +221,87 @@ public class LauncherAM {
             System.err.println("Launcher AM execution failed");
             e.printStackTrace(System.out);
             e.printStackTrace(System.err);
-            finalStatus = FinalApplicationStatus.FAILED;
-            eHolder.setErrorCause(e);
-            eHolder.setErrorMessage(e.getMessage());
+            launcerExecutedProperly = false;
+            if (!errorHolder.isPopulated()) {
+                errorHolder.setErrorCause(e);
+                errorHolder.setErrorMessage(e.getMessage());
+            }
             throw e;
         } finally {
             try {
-                // Store final status in case Launcher AM falls off the RM
-                actionData.put(ACTION_DATA_FINAL_STATUS, 
finalStatus.toString());
-                if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
-                    failLauncher(eHolder);
+                ErrorHolder callbackErrorHolder = callbackHandler.getError();
+
+                if (launcerExecutedProperly) {
+                    actionResult = backgroundAction ? 
OozieActionResult.RUNNING : OozieActionResult.SUCCEEDED;
                 }
-                uploadActionDataToHDFS(ugi);
+
+                if (!launcerExecutedProperly) {
+                    updateActionDataWithFailure(errorHolder, actionData);
+                } else if (callbackErrorHolder != null) {  // async error from 
the callback
+                    actionResult = OozieActionResult.FAILED;
+                    updateActionDataWithFailure(callbackErrorHolder, 
actionData);
+                }
+
+                actionData.put(ACTION_DATA_FINAL_STATUS, 
actionResult.toString());
+                hdfsOperations.uploadActionDataToHDFS(launcherJobConf, 
actionDir, actionData);
             } finally {
                 try {
-                    unregisterWithRM(finalStatus, eHolder.getErrorMessage());
+                    unregisterWithRM(actionResult, 
errorHolder.getErrorMessage());
                 } finally {
-                    LauncherAMCallbackNotifier cn = new 
LauncherAMCallbackNotifier(launcherJobConf);
-                    cn.notifyURL(finalStatus, backgroundAction);
+                    LauncherAMCallbackNotifier cn = 
callbackNotifierFactory.createCallbackNotifier(launcherJobConf);
+                    cn.notifyURL(actionResult);
                 }
             }
         }
     }
 
-    private static void registerWithRM() throws IOException, YarnException {
-        AMRMClient<AMRMClient.ContainerRequest> amRmClient = 
AMRMClient.createAMRMClient();
+    @VisibleForTesting
+    Map<String, String> getActionData() {
+        return actionData;
+    }
+
+    private void printDebugInfo() throws IOException {
+        localFsOperations.printContentsOfDir(new File("."));
+
+        System.out.println();
+        System.out.println("Oozie Launcher Application Master configuration");
+        System.out.println("===============================================");
+        System.out.println("Workflow job id   : " + 
launcherJobConf.get(OOZIE_JOB_ID));
+        System.out.println("Workflow action id: " + 
launcherJobConf.get(OOZIE_ACTION_ID));
+        System.out.println();
+        System.out.println("Classpath         :");
+        System.out.println("------------------------");
+        StringTokenizer st = new 
StringTokenizer(System.getProperty(JAVA_CLASS_PATH), ":");
+        while (st.hasMoreTokens()) {
+            System.out.println("  " + st.nextToken());
+        }
+        System.out.println("------------------------");
+        System.out.println();
+        String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+        System.out.println("Main class        : " + mainClass);
+        System.out.println();
+        System.out.println("Maximum output    : "
+                + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 
* 1024));
+        System.out.println();
+
+        System.out.println();
+        System.out.println("Java System Properties:");
+        System.out.println("------------------------");
+        System.getProperties().store(System.out, "");
+        System.out.flush();
+        System.out.println("------------------------");
+        System.out.println();
+
+        
System.out.println("=================================================================");
+        System.out.println();
+        System.out.println(">>> Invoking Main class now >>>");
+        System.out.println();
+        System.out.flush();
+    }
 
-        AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
-        // TODO: OYA: make heartbeat interval configurable
-        // TODO: OYA: make heartbeat interval higher to put less load on RM, 
but lower than timeout
-        amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, 
60000, callBackHandler);
+    private void registerWithRM() throws IOException, YarnException {
+        // TODO: OYA: make heartbeat interval configurable & make interval 
higher to put less load on RM, but lower than timeout
+        amRmClientAsync = amRmClientAsyncFactory.createAMRMClientAsync(60000);
         amRmClientAsync.init(new Configuration(launcherJobConf));
         amRmClientAsync.start();
 
@@ -255,28 +309,24 @@ public class LauncherAM {
         amRmClientAsync.registerApplicationMaster("", 0, "");
     }
 
-    private static void unregisterWithRM(FinalApplicationStatus status, String 
message) throws YarnException, IOException {
+    private void unregisterWithRM(OozieActionResult actionResult, String 
message) throws YarnException, IOException {
         if (amRmClientAsync != null) {
             System.out.println("Stopping AM");
             try {
                 message = (message == null) ? "" : message;
                 // tracking url is determined automatically
-                amRmClientAsync.unregisterApplicationMaster(status, message, 
"");
-            } catch (YarnException ex) {
-                System.err.println("Error un-registering AM client");
-                throw ex;
-            } catch (IOException ex) {
+                
amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), 
message, "");
+            } catch (Exception ex) {
                 System.err.println("Error un-registering AM client");
                 throw ex;
             } finally {
                 amRmClientAsync.stop();
-                amRmClientAsync = null;
             }
         }
     }
 
     // Method to execute the prepare actions
-    private static void executePrepare(UserGroupInformation ugi) throws 
Exception {
+    private void executePrepare(UserGroupInformation ugi) throws Exception {
         Exception e = ugi.doAs(new PrivilegedAction<Exception>() {
             @Override
             public Exception run() {
@@ -286,7 +336,7 @@ public class LauncherAM {
                         if (prepareXML.length() != 0) {
                             Configuration actionConf = new 
Configuration(launcherJobConf);
                             actionConf.addResource(ACTION_CONF_XML);
-                            PrepareActionsDriver.doOperations(prepareXML, 
actionConf);
+                            prepareHandler.prepareAction(prepareXML, 
actionConf);
                         } else {
                             System.out.println("There are no prepare actions 
to execute.");
                         }
@@ -304,17 +354,13 @@ public class LauncherAM {
         }
     }
 
-    // FIXME - figure out what is actually needed here
-    private static void setupMainConfiguration() throws IOException {
-//        Path pathNew = new Path(new Path(actionDir, ACTION_CONF_XML), new 
Path(new File(ACTION_CONF_XML).getAbsolutePath()));
-//        FileSystem fs = FileSystem.get(pathNew.toUri(), getJobConf());
-//        fs.copyToLocalFile(new Path(actionDir, ACTION_CONF_XML), new 
Path(new File(ACTION_CONF_XML).getAbsolutePath()));
-
-        System.setProperty("oozie.launcher.job.id", 
launcherJobConf.get("oozie.job.id"));
-//        System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID));
-//        System.setProperty(OOZIE_ACTION_ID, 
launcherJobConf.get(OOZIE_ACTION_ID));
-        System.setProperty("oozie.action.conf.xml", new 
File(ACTION_CONF_XML).getAbsolutePath());
-        System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, new 
File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath());
+    private void setupMainConfiguration() throws IOException {
+        System.setProperty(OOZIE_LAUNCHER_JOB_ID, 
launcherJobConf.get(OOZIE_JOB_ID));
+        System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID));
+        System.setProperty(OOZIE_ACTION_ID, 
launcherJobConf.get(OOZIE_ACTION_ID));
+        System.setProperty(OOZIE_ACTION_CONF_XML, new 
File(ACTION_CONF_XML).getAbsolutePath());
+        System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS,
+                new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_STATS, new 
File(ACTION_DATA_STATS).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new 
File(ACTION_DATA_NEW_ID).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new 
File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
@@ -326,33 +372,28 @@ public class LauncherAM {
         } else {
             System.setProperty("oozie.job.launch.time", 
String.valueOf(System.currentTimeMillis()));
         }
-
-//        String actionConfigClass = 
getJobConf().get(OOZIE_ACTION_CONFIG_CLASS);
-//        if (actionConfigClass != null) {
-//            System.setProperty(OOZIE_ACTION_CONFIG_CLASS, actionConfigClass);
-//        }
     }
 
-    private static FinalApplicationStatus runActionMain(final String[] 
mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) {
-        final AtomicReference<FinalApplicationStatus> finalStatus = new 
AtomicReference<FinalApplicationStatus>(FinalApplicationStatus.FAILED);
+    private boolean runActionMain(final String[] mainArgs, final ErrorHolder 
eHolder, UserGroupInformation ugi) {
+        // using AtomicBoolean because we want to modify it inside run()
+        final AtomicBoolean actionMainExecutedProperly = new 
AtomicBoolean(false);
 
         ugi.doAs(new PrivilegedAction<Void>() {
             @Override
             public Void run() {
-                LauncherSecurityManager secMan = new LauncherSecurityManager();
                 try {
                     Class<?> klass = 
launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
                     System.out.println("Launcher class: " + klass.toString());
                     System.out.flush();
                     Method mainMethod = klass.getMethod("main", 
String[].class);
                     // Enable LauncherSecurityManager to catch System.exit 
calls
-                    secMan.set();
+                    launcherSecurityManager.set();
                     mainMethod.invoke(null, (Object) mainArgs);
 
                     System.out.println();
                     System.out.println("<<< Invocation of Main class completed 
<<<");
                     System.out.println();
-                    finalStatus.set(FinalApplicationStatus.SUCCEEDED);
+                    actionMainExecutedProperly.set(true);
                 } catch (InvocationTargetException ex) {
                     ex.printStackTrace(System.out);
                     // Get what actually caused the exception
@@ -362,24 +403,31 @@ public class LauncherAM {
                         cause = cause.getCause();
                     }
                     if (LauncherMainException.class.isInstance(cause)) {
+                        int errorCode = ((LauncherMainException) 
ex.getCause()).getErrorCode();
                         String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
                         eHolder.setErrorMessage("Main Class [" + mainClass + 
"], exit code [" +
-                                ((LauncherMainException) 
ex.getCause()).getErrorCode() + "]");
+                                errorCode + "]");
+                        eHolder.setErrorCode(errorCode);
                     } else if (SecurityException.class.isInstance(cause)) {
-                        if (secMan.getExitInvoked()) {
-                            System.out.println("Intercepting System.exit(" + 
secMan.getExitCode()
-                                    + ")");
-                            System.err.println("Intercepting System.exit(" + 
secMan.getExitCode()
-                                    + ")");
+                        if (launcherSecurityManager.getExitInvoked()) {
+                            final int exitCode = 
launcherSecurityManager.getExitCode();
+                            System.out.println("Intercepting System.exit(" + 
exitCode + ")");
+                            System.err.println("Intercepting System.exit(" + 
exitCode + ")");
                             // if 0 main() method finished successfully
                             // ignoring
-                            eHolder.setErrorCode(secMan.getExitCode());
-                            if (eHolder.getErrorCode() != 0) {
+                            eHolder.setErrorCode(exitCode);
+                            if (exitCode != 0) {
                                 String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
-                                eHolder.setErrorMessage("Main Class [" + 
mainClass + "], exit code [" + eHolder.getErrorCode() + "]");
+                                eHolder.setErrorMessage("Main Class [" + 
mainClass + "],"
+                                        + " exit code [" + 
eHolder.getErrorCode() + "]");
                             } else {
-                                
finalStatus.set(FinalApplicationStatus.SUCCEEDED);
+                                actionMainExecutedProperly.set(true);
                             }
+                        } else {
+                            // just SecurityException, no exit was invoked
+                            eHolder.setErrorCode(0);
+                            eHolder.setErrorCause(cause);
+                            eHolder.setErrorMessage(cause.getMessage());
                         }
                     } else {
                         eHolder.setErrorMessage(cause.getMessage());
@@ -393,144 +441,55 @@ public class LauncherAM {
                     System.out.flush();
                     System.err.flush();
                     // Disable LauncherSecurityManager
-                    secMan.unset();
+                    launcherSecurityManager.unset();
                 }
 
                 return null;
             }
         });
 
-        return finalStatus.get();
+        return actionMainExecutedProperly.get();
     }
 
-    private static void handleActionData() throws IOException {
+    private void handleActionData() throws IOException {
         // external child IDs
-        String externalChildIdsProp = System.getProperty(ACTION_PREFIX
-                + ACTION_DATA_EXTERNAL_CHILD_IDS);
-        if (externalChildIdsProp != null) {
-            File externalChildIDs = new File(externalChildIdsProp);
-            if (externalChildIDs.exists()) {
-                actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, 
getLocalFileContentStr(externalChildIDs, "", -1));
-            }
-        }
+        processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, 
null, ACTION_DATA_EXTERNAL_CHILD_IDS, -1, ACTIONOUTPUTTYPE_EXT_CHILD_ID);
 
         // external stats
-        String statsProp = System.getProperty(ACTION_PREFIX + 
ACTION_DATA_STATS);
-        if (statsProp != null) {
-            File actionStatsData = new File(statsProp);
-            if (actionStatsData.exists()) {
-                int statsMaxOutputData = 
launcherJobConf.getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
-                        Integer.MAX_VALUE);
-                actionData.put(ACTION_DATA_STATS,
-                        getLocalFileContentStr(actionStatsData, "Stats", 
statsMaxOutputData));
-            }
-        }
+        processActionData(ACTION_PREFIX + ACTION_DATA_STATS, 
CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, ACTION_DATA_STATS, Integer.MAX_VALUE, 
ACTIONOUTPUTTYPE_STATS);
 
         // output data
-        String outputProp = System.getProperty(ACTION_PREFIX + 
ACTION_DATA_OUTPUT_PROPS);
-        if (outputProp != null) {
-            File actionOutputData = new File(outputProp);
-            if (actionOutputData.exists()) {
-                int maxOutputData = 
launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
-                actionData.put(ACTION_DATA_OUTPUT_PROPS,
-                        getLocalFileContentStr(actionOutputData, "Output", 
maxOutputData));
-            }
-        }
+        processActionData(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, 
CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, ACTION_DATA_OUTPUT_PROPS, 2048, 
ACTIONOUTPUTTYPE_OUTPUT);
 
         // id swap
-        String newIdProp = System.getProperty(ACTION_PREFIX + 
ACTION_DATA_NEW_ID);
-        if (newIdProp != null) {
-            File newId = new File(newIdProp);
-            if (newId.exists()) {
-                actionData.put(ACTION_DATA_NEW_ID, 
getLocalFileContentStr(newId, "", -1));
-            }
-        }
+        processActionData(ACTION_PREFIX + ACTION_DATA_NEW_ID, null, 
ACTION_DATA_NEW_ID, -1, ACTIONOUTPUTTYPE_ID_SWAP);
     }
 
-    public static String getLocalFileContentStr(File file, String type, int 
maxLen) throws IOException {
-        StringBuilder sb = new StringBuilder();
-        Reader reader = null;
-        try {
-            reader = new BufferedReader(new FileReader(file));
-            char[] buffer = new char[2048];
-            int read;
-            int count = 0;
-            while ((read = reader.read(buffer)) > -1) {
-                count += read;
-                if (maxLen > -1 && count > maxLen) {
-                    throw new IOException(type + " data exceeds its limit [" + 
maxLen + "]");
-                }
-                sb.append(buffer, 0, read);
-            }
-        } finally {
-            if (reader != null) {
-                reader.close();
-            }
-        }
-        return sb.toString();
-    }
-
-    private static void uploadActionDataToHDFS(UserGroupInformation ugi) 
throws IOException {
-        IOException ioe = ugi.doAs(new PrivilegedAction<IOException>() {
-            @Override
-            public IOException run() {
-                Path finalPath = new Path(actionDir, 
ACTION_DATA_SEQUENCE_FILE);
-                // upload into sequence file
-                System.out.println("Oozie Launcher, uploading action data to 
HDFS sequence file: "
-                        + new Path(actionDir, 
ACTION_DATA_SEQUENCE_FILE).toUri());
+    private void processActionData(String propertyName, String 
maxSizePropertyName, String actionDataPropertyName, int maxSizeDefault, String 
type) throws IOException {
+        String propValue = System.getProperty(propertyName);
+        int maxSize = maxSizeDefault;
 
-                SequenceFile.Writer wr = null;
-                try {
-                    wr = SequenceFile.createWriter(launcherJobConf,
-                            SequenceFile.Writer.file(finalPath),
-                            SequenceFile.Writer.keyClass(Text.class),
-                            SequenceFile.Writer.valueClass(Text.class));
-                    if (wr != null) {
-                        Set<String> keys = actionData.keySet();
-                        for (String propsKey : keys) {
-                            wr.append(new Text(propsKey), new 
Text(actionData.get(propsKey)));
-                        }
-                    } else {
-                        throw new IOException("SequenceFile.Writer is null for 
" + finalPath);
-                    }
-                } catch (IOException e) {
-                    e.printStackTrace();
-                    return e;
-                } finally {
-                    if (wr != null) {
-                        try {
-                            wr.close();
-                        } catch (IOException e) {
-                            e.printStackTrace();
-                            return e;
-                        }
-                    }
-                }
+        if (maxSizePropertyName != null) {
+            maxSize = launcherJobConf.getInt(maxSizePropertyName, 
maxSizeDefault);
+        }
 
-                return null;
+        if (propValue != null) {
+            File actionDataFile = new File(propValue);
+            if (localFsOperations.fileExists(actionDataFile)) {
+                actionData.put(actionDataPropertyName, 
localFsOperations.getLocalFileContentAsString(actionDataFile, type, maxSize));
             }
-        });
-
-        if (ioe != null) {
-            throw ioe;
         }
     }
 
-    private static void failLauncher(int errorCode, String message, Throwable 
ex) {
-        ErrorHolder eHolder = new ErrorHolder();
-        eHolder.setErrorCode(errorCode);
-        eHolder.setErrorMessage(message);
-        eHolder.setErrorCause(ex);
-        failLauncher(eHolder);
-    }
-
-    private static void failLauncher(ErrorHolder eHolder) {
-        if (eHolder.getErrorCause() != null) {
+    private void updateActionDataWithFailure(ErrorHolder eHolder, Map<String, 
String> actionData) {
+        if (eHolder.getErrorCause() != null && 
eHolder.getErrorCause().getMessage() != null) {
             eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + 
eHolder.getErrorCause().getMessage());
         }
+
         Properties errorProps = new Properties();
         errorProps.setProperty("error.code", 
Integer.toString(eHolder.getErrorCode()));
-        errorProps.setProperty("error.reason", eHolder.getErrorMessage());
+        String errorMessage = eHolder.getErrorMessage() == null ? "<empty>" : 
eHolder.getErrorMessage();
+        errorProps.setProperty("error.reason", errorMessage);
         if (eHolder.getErrorCause() != null) {
             if (eHolder.getErrorCause().getMessage() != null) {
                 errorProps.setProperty("exception.message", 
eHolder.getErrorCause().getMessage());
@@ -541,18 +500,19 @@ public class LauncherAM {
             pw.close();
             errorProps.setProperty("exception.stacktrace", sw.toString());
         }
+
         StringWriter sw = new StringWriter();
         try {
             errorProps.store(sw, "");
             sw.close();
-            actionData.put(ACTION_DATA_ERROR_PROPS, sw.toString());
+            actionData.put(LauncherAM.ACTION_DATA_ERROR_PROPS, sw.toString());
 
             // external child IDs
-            String externalChildIdsProp = System.getProperty(ACTION_PREFIX + 
ACTION_DATA_EXTERNAL_CHILD_IDS);
+            String externalChildIdsProp = 
System.getProperty(LauncherAM.ACTION_PREFIX + 
LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
             if (externalChildIdsProp != null) {
                 File externalChildIDs = new File(externalChildIdsProp);
-                if (externalChildIDs.exists()) {
-                    actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, 
getLocalFileContentStr(externalChildIDs, "", -1));
+                if (localFsOperations.fileExists(externalChildIDs)) {
+                    actionData.put(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS, 
localFsOperations.getLocalFileContentAsString(externalChildIDs, 
ACTIONOUTPUTTYPE_EXT_CHILD_ID, -1));
                 }
             }
         } catch (IOException ioe) {
@@ -568,50 +528,17 @@ public class LauncherAM {
         }
     }
 
-    private static class AMRMCallBackHandler implements 
AMRMClientAsync.CallbackHandler {
-        @Override
-        public void onContainersCompleted(List<ContainerStatus> 
containerStatuses) {
-            //noop
-        }
-
-        @Override
-        public void onContainersAllocated(List<Container> containers) {
-            //noop
-        }
-
-        @Override
-        public void onShutdownRequest() {
-            failLauncher(0, "ResourceManager requested AM Shutdown", null);
-            // TODO: OYA: interrupt?
-        }
-
-        @Override
-        public void onNodesUpdated(List<NodeReport> nodeReports) {
-            //noop
-        }
-
-        @Override
-        public float getProgress() {
-            return 0.5f;    //TODO: OYA: maybe some action types can report 
better progress?
-        }
-
-        @Override
-        public void onError(final Throwable ex) {
-            failLauncher(0, ex.getMessage(), ex);
-            // TODO: OYA: interrupt?
-        }
-    }
-
-    public static String[] getMainArguments(Configuration conf) {
+    private String[] getMainArguments(Configuration conf) {
         String[] args = new 
String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
 
         for (int i = 0; i < args.length; i++) {
             args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
         }
+
         return args;
     }
 
-    private static class LauncherSecurityManager extends SecurityManager {
+    public static class LauncherSecurityManager extends SecurityManager {
         private boolean exitInvoked;
         private int exitCode;
         private SecurityManager securityManager;
@@ -666,69 +593,20 @@ public class LauncherAM {
         }
     }
 
+    public enum OozieActionResult {
+        SUCCEEDED(FinalApplicationStatus.SUCCEEDED),
+        FAILED(FinalApplicationStatus.FAILED),
+        RUNNING(FinalApplicationStatus.SUCCEEDED);
 
-    /**
-     * Print files and directories in current directory. Will list files in 
the sub-directory (only 1 level deep)
-     */
-    protected static void printContentsOfCurrentDir() {
-        File folder = new File(".");
-        System.out.println();
-        System.out.println("Files in current dir:" + folder.getAbsolutePath());
-        System.out.println("======================");
-
-        File[] listOfFiles = folder.listFiles();
-        for (File fileName : listOfFiles) {
-            if (fileName.isFile()) {
-                System.out.println("File: " + fileName.getName());
-            } else if (fileName.isDirectory()) {
-                System.out.println("Dir: " + fileName.getName());
-                File subDir = new File(fileName.getName());
-                File[] moreFiles = subDir.listFiles();
-                for (File subFileName : moreFiles) {
-                    if (subFileName.isFile()) {
-                        System.out.println("  File: " + subFileName.getName());
-                    } else if (subFileName.isDirectory()) {
-                        System.out.println("  Dir: " + subFileName.getName());
-                    }
-                }
-            }
-        }
-    }
-
-    protected static Configuration readLauncherConf() {
-        File confFile = new File(LAUNCHER_JOB_CONF_XML);
-        Configuration conf = new Configuration(false);
-        conf.addResource(new Path(confFile.getAbsolutePath()));
-        return conf;
-    }
-
-    protected static class ErrorHolder {
-        private int errorCode = 0;
-        private Throwable errorCause = null;
-        private String errorMessage = null;
-
-        public int getErrorCode() {
-            return errorCode;
-        }
-
-        public void setErrorCode(int errorCode) {
-            this.errorCode = errorCode;
-        }
-
-        public Throwable getErrorCause() {
-            return errorCause;
-        }
-
-        public void setErrorCause(Throwable errorCause) {
-            this.errorCause = errorCause;
-        }
+        // YARN-equivalent status
+        private FinalApplicationStatus yarnStatus;
 
-        public String getErrorMessage() {
-            return errorMessage;
+        OozieActionResult(FinalApplicationStatus yarnStatus) {
+            this.yarnStatus = yarnStatus;
         }
 
-        public void setErrorMessage(String errorMessage) {
-            this.errorMessage = errorMessage;
+        public FinalApplicationStatus getYarnStatus() {
+            return yarnStatus;
         }
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
index 23648b8..2972658 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
@@ -18,7 +18,7 @@
 package org.apache.oozie.action.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
 
 import java.io.IOException;
 import java.net.HttpURLConnection;
@@ -26,7 +26,6 @@ import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.net.Proxy;
 import java.net.URL;
-import java.util.EnumSet;
 
 // Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier
 /**
@@ -36,7 +35,6 @@ import java.util.EnumSet;
 public class LauncherAMCallbackNotifier {
     private static final String OOZIE_LAUNCHER_CALLBACK = 
"oozie.launcher.callback.";
     private static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000;
-    private static final EnumSet<FinalApplicationStatus> 
FAILED_APPLICATION_STATES = EnumSet.of(FinalApplicationStatus.KILLED, 
FinalApplicationStatus.FAILED);
 
     public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = 
OOZIE_LAUNCHER_CALLBACK + "retry.attempts";
     public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = 
OOZIE_LAUNCHER_CALLBACK + "retry.interval";
@@ -136,11 +134,11 @@ public class LauncherAMCallbackNotifier {
 
     /**
      * Notify a server of the completion of a submitted job.
-     * @param finalStatus The Application Status
+     * @param actionResult The Action Result (failed/succeeded/running)
      *
      * @throws InterruptedException
      */
-    public void notifyURL(FinalApplicationStatus finalStatus, boolean 
backgroundAction) throws InterruptedException {
+    public void notifyURL(OozieActionResult actionResult) throws 
InterruptedException {
         // Do we need job-end notification?
         if (userUrl == null) {
             System.out.println("Callback notification URL not set, skipping.");
@@ -149,12 +147,7 @@ public class LauncherAMCallbackNotifier {
 
         //Do string replacements for final status
         if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) {
-            // only send back "RUNNING" if the submission was successful
-            if (backgroundAction && 
!FAILED_APPLICATION_STATES.contains(finalStatus)) {
-                userUrl = 
userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, "RUNNING");
-            } else {
-                userUrl = 
userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, 
finalStatus.toString());
-            }
+            userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, 
actionResult.toString());
         }
 
         // Create the URL, ensure sanity

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
new file mode 100644
index 0000000..688424b
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class LauncherAMCallbackNotifierFactory {
+
+    public LauncherAMCallbackNotifier createCallbackNotifier(Configuration 
conf) {
+        return new LauncherAMCallbackNotifier(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
new file mode 100644
index 0000000..011ce93
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.EnumSet;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class LocalFsOperations {
+    private static final int WALK_DEPTH = 2;
+
+    /**
+     * Reads the launcher configuration "launcher.xml"
+     * @return Configuration object
+     */
+    public Configuration readLauncherConf() {
+        File confFile = new File(LauncherAM.LAUNCHER_JOB_CONF_XML);
+        Configuration conf = new Configuration(false);
+        conf.addResource(new 
org.apache.hadoop.fs.Path(confFile.getAbsolutePath()));
+        return conf;
+    }
+
+    /**
+     * Print files and directories in current directory. Will list files in 
the sub-directory (only 2 level deep)
+     * @throws IOException
+     */
+    public void printContentsOfDir(File folder) throws IOException {
+        System.out.println();
+        System.out.println("Files in current dir:" + folder.getAbsolutePath());
+        System.out.println("======================");
+
+        final Path root = folder.toPath();
+        Files.walkFileTree(root, EnumSet.of(FileVisitOption.FOLLOW_LINKS), 
WALK_DEPTH, new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes 
attrs) throws IOException {
+                if (attrs.isRegularFile()) {
+                    System.out.println("  File: " + root.relativize(file));
+                } else if (attrs.isDirectory()) {
+                    System.out.println("  Dir: " +  root.relativize(file) + 
"/");
+                }
+
+                return FileVisitResult.CONTINUE;
+            }
+        });
+    }
+
+    /**
+     * Returns the contents of a file as string.
+     *
+     * @param file the File object which represents the file to be read
+     * @param type Type of the file
+     * @param maxLen Maximum allowed length
+     * @return The file contents as string
+     * @throws IOException if the file is bigger than maxLen or there is any 
I/O error
+     * @throws FileNotFoundException if the file does not exist
+     */
+    public String getLocalFileContentAsString(File file, String type, int 
maxLen) throws IOException {
+        if (file.exists()) {
+            if (maxLen > -1 && file.length() > maxLen) {
+                throw new IOException(type + " data exceeds its limit [" + 
maxLen + "]");
+            }
+
+            return com.google.common.io.Files.toString(file, 
StandardCharsets.UTF_8);
+        } else {
+            throw new FileNotFoundException("File not found: " + 
file.toPath().toAbsolutePath());
+        }
+    }
+
+    /**
+     * Checks if a given File exists or not. This method helps writing unit 
tests.
+     */
+    public boolean fileExists(File file) {
+        return file.exists();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
index 4a51d48..cb5b1ac 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
@@ -38,7 +38,9 @@ import javax.xml.parsers.ParserConfigurationException;
  * Utility class to perform operations on the prepare block of Workflow
  *
  */
+@Deprecated
 public class PrepareActionsDriver {
+    private static final PrepareActionsHandler prepareHandler = new 
PrepareActionsHandler();
 
     /**
      * Method to parse the prepare XML and execute the corresponding prepare 
actions
@@ -48,41 +50,7 @@ public class PrepareActionsDriver {
      */
     static void doOperations(String prepareXML, Configuration conf)
             throws IOException, SAXException, ParserConfigurationException, 
LauncherException {
-        Document doc = getDocumentFromXML(prepareXML);
-        doc.getDocumentElement().normalize();
-
-        // Get the list of child nodes, basically, each one corresponding to a 
separate action
-        NodeList nl = doc.getDocumentElement().getChildNodes();
-        LauncherURIHandlerFactory factory = new 
LauncherURIHandlerFactory(conf);
-
-        for (int i = 0; i < nl.getLength(); ++i) {
-            Node n = nl.item(i);
-            String operation = n.getNodeName();
-            if (n.getAttributes() == null || 
n.getAttributes().getNamedItem("path") == null) {
-                continue;
-            }
-            String pathStr = 
n.getAttributes().getNamedItem("path").getNodeValue().trim();
-            // use Path to avoid URIsyntax error caused by square bracket in 
glob
-            URI uri = new Path(pathStr).toUri();
-            LauncherURIHandler handler = factory.getURIHandler(uri);
-            execute(operation, uri, handler, conf);
-        }
-    }
-
-    /**
-     * Method to execute the prepare actions based on the command
-     *
-     * @param n Child node of the prepare XML
-     * @throws LauncherException
-     */
-    private static void execute(String operation, URI uri, LauncherURIHandler 
handler, Configuration conf)
-            throws LauncherException {
-        if (operation.equals("delete")) {
-            handler.delete(uri, conf);
-        }
-        else if (operation.equals("mkdir")) {
-            handler.create(uri, conf);
-        }
+        prepareHandler.prepareAction(prepareXML, conf);
     }
 
     // Method to return the document from the prepare XML block

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
new file mode 100644
index 0000000..b5377b1
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+public class PrepareActionsHandler {
+
+    /**
+     * Method to parse the prepare XML and execute the corresponding prepare 
actions
+     *
+     * @param prepareXML Prepare XML block in string format
+     * @throws LauncherException
+     */
+    public void prepareAction(String prepareXML, Configuration conf)
+            throws IOException, SAXException, ParserConfigurationException, 
LauncherException {
+        Document doc = getDocumentFromXML(prepareXML);
+        doc.getDocumentElement().normalize();
+
+        // Get the list of child nodes, basically, each one corresponding to a 
separate action
+        NodeList nl = doc.getDocumentElement().getChildNodes();
+        LauncherURIHandlerFactory factory = new 
LauncherURIHandlerFactory(conf);
+
+        for (int i = 0; i < nl.getLength(); ++i) {
+            Node n = nl.item(i);
+            String operation = n.getNodeName();
+            if (n.getAttributes() == null || 
n.getAttributes().getNamedItem("path") == null) {
+                continue;
+            }
+            String pathStr = 
n.getAttributes().getNamedItem("path").getNodeValue().trim();
+            // use Path to avoid URIsyntax error caused by square bracket in 
glob
+            URI uri = new Path(pathStr).toUri();
+            LauncherURIHandler handler = factory.getURIHandler(uri);
+            execute(operation, uri, handler, conf);
+        }
+    }
+
+    private void execute(String operation, URI uri, LauncherURIHandler 
handler, Configuration conf)
+            throws LauncherException {
+
+        switch (operation) {
+            case "delete":
+                handler.delete(uri, conf);
+                break;
+
+            case "mkdir":
+                handler.create(uri, conf);
+                break;
+
+            default:
+                System.out.println("Warning: unknown prepare operation " + 
operation + " -- skipping");
+            }
+    }
+
+    // Method to return the document from the prepare XML block
+    static Document getDocumentFromXML(String prepareXML) throws 
ParserConfigurationException, SAXException,
+            IOException {
+        DocumentBuilderFactory docBuilderFactory = 
DocumentBuilderFactory.newInstance();
+        docBuilderFactory.setNamespaceAware(true);
+        // support for includes in the xml file
+        docBuilderFactory.setXIncludeAware(true);
+        // ignore all comments inside the xml file
+        docBuilderFactory.setIgnoringComments(true);
+        docBuilderFactory.setExpandEntityReferences(false);
+        
docBuilderFactory.setFeature("http://apache.org/xml/features/disallow-doctype-decl";,
 true);
+        DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
+        InputStream is = new 
ByteArrayInputStream(prepareXML.getBytes("UTF-8"));
+        return docBuilder.parse(is);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/9151f4eb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
new file mode 100644
index 0000000..c73c13f
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class SequenceFileWriterFactory {
+
+    public SequenceFile.Writer createSequenceFileWriter(Configuration 
launcherJobConf, Path finalPath, Class<?> keyClass, Class<?> valueClass) throws 
IOException {
+        return SequenceFile.createWriter(launcherJobConf,
+                SequenceFile.Writer.file(finalPath),
+                SequenceFile.Writer.keyClass(keyClass),
+                SequenceFile.Writer.valueClass(valueClass));
+    }
+}

Reply via email to