Repository: oozie
Updated Branches:
  refs/heads/master 57152acd5 -> 4f63e9f66


OOZIE-2243 Kill Command does not kill the child job for java action 
(jaydeepvishwakarma)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4f63e9f6
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4f63e9f6
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4f63e9f6

Branch: refs/heads/master
Commit: 4f63e9f6688c4210e0dd1049a1a1661d4d97f31f
Parents: 57152ac
Author: jvishwakarma <[email protected]>
Authored: Wed Aug 31 16:17:44 2016 +0530
Committer: jvishwakarma <[email protected]>
Committed: Wed Aug 31 16:17:44 2016 +0530

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 10 +++
 .../action/hadoop/LauncherMapperHelper.java     | 17 ++++-
 .../action/oozie/SubWorkflowActionExecutor.java |  1 -
 .../org/apache/hadoop/examples/SleepJob.java    |  4 +-
 .../action/hadoop/TestJavaActionExecutor.java   | 74 +++++++++++++++++++-
 .../action/hadoop/LauncherMainHadoopUtils.java  |  5 ++
 .../action/hadoop/LauncherMainHadoopUtils.java  | 15 +++-
 .../action/hadoop/LauncherMainHadoopUtils.java  | 16 ++++-
 release-log.txt                                 |  1 +
 9 files changed, 137 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index e546e77..0574584 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -61,6 +61,7 @@ import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.command.coord.CoordActionStartXCommand;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -1589,6 +1590,15 @@ public class JavaActionExecutor extends ActionExecutor {
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
             JobConf jobConf = createBaseHadoopConf(context, actionXml);
+            WorkflowJob wfJob = context.getWorkflow();
+            Configuration conf = null;
+            if ( wfJob.getConf() != null ) {
+                conf = new XConfiguration(new StringReader(wfJob.getConf()));
+            }
+            String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, 
wfJob.getParentId(), action);
+            jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, 
LauncherMapperHelper.getTag(launcherTag));
+            jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, 
Long.toString(action.getStartTime().getTime()));
+            LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
             jobClient = createJobClient(context, jobConf);
             RunningJob runningJob = getRunningJob(context, action, jobClient);
             if (runningJob != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index ed06707..9609fdc 100644
--- 
a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ 
b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -55,6 +55,8 @@ import org.apache.oozie.util.PropertiesUtils;
 
 public class LauncherMapperHelper {
 
+    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
+
     public static String getRecoveryId(Configuration launcherConf, Path 
actionDir, String recoveryId)
             throws HadoopAccessorException, IOException {
         String jobId = null;
@@ -174,7 +176,7 @@ public class LauncherMapperHelper {
         actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag);
     }
 
-    private static String getTag(String launcherTag) throws 
NoSuchAlgorithmException {
+    public static String getTag(String launcherTag) throws 
NoSuchAlgorithmException {
         MessageDigest digest = MessageDigest.getInstance("MD5");
         digest.update(launcherTag.getBytes(), 0, launcherTag.length());
         String md5 = "oozie-" + new BigInteger(1, 
digest.digest()).toString(16);
@@ -325,4 +327,17 @@ public class LauncherMapperHelper {
             }
         });
     }
+
+    public static String getActionYarnTag(Configuration conf, String parentId, 
WorkflowAction wfAction) {
+        String tag;
+        if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+            tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
+        } else if (parentId != null) {
+            tag = parentId + "@" + wfAction.getName();
+        } else {
+            tag = wfAction.getId();
+        }
+        return tag;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
 
b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
index b6d2b12..d62cf68 100644
--- 
a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java
@@ -35,7 +35,6 @@ import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.command.CommandException;
-import org.apache.oozie.command.wf.ActionStartXCommand;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.DagEngineService;
 import org.apache.oozie.service.Services;

http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/test/java/org/apache/hadoop/examples/SleepJob.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/hadoop/examples/SleepJob.java 
b/core/src/test/java/org/apache/hadoop/examples/SleepJob.java
index 8dec534..342f8b3 100644
--- a/core/src/test/java/org/apache/hadoop/examples/SleepJob.java
+++ b/core/src/test/java/org/apache/hadoop/examples/SleepJob.java
@@ -163,7 +163,9 @@ public class SleepJob extends Configured implements Tool,
   }
 
   public static void main(String[] args) throws Exception{
-    int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
+    Configuration conf = new Configuration();
+    conf.addResource("core-site.xml");
+    int res = ToolRunner.run(conf, new SleepJob(), args);
     System.exit(res);
   }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 5f9e29a..75301db 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -27,6 +27,7 @@ import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.io.Writer;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.Collections;
