Repository: oozie
Updated Branches:
  refs/heads/master 0bb5e1369 -> 86e0af626


OOZIE-2329 Make handling yarn restarts configurable


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/86e0af62
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/86e0af62
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/86e0af62

Branch: refs/heads/master
Commit: 86e0af62658d11142d68d3b9c939ef7191f9934c
Parents: 0bb5e13
Author: Purshotam Shah <[email protected]>
Authored: Tue Aug 18 11:24:34 2015 -0700
Committer: Purshotam Shah <[email protected]>
Committed: Tue Aug 18 11:24:34 2015 -0700

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 10 +++-
 core/src/main/resources/oozie-default.xml       | 10 ++++
 .../action/hadoop/TestJavaActionExecutor.java   | 54 ++++++++++++++++++--
 .../action/hadoop/LauncherMainHadoopUtils.java  | 15 +++---
 .../action/hadoop/LauncherMainHadoopUtils.java  | 15 +++---
 release-log.txt                                 |  1 +
 .../oozie/action/hadoop/LauncherMapper.java     |  6 ++-
 7 files changed, 87 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 6e959df..bb2d3a6 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -25,6 +25,7 @@ import java.net.ConnectException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -96,6 +97,7 @@ public class JavaActionExecutor extends ActionExecutor {
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
     public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = 
"yarn.timeline-service.enabled";
     public static final String HADOOP_YARN_UBER_MODE = 
"mapreduce.job.ubertask.enable";
+    public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = 
"oozie.action.launcher.am.restart.kill.childjobs";
     public static final String HADOOP_MAP_MEMORY_MB = 
"mapreduce.map.memory.mb";
     public static final String HADOOP_CHILD_JAVA_OPTS = 
"mapred.child.java.opts";
     public static final String HADOOP_MAP_JAVA_OPTS = 
"mapreduce.map.java.opts";
@@ -876,7 +878,13 @@ public class JavaActionExecutor extends ActionExecutor {
             }
 
             // Properties for when a launcher job's AM gets restarted
-            LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, 
actionConf, launcherTag);
+            if 
(ConfigurationService.getBoolean(HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART)) {
+                LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, 
actionConf, launcherTag);
+            }
+            else {
+                LOG.info(MessageFormat.format("{0} is set to false, not 
setting YARN restart properties",
+                        HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART));
+            }
 
             String actionShareLibProperty = actionConf.get(ACTION_SHARELIB_FOR 
+ getType());
             if (actionShareLibProperty != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml 
b/core/src/main/resources/oozie-default.xml
index 0a7e250..9689ce0 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1695,6 +1695,16 @@
     <!-- This is common to the subclasses of action executors for Java (e.g. 
map-reduce, pig, hive, java, etc) -->
 
     <property>
+        <name>oozie.action.launcher.am.restart.kill.childjobs</name>
+        <value>true</value>
+        <description>
+            Multiple instances of launcher jobs can happen due to RM non-work 
preserving recovery on RM restart, AM recovery
+            due to crashes or AM network connectivity loss. This could also 
lead to orphaned child jobs of the old AM attempts
+            leading to conflicting runs. This kills child jobs of previous 
attempts using YARN application tags.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name>
         <value>true</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 21e85d1..079d7b8 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -23,7 +23,9 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.OutputStreamWriter;
 import java.io.StringReader;
+import java.io.Writer;
 import java.net.URI;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
@@ -322,13 +324,11 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         return new Context(wf, action);
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
-        JavaActionExecutor ae = new JavaActionExecutor();
+    protected RunningJob submitAction(Context context, JavaActionExecutor 
javaActionExecutor) throws Exception {
 
         WorkflowAction action = context.getAction();
-
-        ae.prepareActionDir(getFileSystem(), context);
-        ae.submitLauncher(getFileSystem(), context, action);
+        javaActionExecutor.prepareActionDir(getFileSystem(), context);
+        javaActionExecutor.submitLauncher(getFileSystem(), context, action);
 
         String jobId = action.getExternalId();
         String jobTracker = action.getTrackerUri();
@@ -347,6 +347,10 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         return runningJob;
     }
 
+    protected RunningJob submitAction(Context context) throws Exception {
+        return submitAction(context, new JavaActionExecutor());
+    }
+
     public void testSimpestSleSubmitOK() throws Exception {
         String actionXml = "<java>" +
                 "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
@@ -2486,4 +2490,44 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         });
         assertTrue(runningJob.isSuccessful());
     }
