OOZIE-2594 Make MapReduce action work, small refactors, remove RunningJob from test cases, test fixes. Follow up: OOZIE-2686
Change-Id: I797963d65bc248c81c9e9d0b2a48a68dd2bab5cf Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ca7e56fd Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ca7e56fd Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ca7e56fd Branch: refs/heads/oya Commit: ca7e56fdccbca80ce2f9b87812c15305ca9d09d0 Parents: 2fddebb Author: Peter Bacsko <pbac...@cloudera.com> Authored: Mon Sep 12 11:29:14 2016 +0200 Committer: Peter Bacsko <pbac...@cloudera.com> Committed: Mon Sep 26 14:11:28 2016 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 117 ++++---- .../action/hadoop/MapReduceActionExecutor.java | 79 +++++- .../apache/oozie/servlet/CallbackServlet.java | 4 +- .../action/hadoop/TestDistCpActionExecutor.java | 29 +- .../action/hadoop/TestJavaActionExecutor.java | 12 +- .../hadoop/TestLauncherAMCallbackNotifier.java | 148 ++++++---- .../action/hadoop/TestMapReduceActionError.java | 173 ------------ .../oozie/action/hadoop/TestOozieJobInfo.java | 19 +- .../action/hadoop/TestShellActionExecutor.java | 46 +--- .../command/wf/TestActionCheckXCommand.java | 7 +- .../command/wf/TestActionStartXCommand.java | 26 +- .../java/org/apache/oozie/test/XFsTestCase.java | 27 ++ .../java/org/apache/oozie/test/XTestCase.java | 10 +- .../action/hadoop/TestHiveActionExecutor.java | 39 +-- .../action/hadoop/TestHive2ActionExecutor.java | 40 +-- .../apache/oozie/action/hadoop/LauncherAM.java | 274 +++++++++++-------- .../hadoop/LauncherAMCallbackNotifier.java | 15 +- .../oozie/action/hadoop/LauncherMain.java | 8 +- .../oozie/action/hadoop/MapReduceMain.java | 1 - .../org/apache/oozie/action/hadoop/PigMain.java | 2 +- .../action/hadoop/TestPigActionExecutor.java | 58 +--- .../apache/oozie/action/hadoop/TestPyspark.java | 19 +- .../action/hadoop/TestSparkActionExecutor.java | 21 +- .../action/hadoop/TestSqoopActionExecutor.java | 51 +--- .../hadoop/TestMapReduceActionExecutor.java | 267 +++++++++--------- 25 files changed, 645 insertions(+), 847 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 6a28406..8637f64 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -18,6 +18,28 @@ package org.apache.oozie.action.hadoop; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; @@ -31,6 +53,7 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.token.Token; @@ -48,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.oozie.WorkflowActionBean; @@ -78,38 +102,21 @@ import org.jdom.Element; import org.jdom.JDOMException; import org.jdom.Namespace; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.StringReader; -import java.net.ConnectException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; +import com.google.common.collect.ImmutableList; public class JavaActionExecutor extends ActionExecutor { - protected static final String HADOOP_USER = "user.name"; + public static final String RUNNING = "RUNNING"; + public static final String SUCCEEDED = "SUCCEEDED"; + public static final String KILLED = "KILLED"; + public static final String FAILED = "FAILED"; + public static final String FAILED_KILLED = "FAILED/KILLED"; public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; public static final String HADOOP_NAME_NODE = "fs.default.name"; - private static final String HADOOP_JOB_NAME = "mapred.job.name"; public static final String OOZIE_COMMON_LIBDIR = "oozie"; - private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); - public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; + + public static final String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size"; public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled"; @@ -120,24 +127,27 @@ public class JavaActionExecutor extends ActionExecutor { public static final String HADOOP_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts"; public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env"; public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env"; + public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader"; + public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; + public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb"; public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts"; public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env"; - private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain"; public static final int YARN_MEMORY_MB_MIN = 512; + + private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain"; + private static final String HADOOP_JOB_NAME = "mapred.job.name"; + private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>(); + private static int maxActionOutputLen; private static int maxExternalStatsSize; private static int maxFSGlobMax; - private static final String SUCCEEDED = "SUCCEEDED"; - private static final String KILLED = "KILLED"; - private static final String FAILED = "FAILED"; - private static final String FAILED_KILLED = "FAILED/KILLED"; + + protected static final String HADOOP_USER = "user.name"; + protected XLog LOG = XLog.getLog(getClass()); private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])"); private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir="; - public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader"; - public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; - public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; static { DISALLOWED_PROPERTIES.add(HADOOP_USER); @@ -237,6 +247,13 @@ public class JavaActionExecutor extends ActionExecutor { conf.set(HADOOP_YARN_RM, jobTracker); conf.set(HADOOP_NAME_NODE, nameNode); conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true"); + + // FIXME - think about this! + Element e = actionXml.getChild("config-class", ns); + if (e != null) { + conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); + } + return conf; } @@ -308,6 +325,7 @@ public class JavaActionExecutor extends ActionExecutor { } } + // FIXME: is this needed? private HashMap<String, List<String>> populateEnvMap(String input) { HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>(); String[] envEntries = input.split(","); @@ -918,7 +936,7 @@ public class JavaActionExecutor extends ActionExecutor { } } - private void injectCallback(Context context, Configuration conf) { + protected void injectCallback(Context context, Configuration conf) { String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN); conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback); } @@ -1109,6 +1127,7 @@ public class JavaActionExecutor extends ActionExecutor { private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, JobConf launcherJobConf, String user, Context context, Configuration actionConf) throws IOException, HadoopAccessorException, URISyntaxException { + // Create launch context for app master ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); @@ -1149,9 +1168,10 @@ public class JavaActionExecutor extends ActionExecutor { Map<String, String> env = new HashMap<String, String>(); // This adds the Hadoop jars to the classpath in the Launcher JVM ClasspathUtils.setupClasspath(env, launcherJobConf); - if (false) { // TODO: OYA: config to add MR jars? Probably also needed for MR Action - ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); - } + + // FIXME: move this to specific places where it's actually needed - keeping it here for now + ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); + amContainer.setEnvironment(env); // Set the command @@ -1160,18 +1180,28 @@ public class JavaActionExecutor extends ActionExecutor { + "/bin/java"); // TODO: OYA: remove attach debugger to AM; useful for debugging // vargs.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"); - MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs); - vargs.add(LauncherAM.class.getName()); + + // FIXME: decide what to do with this method call - signature keeps changing + // MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs, null); + + vargs.add("-Dlog4j.configuration=container-log4j.properties"); + vargs.add("-Dlog4j.debug=true"); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR); + vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 1024 * 1024); + vargs.add("-Dhadoop.root.logger=INFO,CLA"); + vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG); + vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser()); + vargs.add("org.apache.oozie.action.hadoop.LauncherAM"); // note: using string temporarily so we don't have to depend on sharelib-oozie vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR); - List<String> vargsFinal = new ArrayList<String>(6); StringBuilder mergedCommand = new StringBuilder(); for (CharSequence str : vargs) { mergedCommand.append(str).append(" "); } - vargsFinal.add(mergedCommand.toString()); + + List<String> vargsFinal = ImmutableList.of(mergedCommand.toString()); LOG.debug("Command to launch container for ApplicationMaster is : " + mergedCommand); amContainer.setCommands(vargsFinal); @@ -1405,11 +1435,6 @@ public class JavaActionExecutor extends ActionExecutor { return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf); } - protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{ - RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); - return runningJob; - } - /** * Useful for overriding in actions that do subsequent job runs * such as the MapReduce Action, where the launcher job is not the http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index 4553351..019c4d9 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -21,7 +21,9 @@ package org.apache.oozie.action.hadoop; import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -312,13 +314,80 @@ public class MapReduceActionExecutor extends JavaActionExecutor { } @Override - protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{ + protected void injectCallback(Context context, Configuration conf) { + // add callback for the MapReduce job + String callback = context.getCallbackUrl("$jobStatus"); + if (conf.get("job.end.notification.url") != null) { + LOG.warn("Overriding the action job end notification URI"); + } + conf.set("job.end.notification.url", callback); + + super.injectCallback(context, conf); + } - RunningJob runningJob; - String jobId = getActualExternalId(action); + @Override + public void check(Context context, WorkflowAction action) throws ActionExecutorException { + Map<String, String> actionData = Collections.emptyMap(); + JobConf jobConf = null; - runningJob = jobClient.getJob(JobID.forName(jobId)); + try { + FileSystem actionFs = context.getAppFileSystem(); + Element actionXml = XmlUtils.parseXml(action.getConf()); + jobConf = createBaseHadoopConf(context, actionXml); + Path actionDir = context.getActionDir(); + actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf); + } catch (Exception e) { + LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e); + throw convertException(e); + } - return runningJob; + final String newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID); + + // check the Hadoop job if newID is defined (which should be the case here) - otherwise perform the normal check() + if (newId != null) { + boolean jobCompleted; + JobClient jobClient = null; + boolean exception = false; + + try { + jobClient = createJobClient(context, jobConf); + RunningJob runningJob = jobClient.getJob(JobID.forName(newId)); + + if (runningJob == null) { + context.setExternalStatus(FAILED); + throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", + "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, + action.getId()); + } + + jobCompleted = runningJob.isComplete(); + } catch (Exception e) { + LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e); + exception = true; + throw convertException(e); + } finally { + if (jobClient != null) { + try { + jobClient.close(); + } catch (Exception e) { + if (exception) { + LOG.error("JobClient error (not re-throwing due to a previous error): ", e); + } else { + throw convertException(e); + } + } + } + } + + // run original check() if the MR action is completed or there are errors - otherwise mark it as RUNNING + if (jobCompleted || (!jobCompleted && actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS))) { + super.check(context, action); + } else { + context.setExternalStatus(RUNNING); + context.setExternalChildIDs(newId); + } + } else { + super.check(context, action); + } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java b/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java index 6123021..86ba467 100644 --- a/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java +++ b/core/src/main/java/org/apache/oozie/servlet/CallbackServlet.java @@ -40,10 +40,12 @@ import org.apache.oozie.util.PropertiesUtils; import org.apache.oozie.util.XLog; public class CallbackServlet extends JsonRestServlet { + private static final long serialVersionUID = 6439106936153152786L; + private static final String INSTRUMENTATION_NAME = "callback"; private static final ResourceInfo RESOURCE_INFO = - new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.EMPTY_LIST); + new ResourceInfo("", Arrays.asList("POST", "GET"), Collections.<ParameterInfo>emptyList()); public final static String CONF_MAX_DATA_LEN = "oozie.servlet.CallbackServlet.max.data.len"; http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java index e8a140f..c1f0e6f 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestDistCpActionExecutor.java @@ -25,18 +25,10 @@ import java.io.OutputStream; import java.util.Arrays; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.action.hadoop.ActionExecutorTestCase.Context; import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; -import org.apache.oozie.test.XTestCase.Predicate; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; @@ -64,13 +56,8 @@ public class TestDistCpActionExecutor extends ActionExecutorTestCase{ "<arg>" + outputPath + "</arg>" + "</distcp>"; Context context = createContext(actionXml); - final RunningJob runningJob = submitAction(context); - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return runningJob.isComplete(); - } - }); - assertTrue(runningJob.isSuccessful()); + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); waitFor(60 * 1000, new Predicate() { public boolean evaluate() throws Exception { @@ -139,7 +126,7 @@ public class TestDistCpActionExecutor extends ActionExecutorTestCase{ return new Context(wf, action); } - protected RunningJob submitAction(Context context) throws Exception { + protected String submitAction(Context context) throws Exception { DistcpActionExecutor ae = new DistcpActionExecutor(); WorkflowAction action = context.getAction(); @@ -154,14 +141,8 @@ public class TestDistCpActionExecutor extends ActionExecutorTestCase{ assertNotNull(jobTracker); assertNotNull(consoleUrl); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - jobConf.set("mapred.job.tracker", jobTracker); - - JobClient jobClient = - Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + ae.submitLauncher(getFileSystem(), context, context.getAction()); + return context.getAction().getExternalId(); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 123eba5..8adc606 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -21,7 +21,6 @@ package org.apache.oozie.action.hadoop; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.OutputStreamWriter; @@ -36,7 +35,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Properties; -import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataOutputStream; @@ -44,14 +42,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; @@ -60,7 +53,6 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.service.Services; @@ -582,9 +574,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { assertEquals(launcherId, runningJob2); assertEquals(launcherId, context.getAction().getExternalId()); - waitUntilYarnAppCompletes(runningJob2); - //FIXME????? - waitUntilYarnAppState(launcherId, YarnApplicationState.FINISHED); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); ActionExecutor ae = new JavaActionExecutor(); ae.check(context, context.getAction()); assertEquals("SUCCEEDED", context.getAction().getExternalStatus()); http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java index d0b4d5b..9ba04da 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java @@ -33,6 +33,22 @@ import java.util.Map; // A lot of this adapted from org.apache.hadoop.mapreduce.v2.app.TestJobEndNotifier and org.apache.hadoop.mapred.TestJobEndNotifier public class TestLauncherAMCallbackNotifier extends XTestCase { + private EmbeddedServletContainer container; + + @Override + public void setUp() throws Exception { + super.setUp(); + QueryServlet.lastQueryString = null; + } + + @Override + public void tearDown() throws Exception { + if (container != null) { + container.stop(); + } + + super.tearDown(); + } public void testConfiguration() throws Exception { Configuration conf = new Configuration(false); @@ -91,8 +107,9 @@ public class TestLauncherAMCallbackNotifier extends XTestCase { conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000"); LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); + long start = System.currentTimeMillis(); - cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED); + cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false); long end = System.currentTimeMillis(); Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce(); Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000); @@ -103,68 +120,93 @@ public class TestLauncherAMCallbackNotifier extends XTestCase { cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); start = System.currentTimeMillis(); - cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED); + cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false); end = System.currentTimeMillis(); Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce(); Assert.assertTrue("Should have taken more than 9 seconds but it only took " + (end - start), end - start >= 9000); } public void testNotifyTimeout() throws Exception { - EmbeddedServletContainer container = null; - try { - container = new EmbeddedServletContainer("blah"); - Map<String, String> params = new HashMap<String, String>(); - params.put(HangServlet.SLEEP_TIME_MS, "1000000"); - container.addServletEndpoint("/hang/*", HangServlet.class, params); - container.start(); - - Configuration conf = new Configuration(false); - conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0"); - conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1"); - conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL("/hang/*")); - conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000"); - - LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); - long start = System.currentTimeMillis(); - cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED); - long end = System.currentTimeMillis(); - Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce(); - Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000); - } finally { - if (container != null) { - container.stop(); - } - } + Map<String, String> params = new HashMap<String, String>(); + params.put(HangServlet.SLEEP_TIME_MS, "1000000"); + Configuration conf = setupEmbeddedContainer(HangServlet.class, "/hang/*", "/hang/*", params); + + LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf)); + long start = System.currentTimeMillis(); + cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED, false); + long end = System.currentTimeMillis(); + Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce(); + Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000); } public void testNotify() throws Exception { - EmbeddedServletContainer container = null; - try { - container = new EmbeddedServletContainer("blah"); - container.addServletEndpoint("/count/*", QueryServlet.class); - container.start(); - - Configuration conf = new Configuration(false); - conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0"); - conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1"); - conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL("/count/?status=$jobStatus")); - conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000"); - - LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); - QueryServlet.lastQueryString = null; - assertNull(QueryServlet.lastQueryString); - cn.notifyURL(FinalApplicationStatus.SUCCEEDED); - waitFor(5000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return "status=SUCCEEDED".equals(QueryServlet.lastQueryString); - } - }); - assertEquals("status=SUCCEEDED", QueryServlet.lastQueryString); - } finally { - if (container != null) { - container.stop(); + Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null); + + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); + + assertNull(QueryServlet.lastQueryString); + cn.notifyURL(FinalApplicationStatus.SUCCEEDED, false); + waitForCallbackAndCheckResult(FinalApplicationStatus.SUCCEEDED.toString()); + } + + public void testNotifyBackgroundActionWhenSubmitSucceeds() throws Exception { + Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null); + + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); + + assertNull(QueryServlet.lastQueryString); + cn.notifyURL(FinalApplicationStatus.SUCCEEDED, true); + waitForCallbackAndCheckResult("RUNNING"); + } + + public void testNotifyBackgroundActionWhenSubmitFailsWithKilled() throws Exception { + Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null); + + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); + + assertNull(QueryServlet.lastQueryString); + cn.notifyURL(FinalApplicationStatus.KILLED, true); + waitForCallbackAndCheckResult(FinalApplicationStatus.KILLED.toString()); + } + + public void testNotifyBackgroundActionWhenSubmitFailsWithFailed() throws Exception { + Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null); + + LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf); + + assertNull(QueryServlet.lastQueryString); + cn.notifyURL(FinalApplicationStatus.FAILED, true); + waitForCallbackAndCheckResult(FinalApplicationStatus.FAILED.toString()); + } + + private Configuration setupEmbeddedContainer(Class<?> servletClass, String servletEndPoint, String servletUrl, Map<String, String> params) throws Exception { + container = new EmbeddedServletContainer("test"); + if (servletEndPoint != null) { + if (params != null) { + container.addServletEndpoint(servletEndPoint, servletClass, params); + } else { + container.addServletEndpoint(servletEndPoint, servletClass); } } + container.start(); + + Configuration conf = new Configuration(false); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1"); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL(servletUrl)); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000"); + + return conf; + } + + private void waitForCallbackAndCheckResult(final String expectedResult) { + waitFor(5000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return ("status=" + expectedResult).equals(QueryServlet.lastQueryString); + } + }); + + assertEquals("status=" + expectedResult, QueryServlet.lastQueryString); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java deleted file mode 100644 index 4cda615..0000000 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.action.hadoop; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.JobID; -import org.apache.oozie.WorkflowActionBean; -import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.WorkflowAppService; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; -import org.apache.oozie.util.IOUtils; -import org.jdom.Element; - -import java.io.File; -import java.io.OutputStream; -import java.io.InputStream; -import java.io.FileInputStream; -import java.io.StringReader; -import java.io.Writer; -import java.io.OutputStreamWriter; - -public class TestMapReduceActionError extends ActionExecutorTestCase { - - @Override - protected void setSystemProps() throws Exception { - super.setSystemProps(); - setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName()); - } - - private Context createContext(String actionXml) throws Exception { - JavaActionExecutor ae = new JavaActionExecutor(); - - Path appJarPath = new Path("lib/test.jar"); - File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", MapperReducerForTest.class); - InputStream is = new FileInputStream(jarFile); - OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar")); - IOUtils.copyStream(is, os); - - XConfiguration protoConf = new XConfiguration(); - protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser()); - - protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString()); - - WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action"); - WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0); - action.setType(ae.getType()); - action.setConf(actionXml); - - return new Context(wf, action); - } - - private RunningJob submitAction(Context context) throws Exception { - MapReduceActionExecutor ae = new MapReduceActionExecutor(); - - WorkflowAction action = context.getAction(); - - ae.prepareActionDir(getFileSystem(), context); - ae.submitLauncher(getFileSystem(), context, action); - - String jobId = action.getExternalId(); - String jobTracker = action.getTrackerUri(); - String consoleUrl = action.getConsoleUrl(); - assertNotNull(jobId); - assertNotNull(jobTracker); - assertNotNull(consoleUrl); - - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = - new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker")); - conf.set("fs.default.name", e.getChildTextTrim("name-node")); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - conf.set("mapreduce.framework.name", "yarn"); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; - } - - private void _testSubmit(String actionXml) throws Exception { - - Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context); - String launcherId = context.getAction().getExternalId(); - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - - MapReduceActionExecutor ae = new MapReduceActionExecutor(); - ae.check(context, context.getAction()); - - JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - String user = conf.get("user.name"); - String group = conf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalId())); - - waitFor(60 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - ae.check(context, context.getAction()); - - assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); - - ae.end(context, context.getAction()); - assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); - assertTrue(context.getAction().getErrorMessage().contains("already exists")); - } - - public void testMapReduce() throws Exception { - FileSystem fs = getFileSystem(); - - Path inputDir = new Path(getFsTestCaseDir(), "input"); - Path outputDir = new Path(getFsTestCaseDir(), "output1"); - - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt"))); - ow.write("dummy\n"); - ow.write("dummy\n"); - ow.close(); - - String actionXml = "<map-reduce>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<configuration>" + - "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() + - "</value></property>" + - "<property><name>mapred.reducer.class</name><value>" + MapperReducerForTest.class.getName() + - "</value></property>" + - "<property><name>mapred.input.dir</name><value>" + inputDir + "</value></property>" + - "<property><name>mapred.output.dir</name><value>" + outputDir + "</value></property>" + - "</configuration>" + - "</map-reduce>"; - _testSubmit(actionXml); - } - -} http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java index 5bc7d00..9efacdd 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java @@ -33,19 +33,13 @@ import java.util.regex.Matcher; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.BundleActionBean; import org.apache.oozie.BundleJobBean; import org.apache.oozie.CoordinatorJobBean; import org.apache.oozie.ErrorCode; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; -import org.apache.oozie.action.hadoop.MapReduceActionExecutor; -import org.apache.oozie.action.hadoop.MapperReducerForTest; -import org.apache.oozie.action.hadoop.OozieJobInfo; import org.apache.oozie.client.Job; import org.apache.oozie.client.OozieClient; import org.apache.oozie.command.CommandException; @@ -53,19 +47,16 @@ import org.apache.oozie.command.bundle.BundleStartXCommand; import org.apache.oozie.command.bundle.BundleSubmitXCommand; import org.apache.oozie.command.wf.ActionXCommand; import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; -import org.apache.oozie.command.wf.JobXCommand; import org.apache.oozie.executor.jpa.BundleActionQueryExecutor; +import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor; import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor; -import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery; -import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.JPAService; import org.apache.oozie.service.Services; import org.apache.oozie.service.UUIDService; -import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.UUIDService.ApplicationType; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.util.IOUtils; @@ -165,12 +156,9 @@ public class TestOozieJobInfo extends XDataTestCase { MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(actionList.get(1).getConf())); String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - String launcherId = actionList.get(1).getExternalId(); - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); - FileSystem fs = context.getAppFileSystem(); - Configuration jobXmlConf = new XConfiguration(fs.open(new Path(launcherJob.getJobFile()))); + FileSystem fs = getFileSystem(); + Configuration jobXmlConf = new XConfiguration(fs.open(getPathToWorkflowResource(user, wfbean, services, context, LauncherAM.LAUNCHER_JOB_CONF_XML))); String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY); // BUNDLE_ID;BUNDLE_NAME;COORDINATOR_NAME;COORDINATOR_NOMINAL_TIME; @@ -186,7 +174,6 @@ public class TestOozieJobInfo extends XDataTestCase { assertTrue(jobInfo.contains(",testing=test,")); assertTrue(jobInfo.contains(",coord.nominal.time=")); assertTrue(jobInfo.contains("launcher=true")); - } protected void setCoordConf(Configuration jobConf) throws IOException { http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java index 9468fad..3354b3a 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java @@ -229,14 +229,8 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { Context context = createContext(actionXml); // Submit the action - final RunningJob launcherJob = submitAction(context); - waitFor(180 * 1000, new Predicate() { // Wait for the external job to - // finish - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - + final String launcherId = submitAction(context); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); ShellActionExecutor ae = new ShellActionExecutor(); WorkflowAction action = context.getAction(); ae.check(context, action); @@ -258,19 +252,10 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { private WorkflowAction _testSubmit(String actionXml, boolean checkForSuccess, String capture_output) throws Exception { Context context = createContext(actionXml); - final RunningJob launcherJob = submitAction(context);// Submit the + final String launcherId = submitAction(context);// Submit the // action - String launcherId = context.getAction().getExternalId(); // Get LM id - waitFor(180 * 1000, new Predicate() { // Wait for the external job to - // finish - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - // Thread.sleep(2000); - assertTrue(launcherJob.isSuccessful()); - - sleep(2000);// Wait more to make sure no ID swap happens + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), @@ -334,14 +319,13 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { * @return The RunningJob of the Launcher Mapper * @throws Exception */ - private RunningJob submitAction(Context context) throws Exception { + private String submitAction(Context context) throws Exception { ShellActionExecutor ae = new ShellActionExecutor(); WorkflowAction action = context.getAction(); ae.prepareActionDir(getFileSystem(), context); - ae.submitLauncher(getFileSystem(), context, action); // Submit the - // Launcher Mapper + ae.submitLauncher(getFileSystem(), context, action); // Submit the action String jobId = action.getExternalId(); String jobTracker = action.getTrackerUri(); @@ -351,20 +335,6 @@ public class TestShellActionExecutor extends ActionExecutorTestCase { assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = new XConfiguration(); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker")); - conf.set("fs.default.name", e.getChildTextTrim("name-node")); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return jobId; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java index 0e1d0fd..d1f458b 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java @@ -29,8 +29,6 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; -import org.apache.oozie.ForTestingActionExecutor; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; @@ -46,7 +44,6 @@ import org.apache.oozie.executor.jpa.JPAExecutorException; import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor; import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor; -import org.apache.oozie.service.ActionCheckerService; import org.apache.oozie.service.ActionService; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorService; @@ -266,9 +263,7 @@ public class TestActionCheckXCommand extends XDataTestCase { String launcherId = action.getExternalId(); - waitUntilYarnAppCompletes(launcherId); - YarnApplicationState appState = getYarnApplicationState(launcherId); - assertEquals("YarnApplicationState", YarnApplicationState.FINISHED, appState); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), conf); http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java index b7489e9..28e4755 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java @@ -30,8 +30,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.hadoop.LauncherMapperHelper; @@ -163,19 +161,10 @@ public class TestActionStartXCommand extends XDataTestCase { ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false); MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); - String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); String launcherId = action.getExternalId(); - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), conf); assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); @@ -240,21 +229,10 @@ public class TestActionStartXCommand extends XDataTestCase { MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor(); JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf())); String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); String launcherId = action.getExternalId(); - // retrieve launcher job - final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId)); - - // time out after 120 seconds unless launcher job succeeds - waitFor(240 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - // check if launcher job succeeds - assertTrue(launcherJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), conf); assertTrue(LauncherMapperHelper.hasIdSwap(actionData)); http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/test/XFsTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java index 1d399e4..20529e8 100644 --- a/core/src/test/java/org/apache/oozie/test/XFsTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XFsTestCase.java @@ -28,11 +28,16 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XLog; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; +import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext; import org.apache.oozie.service.HadoopAccessorException; import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; /** * Base JUnit <code>TestCase</code> subclass used by all Oozie testcases that need Hadoop FS access. <p/> As part of its @@ -175,4 +180,26 @@ public abstract class XFsTestCase extends XTestCase { return has.createJobClient(getTestUser(), conf); } + /** + * Returns a Path object to a filesystem resource which belongs to a specific workflow on HDFS + * Example: /user/test/oozie-abcd/0000003-160913132555310-oozie-abcd-W/hadoop--map-reduce/launcher.xml + * + * @param userName current username + * @param action workflow Action object + * @param services Oozie Services class + * @param context Executor context + * @param fileName the filename + * @return the Path object which represents a file on HDFS + * @throws Exception + */ + protected Path getPathToWorkflowResource(String userName, WorkflowJob job, Services services, ActionExecutorContext context, String fileName) throws Exception { + return new Path( + "/user" + + "/" + userName + + "/" + services.getSystemId() + + "/" + job.getId() + + "/" + context.getActionDir().getName(), + fileName + ); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index 7d8c48f..1299fa3 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -1192,11 +1192,12 @@ public abstract class XTestCase extends TestCase { return services; } - protected void waitUntilYarnAppState(String externalId, final YarnApplicationState... acceptedStates) + protected YarnApplicationState waitUntilYarnAppState(String externalId, final YarnApplicationState... acceptedStates) throws HadoopAccessorException, IOException, YarnException { final ApplicationId appId = ConverterUtils.toApplicationId(externalId); final Set<YarnApplicationState> states = Sets.immutableEnumSet(Lists.newArrayList(acceptedStates)); final MutableBoolean endStateOK = new MutableBoolean(false); + final MutableObject<YarnApplicationState> finalState = new MutableObject<YarnApplicationState>(); JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri()); // This is needed here because we need a mutable final YarnClient @@ -1207,6 +1208,7 @@ public abstract class XTestCase extends TestCase { @Override public boolean evaluate() throws Exception { YarnApplicationState state = yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState(); + finalState.setValue(state); if (states.contains(state)) { endStateOK.setValue(true); @@ -1223,10 +1225,12 @@ public abstract class XTestCase extends TestCase { } assertTrue(endStateOK.isTrue()); + return finalState.getValue(); } - protected void waitUntilYarnAppCompletes(String externalId) throws HadoopAccessorException, IOException, YarnException { - waitUntilYarnAppState(externalId, YarnApplicationState.FAILED, YarnApplicationState.KILLED, YarnApplicationState.FINISHED); + protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { + YarnApplicationState state = waitUntilYarnAppState(externalId, YarnApplicationState.FAILED, YarnApplicationState.KILLED, YarnApplicationState.FINISHED); + assertEquals("YARN App state", YarnApplicationState.FINISHED, state); } protected YarnApplicationState getYarnApplicationState(String externalId) throws HadoopAccessorException, IOException, YarnException { http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java index b966d4b..ec53ba9 100644 --- a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java +++ b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java @@ -164,14 +164,8 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { dataWriter.close(); Context context = createContext(getActionScriptXml()); Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.2"); - final RunningJob launcherJob = submitAction(context, ns); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context, ns); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), @@ -198,14 +192,8 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { { Context context = createContext(getActionQueryXml(hiveScript)); Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.6"); - final RunningJob launcherJob = submitAction(context, ns); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context, ns); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), @@ -231,7 +219,7 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { } } - private RunningJob submitAction(Context context, Namespace ns) throws Exception { + private String submitAction(Context context, Namespace ns) throws Exception { HiveActionExecutor ae = new HiveActionExecutor(); WorkflowAction action = context.getAction(); @@ -245,22 +233,9 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { assertNotNull(jobId); assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = - new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns)); - conf.set("fs.default.name", e.getChildTextTrim("name-node", ns)); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + + return jobId; } private String copyJar(String targetFile, Class<?> anyContainedClass) http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java index 5963e42..5e71f12 100644 --- a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java +++ b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java @@ -205,16 +205,9 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { dataWriter.write(SAMPLE_DATA_TEXT); dataWriter.close(); Context context = createContext(getQueryActionXml(query)); - final RunningJob launcherJob = submitAction(context, + final String launcherId = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.2")); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), @@ -244,16 +237,9 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { dataWriter.write(SAMPLE_DATA_TEXT); dataWriter.close(); Context context = createContext(getScriptActionXml()); - final RunningJob launcherJob = submitAction(context, + final String launcherId = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.1")); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), @@ -275,7 +261,7 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { } } - private RunningJob submitAction(Context context, Namespace ns) throws Exception { + private String submitAction(Context context, Namespace ns) throws Exception { Hive2ActionExecutor ae = new Hive2ActionExecutor(); WorkflowAction action = context.getAction(); @@ -289,21 +275,7 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { assertNotNull(jobId); assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = - new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns)); - conf.set("fs.default.name", e.getChildTextTrim("name-node", ns)); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return jobId; } private Context createContext(String actionXml) throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/ca7e56fd/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java index a1998e2..43ce520 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@ -17,22 +17,6 @@ */ package org.apache.oozie.action.hadoop; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerStatus; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; -import org.apache.hadoop.yarn.api.records.NodeReport; -import org.apache.hadoop.yarn.client.api.AMRMClient; -import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; -import org.apache.hadoop.yarn.exceptions.YarnException; -import org.codehaus.jackson.map.Module.SetupContext; -import org.xml.sax.SAXException; - -import javax.xml.parsers.ParserConfigurationException; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@ -43,13 +27,29 @@ import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.security.Permission; -import java.util.Date; +import java.security.PrivilegedAction; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.google.common.base.Preconditions; public class LauncherAM { @@ -125,8 +125,14 @@ public class LauncherAM { // TODO: OYA: rethink all print messages and formatting public static void main(String[] AMargs) throws Exception { - ErrorHolder eHolder = new ErrorHolder(); + final ErrorHolder eHolder = new ErrorHolder(); FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED; + String submitterUser = System.getProperty("submitter.user", "").trim(); + Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user is undefined"); + System.out.println("Submitter user is: " + submitterUser); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(submitterUser); + boolean backgroundAction = false; + try { try { launcherJobConf = readLauncherConf(); @@ -143,7 +149,7 @@ public class LauncherAM { try { System.out.println("\nStarting the execution of prepare actions"); - executePrepare(); + executePrepare(ugi); System.out.println("Completed the execution of prepare actions successfully"); } catch (Exception ex) { eHolder.setErrorMessage("Prepare execution in the Launcher AM has failed"); @@ -151,7 +157,7 @@ public class LauncherAM { throw ex; } - String[] mainArgs = getMainArguments(launcherJobConf); + final String[] mainArgs = getMainArguments(launcherJobConf); // TODO: OYA: should we allow turning this off? // TODO: OYA: what should default be? @@ -161,7 +167,8 @@ public class LauncherAM { setupMainConfiguration(); - finalStatus = runActionMain(mainArgs, eHolder); + finalStatus = runActionMain(mainArgs, eHolder, ugi); + if (finalStatus == FinalApplicationStatus.SUCCEEDED) { handleActionData(); if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) { @@ -180,6 +187,7 @@ public class LauncherAM { System.out.println(actionData.get(ACTION_DATA_NEW_ID)); System.out.println("======================="); System.out.println(); + backgroundAction = true; } } } catch (Exception e) { @@ -193,13 +201,13 @@ public class LauncherAM { if (finalStatus != FinalApplicationStatus.SUCCEEDED) { failLauncher(eHolder); } - uploadActionDataToHDFS(); + uploadActionDataToHDFS(ugi); } finally { try { unregisterWithRM(finalStatus, eHolder.getErrorMessage()); } finally { LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(launcherJobConf); - cn.notifyURL(finalStatus); + cn.notifyURL(finalStatus, backgroundAction); } } } @@ -240,16 +248,31 @@ public class LauncherAM { } // Method to execute the prepare actions - private static void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException { - String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML); - if (prepareXML != null) { - if (prepareXML.length() != 0) { - Configuration actionConf = new Configuration(launcherJobConf); - actionConf.addResource(ACTION_CONF_XML); - PrepareActionsDriver.doOperations(prepareXML, actionConf); - } else { - System.out.println("There are no prepare actions to execute."); + private static void executePrepare(UserGroupInformation ugi) throws Exception { + Exception e = ugi.doAs(new PrivilegedAction<Exception>() { + @Override + public Exception run() { + try { + String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML); + if (prepareXML != null) { + if (prepareXML.length() != 0) { + Configuration actionConf = new Configuration(launcherJobConf); + actionConf.addResource(ACTION_CONF_XML); + PrepareActionsDriver.doOperations(prepareXML, actionConf); + } else { + System.out.println("There are no prepare actions to execute."); + } + } + } catch (Exception e) { + e.printStackTrace(); + return e; + } + return null; } + }); + + if (e != null) { + throw e; } } @@ -282,65 +305,74 @@ public class LauncherAM { // } } - private static FinalApplicationStatus runActionMain(String[] mainArgs, ErrorHolder eHolder) { - FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED; - LauncherSecurityManager secMan = new LauncherSecurityManager(); - try { - Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); - System.out.println("Launcher class: " + klass.toString()); - System.out.flush(); - Method mainMethod = klass.getMethod("main", String[].class); - // Enable LauncherSecurityManager to catch System.exit calls - secMan.set(); - mainMethod.invoke(null, (Object) mainArgs); - - System.out.println(); - System.out.println("<<< Invocation of Main class completed <<<"); - System.out.println(); - finalStatus = FinalApplicationStatus.SUCCEEDED; - } catch (InvocationTargetException ex) { - ex.printStackTrace(System.out); - // Get what actually caused the exception - Throwable cause = ex.getCause(); - // If we got a JavaMainException from JavaMain, then we need to unwrap it - if (JavaMainException.class.isInstance(cause)) { - cause = cause.getCause(); - } - if (LauncherMainException.class.isInstance(cause)) { - String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); - eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + - ((LauncherMainException) ex.getCause()).getErrorCode() + "]"); - } else if (SecurityException.class.isInstance(cause)) { - if (secMan.getExitInvoked()) { - System.out.println("Intercepting System.exit(" + secMan.getExitCode() - + ")"); - System.err.println("Intercepting System.exit(" + secMan.getExitCode() - + ")"); - // if 0 main() method finished successfully - // ignoring - eHolder.setErrorCode(secMan.getExitCode()); - if (eHolder.getErrorCode() != 0) { + private static FinalApplicationStatus runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) { + final AtomicReference<FinalApplicationStatus> finalStatus = new AtomicReference<FinalApplicationStatus>(FinalApplicationStatus.FAILED); + + ugi.doAs(new PrivilegedAction<Void>() { + @Override + public Void run() { + LauncherSecurityManager secMan = new LauncherSecurityManager(); + try { + Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class); + System.out.println("Launcher class: " + klass.toString()); + System.out.flush(); + Method mainMethod = klass.getMethod("main", String[].class); + // Enable LauncherSecurityManager to catch System.exit calls + secMan.set(); + mainMethod.invoke(null, (Object) mainArgs); + + System.out.println(); + System.out.println("<<< Invocation of Main class completed <<<"); + System.out.println(); + finalStatus.set(FinalApplicationStatus.SUCCEEDED); + } catch (InvocationTargetException ex) { + ex.printStackTrace(System.out); + // Get what actually caused the exception + Throwable cause = ex.getCause(); + // If we got a JavaMainException from JavaMain, then we need to unwrap it + if (JavaMainException.class.isInstance(cause)) { + cause = cause.getCause(); + } + if (LauncherMainException.class.isInstance(cause)) { String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); - eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + eHolder.getErrorCode() + "]"); + eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + + ((LauncherMainException) ex.getCause()).getErrorCode() + "]"); + } else if (SecurityException.class.isInstance(cause)) { + if (secMan.getExitInvoked()) { + System.out.println("Intercepting System.exit(" + secMan.getExitCode() + + ")"); + System.err.println("Intercepting System.exit(" + secMan.getExitCode() + + ")"); + // if 0 main() method finished successfully + // ignoring + eHolder.setErrorCode(secMan.getExitCode()); + if (eHolder.getErrorCode() != 0) { + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + eHolder.getErrorCode() + "]"); + } else { + finalStatus.set(FinalApplicationStatus.SUCCEEDED); + } + } } else { - finalStatus = FinalApplicationStatus.SUCCEEDED; + eHolder.setErrorMessage(cause.getMessage()); + eHolder.setErrorCause(cause); } + } catch (Throwable t) { + t.printStackTrace(); + eHolder.setErrorMessage(t.getMessage()); + eHolder.setErrorCause(t); + } finally { + System.out.flush(); + System.err.flush(); + // Disable LauncherSecurityManager + secMan.unset(); } - } else { - eHolder.setErrorMessage(cause.getMessage()); - eHolder.setErrorCause(cause); + + return null; } - } catch (Throwable t) { - t.printStackTrace(System.out); - eHolder.setErrorMessage(t.getMessage()); - eHolder.setErrorCause(t); - } finally { - System.out.flush(); - System.err.flush(); - // Disable LauncherSecurityManager - secMan.unset(); - } - return finalStatus; + }); + + return finalStatus.get(); } private static void handleActionData() throws IOException { @@ -410,40 +442,52 @@ public class LauncherAM { return sb.toString(); } - private static void uploadActionDataToHDFS() throws IOException { - Path finalPath = new Path(actionDir, ACTION_DATA_SEQUENCE_FILE); - // unused ?? - FileSystem fs = FileSystem.get(finalPath.toUri(), launcherJobConf); - // upload into sequence file - System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " - + new Path(actionDir, ACTION_DATA_SEQUENCE_FILE).toUri()); + private static void uploadActionDataToHDFS(UserGroupInformation ugi) throws IOException { + IOException ioe = ugi.doAs(new PrivilegedAction<IOException>() { + @Override + public IOException run() { + Path finalPath = new Path(actionDir, ACTION_DATA_SEQUENCE_FILE); + // upload into sequence file + System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + + new Path(actionDir, ACTION_DATA_SEQUENCE_FILE).toUri()); - SequenceFile.Writer wr = null; - try { - wr = SequenceFile.createWriter(launcherJobConf, - SequenceFile.Writer.file(finalPath), - SequenceFile.Writer.keyClass(Text.class), - SequenceFile.Writer.valueClass(Text.class)); - if (wr != null) { - Set<String> keys = actionData.keySet(); - for (String propsKey : keys) { - wr.append(new Text(propsKey), new Text(actionData.get(propsKey))); + SequenceFile.Writer wr = null; + try { + wr = SequenceFile.createWriter(launcherJobConf, + SequenceFile.Writer.file(finalPath), + SequenceFile.Writer.keyClass(Text.class), + SequenceFile.Writer.valueClass(Text.class)); + if (wr != null) { + Set<String> keys = actionData.keySet(); + for (String propsKey : keys) { + wr.append(new Text(propsKey), new Text(actionData.get(propsKey))); + } + } else { + throw new IOException("SequenceFile.Writer is null for " + finalPath); + } + } catch (IOException e) { + e.printStackTrace(); + return e; + } finally { + if (wr != null) { + try { + wr.close(); + } catch (IOException e) { + e.printStackTrace(); + return e; + } + } } + + return null; } - else { - throw new IOException("SequenceFile.Writer is null for " + finalPath); - } - } - catch(IOException e) { - e.printStackTrace(); - throw e; - } - finally { - if (wr != null) { - wr.close(); - } + }); + + if (ioe != null) { + throw ioe; } } + private static void failLauncher(int errorCode, String message, Throwable ex) { ErrorHolder eHolder = new ErrorHolder(); eHolder.setErrorCode(errorCode);