Repository: oozie Updated Branches: refs/heads/oya a37835fec -> fea512cf6
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java new file mode 100644 index 0000000..e056acc --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@ -0,0 +1,636 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.xml.sax.SAXException; + +import javax.xml.parsers.ParserConfigurationException; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.Reader; +import java.io.StringWriter; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.Permission; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.StringTokenizer; + +public class LauncherAM { + + static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; + + static final String ACTION_PREFIX = "oozie.action."; + public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = ACTION_PREFIX + "max.output.data"; + static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg."; + static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = ACTION_PREFIX + CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count"; + static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size"; + + static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path"; + static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml"; + static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE + static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs"; + static final String ACTION_DATA_OUTPUT_PROPS = "output.properties"; + static final String ACTION_DATA_STATS = "stats.properties"; + static final String ACTION_DATA_NEW_ID = "newId"; + static final String ACTION_DATA_ERROR_PROPS = "error.properties"; + + // TODO: OYA: more unique file names? action.xml may be stuck for backwards compat though + public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml"; + public static final String ACTION_CONF_XML = "action.xml"; + public static final String ACTION_DATA_FINAL_STATUS = "final.status"; + + private static AMRMClientAsync<AMRMClient.ContainerRequest> amRmClientAsync = null; + private static Configuration launcherJobConf = null; + private static Path actionDir; + private static Map<String, String> actionData = new HashMap<String,String>(); + + private static void printDebugInfo(String[] mainArgs) throws IOException { + printContentsOfCurrentDir(); + + System.out.println(); + System.out.println("Oozie Launcher Application Master configuration"); + System.out.println("==============================================="); + System.out.println("Workflow job id : " + launcherJobConf.get("oozie.job.id")); + System.out.println("Workflow action id: " + launcherJobConf.get("oozie.action.id")); + System.out.println(); + System.out.println("Classpath :"); + System.out.println("------------------------"); + StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":"); + while (st.hasMoreTokens()) { + System.out.println(" " + st.nextToken()); + } + System.out.println("------------------------"); + System.out.println(); + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + System.out.println("Main class : " + mainClass); + System.out.println(); + System.out.println("Maximum output : " + + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); + System.out.println(); + System.out.println("Arguments :"); + for (String arg : mainArgs) { + System.out.println(" " + arg); + } + + System.out.println(); + System.out.println("Java System Properties:"); + System.out.println("------------------------"); + System.getProperties().store(System.out, ""); + System.out.flush(); + System.out.println("------------------------"); + System.out.println(); + + System.out.println("================================================================="); + System.out.println(); + System.out.println(">>> Invoking Main class now >>>"); + System.out.println(); + System.out.flush(); + } + + // TODO: OYA: delete me when making real Action Mains + public static class DummyMain { + public static void main(String[] args) throws Exception { + System.out.println("Hello World!"); + if (launcherJobConf.get("foo", "0").equals("1")) { + throw new IOException("foo 1"); + } else if (launcherJobConf.get("foo", "0").equals("2")) { + throw new JavaMainException(new IOException("foo 2")); + } else if (launcherJobConf.get("foo", "0").equals("3")) { + throw new LauncherMainException(3); + } else if (launcherJobConf.get("foo", "0").equals("4")) { + System.exit(0); + } else if (launcherJobConf.get("foo", "0").equals("5")) { + System.exit(1); + } + } + } + + // TODO: OYA: rethink all print messages and formatting + public static void main(String[] AMargs) throws Exception { + ErrorHolder eHolder = new ErrorHolder(); + FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED; + try { + try { + launcherJobConf = readLauncherConf(); + System.out.println("Launcher AM configuration loaded"); + } catch (Exception ex) { + eHolder.setErrorMessage("Could not load the Launcher AM configuration file"); + eHolder.setErrorCause(ex); + throw ex; + } + + registerWithRM(); + + actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH)); + + try { + System.out.println("\nStarting the execution of prepare actions"); + executePrepare(); + System.out.println("Completed the execution of prepare actions successfully"); + } catch (Exception ex) { + eHolder.setErrorMessage("Prepare execution in the Launcher AM has failed"); + eHolder.setErrorCause(ex); + throw ex; + } + + String[] mainArgs = getMainArguments(launcherJobConf); + + // TODO: OYA: should we allow turning this off? + // TODO: OYA: what should default be? + if (launcherJobConf.getBoolean("oozie.launcher.print.debug.info", true)) { + printDebugInfo(mainArgs); + } + finalStatus = runActionMain(mainArgs, eHolder); + if (finalStatus != FinalApplicationStatus.SUCCEEDED) { + handleActionData(); + if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) { + System.out.println(); + System.out.println("Oozie Launcher, capturing output data:"); + System.out.println("======================="); + System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS)); + System.out.println(); + System.out.println("======================="); + System.out.println(); + } + if (actionData.get(ACTION_DATA_NEW_ID) != null) { + System.out.println(); + System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie"); + System.out.println("======================="); + System.out.println(actionData.get(ACTION_DATA_NEW_ID)); + System.out.println("======================="); + System.out.println(); + } + } + } finally { + try { + // Store final status in case Launcher AM falls off the RM + actionData.put(ACTION_DATA_FINAL_STATUS, finalStatus.toString()); + if (finalStatus != FinalApplicationStatus.SUCCEEDED) { + failLauncher(eHolder); + } + uploadActionDataToHDFS(); + } finally { + try { + unregisterWithRM(finalStatus, eHolder.getErrorMessage()); + } finally { + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(launcherJobConf); + cn.notifyURL(finalStatus); + } + } + } + } + + private static void registerWithRM() throws IOException, YarnException { + AMRMClient<AMRMClient.ContainerRequest> amRmClient = AMRMClient.createAMRMClient(); + + AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler(); + // TODO: OYA: make heartbeat interval configurable + // TODO: OYA: make heartbeat interval higher to put less load on RM, but lower than timeout + amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, 60000, callBackHandler); + amRmClientAsync.init(launcherJobConf); + amRmClientAsync.start(); + + // hostname and tracking url are determined automatically + amRmClientAsync.registerApplicationMaster("", 0, ""); + } + + private static void unregisterWithRM(FinalApplicationStatus status, String message) throws YarnException, IOException { + if (amRmClientAsync != null) { + System.out.println("Stopping AM"); + try { + message = (message == null) ? "" : message; + // tracking url is determined automatically + amRmClientAsync.unregisterApplicationMaster(status, message, ""); + } catch (YarnException ex) { + System.err.println("Error un-registering AM client"); + throw ex; + } catch (IOException ex) { + System.err.println("Error un-registering AM client"); + throw ex; + } finally { + amRmClientAsync.stop(); + amRmClientAsync = null; + } + } + } + + // Method to execute the prepare actions + private static void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException { + String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML); + if (prepareXML != null) { + if (prepareXML.length() != 0) { + Configuration actionConf = new Configuration(launcherJobConf); + actionConf.addResource(ACTION_CONF_XML); + PrepareActionsDriver.doOperations(prepareXML, actionConf); + } else { + System.out.println("There are no prepare actions to execute."); + } + } + } + + private static FinalApplicationStatus runActionMain(String[] mainArgs, ErrorHolder eHolder) { + FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED; + LauncherSecurityManager secMan = new LauncherSecurityManager(); + try { + Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); + Method mainMethod = klass.getMethod("main", String[].class); + // Enable LauncherSecurityManager to catch System.exit calls + secMan.set(); + // TODO: OYA: remove this line to actually run the Main class instead of this dummy + mainMethod = DummyMain.class.getMethod("main", String[].class); + mainMethod.invoke(null, (Object) mainArgs); + + System.out.println(); + System.out.println("<<< Invocation of Main class completed <<<"); + System.out.println(); + finalStatus = FinalApplicationStatus.SUCCEEDED; + } catch (InvocationTargetException ex) { + // Get what actually caused the exception + Throwable cause = ex.getCause(); + // If we got a JavaMainException from JavaMain, then we need to unwrap it + if (JavaMainException.class.isInstance(cause)) { + cause = cause.getCause(); + } + if (LauncherMainException.class.isInstance(cause)) { + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + + ((LauncherMainException) ex.getCause()).getErrorCode() + "]"); + } else if (SecurityException.class.isInstance(cause)) { + if (secMan.getExitInvoked()) { + System.out.println("Intercepting System.exit(" + secMan.getExitCode() + + ")"); + System.err.println("Intercepting System.exit(" + secMan.getExitCode() + + ")"); + // if 0 main() method finished successfully + // ignoring + eHolder.setErrorCode(secMan.getExitCode()); + if (eHolder.getErrorCode() != 0) { + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + eHolder.getErrorCode() + "]"); + } else { + finalStatus = FinalApplicationStatus.SUCCEEDED; + } + } + } else { + eHolder.setErrorMessage(cause.getMessage()); + eHolder.setErrorCause(cause); + } + } catch (Throwable t) { + eHolder.setErrorMessage(t.getMessage()); + eHolder.setErrorCause(t); + } finally { + // Disable LauncherSecurityManager + secMan.unset(); + } + return finalStatus; + } + + private static void handleActionData() throws IOException { + // external child IDs + String externalChildIdsProp = System.getProperty(ACTION_PREFIX + + ACTION_DATA_EXTERNAL_CHILD_IDS); + if (externalChildIdsProp != null) { + File externalChildIDs = new File(externalChildIdsProp); + if (externalChildIDs.exists()) { + actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, getLocalFileContentStr(externalChildIDs, "", -1)); + } + } + + // external stats + String statsProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_STATS); + if (statsProp != null) { + File actionStatsData = new File(statsProp); + if (actionStatsData.exists()) { + int statsMaxOutputData = launcherJobConf.getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, + Integer.MAX_VALUE); + actionData.put(ACTION_DATA_STATS, + getLocalFileContentStr(actionStatsData, "Stats", statsMaxOutputData)); + } + } + + // output data + String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS); + if (outputProp != null) { + File actionOutputData = new File(outputProp); + if (actionOutputData.exists()) { + int maxOutputData = launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024); + actionData.put(ACTION_DATA_OUTPUT_PROPS, + getLocalFileContentStr(actionOutputData, "Output", maxOutputData)); + } + } + + // id swap + String newIdProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID); + if (newIdProp != null) { + File newId = new File(newIdProp); + if (newId.exists()) { + actionData.put(ACTION_DATA_NEW_ID, getLocalFileContentStr(newId, "", -1)); + } + } + } + + public static String getLocalFileContentStr(File file, String type, int maxLen) throws IOException { + StringBuilder sb = new StringBuilder(); + Reader reader = null; + try { + reader = new BufferedReader(new FileReader(file)); + char[] buffer = new char[2048]; + int read; + int count = 0; + while ((read = reader.read(buffer)) > -1) { + count += read; + if (maxLen > -1 && count > maxLen) { + throw new IOException(type + " data exceeds its limit [" + maxLen + "]"); + } + sb.append(buffer, 0, read); + } + } finally { + if (reader != null) { + reader.close(); + } + } + return sb.toString(); + } + + private static void uploadActionDataToHDFS() throws IOException { + Path finalPath = new Path(actionDir, ACTION_DATA_SEQUENCE_FILE); + FileSystem fs = FileSystem.get(finalPath.toUri(), launcherJobConf); + // upload into sequence file + System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + + new Path(actionDir, ACTION_DATA_SEQUENCE_FILE).toUri()); + + SequenceFile.Writer wr = null; + try { + wr = SequenceFile.createWriter(launcherJobConf, + SequenceFile.Writer.file(finalPath), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class)); + if (wr != null) { + Set<String> keys = actionData.keySet(); + for (String propsKey : keys) { + wr.append(new Text(propsKey), new Text(actionData.get(propsKey))); + } + } + else { + throw new IOException("SequenceFile.Writer is null for " + finalPath); + } + } + catch(IOException e) { + e.printStackTrace(); + throw e; + } + finally { + if (wr != null) { + wr.close(); + } + } + } + private static void failLauncher(int errorCode, String message, Throwable ex) { + ErrorHolder eHolder = new ErrorHolder(); + eHolder.setErrorCode(errorCode); + eHolder.setErrorMessage(message); + eHolder.setErrorCause(ex); + failLauncher(eHolder); + } + + private static void failLauncher(ErrorHolder eHolder) { + if (eHolder.getErrorCause() != null) { + eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + eHolder.getErrorCause().getMessage()); + } + Properties errorProps = new Properties(); + errorProps.setProperty("error.code", Integer.toString(eHolder.getErrorCode())); + errorProps.setProperty("error.reason", eHolder.getErrorMessage()); + if (eHolder.getErrorCause() != null) { + if (eHolder.getErrorCause().getMessage() != null) { + errorProps.setProperty("exception.message", eHolder.getErrorCause().getMessage()); + } + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + eHolder.getErrorCause().printStackTrace(pw); + pw.close(); + errorProps.setProperty("exception.stacktrace", sw.toString()); + } + StringWriter sw = new StringWriter(); + try { + errorProps.store(sw, ""); + sw.close(); + actionData.put(ACTION_DATA_ERROR_PROPS, sw.toString()); + + // external child IDs + String externalChildIdsProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS); + if (externalChildIdsProp != null) { + File externalChildIDs = new File(externalChildIdsProp); + if (externalChildIDs.exists()) { + actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, getLocalFileContentStr(externalChildIDs, "", -1)); + } + } + } catch (IOException ioe) { + System.err.println("A problem occured trying to fail the launcher"); + ioe.printStackTrace(); + } finally { + System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n"); + System.err.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n"); + if (eHolder.getErrorCause() != null) { + eHolder.getErrorCause().printStackTrace(System.out); + eHolder.getErrorCause().printStackTrace(System.err); + } + } + } + + private static class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler { + @Override + public void onContainersCompleted(List<ContainerStatus> containerStatuses) { + //noop + } + + @Override + public void onContainersAllocated(List<Container> containers) { + //noop + } + + @Override + public void onShutdownRequest() { + failLauncher(0, "ResourceManager requested AM Shutdown", null); + // TODO: OYA: interrupt? + } + + @Override + public void onNodesUpdated(List<NodeReport> nodeReports) { + //noop + } + + @Override + public float getProgress() { + return 0.5f; //TODO: OYA: maybe some action types can report better progress? + } + + @Override + public void onError(final Throwable ex) { + failLauncher(0, ex.getMessage(), ex); + // TODO: OYA: interrupt? + } + } + + public static String[] getMainArguments(Configuration conf) { + String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)]; + for (int i = 0; i < args.length; i++) { + args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i); + } + return args; + } + + private static class LauncherSecurityManager extends SecurityManager { + private boolean exitInvoked; + private int exitCode; + private SecurityManager securityManager; + + public LauncherSecurityManager() { + exitInvoked = false; + exitCode = 0; + securityManager = System.getSecurityManager(); + } + + @Override + public void checkPermission(Permission perm, Object context) { + if (securityManager != null) { + // check everything with the original SecurityManager + securityManager.checkPermission(perm, context); + } + } + + @Override + public void checkPermission(Permission perm) { + if (securityManager != null) { + // check everything with the original SecurityManager + securityManager.checkPermission(perm); + } + } + + @Override + public void checkExit(int status) throws SecurityException { + exitInvoked = true; + exitCode = status; + throw new SecurityException("Intercepted System.exit(" + status + ")"); + } + + public boolean getExitInvoked() { + return exitInvoked; + } + + public int getExitCode() { + return exitCode; + } + + public void set() { + if (System.getSecurityManager() != this) { + System.setSecurityManager(this); + } + } + + public void unset() { + if (System.getSecurityManager() == this) { + System.setSecurityManager(securityManager); + } + } + } + + + /** + * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep) + */ + protected static void printContentsOfCurrentDir() { + File folder = new File("."); + System.out.println(); + System.out.println("Files in current dir:" + folder.getAbsolutePath()); + System.out.println("======================"); + + File[] listOfFiles = folder.listFiles(); + for (File fileName : listOfFiles) { + if (fileName.isFile()) { + System.out.println("File: " + fileName.getName()); + } else if (fileName.isDirectory()) { + System.out.println("Dir: " + fileName.getName()); + File subDir = new File(fileName.getName()); + File[] moreFiles = subDir.listFiles(); + for (File subFileName : moreFiles) { + if (subFileName.isFile()) { + System.out.println(" File: " + subFileName.getName()); + } else if (subFileName.isDirectory()) { + System.out.println(" Dir: " + subFileName.getName()); + } + } + } + } + } + + protected static Configuration readLauncherConf() { + File confFile = new File(LAUNCHER_JOB_CONF_XML); + Configuration conf = new Configuration(false); + conf.addResource(new Path(confFile.getAbsolutePath())); + return conf; + } + + protected static class ErrorHolder { + private int errorCode = 0; + private Throwable errorCause = null; + private String errorMessage = null; + + public int getErrorCode() { + return errorCode; + } + + public void setErrorCode(int errorCode) { + this.errorCode = errorCode; + } + + public Throwable getErrorCause() { + return errorCause; + } + + public void setErrorCause(Throwable errorCause) { + this.errorCause = errorCause; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java new file mode 100644 index 0000000..dbef441 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.Proxy; +import java.net.URL; + +// Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier +/** + * This call sends back an HTTP GET callback to the configured URL. It is meant for the {@link LauncherAM} to notify the + * Oozie Server that it has finished. + */ +public class LauncherAMCallbackNotifier { + private static final String OOZIE_LAUNCHER_CALLBACK = "oozie.launcher.callback."; + public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "retry.attempts"; + public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = OOZIE_LAUNCHER_CALLBACK + "retry.interval"; + static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000; + public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "max.attempts"; + public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = OOZIE_LAUNCHER_CALLBACK + "timeout"; + public static final String OOZIE_LAUNCHER_CALLBACK_URL = OOZIE_LAUNCHER_CALLBACK + "url"; + public static final String OOZIE_LAUNCHER_CALLBACK_PROXY = OOZIE_LAUNCHER_CALLBACK + "proxy"; + public static final String OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN = "$jobStatus"; + + protected String userUrl; + protected String proxyConf; + protected int numTries; //Number of tries to attempt notification + protected int waitInterval; //Time (ms) to wait between retrying notification + protected int timeout; // Timeout (ms) on the connection and notification + protected URL urlToNotify; //URL to notify read from the config + protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification + + /** + * Parse the URL that needs to be notified of the end of the job, along + * with the number of retries in case of failure, the amount of time to + * wait between retries and proxy settings + * @param conf the configuration + */ + public LauncherAMCallbackNotifier(Configuration conf) { + numTries = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 0) + 1, + conf.getInt(OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, 1)); + + waitInterval = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX), + OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX); + waitInterval = (waitInterval < 0) ? OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX : waitInterval; + + timeout = conf.getInt(OOZIE_LAUNCHER_CALLBACK_TIMEOUT, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX); + + userUrl = conf.get(OOZIE_LAUNCHER_CALLBACK_URL); + + proxyConf = conf.get(OOZIE_LAUNCHER_CALLBACK_PROXY); + + //Configure the proxy to use if its set. It should be set like + //proxyType@proxyHostname:port + if(proxyConf != null && !proxyConf.equals("") && + proxyConf.lastIndexOf(":") != -1) { + int typeIndex = proxyConf.indexOf("@"); + Proxy.Type proxyType = Proxy.Type.HTTP; + if(typeIndex != -1 && + proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) { + proxyType = Proxy.Type.SOCKS; + } + String hostname = proxyConf.substring(typeIndex + 1, + proxyConf.lastIndexOf(":")); + String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1); + try { + int port = Integer.parseInt(portConf); + proxyToUse = new Proxy(proxyType, + new InetSocketAddress(hostname, port)); + System.out.println("Callback notification using proxy type \"" + proxyType + + "\" hostname \"" + hostname + "\" and port \"" + port + "\""); + } catch(NumberFormatException nfe) { + System.err.println("Callback notification couldn't parse configured proxy's port " + + portConf + ". Not going to use a proxy"); + } + } + + } + + /** + * Notify the URL just once. Use best effort. + */ + protected boolean notifyURLOnce() { + boolean success = false; + HttpURLConnection conn = null; + try { + System.out.println("Callback notification trying " + urlToNotify); + conn = (HttpURLConnection) urlToNotify.openConnection(proxyToUse); + conn.setConnectTimeout(timeout); + conn.setReadTimeout(timeout); + conn.setAllowUserInteraction(false); + if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) { + System.err.println("Callback notification to " + urlToNotify +" failed with code: " + + conn.getResponseCode() + " and message \"" + conn.getResponseMessage() + +"\""); + } + else { + success = true; + System.out.println("Callback notification to " + urlToNotify + " succeeded"); + } + } catch(IOException ioe) { + System.err.println("Callback notification to " + urlToNotify + " failed"); + ioe.printStackTrace(); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + return success; + } + + /** + * Notify a server of the completion of a submitted job. + * @param finalStatus The Application Status + * + * @throws InterruptedException + */ + public void notifyURL(FinalApplicationStatus finalStatus) throws InterruptedException { + // Do we need job-end notification? + if (userUrl == null) { + System.out.println("Callback notification URL not set, skipping."); + return; + } + + //Do string replacements for final status + if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) { + userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, finalStatus.toString()); + } + + // Create the URL, ensure sanity + try { + urlToNotify = new URL(userUrl); + } catch (MalformedURLException mue) { + System.err.println("Callback notification couldn't parse " + userUrl); + mue.printStackTrace(); + return; + } + + // Send notification + boolean success = false; + while (numTries-- > 0 && !success) { + System.out.println("Callback notification attempts left " + numTries); + success = notifyURLOnce(); + if (!success) { + Thread.sleep(waitInterval); + } + } + if (!success) { + System.err.println("Callback notification failed to notify : " + urlToNotify); + } else { + System.out.println("Callback notification succeeded"); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java index f2cba13..eeffe81 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java @@ -49,7 +49,11 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.xml.sax.SAXException; +import javax.xml.parsers.ParserConfigurationException; + +// TODO: OYA: Delete :) public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable { static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; @@ -480,7 +484,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R } // Method to execute the prepare actions - private void executePrepare() throws IOException, LauncherException { + private void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException { String prepareXML = getJobConf().get(ACTION_PREPARE_XML); if (prepareXML != null) { if (!prepareXML.equals("")) { http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java index 21ae456..4a51d48 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java @@ -46,35 +46,26 @@ public class PrepareActionsDriver { * @param prepareXML Prepare XML block in string format * @throws LauncherException */ - static void doOperations(String prepareXML, Configuration conf) throws LauncherException { - try { - Document doc = getDocumentFromXML(prepareXML); - doc.getDocumentElement().normalize(); + static void doOperations(String prepareXML, Configuration conf) + throws IOException, SAXException, ParserConfigurationException, LauncherException { + Document doc = getDocumentFromXML(prepareXML); + doc.getDocumentElement().normalize(); - // Get the list of child nodes, basically, each one corresponding to a separate action - NodeList nl = doc.getDocumentElement().getChildNodes(); - LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf); + // Get the list of child nodes, basically, each one corresponding to a separate action + NodeList nl = doc.getDocumentElement().getChildNodes(); + LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf); - for (int i = 0; i < nl.getLength(); ++i) { - Node n = nl.item(i); - String operation = n.getNodeName(); - if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) { - continue; - } - String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim(); - // use Path to avoid URIsyntax error caused by square bracket in glob - URI uri = new Path(pathStr).toUri(); - LauncherURIHandler handler = factory.getURIHandler(uri); - execute(operation, uri, handler, conf); + for (int i = 0; i < nl.getLength(); ++i) { + Node n = nl.item(i); + String operation = n.getNodeName(); + if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) { + continue; } - } catch (IOException ioe) { - throw new LauncherException(ioe.getMessage(), ioe); - } catch (SAXException saxe) { - throw new LauncherException(saxe.getMessage(), saxe); - } catch (ParserConfigurationException pce) { - throw new LauncherException(pce.getMessage(), pce); - } catch (IllegalArgumentException use) { - throw new LauncherException(use.getMessage(), use); + String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim(); + // use Path to avoid URIsyntax error caused by square bracket in glob + URI uri = new Path(pathStr).toUri(); + LauncherURIHandler handler = factory.getURIHandler(uri); + execute(operation, uri, handler, conf); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/pig/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/pig/pom.xml b/sharelib/pig/pom.xml index 562c530..ea674a1 100644 --- a/sharelib/pig/pom.xml +++ b/sharelib/pig/pom.xml @@ -136,18 +136,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/spark/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml index 46c6375..6d52ab4 100644 --- a/sharelib/spark/pom.xml +++ b/sharelib/spark/pom.xml @@ -370,18 +370,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/sqoop/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/sqoop/pom.xml b/sharelib/sqoop/pom.xml index d875c93..aad13f9 100644 --- a/sharelib/sqoop/pom.xml +++ b/sharelib/sqoop/pom.xml @@ -239,18 +239,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/streaming/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/streaming/pom.xml b/sharelib/streaming/pom.xml index fd79518..783c669 100644 --- a/sharelib/streaming/pom.xml +++ b/sharelib/streaming/pom.xml @@ -107,39 +107,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-antrun-plugin</artifactId> - <version>1.6</version> - <executions> - <execution> - <configuration> - <target> - <!-- needed to include Main class in classpath for mini yarn cluster for unit tests --> - <echo file="${project.build.directory}/test-classes/mrapp-generated-classpath" - append="true" - message=":${project.build.directory}/classes"/> - </target> - </configuration> - <goals> - <goal>run</goal> - </goals> - <phase>generate-test-resources</phase> - </execution> </executions> </plugin> <plugin>