+
+    public void testJobSubmissionWithoutYarnKill() throws Exception {
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+
+        Writer w = new OutputStreamWriter(getFileSystem().create(new 
Path(inputDir, "data.txt")));
+        w.write("dummy\n");
+        w.write("dummy\n");
+        w.close();
+
+        w = new OutputStreamWriter(getFileSystem().create(new Path(inputDir, 
"id.pig")));
+        w.write("A = load '$INPUT' using PigStorage(':');\n");
+        w.write("store B into '$OUTPUT' USING PigStorage();\n");
+        w.close();
+        String actionXml = "<pig>" + "<job-tracker>" + getJobTrackerUri() + 
"</job-tracker>" + "<name-node>"
+                + getNameNodeUri() + "</name-node>" + "<prepare>" + "<delete 
path='outputdir' />" + "</prepare>"
+                + "<configuration>" + "<property>" + 
"<name>mapred.compress.map.output</name>" + "<value>true</value>"
+                + "</property>" + "<property>" + 
"<name>mapred.job.queue.name</name>" + "<value>default</value>"
+                + "</property>" + "</configuration>" + "<script>" + 
inputDir.toString() + "/id.pig" + "</script>"
+                + "<param>INPUT=" + inputDir.toUri().getPath() + "</param>" + 
"<param>OUTPUT="
+                + outputDir.toUri().getPath() + "/pig-output</param>" + 
"</pig>";
+
+        PigActionExecutor ae = new PigActionExecutor();
+        WorkflowJobBean wfBean = addRecordToWfJobTable("test1", actionXml);
+        WorkflowActionBean action = (WorkflowActionBean) 
wfBean.getActions().get(0);
+        action.setType(ae.getType());
+        action.setConf(actionXml);
+        Context context = new Context(wfBean, action);
+
+        
ConfigurationService.setBoolean(JavaActionExecutor.HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART,
 false);
+
+        final RunningJob runningJob = submitAction(context, ae);
+        waitFor(60 * 1000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return runningJob.isComplete();
+            }
+        });
+        assertTrue(runningJob.isSuccessful());
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
 
b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index c4a0787..ce8c14f 100644
--- 
a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ 
b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -47,6 +47,13 @@ public class LauncherMainHadoopUtils {
 
     private static Set<ApplicationId> getChildYarnJobs(Configuration 
actionConf) {
         System.out.println("Fetching child yarn jobs");
+        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+        String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+        if (tag == null) {
+            System.out.print("Could not find Yarn tags property " + 
CHILD_MAPREDUCE_JOB_TAGS);
+            return childYarnJobs;
+        }
+        System.out.println("tag id : " + tag);
         long startTime = 0L;
         try {
             startTime = 
Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
@@ -54,14 +61,6 @@ public class LauncherMainHadoopUtils {
             throw new RuntimeException("Could not find Oozie job launch time", 
nfe);
         }
 
-        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
-        if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) == null) {
-            System.out.print("Could not find Yarn tags property " + 
CHILD_MAPREDUCE_JOB_TAGS);
-            return childYarnJobs;
-        }
-
-        String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
-        System.out.println("tag id : " + tag);
         GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
         gar.setScope(ApplicationsRequestScope.OWN);
         gar.setApplicationTags(Collections.singleton(tag));

http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
 
b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index fb259e2..94e01ea 100644
--- 
a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ 
b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -44,6 +44,13 @@ public class LauncherMainHadoopUtils {
 
     private static Set<ApplicationId> getChildYarnJobs(Configuration 
actionConf) {
         System.out.println("Fetching child yarn jobs");
+        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+        String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+        if (tag == null) {
+            System.out.print("Could not find Yarn tags property " + 
CHILD_MAPREDUCE_JOB_TAGS);
+            return childYarnJobs;
+        }
+        System.out.println("tag id : " + tag);
         long startTime = 0L;
         try {
             startTime = 
Long.parseLong((System.getProperty(OOZIE_JOB_LAUNCH_TIME)));
@@ -51,14 +58,6 @@ public class LauncherMainHadoopUtils {
             throw new RuntimeException("Could not find Oozie job launch time", 
nfe);
         }
 
-        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
-        if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) == null) {
-            System.out.print("Could not find Yarn tags property " + 
CHILD_MAPREDUCE_JOB_TAGS);
-            return childYarnJobs;
-        }
-
-        String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
-        System.out.println("tag id : " + tag);
         GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
         gar.setScope(ApplicationsRequestScope.OWN);
         gar.setApplicationTags(Collections.singleton(tag));

http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 729202a..556e88b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2329 Make handling yarn restarts configurable (puru)
 OOZIE-2228 Statustransit service doesn't pick bundle with suspend status (puru)
 OOZIE-2325 Shell action fails if user overrides 
oozie.launcher.mapreduce.map.env (kailongs via puru)
 OOZIE-2324 A syntax error in the kill node causes the workflow to get stuck 
and other problems (rkanter)

http://git-wip-us.apache.org/repos/asf/oozie/blob/86e0af62/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index 7c4d48d..c45073f 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -469,8 +469,10 @@ public class LauncherMapper<K1, V1, K2, V2> implements 
Mapper<K1, V1, K2, V2>, R
         System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new 
File(ACTION_DATA_NEW_ID).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new 
File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new 
File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
-        System.setProperty(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME,
-                
getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME));
+        if (getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME) != 
null) {
+            System.setProperty(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME,
+                    
getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME));
+        }
 
         String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS);
         if (actionConfigClass != null) {

Reply via email to