Repository: oozie
Updated Branches:
  refs/heads/oya a37835fec -> fea512cf6


http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
new file mode 100644
index 0000000..e056acc
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Permission;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+public class LauncherAM {
+
+    static final String CONF_OOZIE_ACTION_MAIN_CLASS = 
"oozie.launcher.action.main.class";
+
+    static final String ACTION_PREFIX = "oozie.action.";
+    public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = 
ACTION_PREFIX + "max.output.data";
+    static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + 
"main.arg.";
+    static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = ACTION_PREFIX + 
CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count";
+    static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = 
"oozie.external.stats.max.size";
+
+    static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path";
+    static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml";
+    static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // 
COMBO FILE
+    static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs";
+    static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
+    static final String ACTION_DATA_STATS = "stats.properties";
+    static final String ACTION_DATA_NEW_ID = "newId";
+    static final String ACTION_DATA_ERROR_PROPS = "error.properties";
+
+    // TODO: OYA: more unique file names?  action.xml may be stuck for 
backwards compat though
+    public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml";
+    public static final String ACTION_CONF_XML = "action.xml";
+    public static final String ACTION_DATA_FINAL_STATUS = "final.status";
+
+    private static AMRMClientAsync<AMRMClient.ContainerRequest> 
amRmClientAsync = null;
+    private static Configuration launcherJobConf = null;
+    private static Path actionDir;
+    private static Map<String, String> actionData = new 
HashMap<String,String>();
+
+    private static void printDebugInfo(String[] mainArgs) throws IOException {
+        printContentsOfCurrentDir();
+
+        System.out.println();
+        System.out.println("Oozie Launcher Application Master configuration");
+        System.out.println("===============================================");
+        System.out.println("Workflow job id   : " + 
launcherJobConf.get("oozie.job.id"));
+        System.out.println("Workflow action id: " + 
launcherJobConf.get("oozie.action.id"));
+        System.out.println();
+        System.out.println("Classpath         :");
+        System.out.println("------------------------");
+        StringTokenizer st = new 
StringTokenizer(System.getProperty("java.class.path"), ":");
+        while (st.hasMoreTokens()) {
+            System.out.println("  " + st.nextToken());
+        }
+        System.out.println("------------------------");
+        System.out.println();
+        String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+        System.out.println("Main class        : " + mainClass);
+        System.out.println();
+        System.out.println("Maximum output    : "
+                + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 
* 1024));
+        System.out.println();
+        System.out.println("Arguments         :");
+        for (String arg : mainArgs) {
+            System.out.println("                    " + arg);
+        }
+
+        System.out.println();
+        System.out.println("Java System Properties:");
+        System.out.println("------------------------");
+        System.getProperties().store(System.out, "");
+        System.out.flush();
+        System.out.println("------------------------");
+        System.out.println();
+
+        
System.out.println("=================================================================");
+        System.out.println();
+        System.out.println(">>> Invoking Main class now >>>");
+        System.out.println();
+        System.out.flush();
+    }
+
+    // TODO: OYA: delete me when making real Action Mains
+    public static class DummyMain {
+        public static void main(String[] args) throws Exception {
+            System.out.println("Hello World!");
+            if (launcherJobConf.get("foo", "0").equals("1")) {
+                throw new IOException("foo 1");
+            } else if (launcherJobConf.get("foo", "0").equals("2")) {
+                throw new JavaMainException(new IOException("foo 2"));
+            } else if (launcherJobConf.get("foo", "0").equals("3")) {
+                throw new LauncherMainException(3);
+            } else if (launcherJobConf.get("foo", "0").equals("4")) {
+                System.exit(0);
+            } else if (launcherJobConf.get("foo", "0").equals("5")) {
+                System.exit(1);
+            }
+        }
+    }
+
+    // TODO: OYA: rethink all print messages and formatting
+    public static void main(String[] AMargs) throws Exception {
+        ErrorHolder eHolder = new ErrorHolder();
+        FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
+        try {
+            try {
+                launcherJobConf = readLauncherConf();
+                System.out.println("Launcher AM configuration loaded");
+            } catch (Exception ex) {
+                eHolder.setErrorMessage("Could not load the Launcher AM 
configuration file");
+                eHolder.setErrorCause(ex);
+                throw ex;
+            }
+
+            registerWithRM();
+
+            actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH));
+
+            try {
+                System.out.println("\nStarting the execution of prepare 
actions");
+                executePrepare();
+                System.out.println("Completed the execution of prepare actions 
successfully");
+            } catch (Exception ex) {
+                eHolder.setErrorMessage("Prepare execution in the Launcher AM 
has failed");
+                eHolder.setErrorCause(ex);
+                throw ex;
+            }
+
+            String[] mainArgs = getMainArguments(launcherJobConf);
+
+            // TODO: OYA: should we allow turning this off?
+            // TODO: OYA: what should default be?
+            if (launcherJobConf.getBoolean("oozie.launcher.print.debug.info", 
true)) {
+                printDebugInfo(mainArgs);
+            }
+            finalStatus = runActionMain(mainArgs, eHolder);
+            if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
+                handleActionData();
+                if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
+                    System.out.println();
+                    System.out.println("Oozie Launcher, capturing output 
data:");
+                    System.out.println("=======================");
+                    
System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
+                    System.out.println();
+                    System.out.println("=======================");
+                    System.out.println();
+                }
+                if (actionData.get(ACTION_DATA_NEW_ID) != null) {
+                    System.out.println();
+                    System.out.println("Oozie Launcher, propagating new Hadoop 
job id to Oozie");
+                    System.out.println("=======================");
+                    System.out.println(actionData.get(ACTION_DATA_NEW_ID));
+                    System.out.println("=======================");
+                    System.out.println();
+                }
+            }
+        } finally {
+            try {
+                // Store final status in case Launcher AM falls off the RM
+                actionData.put(ACTION_DATA_FINAL_STATUS, 
finalStatus.toString());
+                if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
+                    failLauncher(eHolder);
+                }
+                uploadActionDataToHDFS();
+            } finally {
+                try {
+                    unregisterWithRM(finalStatus, eHolder.getErrorMessage());
+                } finally {
+                    LauncherAMCallbackNotifier cn = new 
LauncherAMCallbackNotifier(launcherJobConf);
+                    cn.notifyURL(finalStatus);
+                }
+            }
+        }
+    }
+
+    private static void registerWithRM() throws IOException, YarnException {
+        AMRMClient<AMRMClient.ContainerRequest> amRmClient = 
AMRMClient.createAMRMClient();
+
+        AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
+        // TODO: OYA: make heartbeat interval configurable
+        // TODO: OYA: make heartbeat interval higher to put less load on RM, 
but lower than timeout
+        amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, 
60000, callBackHandler);
+        amRmClientAsync.init(launcherJobConf);
+        amRmClientAsync.start();
+
+        // hostname and tracking url are determined automatically
+        amRmClientAsync.registerApplicationMaster("", 0, "");
+    }
+
+    private static void unregisterWithRM(FinalApplicationStatus status, String 
message) throws YarnException, IOException {
+        if (amRmClientAsync != null) {
+            System.out.println("Stopping AM");
+            try {
+                message = (message == null) ? "" : message;
+                // tracking url is determined automatically
+                amRmClientAsync.unregisterApplicationMaster(status, message, 
"");
+            } catch (YarnException ex) {
+                System.err.println("Error un-registering AM client");
+                throw ex;
+            } catch (IOException ex) {
+                System.err.println("Error un-registering AM client");
+                throw ex;
+            } finally {
+                amRmClientAsync.stop();
+                amRmClientAsync = null;
+            }
+        }
+    }
+
+    // Method to execute the prepare actions
+    private static void executePrepare() throws IOException, 
LauncherException, ParserConfigurationException, SAXException {
+        String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML);
+        if (prepareXML != null) {
+            if (prepareXML.length() != 0) {
+                Configuration actionConf = new Configuration(launcherJobConf);
+                actionConf.addResource(ACTION_CONF_XML);
+                PrepareActionsDriver.doOperations(prepareXML, actionConf);
+            } else {
+                System.out.println("There are no prepare actions to execute.");
+            }
+        }
+    }
+
+    private static FinalApplicationStatus runActionMain(String[] mainArgs, 
ErrorHolder eHolder) {
+        FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
+        LauncherSecurityManager secMan = new LauncherSecurityManager();
+        try {
+            Class<?> klass = 
launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
+            Method mainMethod = klass.getMethod("main", String[].class);
+            // Enable LauncherSecurityManager to catch System.exit calls
+            secMan.set();
+            // TODO: OYA: remove this line to actually run the Main class 
instead of this dummy
+            mainMethod = DummyMain.class.getMethod("main", String[].class);
+            mainMethod.invoke(null, (Object) mainArgs);
+
+            System.out.println();
+            System.out.println("<<< Invocation of Main class completed <<<");
+            System.out.println();
+            finalStatus = FinalApplicationStatus.SUCCEEDED;
+        } catch (InvocationTargetException ex) {
+            // Get what actually caused the exception
+            Throwable cause = ex.getCause();
+            // If we got a JavaMainException from JavaMain, then we need to 
unwrap it
+            if (JavaMainException.class.isInstance(cause)) {
+                cause = cause.getCause();
+            }
+            if (LauncherMainException.class.isInstance(cause)) {
+                String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                eHolder.setErrorMessage("Main Class [" + mainClass + "], exit 
code [" +
+                        ((LauncherMainException) ex.getCause()).getErrorCode() 
+ "]");
+            } else if (SecurityException.class.isInstance(cause)) {
+                if (secMan.getExitInvoked()) {
+                    System.out.println("Intercepting System.exit(" + 
secMan.getExitCode()
+                            + ")");
+                    System.err.println("Intercepting System.exit(" + 
secMan.getExitCode()
+                            + ")");
+                    // if 0 main() method finished successfully
+                    // ignoring
+                    eHolder.setErrorCode(secMan.getExitCode());
+                    if (eHolder.getErrorCode() != 0) {
+                        String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                        eHolder.setErrorMessage("Main Class [" + mainClass + 
"], exit code [" + eHolder.getErrorCode() + "]");
+                    } else {
+                        finalStatus = FinalApplicationStatus.SUCCEEDED;
+                    }
+                }
+            } else {
+                eHolder.setErrorMessage(cause.getMessage());
+                eHolder.setErrorCause(cause);
+            }
+        } catch (Throwable t) {
+            eHolder.setErrorMessage(t.getMessage());
+            eHolder.setErrorCause(t);
+        } finally {
+            // Disable LauncherSecurityManager
+            secMan.unset();
+        }
+        return finalStatus;
+    }
+
+    private static void handleActionData() throws IOException {
+        // external child IDs
+        String externalChildIdsProp = System.getProperty(ACTION_PREFIX
+                + ACTION_DATA_EXTERNAL_CHILD_IDS);
+        if (externalChildIdsProp != null) {
+            File externalChildIDs = new File(externalChildIdsProp);
+            if (externalChildIDs.exists()) {
+                actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, 
getLocalFileContentStr(externalChildIDs, "", -1));
+            }
+        }
+
+        // external stats
+        String statsProp = System.getProperty(ACTION_PREFIX + 
ACTION_DATA_STATS);
+        if (statsProp != null) {
+            File actionStatsData = new File(statsProp);
+            if (actionStatsData.exists()) {
+                int statsMaxOutputData = 
launcherJobConf.getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+                        Integer.MAX_VALUE);
+                actionData.put(ACTION_DATA_STATS,
+                        getLocalFileContentStr(actionStatsData, "Stats", 
statsMaxOutputData));
+            }
+        }
+
+        // output data
+        String outputProp = System.getProperty(ACTION_PREFIX + 
ACTION_DATA_OUTPUT_PROPS);
+        if (outputProp != null) {
+            File actionOutputData = new File(outputProp);
+            if (actionOutputData.exists()) {
+                int maxOutputData = 
launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
+                actionData.put(ACTION_DATA_OUTPUT_PROPS,
+                        getLocalFileContentStr(actionOutputData, "Output", 
maxOutputData));
+            }
+        }
+
+        // id swap
+        String newIdProp = System.getProperty(ACTION_PREFIX + 
ACTION_DATA_NEW_ID);
+        if (newIdProp != null) {
+            File newId = new File(newIdProp);
+            if (newId.exists()) {
+                actionData.put(ACTION_DATA_NEW_ID, 
getLocalFileContentStr(newId, "", -1));
+            }
+        }
+    }
+
+    public static String getLocalFileContentStr(File file, String type, int 
maxLen) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        Reader reader = null;
+        try {
+            reader = new BufferedReader(new FileReader(file));
+            char[] buffer = new char[2048];
+            int read;
+            int count = 0;
+            while ((read = reader.read(buffer)) > -1) {
+                count += read;
+                if (maxLen > -1 && count > maxLen) {
+                    throw new IOException(type + " data exceeds its limit [" + 
maxLen + "]");
+                }
+                sb.append(buffer, 0, read);
+            }
+        } finally {
+            if (reader != null) {
+                reader.close();
+            }
+        }
+        return sb.toString();
+    }
+
+    private static void uploadActionDataToHDFS() throws IOException {
+        Path finalPath = new Path(actionDir, ACTION_DATA_SEQUENCE_FILE);
+        FileSystem fs = FileSystem.get(finalPath.toUri(), launcherJobConf);
+        // upload into sequence file
+        System.out.println("Oozie Launcher, uploading action data to HDFS 
sequence file: "
+                + new Path(actionDir, ACTION_DATA_SEQUENCE_FILE).toUri());
+
+        SequenceFile.Writer wr = null;
+        try {
+            wr = SequenceFile.createWriter(launcherJobConf,
+                    SequenceFile.Writer.file(finalPath),
+                    SequenceFile.Writer.keyClass(Text.class),
+                    SequenceFile.Writer.valueClass(Text.class));
+            if (wr != null) {
+                Set<String> keys = actionData.keySet();
+                for (String propsKey : keys) {
+                    wr.append(new Text(propsKey), new 
Text(actionData.get(propsKey)));
+                }
+            }
+            else {
+                throw new IOException("SequenceFile.Writer is null for " + 
finalPath);
+            }
+        }
+        catch(IOException e) {
+            e.printStackTrace();
+            throw e;
+        }
+        finally {
+            if (wr != null) {
+                wr.close();
+            }
+        }
+    }
+    private static void failLauncher(int errorCode, String message, Throwable 
ex) {
+        ErrorHolder eHolder = new ErrorHolder();
+        eHolder.setErrorCode(errorCode);
+        eHolder.setErrorMessage(message);
+        eHolder.setErrorCause(ex);
+        failLauncher(eHolder);
+    }
+
+    private static void failLauncher(ErrorHolder eHolder) {
+        if (eHolder.getErrorCause() != null) {
+            eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + 
eHolder.getErrorCause().getMessage());
+        }
+        Properties errorProps = new Properties();
+        errorProps.setProperty("error.code", 
Integer.toString(eHolder.getErrorCode()));
+        errorProps.setProperty("error.reason", eHolder.getErrorMessage());
+        if (eHolder.getErrorCause() != null) {
+            if (eHolder.getErrorCause().getMessage() != null) {
+                errorProps.setProperty("exception.message", 
eHolder.getErrorCause().getMessage());
+            }
+            StringWriter sw = new StringWriter();
+            PrintWriter pw = new PrintWriter(sw);
+            eHolder.getErrorCause().printStackTrace(pw);
+            pw.close();
+            errorProps.setProperty("exception.stacktrace", sw.toString());
+        }
+        StringWriter sw = new StringWriter();
+        try {
+            errorProps.store(sw, "");
+            sw.close();
+            actionData.put(ACTION_DATA_ERROR_PROPS, sw.toString());
+
+            // external child IDs
+            String externalChildIdsProp = System.getProperty(ACTION_PREFIX + 
ACTION_DATA_EXTERNAL_CHILD_IDS);
+            if (externalChildIdsProp != null) {
+                File externalChildIDs = new File(externalChildIdsProp);
+                if (externalChildIDs.exists()) {
+                    actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, 
getLocalFileContentStr(externalChildIDs, "", -1));
+                }
+            }
+        } catch (IOException ioe) {
+            System.err.println("A problem occured trying to fail the 
launcher");
+            ioe.printStackTrace();
+        } finally {
+            System.out.print("Failing Oozie Launcher, " + 
eHolder.getErrorMessage() + "\n");
+            System.err.print("Failing Oozie Launcher, " + 
eHolder.getErrorMessage() + "\n");
+            if (eHolder.getErrorCause() != null) {
+                eHolder.getErrorCause().printStackTrace(System.out);
+                eHolder.getErrorCause().printStackTrace(System.err);
+            }
+        }
+    }
+
+    private static class AMRMCallBackHandler implements 
AMRMClientAsync.CallbackHandler {
+        @Override
+        public void onContainersCompleted(List<ContainerStatus> 
containerStatuses) {
+            //noop
+        }
+
+        @Override
+        public void onContainersAllocated(List<Container> containers) {
+            //noop
+        }
+
+        @Override
+        public void onShutdownRequest() {
+            failLauncher(0, "ResourceManager requested AM Shutdown", null);
+            // TODO: OYA: interrupt?
+        }
+
+        @Override
+        public void onNodesUpdated(List<NodeReport> nodeReports) {
+            //noop
+        }
+
+        @Override
+        public float getProgress() {
+            return 0.5f;    //TODO: OYA: maybe some action types can report 
better progress?
+        }
+
+        @Override
+        public void onError(final Throwable ex) {
+            failLauncher(0, ex.getMessage(), ex);
+            // TODO: OYA: interrupt?
+        }
+    }
+
+    public static String[] getMainArguments(Configuration conf) {
+        String[] args = new 
String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
+        for (int i = 0; i < args.length; i++) {
+            args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
+        }
+        return args;
+    }
+
+    private static class LauncherSecurityManager extends SecurityManager {
+        private boolean exitInvoked;
+        private int exitCode;
+        private SecurityManager securityManager;
+
+        public LauncherSecurityManager() {
+            exitInvoked = false;
+            exitCode = 0;
+            securityManager = System.getSecurityManager();
+        }
+
+        @Override
+        public void checkPermission(Permission perm, Object context) {
+            if (securityManager != null) {
+                // check everything with the original SecurityManager
+                securityManager.checkPermission(perm, context);
+            }
+        }
+
+        @Override
+        public void checkPermission(Permission perm) {
+            if (securityManager != null) {
+                // check everything with the original SecurityManager
+                securityManager.checkPermission(perm);
+            }
+        }
+
+        @Override
+        public void checkExit(int status) throws SecurityException {
+            exitInvoked = true;
+            exitCode = status;
+            throw new SecurityException("Intercepted System.exit(" + status + 
")");
+        }
+
+        public boolean getExitInvoked() {
+            return exitInvoked;
+        }
+
+        public int getExitCode() {
+            return exitCode;
+        }
+
+        public void set() {
+            if (System.getSecurityManager() != this) {
+                System.setSecurityManager(this);
+            }
+        }
+
+        public void unset() {
+            if (System.getSecurityManager() == this) {
+                System.setSecurityManager(securityManager);
+            }
+        }
+    }
+
+
+    /**
+     * Print files and directories in current directory. Will list files in 
the sub-directory (only 1 level deep)
+     */
+    protected static void printContentsOfCurrentDir() {
+        File folder = new File(".");
+        System.out.println();
+        System.out.println("Files in current dir:" + folder.getAbsolutePath());
+        System.out.println("======================");
+
+        File[] listOfFiles = folder.listFiles();
+        for (File fileName : listOfFiles) {
+            if (fileName.isFile()) {
+                System.out.println("File: " + fileName.getName());
+            } else if (fileName.isDirectory()) {
+                System.out.println("Dir: " + fileName.getName());
+                File subDir = new File(fileName.getName());
+                File[] moreFiles = subDir.listFiles();
+                for (File subFileName : moreFiles) {
+                    if (subFileName.isFile()) {
+                        System.out.println("  File: " + subFileName.getName());
+                    } else if (subFileName.isDirectory()) {
+                        System.out.println("  Dir: " + subFileName.getName());
+                    }
+                }
+            }
+        }
+    }
+
+    protected static Configuration readLauncherConf() {
+        File confFile = new File(LAUNCHER_JOB_CONF_XML);
+        Configuration conf = new Configuration(false);
+        conf.addResource(new Path(confFile.getAbsolutePath()));
+        return conf;
+    }
+
+    protected static class ErrorHolder {
+        private int errorCode = 0;
+        private Throwable errorCause = null;
+        private String errorMessage = null;
+
+        public int getErrorCode() {
+            return errorCode;
+        }
+
+        public void setErrorCode(int errorCode) {
+            this.errorCode = errorCode;
+        }
+
+        public Throwable getErrorCause() {
+            return errorCause;
+        }
+
+        public void setErrorCause(Throwable errorCause) {
+            this.errorCause = errorCause;
+        }
+
+        public String getErrorMessage() {
+            return errorMessage;
+        }
+
+        public void setErrorMessage(String errorMessage) {
+            this.errorMessage = errorMessage;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..dbef441
--- /dev/null
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.URL;
+
+// Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier
+/**
+ * This call sends back an HTTP GET callback to the configured URL.  It is 
meant for the {@link LauncherAM} to notify the
+ * Oozie Server that it has finished.
+ */
+public class LauncherAMCallbackNotifier {
+    private static final String OOZIE_LAUNCHER_CALLBACK = 
"oozie.launcher.callback.";
+    public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = 
OOZIE_LAUNCHER_CALLBACK + "retry.attempts";
+    public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = 
OOZIE_LAUNCHER_CALLBACK + "retry.interval";
+    static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000;
+    public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = 
OOZIE_LAUNCHER_CALLBACK + "max.attempts";
+    public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = 
OOZIE_LAUNCHER_CALLBACK + "timeout";
+    public static final String OOZIE_LAUNCHER_CALLBACK_URL = 
OOZIE_LAUNCHER_CALLBACK + "url";
+    public static final String OOZIE_LAUNCHER_CALLBACK_PROXY = 
OOZIE_LAUNCHER_CALLBACK + "proxy";
+    public static final String OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN = 
"$jobStatus";
+
+    protected String userUrl;
+    protected String proxyConf;
+    protected int numTries; //Number of tries to attempt notification
+    protected int waitInterval; //Time (ms) to wait between retrying 
notification
+    protected int timeout; // Timeout (ms) on the connection and notification
+    protected URL urlToNotify; //URL to notify read from the config
+    protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for 
notification
+
+    /**
+     * Parse the URL that needs to be notified of the end of the job, along
+     * with the number of retries in case of failure, the amount of time to
+     * wait between retries and proxy settings
+     * @param conf the configuration
+     */
+    public LauncherAMCallbackNotifier(Configuration conf) {
+        numTries = 
Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 0) + 1,
+                conf.getInt(OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, 1));
+
+        waitInterval = 
Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, 
OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX),
+                OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+        waitInterval = (waitInterval < 0) ? 
OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX : waitInterval;
+
+        timeout = conf.getInt(OOZIE_LAUNCHER_CALLBACK_TIMEOUT, 
OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+
+        userUrl = conf.get(OOZIE_LAUNCHER_CALLBACK_URL);
+
+        proxyConf = conf.get(OOZIE_LAUNCHER_CALLBACK_PROXY);
+
+        //Configure the proxy to use if its set. It should be set like
+        //proxyType@proxyHostname:port
+        if(proxyConf != null && !proxyConf.equals("") &&
+                proxyConf.lastIndexOf(":") != -1) {
+            int typeIndex = proxyConf.indexOf("@");
+            Proxy.Type proxyType = Proxy.Type.HTTP;
+            if(typeIndex != -1 &&
+                    proxyConf.substring(0, 
typeIndex).compareToIgnoreCase("socks") == 0) {
+                proxyType = Proxy.Type.SOCKS;
+            }
+            String hostname = proxyConf.substring(typeIndex + 1,
+                    proxyConf.lastIndexOf(":"));
+            String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 
1);
+            try {
+                int port = Integer.parseInt(portConf);
+                proxyToUse = new Proxy(proxyType,
+                        new InetSocketAddress(hostname, port));
+                System.out.println("Callback notification using proxy type \"" 
+ proxyType +
+                        "\" hostname \"" + hostname + "\" and port \"" + port 
+ "\"");
+            } catch(NumberFormatException nfe) {
+                System.err.println("Callback notification couldn't parse 
configured proxy's port "
+                        + portConf + ". Not going to use a proxy");
+            }
+        }
+
+    }
+
+    /**
+     * Notify the URL just once. Use best effort.
+     */
+    protected boolean notifyURLOnce() {
+        boolean success = false;
+        HttpURLConnection conn = null;
+        try {
+            System.out.println("Callback notification trying " + urlToNotify);
+            conn = (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
+            conn.setConnectTimeout(timeout);
+            conn.setReadTimeout(timeout);
+            conn.setAllowUserInteraction(false);
+            if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+                System.err.println("Callback notification to " + urlToNotify 
+" failed with code: "
+                        + conn.getResponseCode() + " and message \"" + 
conn.getResponseMessage()
+                        +"\"");
+            }
+            else {
+                success = true;
+                System.out.println("Callback notification to " + urlToNotify + 
" succeeded");
+            }
+        } catch(IOException ioe) {
+            System.err.println("Callback notification to " + urlToNotify + " 
failed");
+            ioe.printStackTrace();
+        } finally {
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+        return success;
+    }
+
+    /**
+     * Notify a server of the completion of a submitted job.
+     * @param finalStatus The Application Status
+     *
+     * @throws InterruptedException
+     */
+    public void notifyURL(FinalApplicationStatus finalStatus) throws 
InterruptedException {
+        // Do we need job-end notification?
+        if (userUrl == null) {
+            System.out.println("Callback notification URL not set, skipping.");
+            return;
+        }
+
+        //Do string replacements for final status
+        if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) {
+            userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, 
finalStatus.toString());
+        }
+
+        // Create the URL, ensure sanity
+        try {
+            urlToNotify = new URL(userUrl);
+        } catch (MalformedURLException mue) {
+            System.err.println("Callback notification couldn't parse " + 
userUrl);
+            mue.printStackTrace();
+            return;
+        }
+
+        // Send notification
+        boolean success = false;
+        while (numTries-- > 0 && !success) {
+            System.out.println("Callback notification attempts left " + 
numTries);
+            success = notifyURLOnce();
+            if (!success) {
+                Thread.sleep(waitInterval);
+            }
+        }
+        if (!success) {
+            System.err.println("Callback notification failed to notify : " + 
urlToNotify);
+        } else {
+            System.out.println("Callback notification succeeded");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index f2cba13..eeffe81 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -49,7 +49,11 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.xml.sax.SAXException;
 
+import javax.xml.parsers.ParserConfigurationException;
+
+// TODO: OYA: Delete :)
 public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, 
Runnable {
 
     static final String CONF_OOZIE_ACTION_MAIN_CLASS = 
"oozie.launcher.action.main.class";
@@ -480,7 +484,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements 
Mapper<K1, V1, K2, V2>, R
     }
 
     // Method to execute the prepare actions
-    private void executePrepare() throws IOException, LauncherException {
+    private void executePrepare() throws IOException, LauncherException, 
ParserConfigurationException, SAXException {
         String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
         if (prepareXML != null) {
              if (!prepareXML.equals("")) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
index 21ae456..4a51d48 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
@@ -46,35 +46,26 @@ public class PrepareActionsDriver {
      * @param prepareXML Prepare XML block in string format
      * @throws LauncherException
      */
-    static void doOperations(String prepareXML, Configuration conf) throws 
LauncherException {
-        try {
-            Document doc = getDocumentFromXML(prepareXML);
-            doc.getDocumentElement().normalize();
+    static void doOperations(String prepareXML, Configuration conf)
+            throws IOException, SAXException, ParserConfigurationException, 
LauncherException {
+        Document doc = getDocumentFromXML(prepareXML);
+        doc.getDocumentElement().normalize();
 
-            // Get the list of child nodes, basically, each one corresponding 
to a separate action
-            NodeList nl = doc.getDocumentElement().getChildNodes();
-            LauncherURIHandlerFactory factory = new 
LauncherURIHandlerFactory(conf);
+        // Get the list of child nodes, basically, each one corresponding to a 
separate action
+        NodeList nl = doc.getDocumentElement().getChildNodes();
+        LauncherURIHandlerFactory factory = new 
LauncherURIHandlerFactory(conf);
 
-            for (int i = 0; i < nl.getLength(); ++i) {
-                Node n = nl.item(i);
-                String operation = n.getNodeName();
-                if (n.getAttributes() == null || 
n.getAttributes().getNamedItem("path") == null) {
-                    continue;
-                }
-                String pathStr = 
n.getAttributes().getNamedItem("path").getNodeValue().trim();
-                // use Path to avoid URIsyntax error caused by square bracket 
in glob
-                URI uri = new Path(pathStr).toUri();
-                LauncherURIHandler handler = factory.getURIHandler(uri);
-                execute(operation, uri, handler, conf);
+        for (int i = 0; i < nl.getLength(); ++i) {
+            Node n = nl.item(i);
+            String operation = n.getNodeName();
+            if (n.getAttributes() == null || 
n.getAttributes().getNamedItem("path") == null) {
+                continue;
             }
-        } catch (IOException ioe) {
-            throw new LauncherException(ioe.getMessage(), ioe);
-        } catch (SAXException saxe) {
-            throw new LauncherException(saxe.getMessage(), saxe);
-        } catch (ParserConfigurationException pce) {
-            throw new LauncherException(pce.getMessage(), pce);
-        } catch (IllegalArgumentException use) {
-            throw new LauncherException(use.getMessage(), use);
+            String pathStr = 
n.getAttributes().getNamedItem("path").getNodeValue().trim();
+            // use Path to avoid URIsyntax error caused by square bracket in 
glob
+            URI uri = new Path(pathStr).toUri();
+            LauncherURIHandler handler = factory.getURIHandler(uri);
+            execute(operation, uri, handler, conf);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/pig/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/pig/pom.xml b/sharelib/pig/pom.xml
index 562c530..ea674a1 100644
--- a/sharelib/pig/pom.xml
+++ b/sharelib/pig/pom.xml
@@ -136,18 +136,6 @@
                             
<outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to 
generate the required classpath
-                            that is required in the env of the launch 
container in the mini mr/yarn cluster -->
-                            
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index 46c6375..6d52ab4 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -370,18 +370,6 @@
                             
<outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to 
generate the required classpath
-                            that is required in the env of the launch 
container in the mini mr/yarn cluster -->
-                            
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/sqoop/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/sqoop/pom.xml b/sharelib/sqoop/pom.xml
index d875c93..aad13f9 100644
--- a/sharelib/sqoop/pom.xml
+++ b/sharelib/sqoop/pom.xml
@@ -239,18 +239,6 @@
                             
<outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to 
generate the required classpath
-                            that is required in the env of the launch 
container in the mini mr/yarn cluster -->
-                            
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/streaming/pom.xml b/sharelib/streaming/pom.xml
index fd79518..783c669 100644
--- a/sharelib/streaming/pom.xml
+++ b/sharelib/streaming/pom.xml
@@ -107,39 +107,6 @@
                             
<outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to 
generate the required classpath
-                            that is required in the env of the launch 
container in the mini mr/yarn cluster -->
-                            
<outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-antrun-plugin</artifactId>
-                <version>1.6</version>
-                <executions>
-                    <execution>
-                        <configuration>
-                            <target>
-                                <!-- needed to include Main class in classpath 
for mini yarn cluster for unit tests -->
-                                <echo 
file="${project.build.directory}/test-classes/mrapp-generated-classpath"
-                                      append="true"
-                                      
message=":${project.build.directory}/classes"/>
-                            </target>
-                        </configuration>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
-                        <phase>generate-test-resources</phase>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

Reply via email to