Repository: oozie Updated Branches: refs/heads/master 0bb5e1369 -> 86e0af626
OOZIE-2329 Make handling yarn restarts configurable Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/86e0af62 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/86e0af62 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/86e0af62 Branch: refs/heads/master Commit: 86e0af62658d11142d68d3b9c939ef7191f9934c Parents: 0bb5e13 Author: Purshotam Shah <[email protected]> Authored: Tue Aug 18 11:24:34 2015 -0700 Committer: Purshotam Shah <[email protected]> Committed: Tue Aug 18 11:24:34 2015 -0700 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 10 +++- core/src/main/resources/oozie-default.xml | 10 ++++ .../action/hadoop/TestJavaActionExecutor.java | 54 ++++++++++++++++++-- .../action/hadoop/LauncherMainHadoopUtils.java | 15 +++--- .../action/hadoop/LauncherMainHadoopUtils.java | 15 +++--- release-log.txt | 1 + .../oozie/action/hadoop/LauncherMapper.java | 6 ++- 7 files changed, 87 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 6e959df..bb2d3a6 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -25,6 +25,7 @@ import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -96,6 +97,7 @@ public class JavaActionExecutor extends ActionExecutor { public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled"; public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable"; + public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs"; public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb"; public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts"; public static final String HADOOP_MAP_JAVA_OPTS = "mapreduce.map.java.opts"; @@ -876,7 +878,13 @@ public class JavaActionExecutor extends ActionExecutor { } // Properties for when a launcher job's AM gets restarted - LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, launcherTag); + if (ConfigurationService.getBoolean(HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)) { + LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, launcherTag); + } + else { + LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties", + HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)); + } String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR + getType()); if (actionShareLibProperty != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 0a7e250..9689ce0 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1695,6 +1695,16 @@ <!-- This is common to the subclasses of action executors for Java (e.g. map-reduce, pig, hive, java, etc) --> <property> + <name>oozie.action.launcher.am.restart.kill.childjobs</name> + <value>true</value> + <description> + Multiple instances of launcher jobs can happen due to RM non-work preserving recovery on RM restart, AM recovery + due to crashes or AM network connectivity loss. This could also lead to orphaned child jobs of the old AM attempts + leading to conflicting runs. This kills child jobs of previous attempts using YARN application tags. + </description> + </property> + + <property> <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name> <value>true</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 21e85d1..079d7b8 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -23,7 +23,9 @@ import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.InputStream; import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.io.StringReader; +import java.io.Writer; import java.net.URI; import java.text.SimpleDateFormat; import java.util.Arrays; @@ -322,13 +324,11 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { return new Context(wf, action); } - protected RunningJob submitAction(Context context) throws Exception { - JavaActionExecutor ae = new JavaActionExecutor(); + protected RunningJob submitAction(Context context, JavaActionExecutor javaActionExecutor) throws Exception { WorkflowAction action = context.getAction(); - - ae.prepareActionDir(getFileSystem(), context); - ae.submitLauncher(getFileSystem(), context, action); + javaActionExecutor.prepareActionDir(getFileSystem(), context); + javaActionExecutor.submitLauncher(getFileSystem(), context, action); String jobId = action.getExternalId(); String jobTracker = action.getTrackerUri(); @@ -347,6 +347,10 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { return runningJob; } + protected RunningJob submitAction(Context context) throws Exception { + return submitAction(context, new JavaActionExecutor()); + } + public void testSimpestSleSubmitOK() throws Exception { String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + @@ -2486,4 +2490,44 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { }); assertTrue(runningJob.isSuccessful()); } + + public void testJobSubmissionWithoutYarnKill() throws Exception { + Path inputDir = new Path(getFsTestCaseDir(), "input"); + Path outputDir = new Path(getFsTestCaseDir(), "output"); + + Writer w = new OutputStreamWriter(getFileSystem().create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); + + w = new OutputStreamWriter(getFileSystem().create(new Path(inputDir, "id.pig"))); + w.write("A = load '$INPUT' using PigStorage(':');\n"); + w.write("store B into '$OUTPUT' USING PigStorage();\n"); + w.close(); + String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + + getNameNodeUri() + "</name-node>" + "<prepare>" + "<delete path='outputdir' />" + "</prepare>" + + "<configuration>" + "<property>" + "<name>mapred.compress.map.output</name>" + "<value>true</value>" + + "</property>" + "<property>" + "<name>mapred.job.queue.name</name>" + "<value>default</value>" + + "</property>" + "</configuration>" + "<script>" + inputDir.toString() + "/id.pig" + "</script>" + + "<param>INPUT=" + inputDir.toUri().getPath() + "</param>" + "<param>OUTPUT=" + + outputDir.toUri().getPath() + "/pig-output</param>" + "</pig>"; + + PigActionExecutor ae = new PigActionExecutor(); + WorkflowJobBean wfBean = addRecordToWfJobTable("test1", actionXml); + WorkflowActionBean action = (WorkflowActionBean) wfBean.getActions().get(0); + action.setType(ae.getType()); + action.setConf(actionXml); + Context context = new Context(wfBean, action); + + ConfigurationService.setBoolean(JavaActionExecutor.HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART, false); + + final RunningJob runningJob = submitAction(context, ae); + waitFor(60 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return runningJob.isComplete(); + } + }); + assertTrue(runningJob.isSuccessful()); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java index c4a0787..ce8c14f 100644 --- a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java +++ b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java @@ -47,6 +47,13 @@ public class LauncherMainHadoopUtils { private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) { System.out.println("Fetching child yarn jobs"); + Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); + String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); + if (tag == null) { + System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS); + return childYarnJobs; + } + System.out.println("tag id : " + tag); long startTime = 0L; try { startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); @@ -54,14 +61,6 @@ public class LauncherMainHadoopUtils { throw new RuntimeException("Could not find Oozie job launch time", nfe); } - Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); - if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) == null) { - System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS); - return childYarnJobs; - } - - String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); - System.out.println("tag id : " + tag); GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); gar.setScope(ApplicationsRequestScope.OWN); gar.setApplicationTags(Collections.singleton(tag)); http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java index fb259e2..94e01ea 100644 --- a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java +++ b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java @@ -44,6 +44,13 @@ public class LauncherMainHadoopUtils { private static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) { System.out.println("Fetching child yarn jobs"); + Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); + String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); + if (tag == null) { + System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS); + return childYarnJobs; + } + System.out.println("tag id : " + tag); long startTime = 0L; try { startTime = Long.parseLong((System.getProperty(OOZIE_JOB_LAUNCH_TIME))); @@ -51,14 +58,6 @@ public class LauncherMainHadoopUtils { throw new RuntimeException("Could not find Oozie job launch time", nfe); } - Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); - if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) == null) { - System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS); - return childYarnJobs; - } - - String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); - System.out.println("tag id : " + tag); GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); gar.setScope(ApplicationsRequestScope.OWN); gar.setApplicationTags(Collections.singleton(tag)); http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 729202a..556e88b 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2329 Make handling yarn restarts configurable (puru) OOZIE-2228 Statustransit service doesn't pick bundle with suspend status (puru) OOZIE-2325 Shell action fails if user overrides oozie.launcher.mapreduce.map.env (kailongs via puru) OOZIE-2324 A syntax error in the kill node causes the workflow to get stuck and other problems (rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java index 7c4d48d..c45073f 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java @@ -469,8 +469,10 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath()); - System.setProperty(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, - getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME)); + if (getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME) != null) { + System.setProperty(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, + getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME)); + } String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS); if (actionConfigClass != null) {