@@ -34,9 +35,10 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -46,6 +48,8 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.oozie.WorkflowActionBean;
@@ -64,6 +68,7 @@ import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.WorkflowStoreService;
+import org.apache.oozie.service.UserGroupInformationService;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
@@ -353,6 +358,7 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         JobConf jobConf = 
Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
         jobConf.set("mapred.job.tracker", jobTracker);
 
+
         JobClient jobClient =
             
Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), 
jobConf);
         final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
@@ -537,6 +543,72 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         assertEquals(WorkflowAction.Status.ERROR, 
context.getAction().getStatus());
     }
 
+    public void testChildKill() throws Exception {
+        if (HadoopShims.isYARN()) {
+            final JobConf clusterConf = createJobConf();
+            FileSystem fileSystem = FileSystem.get(clusterConf);
+            Path confFile = new Path("/tmp/cluster-conf.xml");
+            OutputStream out = fileSystem.create(confFile);
+            clusterConf.writeXml(out);
+            out.close();
+            String confFileName = 
fileSystem.makeQualified(confFile).toString() + "#core-site.xml";
+            final String actionXml = "<java>" +
+                    "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                    "<name-node>" + getNameNodeUri() + "</name-node>" +
+                    "<main-class> " + SleepJob.class.getName() + " 
</main-class>" +
+                    "<arg>-mt</arg>" +
+                    "<arg>300000</arg>" +
+                    "<archive>" + confFileName + "</archive>" +
+                    "</java>";
+            final Context context = createContext(actionXml, null);
+            final RunningJob runningJob = submitAction(context);
+            waitFor(60 * 1000, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    return runningJob.getJobStatus().getRunState() == 1;
+                }
+            });
+            assertFalse(runningJob.isComplete());
+            Thread.sleep(15000);
+            UserGroupInformationService ugiService = Services.get().
+                    get(UserGroupInformationService.class);
+
+            UserGroupInformation ugi = ugiService.getProxyUser(getTestUser());
+            ugi.doAs(new PrivilegedExceptionAction<Object>() {
+                @Override
+                public Void run() throws Exception {
+                    JavaActionExecutor ae = new JavaActionExecutor();
+                    ae.kill(context, context.getAction());
+
+                    WorkflowJob wfJob = context.getWorkflow();
+                    Configuration conf = null;
+                    if (wfJob.getConf() != null) {
+                        conf = new XConfiguration(new 
StringReader(wfJob.getConf()));
+                    }
+                    String launcherTag = 
LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), 
context.getAction());
+                    Configuration jobConf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
+                    
jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, 
LauncherMapperHelper.getTag(launcherTag));
+                    
jobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME,
+                            context.getAction().getStartTime().getTime());
+                    Set<String> childSet = 
LauncherMainHadoopUtils.getChildJobs(jobConf);
+                    assertEquals(1, childSet.size());
+
+                    JobClient jobClient = new JobClient(clusterConf);
+                    for (String jobId : childSet) {
+                        RunningJob childJob = jobClient.getJob(jobId);
+                        assertEquals(JobStatus.State.KILLED.getValue(), 
childJob.getJobStatus().getRunState());
+                    }
+                    
assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
+                    return null;
+                }
+            });
+
+            assertEquals(WorkflowAction.Status.DONE, 
context.getAction().getStatus());
+            assertEquals("KILLED", context.getAction().getExternalStatus());
+            assertFalse(runningJob.isSuccessful());
+        }
+    }
+
         public void testExceptionSubmitException() throws Exception {
         String actionXml = "<java>" +
                 "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +

http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
 
b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index dca7820..c0a2377 100644
--- 
a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ 
b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -19,6 +19,7 @@
 package org.apache.oozie.action.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
+import java.util.Set;
 
 
 public class LauncherMainHadoopUtils {
@@ -32,4 +33,8 @@ public class LauncherMainHadoopUtils {
     public static void killChildYarnJobs(Configuration actionConf) {
         // no-op
     }
+
+    public static Set<String> getChildJobs(Configuration actionConf) {
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
 
b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index ce8c14f..a0b7d62 100644
--- 
a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ 
b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -56,7 +56,12 @@ public class LauncherMainHadoopUtils {
         System.out.println("tag id : " + tag);
         long startTime = 0L;
         try {
-            startTime = 
Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
+            if(actionConf.get(OOZIE_JOB_LAUNCH_TIME) != null) {
+                startTime = 
Long.parseLong(actionConf.get(OOZIE_JOB_LAUNCH_TIME));
+            }
+            else {
+                startTime = 
Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
+            }
         } catch(NumberFormatException nfe) {
             throw new RuntimeException("Could not find Oozie job launch time", 
nfe);
         }
@@ -115,4 +120,12 @@ public class LauncherMainHadoopUtils {
             throw new RuntimeException("Exception occurred while killing child 
job(s)", ioe);
         }
     }
+
+    public static Set<String> getChildJobs(Configuration actionConf) {
+        Set<String> jobList = new HashSet<String>();
+        for(ApplicationId applicationId :getChildYarnJobs(actionConf)) {
+            jobList.add(applicationId.toString().replace("application", 
"job"));
+        }
+        return jobList;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
----------------------------------------------------------------------
diff --git 
a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
 
b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
index 94e01ea..5fda0ef 100644
--- 
a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
+++ 
b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java
@@ -53,7 +53,13 @@ public class LauncherMainHadoopUtils {
         System.out.println("tag id : " + tag);
         long startTime = 0L;
         try {
-            startTime = 
Long.parseLong((System.getProperty(OOZIE_JOB_LAUNCH_TIME)));
+            try {
+                if(actionConf.get(OOZIE_JOB_LAUNCH_TIME) != null) {
+                    startTime = 
Long.parseLong(actionConf.get(OOZIE_JOB_LAUNCH_TIME));
+                }
+                else {
+                    startTime = 
Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
+                }
         } catch(NumberFormatException nfe) {
             throw new RuntimeException("Could not find Oozie job launch time", 
nfe);
         }
@@ -112,4 +118,12 @@ public class LauncherMainHadoopUtils {
             throw new RuntimeException("Exception occurred while killing child 
job(s)", ioe);
         }
     }
+
+    public static Set<String> getChildJobs(Configuration actionConf) {
+        Set<String> jobList = new HashSet<String>();
+        for(ApplicationId applicationId :getChildYarnJobs(actionConf)) {
+            jobList.add(applicationId.toString().replace("application", 
"job"));
+        }
+        return jobList;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 37f3b71..100c3e7 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2243 Kill Command does not kill the child job for java action 
(jaydeepvishwakarma)
 OOZIE-2649 Can't override sub-workflow configuration property if defined in 
parent workflow XML (asasvari via rkanter)
 OOZIE-2656 OozieShareLibCLI uses op system username instead of Kerberos to 
upload jars (gezapeti via rkanter)
 OOZIE-1173 Refactor: use ParamChecker inXOozieClient (abhishekbafna via 
jaydeepvishwakarma)

Reply via email to