Repository: oozie
Updated Branches:
  refs/heads/master d135b88ce -> 9cb4bd05a


http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index 3e6ffc9..ee1a32a 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -26,15 +26,22 @@ import java.lang.reflect.Method;
 import java.security.Permission;
 import java.security.PrivilegedExceptionAction;
 import java.text.MessageFormat;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
 import java.util.StringTokenizer;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -44,6 +51,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
 
 public class LauncherAM {
     public static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml";
@@ -80,10 +88,9 @@ public class LauncherAM {
     public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml";
     public static final String ACTION_CONF_XML = "action.xml";
     public static final String ACTION_DATA_FINAL_STATUS = "final.status";
+    public static final String OOZIE_SUBMITTER_USER = "oozie.submitter.user";
 
-    private final UserGroupInformation ugi;
-    private final AMRMCallBackHandler callbackHandler;
-    private final AMRMClientAsyncFactory amRmClientAsyncFactory;
+    private final AMRMClientAsyncFactory amrmClientAsyncFactory;
     private final HdfsOperations hdfsOperations;
     private final LocalFsOperations localFsOperations;
     private final PrepareActionsHandler prepareHandler;
@@ -91,24 +98,25 @@ public class LauncherAM {
     private final LauncherSecurityManager launcherSecurityManager;
     private final ContainerId containerId;
 
-    private Configuration launcherJobConf;
+    private final Configuration launcherConf;
+    private final AMRMCallBackHandler amrmCallBackHandler;
     private AMRMClientAsync<?> amRmClientAsync;
     private Path actionDir;
     private Map<String, String> actionData = new HashMap<String,String>();
 
-    public LauncherAM(UserGroupInformation ugi,
-            AMRMClientAsyncFactory amRmClientAsyncFactory,
-            AMRMCallBackHandler callbackHandler,
+    public LauncherAM(
+            AMRMClientAsyncFactory amrmClientAsyncFactory,
+            AMRMCallBackHandler amrmCallBackHandler,
             HdfsOperations hdfsOperations,
             LocalFsOperations localFsOperations,
             PrepareActionsHandler prepareHandler,
             LauncherAMCallbackNotifierFactory callbackNotifierFactory,
             LauncherSecurityManager launcherSecurityManager,
-            String containerId) {
-        this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
-        this.amRmClientAsyncFactory = 
Preconditions.checkNotNull(amRmClientAsyncFactory,
-                "amRmClientAsyncFactory should not be null");
-        this.callbackHandler = Preconditions.checkNotNull(callbackHandler, 
"callbackHandler should not be null");
+            String containerId,
+            Configuration launcherConf) {
+        this.amrmClientAsyncFactory = 
Preconditions.checkNotNull(amrmClientAsyncFactory,
+                "amrmClientAsyncFactory should not be null");
+        this.amrmCallBackHandler = 
Preconditions.checkNotNull(amrmCallBackHandler, "amrmCallBackHandler should not 
be null");
         this.hdfsOperations = Preconditions.checkNotNull(hdfsOperations, 
"hdfsOperations should not be null");
         this.localFsOperations = Preconditions.checkNotNull(localFsOperations, 
"localFsOperations should not be null");
         this.prepareHandler = Preconditions.checkNotNull(prepareHandler, 
"prepareHandler should not be null");
@@ -117,70 +125,107 @@ public class LauncherAM {
         this.launcherSecurityManager = 
Preconditions.checkNotNull(launcherSecurityManager,
                 "launcherSecurityManager should not be null");
         this.containerId = 
ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId 
should not be null"));
+        this.launcherConf = Preconditions.checkNotNull(launcherConf, 
"launcherConf should not be null");
     }
 
     public static void main(String[] args) throws Exception {
-        UserGroupInformation ugi = null;
-        String submitterUser = System.getProperty("submitter.user", "").trim();
-        Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user 
is undefined");
-        System.out.println("Submitter user is: " + submitterUser);
-
-        // We don't need remote/proxy user if the current login user is the 
workflow submitter
-        // Otherwise we have to create a remote user
-        if 
(UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) {
-            System.out.println("Using login user for UGI");
-            ugi = UserGroupInformation.getLoginUser();
-        } else {
-            ugi = UserGroupInformation.createRemoteUser(submitterUser);
-            
ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials());
+        final LocalFsOperations localFsOperations = new LocalFsOperations();
+        final Configuration launcherConf = 
readLauncherConfiguration(localFsOperations);
+        UserGroupInformation.setConfiguration(launcherConf);
+        // MRAppMaster adds this call as well, but it's included only in 
Hadoop 2.9+
+        // SecurityUtil.setConfiguration(launcherConf);
+        UserGroupInformation ugi = getUserGroupInformation(launcherConf);
+        printTokens("Executing Oozie Launcher with tokens:", ugi.getTokens());
+        // Executing code inside a doAs with an ugi equipped with correct 
tokens.
+        ugi.doAs(new PrivilegedExceptionAction<Object>() {
+            @Override
+            public Object run() throws Exception {
+                  LauncherAM launcher = new LauncherAM(new 
AMRMClientAsyncFactory(),
+                        new AMRMCallBackHandler(),
+                        new HdfsOperations(new SequenceFileWriterFactory()),
+                        new LocalFsOperations(),
+                        new PrepareActionsHandler(),
+                        new LauncherAMCallbackNotifierFactory(),
+                        new LauncherSecurityManager(),
+                        
System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()),
+                        launcherConf);
+
+                    launcher.run();
+                    return null;
+            }
+        });
+
+    }
+
+    private static void printTokens(String message, Collection<Token<? extends 
TokenIdentifier>> tokens) {
+        System.out.println(message);
+        for(Token<?> token : tokens) {
+            System.out.println(token);
         }
+    }
+
+    private static UserGroupInformation getUserGroupInformation(Configuration 
launcherConf, Text... kindToFilter)
+            throws IOException {
+        final String submitterUser = launcherConf.get(OOZIE_SUBMITTER_USER);
+        Credentials credentials = 
UserGroupInformation.getCurrentUser().getCredentials();
+        filterTokensByKind(credentials, kindToFilter);
 
-        AMRMClientAsyncFactory amRmClientAsyncFactory = new 
AMRMClientAsyncFactory();
-        AMRMCallBackHandler callbackHandler = new AMRMCallBackHandler();
-        HdfsOperations hdfsOperations = new HdfsOperations(new 
SequenceFileWriterFactory(), ugi);
-        LocalFsOperations localFSOperations = new LocalFsOperations();
-        PrepareActionsHandler prepareHandler = new PrepareActionsHandler();
-        LauncherAMCallbackNotifierFactory callbackNotifierFactory = new 
LauncherAMCallbackNotifierFactory();
-        LauncherSecurityManager launcherSecurityManager = new 
LauncherSecurityManager();
-
-        LauncherAM launcher = new LauncherAM(ugi,
-                amRmClientAsyncFactory,
-                callbackHandler,
-                hdfsOperations,
-                localFSOperations,
-                prepareHandler,
-                callbackNotifierFactory,
-                launcherSecurityManager,
-                System.getenv("CONTAINER_ID"));
-
-        launcher.run();
+        UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(submitterUser);
+        ugi.addCredentials(credentials);
+        return ugi;
+    }
+
+    private static void filterTokensByKind(Credentials credentials, Text[] 
kindToFilter) throws IOException {
+        Iterator<Token<? extends TokenIdentifier>> iterator = 
credentials.getAllTokens().iterator();
+
+        while (iterator.hasNext()) {
+            Token<?> token = iterator.next();
+            for (int i = 0; i < kindToFilter.length; i++) {
+                if (kindToFilter[i].equals(token.getKind())) {
+                    System.out.println("Removing token from the Ugi: " + 
kindToFilter[i]);
+                    iterator.remove();
+                }
+            }
+        }
+    }
+
+    static Configuration readLauncherConfiguration(LocalFsOperations 
localFsOperations) {
+        Configuration launcherConf = null;
+        try {
+            launcherConf = localFsOperations.readLauncherConf();
+            System.out.println("Launcher AM configuration loaded");
+        } catch (Exception ex) {
+            System.err.println("Could not load the Launcher AM configuration 
file");
+            ex.printStackTrace();
+            throw ex;
+        }
+        return launcherConf;
     }
 
     public void run() throws Exception {
         final ErrorHolder errorHolder = new ErrorHolder();
         OozieActionResult actionResult = OozieActionResult.FAILED;
-        boolean launcherExecutedProperly = false;
         boolean backgroundAction = false;
-
         try {
-            try {
-                launcherJobConf = localFsOperations.readLauncherConf();
-                System.out.println("Launcher AM configuration loaded");
-            } catch (Exception ex) {
-                errorHolder.setErrorMessage("Could not load the Launcher AM 
configuration file");
-                errorHolder.setErrorCause(ex);
-                throw ex;
-            }
-            actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH));
 
-            registerWithRM();
-            executePrepare(ugi, errorHolder);
-            final String[] mainArgs = getMainArguments(launcherJobConf);
-            printDebugInfo();
-            setupMainConfiguration();
-            launcherExecutedProperly = runActionMain(mainArgs, errorHolder, 
ugi);
+            actionDir = new Path(launcherConf.get(OOZIE_ACTION_DIR_PATH));
 
-            if (launcherExecutedProperly) {
+            registerWithRM(amrmCallBackHandler);
+            // Run user code without the AM_RM_TOKEN so users can't request 
containers
+            UserGroupInformation ugi = getUserGroupInformation(launcherConf, 
AMRMTokenIdentifier.KIND_NAME);
+            printTokens(  "Executing Action Main with tokens:", 
ugi.getTokens());
+            ugi.doAs(new PrivilegedExceptionAction<Object>() {
+                @Override
+                public Object run() throws Exception {
+                    executePrepare(errorHolder);
+                    printDebugInfo();
+                    setupMainConfiguration();
+                    runActionMain(errorHolder);
+                    return null;
+                }
+            });
+
+            if (!errorHolder.isPopulated()) {
                 handleActionData();
                 if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
                     System.out.println();
@@ -206,7 +251,6 @@ public class LauncherAM {
             System.err.println("Launcher AM execution failed");
             e.printStackTrace(System.out);
             e.printStackTrace(System.err);
-            launcherExecutedProperly = false;
             if (!errorHolder.isPopulated()) {
                 errorHolder.setErrorCause(e);
                 errorHolder.setErrorMessage(e.getMessage());
@@ -214,13 +258,13 @@ public class LauncherAM {
             throw e;
         } finally {
             try {
-                ErrorHolder callbackErrorHolder = callbackHandler.getError();
+                ErrorHolder callbackErrorHolder = 
amrmCallBackHandler.getError();
 
-                if (launcherExecutedProperly) {
+                if (!errorHolder.isPopulated()) {
                     actionResult = backgroundAction ? 
OozieActionResult.RUNNING : OozieActionResult.SUCCEEDED;
                 }
 
-                if (!launcherExecutedProperly) {
+                if (errorHolder.isPopulated()) {
                     updateActionDataWithFailure(errorHolder, actionData);
                 } else if (callbackErrorHolder != null) {  // async error from 
the callback
                     actionResult = OozieActionResult.FAILED;
@@ -228,12 +272,12 @@ public class LauncherAM {
                 }
 
                 actionData.put(ACTION_DATA_FINAL_STATUS, 
actionResult.toString());
-                hdfsOperations.uploadActionDataToHDFS(launcherJobConf, 
actionDir, actionData);
+                hdfsOperations.uploadActionDataToHDFS(launcherConf, actionDir, 
actionData);
             } finally {
                 try {
                     unregisterWithRM(actionResult, 
errorHolder.getErrorMessage());
                 } finally {
-                    LauncherAMCallbackNotifier cn = 
callbackNotifierFactory.createCallbackNotifier(launcherJobConf);
+                    LauncherAMCallbackNotifier cn = 
callbackNotifierFactory.createCallbackNotifier(launcherConf);
                     cn.notifyURL(actionResult);
                 }
             }
@@ -251,8 +295,8 @@ public class LauncherAM {
         System.out.println();
         System.out.println("Oozie Launcher Application Master configuration");
         System.out.println("===============================================");
-        System.out.println("Workflow job id   : " + 
launcherJobConf.get(OOZIE_JOB_ID));
-        System.out.println("Workflow action id: " + 
launcherJobConf.get(OOZIE_ACTION_ID));
+        System.out.println("Workflow job id   : " + 
launcherConf.get(OOZIE_JOB_ID));
+        System.out.println("Workflow action id: " + 
launcherConf.get(OOZIE_ACTION_ID));
         System.out.println();
         System.out.println("Classpath         :");
         System.out.println("------------------------");
@@ -262,11 +306,11 @@ public class LauncherAM {
         }
         System.out.println("------------------------");
         System.out.println();
-        String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+        String mainClass = launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
         System.out.println("Main class        : " + mainClass);
         System.out.println();
         System.out.println("Maximum output    : "
-                + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 
* 1024));
+                + launcherConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 
1024));
         System.out.println();
 
         System.out.println();
@@ -289,10 +333,10 @@ public class LauncherAM {
         System.out.println();
     }
 
-    private void registerWithRM() throws IOException, YarnException {
+    private void registerWithRM(AMRMCallBackHandler amrmCallBackHandler) 
throws IOException, YarnException {
         // TODO: OYA: make heartbeat interval configurable & make interval 
higher to put less load on RM, but lower than timeout
-        amRmClientAsync = amRmClientAsyncFactory.createAMRMClientAsync(60000);
-        amRmClientAsync.init(new Configuration(launcherJobConf));
+        amRmClientAsync = amrmClientAsyncFactory.createAMRMClientAsync(60000, 
amrmCallBackHandler);
+        amRmClientAsync.init(new Configuration(launcherConf));
         amRmClientAsync.start();
 
         // hostname and tracking url are determined automatically
@@ -316,25 +360,19 @@ public class LauncherAM {
     }
 
     // Method to execute the prepare actions
-    private void executePrepare(UserGroupInformation ugi, ErrorHolder 
errorHolder) throws Exception {
+    private void executePrepare(ErrorHolder errorHolder) throws Exception {
         try {
             System.out.println("\nStarting the execution of prepare actions");
-            ugi.doAs(new PrivilegedExceptionAction<Void>() {
-                @Override
-                public Void run() throws Exception {
-                    String prepareXML = 
launcherJobConf.get(ACTION_PREPARE_XML);
-                    if (prepareXML != null) {
-                        if (prepareXML.length() != 0) {
-                            Configuration actionConf = new 
Configuration(launcherJobConf);
-                            actionConf.addResource(ACTION_CONF_XML);
-                            prepareHandler.prepareAction(prepareXML, 
actionConf);
-                        } else {
-                            System.out.println("There are no prepare actions 
to execute.");
-                        }
-                    }
-                    return null;
+            String prepareXML = launcherConf.get(ACTION_PREPARE_XML);
+            if (prepareXML != null) {
+                if (prepareXML.length() != 0) {
+                    Configuration actionConf = new Configuration(launcherConf);
+                    actionConf.addResource(ACTION_CONF_XML);
+                    prepareHandler.prepareAction(prepareXML, actionConf);
+                } else {
+                    System.out.println("There are no prepare actions to 
execute.");
                 }
-            });
+            }
             System.out.println("Completed the execution of prepare actions 
successfully");
         } catch (Exception ex) {
             errorHolder.setErrorMessage("Prepare execution in the Launcher AM 
has failed");
@@ -344,9 +382,9 @@ public class LauncherAM {
     }
 
     private void setupMainConfiguration() throws IOException {
-        System.setProperty(OOZIE_LAUNCHER_JOB_ID, 
launcherJobConf.get(OOZIE_JOB_ID));
-        System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID));
-        System.setProperty(OOZIE_ACTION_ID, 
launcherJobConf.get(OOZIE_ACTION_ID));
+        System.setProperty(OOZIE_LAUNCHER_JOB_ID, 
launcherConf.get(OOZIE_JOB_ID));
+        System.setProperty(OOZIE_JOB_ID, launcherConf.get(OOZIE_JOB_ID));
+        System.setProperty(OOZIE_ACTION_ID, launcherConf.get(OOZIE_ACTION_ID));
         System.setProperty(OOZIE_ACTION_CONF_XML, new 
File(ACTION_CONF_XML).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS,
                 new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath());
@@ -358,79 +396,70 @@ public class LauncherAM {
         System.setProperty("oozie.job.launch.time", 
String.valueOf(System.currentTimeMillis()));
     }
 
-    private boolean runActionMain(final String[] mainArgs, final ErrorHolder 
eHolder, UserGroupInformation ugi) throws Exception {
-        // using AtomicBoolean because we want to modify it inside run()
-        final AtomicBoolean actionMainExecutedProperly = new 
AtomicBoolean(false);
-
-        ugi.doAs(new PrivilegedExceptionAction<Void>() {
-            @Override
-            public Void run() throws Exception {
-                try {
-                    setRecoveryId();
-                    Class<?> klass = 
launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null);
-                    Preconditions.checkNotNull(klass, "Launcher class should 
not be null");
-                    System.out.println("Launcher class: " + klass.toString());
-                    Method mainMethod = klass.getMethod("main", 
String[].class);
-                    // Enable LauncherSecurityManager to catch System.exit 
calls
-                    launcherSecurityManager.enable();
-                    mainMethod.invoke(null, (Object) mainArgs);
-
-                    System.out.println();
-                    System.out.println("<<< Invocation of Main class completed 
<<<");
-                    System.out.println();
-                    actionMainExecutedProperly.set(true);
-                } catch (InvocationTargetException ex) {
-                    ex.printStackTrace(System.out);
-                    // Get what actually caused the exception
-                    Throwable cause = ex.getCause();
-                    // If we got a JavaMainException from JavaMain, then we 
need to unwrap it
-                    if (JavaMain.JavaMainException.class.isInstance(cause)) {
-                        cause = cause.getCause();
-                    }
-                    if (LauncherMainException.class.isInstance(cause)) {
-                        int errorCode = ((LauncherMainException) 
ex.getCause()).getErrorCode();
-                        String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
-                        eHolder.setErrorMessage("Main Class [" + mainClass + 
"], exit code [" +
-                                errorCode + "]");
-                        eHolder.setErrorCode(errorCode);
-                    } else if (SecurityException.class.isInstance(cause)) {
-                        if (launcherSecurityManager.getExitInvoked()) {
-                            final int exitCode = 
launcherSecurityManager.getExitCode();
-                            System.out.println("Intercepting System.exit(" + 
exitCode + ")");
-                            // if 0 main() method finished successfully
-                            // ignoring
-                            eHolder.setErrorCode(exitCode);
-                            if (exitCode != 0) {
-                                String mainClass = 
launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
-                                eHolder.setErrorMessage("Main Class [" + 
mainClass + "],"
-                                        + " exit code [" + 
eHolder.getErrorCode() + "]");
-                            } else {
-                                actionMainExecutedProperly.set(true);
-                            }
-                        } else {
-                            // just SecurityException, no exit was invoked
-                            eHolder.setErrorCode(0);
-                            eHolder.setErrorCause(cause);
-                            eHolder.setErrorMessage(cause.getMessage());
-                        }
+    private boolean runActionMain(final ErrorHolder eHolder) throws Exception {
+        boolean actionMainExecutedProperly = false;
+        try {
+            final String[] mainArgs = getMainArguments(launcherConf);
+            setRecoveryId();
+            Class<?> klass = 
launcherConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null);
+            Preconditions.checkNotNull(klass, "Launcher class should not be 
null");
+            System.out.println("Launcher class: " + klass.toString());
+            Method mainMethod = klass.getMethod("main", String[].class);
+            // Enable LauncherSecurityManager to catch System.exit calls
+            launcherSecurityManager.enable();
+            mainMethod.invoke(null, (Object) mainArgs);
+
+            System.out.println();
+            System.out.println("<<< Invocation of Main class completed <<<");
+            System.out.println();
+            actionMainExecutedProperly = true;
+        } catch (InvocationTargetException ex) {
+            ex.printStackTrace(System.out);
+            // Get what actually caused the exception
+            Throwable cause = ex.getCause();
+            // If we got a JavaMainException from JavaMain, then we need to 
unwrap it
+            if (JavaMain.JavaMainException.class.isInstance(cause)) {
+                cause = cause.getCause();
+            }
+            if (LauncherMainException.class.isInstance(cause)) {
+                int errorCode = ((LauncherMainException) 
ex.getCause()).getErrorCode();
+                String mainClass = 
launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                eHolder.setErrorMessage("Main Class [" + mainClass + "], exit 
code [" +
+                        errorCode + "]");
+                eHolder.setErrorCode(errorCode);
+            } else if (SecurityException.class.isInstance(cause)) {
+                if (launcherSecurityManager.getExitInvoked()) {
+                    final int exitCode = launcherSecurityManager.getExitCode();
+                    System.out.println("Intercepting System.exit(" + exitCode 
+ ")");
+                    // if 0 main() method finished successfully
+                    // ignoring
+                    if (exitCode != 0) {
+                        eHolder.setErrorCode(exitCode);
+                        String mainClass = 
launcherConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                        eHolder.setErrorMessage("Main Class [" + mainClass + 
"],"
+                                + " exit code [" + eHolder.getErrorCode() + 
"]");
                     } else {
-                        eHolder.setErrorMessage(cause.getMessage());
-                        eHolder.setErrorCause(cause);
+                        actionMainExecutedProperly = true;
                     }
-                } catch (Throwable t) {
-                    t.printStackTrace();
-                    eHolder.setErrorMessage(t.getMessage());
-                    eHolder.setErrorCause(t);
-                } finally {
-                    // Disable LauncherSecurityManager
-                    launcherSecurityManager.disable();
+                } else {
+                    // just SecurityException, no exit was invoked
+                    eHolder.setErrorCode(0);
+                    eHolder.setErrorCause(cause);
+                    eHolder.setErrorMessage(cause.getMessage());
                 }
-
-                return null;
+            } else {
+                eHolder.setErrorMessage(cause.getMessage());
+                eHolder.setErrorCause(cause);
             }
-        });
-
-        return actionMainExecutedProperly.get();
+        } catch (Throwable t) {
+            t.printStackTrace();
+            eHolder.setErrorMessage(t.getMessage());
+            eHolder.setErrorCause(t);
+        } finally {
+            // Disable LauncherSecurityManager
+            launcherSecurityManager.disable();
+        }
+        return actionMainExecutedProperly;
     }
 
     private void setRecoveryId() throws LauncherException {
@@ -438,14 +467,14 @@ public class LauncherAM {
             ApplicationId applicationId = 
containerId.getApplicationAttemptId().getApplicationId();
             String applicationIdStr = applicationId.toString();
 
-            String recoveryId = 
Preconditions.checkNotNull(launcherJobConf.get(OOZIE_ACTION_RECOVERY_ID),
+            String recoveryId = 
Preconditions.checkNotNull(launcherConf.get(OOZIE_ACTION_RECOVERY_ID),
                             "RecoveryID should not be null");
 
             Path path = new Path(actionDir, recoveryId);
-            if (!hdfsOperations.fileExists(path, launcherJobConf)) {
-                hdfsOperations.writeStringToFile(path, launcherJobConf, 
applicationIdStr);
+            if (!hdfsOperations.fileExists(path, launcherConf)) {
+                hdfsOperations.writeStringToFile(path, launcherConf, 
applicationIdStr);
             } else {
-                String id = hdfsOperations.readFileContents(path, 
launcherJobConf);
+                String id = hdfsOperations.readFileContents(path, 
launcherConf);
 
                 if (!applicationIdStr.equals(id)) {
                     throw new LauncherException(MessageFormat.format(
@@ -482,7 +511,7 @@ public class LauncherAM {
         int maxSize = maxSizeDefault;
 
         if (maxSizePropertyName != null) {
-            maxSize = launcherJobConf.getInt(maxSizePropertyName, 
maxSizeDefault);
+            maxSize = launcherConf.getInt(maxSizePropertyName, maxSizeDefault);
         }
 
         if (propValue != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
index bde7f1d..27607f6 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
@@ -66,7 +66,6 @@ public class ShellMain extends LauncherMain {
         setYarnTag(actionConf);
         setApplicationTags(actionConf, TEZ_APPLICATION_TAGS);
         setApplicationTags(actionConf, SPARK_YARN_TAGS);
-
         int exitCode = execute(actionConf);
         if (exitCode != 0) {
             // Shell command failed. therefore make the action failed

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
 
b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
index 68c0f4b..2812593 100644
--- 
a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
+++ 
b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
@@ -54,9 +54,6 @@ public class TestHdfsOperations {
     private SequenceFile.Writer writerMock;
 
     @Mock
-    private UserGroupInformation ugiMock;
-
-    @Mock
     private Configuration configurationMock;
 
     private Path path = new Path(".");
@@ -102,14 +99,6 @@ public class TestHdfsOperations {
 
     @SuppressWarnings("unchecked")
     private void configureMocksForHappyPath() throws Exception {
-        
given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new 
Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable 
{
-                PrivilegedExceptionAction<?> action = 
(PrivilegedExceptionAction<?>) invocation.getArguments()[0];
-                return action.run();
-            }
-        });
-
         
given(seqFileWriterFactoryMock.createSequenceFileWriter(eq(configurationMock),
                 any(Path.class), eq(Text.class), 
eq(Text.class))).willReturn(writerMock);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
 
b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
index 37af3dd..96d3e1d 100644
--- 
a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
+++ 
b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
@@ -65,6 +65,7 @@ import 
org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.action.hadoop.LauncherAM.LauncherSecurityManager;
 import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -94,9 +95,6 @@ public class TestLauncherAM {
     public ExpectedException thrown = ExpectedException.none();
 
     @Mock
-    private UserGroupInformation ugiMock;
-
-    @Mock
     private AMRMClientAsyncFactory amRMClientAsyncFactoryMock;
 
     @Mock
@@ -138,6 +136,7 @@ public class TestLauncherAM {
     public void setup() throws Exception {
         configureMocksForHappyPath();
         launcherJobConfig.set(LauncherAMUtils.OOZIE_ACTION_RECOVERY_ID, "1");
+        launcherJobConfig.set(LauncherAM.OOZIE_SUBMITTER_USER, 
System.getProperty("user.name"));
         instantiateLauncher();
     }
 
@@ -228,21 +227,10 @@ public class TestLauncherAM {
         assertFailedExecution();
     }
 
-    @Test
+    @Test(expected = RuntimeException.class)
     public void testLauncherJobConfCannotBeLoaded() throws Exception {
         given(localFsOperationsMock.readLauncherConf()).willThrow(new 
RuntimeException());
-        thrown.expect(RuntimeException.class);
-
-        try {
-            executeLauncher();
-        } finally {
-            failureDetails.expectedExceptionMessage(null)
-                .expectedErrorCode(EXIT_CODE_0)
-                .expectedErrorReason("Could not load the Launcher AM 
configuration file")
-                .withStackTrace();
-
-            assertFailedExecution();
-        }
+        LauncherAM.readLauncherConfiguration(localFsOperationsMock);
     }
 
     @Test
@@ -460,15 +448,14 @@ public class TestLauncherAM {
     }
 
     private void instantiateLauncher() {
-        launcherAM = new LauncherAM(ugiMock,
-                amRMClientAsyncFactoryMock,
+        launcherAM = new LauncherAM(amRMClientAsyncFactoryMock,
                 callbackHandlerMock,
                 hdfsOperationsMock,
                 localFsOperationsMock,
                 prepareHandlerMock,
                 launcherCallbackNotifierFactoryMock,
                 launcherSecurityManagerMock,
-                containerId);
+                containerId, launcherJobConfig);
     }
 
      @SuppressWarnings("unchecked")
@@ -480,14 +467,8 @@ public class TestLauncherAM {
 
         
given(localFsOperationsMock.readLauncherConf()).willReturn(launcherJobConfig);
         
given(localFsOperationsMock.fileExists(any(File.class))).willReturn(true);
-        
willReturn(amRmAsyncClientMock).given(amRMClientAsyncFactoryMock).createAMRMClientAsync(anyInt());
-        
given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new 
Answer<Object>() {
-            @Override
-            public Object answer(InvocationOnMock invocation) throws Throwable 
{
-                PrivilegedExceptionAction<?> action = 
(PrivilegedExceptionAction<?>) invocation.getArguments()[0];
-                return action.run();
-            }
-        });
+        willReturn(amRmAsyncClientMock).given(amRMClientAsyncFactoryMock)
+                .createAMRMClientAsync(anyInt(), 
any(AMRMCallBackHandler.class));
         
given(launcherCallbackNotifierFactoryMock.createCallbackNotifier(any(Configuration.class)))
             .willReturn(launcherCallbackNotifierMock);
     }
@@ -524,7 +505,6 @@ public class TestLauncherAM {
         verify(amRmAsyncClientMock).registerApplicationMaster(anyString(), 
anyInt(), anyString());
         
verify(amRmAsyncClientMock).unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
 EMPTY_STRING, EMPTY_STRING);
         verify(amRmAsyncClientMock).stop();
-        verify(ugiMock, times(2)).doAs(any(PrivilegedExceptionAction.class)); 
// prepare & action main
         
verify(hdfsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), 
any(Path.class), any(Map.class));
         
verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class));
         verify(launcherCallbackNotifierMock).notifyURL(actionResult);

Reply via email to