OOZIE-2591 Fix recovery handling Change-Id: I7501411b2bdcdc1962e5ac77082a71c96b205902
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/3b6daff5 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/3b6daff5 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/3b6daff5 Branch: refs/heads/oya Commit: 3b6daff59a58c5b5b30a123c4ca75d6c20b4e30d Parents: ba68347 Author: Peter Bacsko <[email protected]> Authored: Fri Nov 18 11:13:00 2016 +0100 Committer: Peter Bacsko <[email protected]> Committed: Mon Nov 21 14:28:36 2016 +0100 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 81 ++++++++----------- .../oozie/action/hadoop/LauncherMainTester.java | 1 + .../oozie/service/TestRecoveryService.java | 26 ++----- .../oozie/action/hadoop/HdfsOperations.java | 50 ++++++++++++ .../apache/oozie/action/hadoop/LauncherAM.java | 78 +++++++++++-------- .../oozie/action/hadoop/TestLauncherAM.java | 82 +++++++++++++++++++- 6 files changed, 211 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 284690b..2ec5266 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -49,6 +49,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AccessControlException; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; @@ -103,6 +104,7 @@ import org.jdom.JDOMException; import org.jdom.Namespace; import com.google.common.collect.ImmutableList; +import com.google.common.io.Closeables; public class JavaActionExecutor extends ActionExecutor { @@ -958,6 +960,7 @@ public class JavaActionExecutor extends ActionExecutor { public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException { JobClient jobClient = null; boolean exception = false; + YarnClient yarnClient = null; try { Path appPathRoot = new Path(context.getWorkflow().getAppPath()); @@ -1014,23 +1017,23 @@ public class JavaActionExecutor extends ActionExecutor { } JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); - boolean alreadyRunning = false; - String launcherId = null; - String consoleUrl = null; - // TODO: OYA: equivalent of this? (recovery, alreadyRunning) When does this happen? -// LOG.debug("Creating Job Client for action " + action.getId()); -// jobClient = createJobClient(context, launcherJobConf); -// launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context -// .getRecoveryId()); -// alreadyRunning = launcherId != null; - RunningJob runningJob; + String consoleUrl; + String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context + .getRecoveryId()); + boolean alreadyRunning = launcherId != null; // if user-retry is on, always submit new launcher boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); + yarnClient = createYarnClient(context, launcherJobConf); if (alreadyRunning && !isUserRetry) { - runningJob = jobClient.getJob(JobID.forName(launcherId)); - if (runningJob == null) { + try { + ApplicationId appId = ConverterUtils.toApplicationId(launcherId); + ApplicationReport report = yarnClient.getApplicationReport(appId); + consoleUrl = report.getTrackingUrl(); + } catch (RemoteException e) { + // caught when the application id does not exist + LOG.error("Got RemoteException from YARN", e); String jobTracker = launcherJobConf.get(HADOOP_YARN_RM); throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); @@ -1070,32 +1073,18 @@ public class JavaActionExecutor extends ActionExecutor { LOG.info("No need to inject credentials."); } - YarnClient yarnClient = null; - try { - String user = context.getWorkflow().getUser(); - - // Create application - yarnClient = createYarnClient(context, launcherJobConf); - YarnClientApplication newApp = yarnClient.createApplication(); - ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); - - // Create launch context for app master - ApplicationSubmissionContext appContext = - createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf); - - // Submit the launcher AM - yarnClient.submitApplication(appContext); - - launcherId = appId.toString(); - LOG.debug("After submission get the launcherId [{0}]", launcherId); - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - consoleUrl = appReport.getTrackingUrl(); - } finally { - if (yarnClient != null) { - yarnClient.close(); - yarnClient = null; - } - } + String user = context.getWorkflow().getUser(); + + YarnClientApplication newApp = yarnClient.createApplication(); + ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); + ApplicationSubmissionContext appContext = + createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf); + yarnClient.submitApplication(appContext); + + launcherId = appId.toString(); + LOG.debug("After submission get the launcherId [{0}]", launcherId); + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + consoleUrl = appReport.getTrackingUrl(); } String jobTracker = launcherJobConf.get(HADOOP_YARN_RM); @@ -1106,6 +1095,10 @@ public class JavaActionExecutor extends ActionExecutor { throw convertException(ex); } finally { + if (yarnClient != null) { + Closeables.closeQuietly(yarnClient); + } + if (jobClient != null) { try { jobClient.close(); @@ -1126,26 +1119,16 @@ public class JavaActionExecutor extends ActionExecutor { Context context, Configuration actionConf) throws IOException, HadoopAccessorException, URISyntaxException { - // Create launch context for app master ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); - // set the application id appContext.setApplicationId(appId); - - // set the application name appContext.setApplicationName(launcherJobConf.getJobName()); appContext.setApplicationType("Oozie Launcher"); - - // Set the priority for the application master Priority pri = Records.newRecord(Priority.class); int priority = 0; // TODO: OYA: Add a constant or a config pri.setPriority(priority); appContext.setPriority(pri); - - // Set the queue to which this application is to be submitted in the RM appContext.setQueue(launcherJobConf.getQueueName()); - - // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); // Set the resources to localize @@ -1193,7 +1176,7 @@ public class JavaActionExecutor extends ActionExecutor { vargs.add("-Dhadoop.root.logger=INFO,CLA"); vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); - vargs.add("org.apache.oozie.action.hadoop.LauncherAM"); // note: using string temporarily so we don't have to depend on sharelib-oozie + vargs.add(LauncherAM.class.getCanonicalName()); vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java index 4baed6e..c2aae4c 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java @@ -30,6 +30,7 @@ public class LauncherMainTester { if (args.length == 0) { System.out.println("Hello World!"); } + if (args.length == 1) { if (args[0].equals("throwable")) { throw new Throwable("throwing throwable"); http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java index 8fd0c2d..a3270e9 100644 --- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java +++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java @@ -21,10 +21,7 @@ package org.apache.oozie.service; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorActionBean; @@ -250,20 +247,10 @@ public class TestRecoveryService extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf())); - String user = conf.get("user.name"); - String group = conf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - String launcherId = action1.getExternalId(); - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); - waitFor(240 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), conf); assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); @@ -274,10 +261,8 @@ public class TestRecoveryService extends XDataTestCase { * @throws Exception */ public void testBundleRecoveryCoordCreate() throws Exception { - final BundleActionBean bundleAction; - final BundleJobBean bundle; - bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); - bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP); + final BundleJobBean bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); + addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP); final JPAService jpaService = Services.get().get(JPAService.class); sleep(3000); @@ -290,7 +275,7 @@ public class TestRecoveryService extends XDataTestCase { jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1")); try { if (mybundleAction.getCoordId() != null) { - CoordinatorJobBean coord = jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId())); + jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId())); return true; } } catch (Exception e) { @@ -345,12 +330,11 @@ public class TestRecoveryService extends XDataTestCase { * @throws Exception */ public void testBundleRecoveryCoordExists() throws Exception { - final BundleActionBean bundleAction; final BundleJobBean bundle; final CoordinatorJob coord; bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false); coord = addRecordToCoordJobTable(Job.Status.PREP, false, false); - bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP); + addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP); final JPAService jpaService = Services.get().get(JPAService.class); sleep(3000); http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java index 593de00..6f354a8 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java @@ -17,12 +17,18 @@ */ package org.apache.oozie.action.hadoop; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; @@ -85,4 +91,48 @@ public class HdfsOperations { throw ioe; } } + + public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException { + return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws Exception { + FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf); + return fs.exists(path); + } + }); + } + + public void writeStringToFile(final Path path, final Configuration conf, final String contents) throws IOException, InterruptedException { + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + FileSystem fs = FileSystem.get(path.toUri(), conf); + java.io.Writer writer = new OutputStreamWriter(fs.create(path)); + writer.write(contents); + writer.close(); + return null; + } + }); + } + + public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException { + return ugi.doAs(new PrivilegedExceptionAction<String>() { + @Override + public String run() throws Exception { + FileSystem fs = FileSystem.get(path.toUri(), conf); + InputStream is = fs.open(path); + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + StringBuilder sb = new StringBuilder(); + + String contents; + while ((contents = reader.readLine()) != null) { + sb.append(contents); + } + + reader.close(); + + return sb.toString(); + } + }); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java index 89357ad..881fa72 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@ -25,6 +25,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.security.Permission; import java.security.PrivilegedAction; +import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -34,7 +35,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -81,6 +83,7 @@ public class LauncherAM { private final PrepareActionsHandler prepareHandler; private final LauncherAMCallbackNotifierFactory callbackNotifierFactory; private final LauncherSecurityManager launcherSecurityManager; + private final ContainerId containerId; private Configuration launcherJobConf; private AMRMClientAsync<?> amRmClientAsync; @@ -94,7 +97,8 @@ public class LauncherAM { LocalFsOperations localFsOperations, PrepareActionsHandler prepareHandler, LauncherAMCallbackNotifierFactory callbackNotifierFactory, - LauncherSecurityManager launcherSecurityManager) { + LauncherSecurityManager launcherSecurityManager, + String containerId) { this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null"); this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory, "amRmClientAsyncFactory should not be null"); this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null"); @@ -103,6 +107,7 @@ public class LauncherAM { this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null"); this.callbackNotifierFactory = Preconditions.checkNotNull(callbackNotifierFactory, "callbackNotifierFactory should not be null"); this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager, "launcherSecurityManager should not be null"); + this.containerId = ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId should not be null")); } public static void main(String[] args) throws Exception { @@ -134,29 +139,16 @@ public class LauncherAM { localFSOperations, prepareHandler, callbackNotifierFactory, - launcherSecurityManager); + launcherSecurityManager, + System.getenv("CONTAINER_ID")); launcher.run(); } - // TODO: OYA: rethink all print messages and formatting public void run() throws Exception { final ErrorHolder errorHolder = new ErrorHolder(); OozieActionResult actionResult = OozieActionResult.FAILED; boolean launcerExecutedProperly = false; - - String jobUserName = System.getenv(ApplicationConstants.Environment.USER.name()); - - // DEBUG - will be removed - UserGroupInformation login = UserGroupInformation.getLoginUser(); - System.out.println("Login: " + login.getUserName()); - System.out.println("SecurityEnabled:" + UserGroupInformation.isSecurityEnabled()); - System.out.println("Login keytab based:" + UserGroupInformation.isLoginKeytabBased()); - System.out.println("Login from keytab: " + login.isFromKeytab()); - System.out.println("Login has kerberos credentials: " + login.hasKerberosCredentials()); - System.out.println("Login authMethod: " + login.getAuthenticationMethod()); - System.out.println("JobUserName:" + jobUserName); - boolean backgroundAction = false; try { @@ -288,15 +280,20 @@ public class LauncherAM { System.out.println("Java System Properties:"); System.out.println("------------------------"); System.getProperties().store(System.out, ""); - System.out.flush(); System.out.println("------------------------"); System.out.println(); + System.out.println("Environment variables"); + Map<String, String> env = System.getenv(); + System.out.println("------------------------"); + for (Map.Entry<String, String> entry : env.entrySet()) { + System.out.println(entry.getKey() + "=" + entry.getValue()); + } + System.out.println("------------------------"); System.out.println("================================================================="); System.out.println(); System.out.println(">>> Invoking Main class now >>>"); System.out.println(); - System.out.flush(); } private void registerWithRM() throws IOException, YarnException { @@ -317,7 +314,7 @@ public class LauncherAM { // tracking url is determined automatically amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), message, ""); } catch (Exception ex) { - System.err.println("Error un-registering AM client"); + System.out.println("Error un-registering AM client"); throw ex; } finally { amRmClientAsync.stop(); @@ -366,12 +363,7 @@ public class LauncherAM { System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath()); - // FIXME - make sure it's always set - if (launcherJobConf.get("oozie.job.launch.time") != null) { - System.setProperty("oozie.job.launch.time", launcherJobConf.get("oozie.job.launch.time")); - } else { - System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis())); - } + System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis())); } private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) { @@ -382,9 +374,9 @@ public class LauncherAM { @Override public Void run() { try { + setRecoveryId(); Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); System.out.println("Launcher class: " + klass.toString()); - System.out.flush(); Method mainMethod = klass.getMethod("main", String[].class); // Enable LauncherSecurityManager to catch System.exit calls launcherSecurityManager.set(); @@ -412,7 +404,6 @@ public class LauncherAM { if (launcherSecurityManager.getExitInvoked()) { final int exitCode = launcherSecurityManager.getExitCode(); System.out.println("Intercepting System.exit(" + exitCode + ")"); - System.err.println("Intercepting System.exit(" + exitCode + ")"); // if 0 main() method finished successfully // ignoring eHolder.setErrorCode(exitCode); @@ -438,8 +429,6 @@ public class LauncherAM { eHolder.setErrorMessage(t.getMessage()); eHolder.setErrorCause(t); } finally { - System.out.flush(); - System.err.flush(); // Disable LauncherSecurityManager launcherSecurityManager.unset(); } @@ -451,6 +440,31 @@ public class LauncherAM { return actionMainExecutedProperly.get(); } + private void setRecoveryId() throws LauncherException { + try { + ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); + String applicationIdStr = applicationId.toString(); + + String recoveryId = Preconditions.checkNotNull(launcherJobConf.get(LauncherMapper.OOZIE_ACTION_RECOVERY_ID), + "RecoveryID should not be null"); + + Path path = new Path(actionDir, recoveryId); + if (!hdfsOperations.fileExists(path, launcherJobConf)) { + hdfsOperations.writeStringToFile(path, launcherJobConf, applicationIdStr); + } else { + String id = hdfsOperations.readFileContents(path, launcherJobConf); + + if (!applicationIdStr.equals(id)) { + throw new LauncherException(MessageFormat.format( + "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id, + applicationIdStr)); + } + } + } catch (Exception ex) { + throw new LauncherException("IO error",ex); + } + } + private void handleActionData() throws IOException { // external child IDs processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, null, ACTION_DATA_EXTERNAL_CHILD_IDS, -1, ACTIONOUTPUTTYPE_EXT_CHILD_ID); @@ -516,14 +530,12 @@ public class LauncherAM { } } } catch (IOException ioe) { - System.err.println("A problem occured trying to fail the launcher"); + System.out.println("A problem occured trying to fail the launcher"); ioe.printStackTrace(); } finally { System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n"); - System.err.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n"); if (eHolder.getErrorCause() != null) { eHolder.getErrorCause().printStackTrace(System.out); - eHolder.getErrorCause().printStackTrace(System.err); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/3b6daff5/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java index 30441ea..052673d 100644 --- a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java +++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java @@ -65,6 +65,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.action.hadoop.LauncherAM.LauncherSecurityManager; import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult; import org.junit.Before; @@ -72,7 +73,6 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; @@ -80,6 +80,7 @@ import org.mockito.stubbing.Answer; @RunWith(MockitoJUnitRunner.class) public class TestLauncherAM { + private static final String DEFAULT_CONTAINER_ID = "container_1479473450392_0001_01_000001"; private static final String ACTIONDATA_ERROR_PROPERTIES = "error.properties"; private static final String ACTIONDATA_FINAL_STATUS_PROPERTY = "final.status"; private static final String ERROR_CODE_PROPERTY = "error.code"; @@ -108,7 +109,7 @@ public class TestLauncherAM { private AMRMCallBackHandler callbackHandlerMock; @Mock - private HdfsOperations fsOperationsMock; + private HdfsOperations hdfsOperationsMock; @Mock private LocalFsOperations localFsOperationsMock; @@ -127,7 +128,10 @@ public class TestLauncherAM { private Configuration launcherJobConfig = new Configuration(); - @InjectMocks + private String containerId = DEFAULT_CONTAINER_ID; + + private String applicationId = ConverterUtils.toContainerId(containerId).getApplicationAttemptId().getApplicationId().toString(); + private LauncherAM launcherAM; private ExpectedFailureDetails failureDetails = new ExpectedFailureDetails(); @@ -135,6 +139,8 @@ public class TestLauncherAM { @Before public void setup() throws IOException { configureMocksForHappyPath(); + launcherJobConfig.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, "1"); + instantiateLauncher(); } @Test @@ -379,6 +385,73 @@ public class TestLauncherAM { } } + @Test + public void testRecoveryIdNotSet() throws Exception { + launcherJobConfig.unset(LauncherMapper.OOZIE_ACTION_RECOVERY_ID); + instantiateLauncher(); + + executeLauncher(); + + failureDetails.expectedExceptionMessage("IO error") + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason("IO error, IO error") + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testRecoveryIdExistsAndRecoveryIsdMatch() throws Exception { + given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true); + given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(applicationId); + + executeLauncher(); + + verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig)); + } + + @Test + public void testRecoveryIdExistsAndRecoveryIdsDoNotMatch() throws Exception { + given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true); + given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn("not_matching_appid"); + + executeLauncher(); + + failureDetails.expectedExceptionMessage("IO error") + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason("IO error, IO error") + .withStackTrace(); + + verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig)); + assertFailedExecution(); + } + + @Test + public void testReadingRecoveryIdFails() throws Exception { + willThrow(new IOException()).given(hdfsOperationsMock).writeStringToFile(any(Path.class), eq(launcherJobConfig), eq(applicationId)); + + executeLauncher(); + + failureDetails.expectedExceptionMessage("IO error") + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason("IO error, IO error") + .withStackTrace(); + + assertFailedExecution(); + } + + private void instantiateLauncher() { + launcherAM = new LauncherAM(ugiMock, + amRMClientAsyncFactoryMock, + callbackHandlerMock, + hdfsOperationsMock, + localFsOperationsMock, + prepareHandlerMock, + launcherCallbackNotifierFactoryMock, + launcherSecurityManagerMock, + containerId); + } + @SuppressWarnings("unchecked") private void configureMocksForHappyPath() throws IOException { launcherJobConfig.set(LauncherAM.OOZIE_ACTION_DIR_PATH, "dummy"); @@ -426,9 +499,10 @@ public class TestLauncherAM { verify(amRmAsyncClientMock).unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, EMPTY_STRING, EMPTY_STRING); verify(amRmAsyncClientMock).stop(); verify(ugiMock, times(2)).doAs(any(PrivilegedAction.class)); // prepare & action main - verify(fsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class)); + verify(hdfsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class)); verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class)); verify(launcherCallbackNotifierMock).notifyURL(actionResult); + verify(hdfsOperationsMock).writeStringToFile(any(Path.class), any(Configuration.class), any(String.class)); Map<String, String> actionData = launcherAM.getActionData(); verifyFinalStatus(actionData, actionResult);
