Repository: oozie Updated Branches: refs/heads/master d135b88ce -> 9cb4bd05a
http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java index 3e6ffc9..ee1a32a 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@ -26,15 +26,22 @@ import java.lang.reflect.Method; import java.security.Permission; import java.security.PrivilegedExceptionAction; import java.text.MessageFormat; +import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Properties; import java.util.StringTokenizer; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; @@ -44,6 +51,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; import com.google.common.base.Preconditions; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; public class LauncherAM { public static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml"; @@ -80,10 +88,9 @@ public class LauncherAM { public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml"; public static final String ACTION_CONF_XML = "action.xml"; public static final String ACTION_DATA_FINAL_STATUS = "final.status"; + public static final String OOZIE_SUBMITTER_USER = "oozie.submitter.user"; - private final UserGroupInformation ugi; - private final AMRMCallBackHandler callbackHandler; - private final AMRMClientAsyncFactory amRmClientAsyncFactory; + private final AMRMClientAsyncFactory amrmClientAsyncFactory; private final HdfsOperations hdfsOperations; private final LocalFsOperations localFsOperations; private final PrepareActionsHandler prepareHandler; @@ -91,24 +98,25 @@ public class LauncherAM { private final LauncherSecurityManager launcherSecurityManager; private final ContainerId containerId; - private Configuration launcherJobConf; + private final Configuration launcherConf; + private final AMRMCallBackHandler amrmCallBackHandler; private AMRMClientAsync<?> amRmClientAsync; private Path actionDir; private Map<String, String> actionData = new HashMap<String,String>(); - public LauncherAM(UserGroupInformation ugi, - AMRMClientAsyncFactory amRmClientAsyncFactory, - AMRMCallBackHandler callbackHandler, + public LauncherAM( + AMRMClientAsyncFactory amrmClientAsyncFactory, + AMRMCallBackHandler amrmCallBackHandler, HdfsOperations hdfsOperations, LocalFsOperations localFsOperations, PrepareActionsHandler prepareHandler, LauncherAMCallbackNotifierFactory callbackNotifierFactory, LauncherSecurityManager launcherSecurityManager, - String containerId) { - this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null"); - this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory, - "amRmClientAsyncFactory should not be null"); - this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null"); + String containerId, + Configuration launcherConf) { + this.amrmClientAsyncFactory = Preconditions.checkNotNull(amrmClientAsyncFactory, + "amrmClientAsyncFactory should not be null"); + this.amrmCallBackHandler = Preconditions.checkNotNull(amrmCallBackHandler, "amrmCallBackHandler should not be null"); this.hdfsOperations = Preconditions.checkNotNull(hdfsOperations, "hdfsOperations should not be null"); this.localFsOperations = Preconditions.checkNotNull(localFsOperations, "localFsOperations should not be null"); this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null"); @@ -117,70 +125,107 @@ public class LauncherAM { this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager, "launcherSecurityManager should not be null"); this.containerId = ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId should not be null")); + this.launcherConf = Preconditions.checkNotNull(launcherConf, "launcherConf should not be null"); } public static void main(String[] args) throws Exception { - UserGroupInformation ugi = null; - String submitterUser = System.getProperty("submitter.user", "").trim(); - Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user is undefined"); - System.out.println("Submitter user is: " + submitterUser); - - // We don't need remote/proxy user if the current login user is the workflow submitter - // Otherwise we have to create a remote user - if (UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) { - System.out.println("Using login user for UGI"); - ugi = UserGroupInformation.getLoginUser(); - } else { - ugi = UserGroupInformation.createRemoteUser(submitterUser); - ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials()); + final LocalFsOperations localFsOperations = new LocalFsOperations(); + final Configuration launcherConf = readLauncherConfiguration(localFsOperations); + UserGroupInformation.setConfiguration(launcherConf); + // MRAppMaster adds this call as well, but it's included only in Hadoop 2.9+ + // SecurityUtil.setConfiguration(launcherConf); + UserGroupInformation ugi = getUserGroupInformation(launcherConf); + printTokens("Executing Oozie Launcher with tokens:", ugi.getTokens()); + // Executing code inside a doAs with an ugi equipped with correct tokens. + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + LauncherAM launcher = new LauncherAM(new AMRMClientAsyncFactory(), + new AMRMCallBackHandler(), + new HdfsOperations(new SequenceFileWriterFactory()), + new LocalFsOperations(), + new PrepareActionsHandler(), + new LauncherAMCallbackNotifierFactory(), + new LauncherSecurityManager(), + System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()), + launcherConf); + + launcher.run(); + return null; + } + }); + + } + + private static void printTokens(String message, Collection<Token<? extends TokenIdentifier>> tokens) { + System.out.println(message); + for(Token<?> token : tokens) { + System.out.println(token); } + } + + private static UserGroupInformation getUserGroupInformation(Configuration launcherConf, Text... kindToFilter) + throws IOException { + final String submitterUser = launcherConf.get(OOZIE_SUBMITTER_USER); + Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials(); + filterTokensByKind(credentials, kindToFilter); - AMRMClientAsyncFactory amRmClientAsyncFactory = new AMRMClientAsyncFactory(); - AMRMCallBackHandler callbackHandler = new AMRMCallBackHandler(); - HdfsOperations hdfsOperations = new HdfsOperations(new SequenceFileWriterFactory(), ugi); - LocalFsOperations localFSOperations = new LocalFsOperations(); - PrepareActionsHandler prepareHandler = new PrepareActionsHandler(); - LauncherAMCallbackNotifierFactory callbackNotifierFactory = new LauncherAMCallbackNotifierFactory(); - LauncherSecurityManager launcherSecurityManager = new LauncherSecurityManager(); - - LauncherAM launcher = new LauncherAM(ugi, - amRmClientAsyncFactory, - callbackHandler, - hdfsOperations, - localFSOperations, - prepareHandler, - callbackNotifierFactory, - launcherSecurityManager, - System.getenv("CONTAINER_ID")); - - launcher.run(); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(submitterUser); + ugi.addCredentials(credentials); + return ugi; + } + + private static void filterTokensByKind(Credentials credentials, Text[] kindToFilter) throws IOException { + Iterator<Token<? extends TokenIdentifier>> iterator = credentials.getAllTokens().iterator(); + + while (iterator.hasNext()) { + Token<?> token = iterator.next(); + for (int i = 0; i < kindToFilter.length; i++) { + if (kindToFilter[i].equals(token.getKind())) { + System.out.println("Removing token from the Ugi: " + kindToFilter[i]); + iterator.remove(); + } + } + } + } + + static Configuration readLauncherConfiguration(LocalFsOperations localFsOperations) { + Configuration launcherConf = null; + try { + launcherConf = localFsOperations.readLauncherConf(); + System.out.println("Launcher AM configuration loaded"); + } catch (Exception ex) { + System.err.println("Could not load the Launcher AM configuration file"); + ex.printStackTrace(); + throw ex; + } + return launcherConf; } public void run() throws Exception { final ErrorHolder errorHolder = new ErrorHolder(); OozieActionResult actionResult = OozieActionResult.FAILED; - boolean launcherExecutedProperly = false; boolean backgroundAction = false; - try { - try { - launcherJobConf = localFsOperations.readLauncherConf(); - System.out.println("Launcher AM configuration loaded"); - } catch (Exception ex) { - errorHolder.setErrorMessage("Could not load the Launcher AM configuration file"); - errorHolder.setErrorCause(ex); - throw ex; - } - actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH)); - registerWithRM(); - executePrepare(ugi, errorHolder); - final String[] mainArgs = getMainArguments(launcherJobConf); - printDebugInfo(); - setupMainConfiguration(); - launcherExecutedProperly = runActionMain(mainArgs, errorHolder, ugi); + actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH)); - if (launcherExecutedProperly) { + registerWithRM(amrmCallBackHandler); + // Run user code without the AM_RM_TOKEN so users can't request containers + UserGroupInformation ugi = getUserGroupInformation(launcherConf, AMRMTokenIdentifier.KIND_NAME); + printTokens( "Executing Action Main with tokens:", ugi.getTokens()); + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Object run() throws Exception { + executePrepare(errorHolder); + printDebugInfo(); + setupMainConfiguration(); + runActionMain(errorHolder); + return null; + } + }); + + if (!errorHolder.isPopulated()) { handleActionData(); if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) { System.out.println(); @@ -206,7 +251,6 @@ public class LauncherAM { System.err.println("Launcher AM execution failed"); e.printStackTrace(System.out); e.printStackTrace(System.err); - launcherExecutedProperly = false; if (!errorHolder.isPopulated()) { errorHolder.setErrorCause(e); errorHolder.setErrorMessage(e.getMessage()); @@ -214,13 +258,13 @@ public class LauncherAM { throw e; } finally { try { - ErrorHolder callbackErrorHolder = callbackHandler.getError(); + ErrorHolder callbackErrorHolder = amrmCallBackHandler.getError(); - if (launcherExecutedProperly) { + if (!errorHolder.isPopulated()) { actionResult = backgroundAction ? OozieActionResult.RUNNING : OozieActionResult.SUCCEEDED; } - if (!launcherExecutedProperly) { + if (errorHolder.isPopulated()) { updateActionDataWithFailure(errorHolder, actionData); } else if (callbackErrorHolder != null) { // async error from the callback actionResult = OozieActionResult.FAILED; @@ -228,12 +272,12 @@ public class LauncherAM { } actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString()); - hdfsOperations.uploadActionDataToHDFS(launcherJobConf, actionDir, actionData); + hdfsOperations.uploadActionDataToHDFS(launcherConf, actionDir, actionData); } finally { try { unregisterWithRM(actionResult, errorHolder.getErrorMessage()); } finally { - LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherJobConf); + LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherConf); cn.notifyURL(actionResult); } } @@ -251,8 +295,8 @@ public class LauncherAM { System.out.println(); System.out.println("Oozie Launcher Application Master configuration"); System.out.println("==============================================="); - System.out.println("Workflow job id : " + launcherJobConf.get(OOZIE_JOB_ID)); - System.out.println("Workflow action id: " + launcherJobConf.get(OOZIE_ACTION_ID)); + System.out.println("Workflow job id : " + launcherConf.get(OOZIE_JOB_ID)); + System.out.println("Workflow action id: " + launcherConf.get(OOZIE_ACTION_ID)); System.out.println(); System.out.println("Classpath :"); System.out.println("------------------------"); @@ -262,11 +306,11 @@ public class LauncherAM { } System.out.println("------------------------"); System.out.println(); - String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + String mainClass = launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); System.out.println("Main class : " + mainClass); System.out.println(); System.out.println("Maximum output : " - + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); + + launcherConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); System.out.println(); System.out.println(); @@ -289,10 +333,10 @@ public class LauncherAM { System.out.println(); } - private void registerWithRM() throws IOException, YarnException { + private void registerWithRM(AMRMCallBackHandler amrmCallBackHandler) throws IOException, YarnException { // TODO: OYA: make heartbeat interval configurable & make interval higher to put less load on RM, but lower than timeout - amRmClientAsync = amRmClientAsyncFactory.createAMRMClientAsync(60000); - amRmClientAsync.init(new Configuration(launcherJobConf)); + amRmClientAsync = amrmClientAsyncFactory.createAMRMClientAsync(60000, amrmCallBackHandler); + amRmClientAsync.init(new Configuration(launcherConf)); amRmClientAsync.start(); // hostname and tracking url are determined automatically @@ -316,25 +360,19 @@ public class LauncherAM { } // Method to execute the prepare actions - private void executePrepare(UserGroupInformation ugi, ErrorHolder errorHolder) throws Exception { + private void executePrepare(ErrorHolder errorHolder) throws Exception { try { System.out.println("\nStarting the execution of prepare actions"); - ugi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML); - if (prepareXML != null) { - if (prepareXML.length() != 0) { - Configuration actionConf = new Configuration(launcherJobConf); - actionConf.addResource(ACTION_CONF_XML); - prepareHandler.prepareAction(prepareXML, actionConf); - } else { - System.out.println("There are no prepare actions to execute."); - } - } - return null; + String prepareXML = launcherConf.get(ACTION_PREPARE_XML); + if (prepareXML != null) { + if (prepareXML.length() != 0) { + Configuration actionConf = new Configuration(launcherConf); + actionConf.addResource(ACTION_CONF_XML); + prepareHandler.prepareAction(prepareXML, actionConf); + } else { + System.out.println("There are no prepare actions to execute."); } - }); + } System.out.println("Completed the execution of prepare actions successfully"); } catch (Exception ex) { errorHolder.setErrorMessage("Prepare execution in the Launcher AM has failed"); @@ -344,9 +382,9 @@ public class LauncherAM { } private void setupMainConfiguration() throws IOException { - System.setProperty(OOZIE_LAUNCHER_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); - System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); - System.setProperty(OOZIE_ACTION_ID, launcherJobConf.get(OOZIE_ACTION_ID)); + System.setProperty(OOZIE_LAUNCHER_JOB_ID, launcherConf.get(OOZIE_JOB_ID)); + System.setProperty(OOZIE_JOB_ID, launcherConf.get(OOZIE_JOB_ID)); + System.setProperty(OOZIE_ACTION_ID, launcherConf.get(OOZIE_ACTION_ID)); System.setProperty(OOZIE_ACTION_CONF_XML, new File(ACTION_CONF_XML).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath()); @@ -358,79 +396,70 @@ public class LauncherAM { System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis())); } - private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) throws Exception { - // using AtomicBoolean because we want to modify it inside run() - final AtomicBoolean actionMainExecutedProperly = new AtomicBoolean(false); - - ugi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - try { - setRecoveryId(); - Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null); - Preconditions.checkNotNull(klass, "Launcher class should not be null"); - System.out.println("Launcher class: " + klass.toString()); - Method mainMethod = klass.getMethod("main", String[].class); - // Enable LauncherSecurityManager to catch System.exit calls - launcherSecurityManager.enable(); - mainMethod.invoke(null, (Object) mainArgs); - - System.out.println(); - System.out.println("<<< Invocation of Main class completed <<<"); - System.out.println(); - actionMainExecutedProperly.set(true); - } catch (InvocationTargetException ex) { - ex.printStackTrace(System.out); - // Get what actually caused the exception - Throwable cause = ex.getCause(); - // If we got a JavaMainException from JavaMain, then we need to unwrap it - if (JavaMain.JavaMainException.class.isInstance(cause)) { - cause = cause.getCause(); - } - if (LauncherMainException.class.isInstance(cause)) { - int errorCode = ((LauncherMainException) ex.getCause()).getErrorCode(); - String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); - eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + - errorCode + "]"); - eHolder.setErrorCode(errorCode); - } else if (SecurityException.class.isInstance(cause)) { - if (launcherSecurityManager.getExitInvoked()) { - final int exitCode = launcherSecurityManager.getExitCode(); - System.out.println("Intercepting System.exit(" + exitCode + ")"); - // if 0 main() method finished successfully - // ignoring - eHolder.setErrorCode(exitCode); - if (exitCode != 0) { - String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); - eHolder.setErrorMessage("Main Class [" + mainClass + "]," - + " exit code [" + eHolder.getErrorCode() + "]"); - } else { - actionMainExecutedProperly.set(true); - } - } else { - // just SecurityException, no exit was invoked - eHolder.setErrorCode(0); - eHolder.setErrorCause(cause); - eHolder.setErrorMessage(cause.getMessage()); - } + private boolean runActionMain(final ErrorHolder eHolder) throws Exception { + boolean actionMainExecutedProperly = false; + try { + final String[] mainArgs = getMainArguments(launcherConf); + setRecoveryId(); + Class<?> klass = launcherConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null); + Preconditions.checkNotNull(klass, "Launcher class should not be null"); + System.out.println("Launcher class: " + klass.toString()); + Method mainMethod = klass.getMethod("main", String[].class); + // Enable LauncherSecurityManager to catch System.exit calls + launcherSecurityManager.enable(); + mainMethod.invoke(null, (Object) mainArgs); + + System.out.println(); + System.out.println("<<< Invocation of Main class completed <<<"); + System.out.println(); + actionMainExecutedProperly = true; + } catch (InvocationTargetException ex) { + ex.printStackTrace(System.out); + // Get what actually caused the exception + Throwable cause = ex.getCause(); + // If we got a JavaMainException from JavaMain, then we need to unwrap it + if (JavaMain.JavaMainException.class.isInstance(cause)) { + cause = cause.getCause(); + } + if (LauncherMainException.class.isInstance(cause)) { + int errorCode = ((LauncherMainException) ex.getCause()).getErrorCode(); + String mainClass = launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + + errorCode + "]"); + eHolder.setErrorCode(errorCode); + } else if (SecurityException.class.isInstance(cause)) { + if (launcherSecurityManager.getExitInvoked()) { + final int exitCode = launcherSecurityManager.getExitCode(); + System.out.println("Intercepting System.exit(" + exitCode + ")"); + // if 0 main() method finished successfully + // ignoring + if (exitCode != 0) { + eHolder.setErrorCode(exitCode); + String mainClass = launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "]," + + " exit code [" + eHolder.getErrorCode() + "]"); } else { - eHolder.setErrorMessage(cause.getMessage()); - eHolder.setErrorCause(cause); + actionMainExecutedProperly = true; } - } catch (Throwable t) { - t.printStackTrace(); - eHolder.setErrorMessage(t.getMessage()); - eHolder.setErrorCause(t); - } finally { - // Disable LauncherSecurityManager - launcherSecurityManager.disable(); + } else { + // just SecurityException, no exit was invoked + eHolder.setErrorCode(0); + eHolder.setErrorCause(cause); + eHolder.setErrorMessage(cause.getMessage()); } - - return null; + } else { + eHolder.setErrorMessage(cause.getMessage()); + eHolder.setErrorCause(cause); } - }); - - return actionMainExecutedProperly.get(); + } catch (Throwable t) { + t.printStackTrace(); + eHolder.setErrorMessage(t.getMessage()); + eHolder.setErrorCause(t); + } finally { + // Disable LauncherSecurityManager + launcherSecurityManager.disable(); + } + return actionMainExecutedProperly; } private void setRecoveryId() throws LauncherException { @@ -438,14 +467,14 @@ public class LauncherAM { ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); String applicationIdStr = applicationId.toString(); - String recoveryId = Preconditions.checkNotNull(launcherJobConf.get(OOZIE_ACTION_RECOVERY_ID), + String recoveryId = Preconditions.checkNotNull(launcherConf.get(OOZIE_ACTION_RECOVERY_ID), "RecoveryID should not be null"); Path path = new Path(actionDir, recoveryId); - if (!hdfsOperations.fileExists(path, launcherJobConf)) { - hdfsOperations.writeStringToFile(path, launcherJobConf, applicationIdStr); + if (!hdfsOperations.fileExists(path, launcherConf)) { + hdfsOperations.writeStringToFile(path, launcherConf, applicationIdStr); } else { - String id = hdfsOperations.readFileContents(path, launcherJobConf); + String id = hdfsOperations.readFileContents(path, launcherConf); if (!applicationIdStr.equals(id)) { throw new LauncherException(MessageFormat.format( @@ -482,7 +511,7 @@ public class LauncherAM { int maxSize = maxSizeDefault; if (maxSizePropertyName != null) { - maxSize = launcherJobConf.getInt(maxSizePropertyName, maxSizeDefault); + maxSize = launcherConf.getInt(maxSizePropertyName, maxSizeDefault); } if (propValue != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java index bde7f1d..27607f6 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java @@ -66,7 +66,6 @@ public class ShellMain extends LauncherMain { setYarnTag(actionConf); setApplicationTags(actionConf, TEZ_APPLICATION_TAGS); setApplicationTags(actionConf, SPARK_YARN_TAGS); - int exitCode = execute(actionConf); if (exitCode != 0) { // Shell command failed. therefore make the action failed http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java index 68c0f4b..2812593 100644 --- a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java +++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java @@ -54,9 +54,6 @@ public class TestHdfsOperations { private SequenceFile.Writer writerMock; @Mock - private UserGroupInformation ugiMock; - - @Mock private Configuration configurationMock; private Path path = new Path("."); @@ -102,14 +99,6 @@ public class TestHdfsOperations { @SuppressWarnings("unchecked") private void configureMocksForHappyPath() throws Exception { - given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - PrivilegedExceptionAction<?> action = (PrivilegedExceptionAction<?>) invocation.getArguments()[0]; - return action.run(); - } - }); - given(seqFileWriterFactoryMock.createSequenceFileWriter(eq(configurationMock), any(Path.class), eq(Text.class), eq(Text.class))).willReturn(writerMock); } http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java index 37af3dd..96d3e1d 100644 --- a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java +++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.action.hadoop.LauncherAM.LauncherSecurityManager; import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult; +import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -94,9 +95,6 @@ public class TestLauncherAM { public ExpectedException thrown = ExpectedException.none(); @Mock - private UserGroupInformation ugiMock; - - @Mock private AMRMClientAsyncFactory amRMClientAsyncFactoryMock; @Mock @@ -138,6 +136,7 @@ public class TestLauncherAM { public void setup() throws Exception { configureMocksForHappyPath(); launcherJobConfig.set(LauncherAMUtils.OOZIE_ACTION_RECOVERY_ID, "1"); + launcherJobConfig.set(LauncherAM.OOZIE_SUBMITTER_USER, System.getProperty("user.name")); instantiateLauncher(); } @@ -228,21 +227,10 @@ public class TestLauncherAM { assertFailedExecution(); } - @Test + @Test(expected = RuntimeException.class) public void testLauncherJobConfCannotBeLoaded() throws Exception { given(localFsOperationsMock.readLauncherConf()).willThrow(new RuntimeException()); - thrown.expect(RuntimeException.class); - - try { - executeLauncher(); - } finally { - failureDetails.expectedExceptionMessage(null) - .expectedErrorCode(EXIT_CODE_0) - .expectedErrorReason("Could not load the Launcher AM configuration file") - .withStackTrace(); - - assertFailedExecution(); - } + LauncherAM.readLauncherConfiguration(localFsOperationsMock); } @Test @@ -460,15 +448,14 @@ public class TestLauncherAM { } private void instantiateLauncher() { - launcherAM = new LauncherAM(ugiMock, - amRMClientAsyncFactoryMock, + launcherAM = new LauncherAM(amRMClientAsyncFactoryMock, callbackHandlerMock, hdfsOperationsMock, localFsOperationsMock, prepareHandlerMock, launcherCallbackNotifierFactoryMock, launcherSecurityManagerMock, - containerId); + containerId, launcherJobConfig); } @SuppressWarnings("unchecked") @@ -480,14 +467,8 @@ public class TestLauncherAM { given(localFsOperationsMock.readLauncherConf()).willReturn(launcherJobConfig); given(localFsOperationsMock.fileExists(any(File.class))).willReturn(true); - willReturn(amRmAsyncClientMock).given(amRMClientAsyncFactoryMock).createAMRMClientAsync(anyInt()); - given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new Answer<Object>() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - PrivilegedExceptionAction<?> action = (PrivilegedExceptionAction<?>) invocation.getArguments()[0]; - return action.run(); - } - }); + willReturn(amRmAsyncClientMock).given(amRMClientAsyncFactoryMock) + .createAMRMClientAsync(anyInt(), any(AMRMCallBackHandler.class)); given(launcherCallbackNotifierFactoryMock.createCallbackNotifier(any(Configuration.class))) .willReturn(launcherCallbackNotifierMock); } @@ -524,7 +505,6 @@ public class TestLauncherAM { verify(amRmAsyncClientMock).registerApplicationMaster(anyString(), anyInt(), anyString()); verify(amRmAsyncClientMock).unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, EMPTY_STRING, EMPTY_STRING); verify(amRmAsyncClientMock).stop(); - verify(ugiMock, times(2)).doAs(any(PrivilegedExceptionAction.class)); // prepare & action main verify(hdfsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class)); verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class)); verify(launcherCallbackNotifierMock).notifyURL(actionResult);