Repository: oozie Updated Branches: refs/heads/master 57152acd5 -> 4f63e9f66
OOZIE-2243 Kill Command does not kill the child job for java action (jaydeepvishwakarma) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4f63e9f6 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4f63e9f6 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4f63e9f6 Branch: refs/heads/master Commit: 4f63e9f6688c4210e0dd1049a1a1661d4d97f31f Parents: 57152ac Author: jvishwakarma <[email protected]> Authored: Wed Aug 31 16:17:44 2016 +0530 Committer: jvishwakarma <[email protected]> Committed: Wed Aug 31 16:17:44 2016 +0530 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 10 +++ .../action/hadoop/LauncherMapperHelper.java | 17 ++++- .../action/oozie/SubWorkflowActionExecutor.java | 1 - .../org/apache/hadoop/examples/SleepJob.java | 4 +- .../action/hadoop/TestJavaActionExecutor.java | 74 +++++++++++++++++++- .../action/hadoop/LauncherMainHadoopUtils.java | 5 ++ .../action/hadoop/LauncherMainHadoopUtils.java | 15 +++- .../action/hadoop/LauncherMainHadoopUtils.java | 16 ++++- release-log.txt | 1 + 9 files changed, 137 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index e546e77..0574584 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -61,6 +61,7 @@ import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.command.coord.CoordActionStartXCommand; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorException; @@ -1589,6 +1590,15 @@ public class JavaActionExecutor extends ActionExecutor { try { Element actionXml = XmlUtils.parseXml(action.getConf()); JobConf jobConf = createBaseHadoopConf(context, actionXml); + WorkflowJob wfJob = context.getWorkflow(); + Configuration conf = null; + if ( wfJob.getConf() != null ) { + conf = new XConfiguration(new StringReader(wfJob.getConf())); + } + String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action); + jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag)); + jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime())); + LauncherMainHadoopUtils.killChildYarnJobs(jobConf); jobClient = createJobClient(context, jobConf); RunningJob runningJob = getRunningJob(context, action, jobClient); if (runningJob != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java index ed06707..9609fdc 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java @@ -55,6 +55,8 @@ import org.apache.oozie.util.PropertiesUtils; public class LauncherMapperHelper { + public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; + public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) throws HadoopAccessorException, IOException { String jobId = null; @@ -174,7 +176,7 @@ public class LauncherMapperHelper { actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag); } - private static String getTag(String launcherTag) throws NoSuchAlgorithmException { + public static String getTag(String launcherTag) throws NoSuchAlgorithmException { MessageDigest digest = MessageDigest.getInstance("MD5"); digest.update(launcherTag.getBytes(), 0, launcherTag.length()); String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16); @@ -325,4 +327,17 @@ public class LauncherMapperHelper { } }); } + + public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) { + String tag; + if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) { + tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName(); + } else if (parentId != null) { + tag = parentId + "@" + wfAction.getName(); + } else { + tag = wfAction.getId(); + } + return tag; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java index b6d2b12..d62cf68 100644 --- a/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/oozie/SubWorkflowActionExecutor.java @@ -35,7 +35,6 @@ import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.command.CommandException; -import org.apache.oozie.command.wf.ActionStartXCommand; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.DagEngineService; import org.apache.oozie.service.Services; http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/test/java/org/apache/hadoop/examples/SleepJob.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/hadoop/examples/SleepJob.java b/core/src/test/java/org/apache/hadoop/examples/SleepJob.java index 8dec534..342f8b3 100644 --- a/core/src/test/java/org/apache/hadoop/examples/SleepJob.java +++ b/core/src/test/java/org/apache/hadoop/examples/SleepJob.java @@ -163,7 +163,9 @@ public class SleepJob extends Configured implements Tool, } public static void main(String[] args) throws Exception{ - int res = ToolRunner.run(new Configuration(), new SleepJob(), args); + Configuration conf = new Configuration(); + conf.addResource("core-site.xml"); + int res = ToolRunner.run(conf, new SleepJob(), args); System.exit(res); } http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 5f9e29a..75301db 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -27,6 +27,7 @@ import java.io.OutputStreamWriter; import java.io.StringReader; import java.io.Writer; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collections; @@ -34,9 +35,10 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.Set; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -46,6 +48,8 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.oozie.WorkflowActionBean; @@ -64,6 +68,7 @@ import org.apache.oozie.service.ShareLibService; import org.apache.oozie.service.UUIDService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.WorkflowStoreService; +import org.apache.oozie.service.UserGroupInformationService; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; @@ -353,6 +358,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); jobConf.set("mapred.job.tracker", jobTracker); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf); final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); @@ -537,6 +543,72 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); } + public void testChildKill() throws Exception { + if (HadoopShims.isYARN()) { + final JobConf clusterConf = createJobConf(); + FileSystem fileSystem = FileSystem.get(clusterConf); + Path confFile = new Path("/tmp/cluster-conf.xml"); + OutputStream out = fileSystem.create(confFile); + clusterConf.writeXml(out); + out.close(); + String confFileName = fileSystem.makeQualified(confFile).toString() + "#core-site.xml"; + final String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class> " + SleepJob.class.getName() + " </main-class>" + + "<arg>-mt</arg>" + + "<arg>300000</arg>" + + "<archive>" + confFileName + "</archive>" + + "</java>"; + final Context context = createContext(actionXml, null); + final RunningJob runningJob = submitAction(context); + waitFor(60 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return runningJob.getJobStatus().getRunState() == 1; + } + }); + assertFalse(runningJob.isComplete()); + Thread.sleep(15000); + UserGroupInformationService ugiService = Services.get(). + get(UserGroupInformationService.class); + + UserGroupInformation ugi = ugiService.getProxyUser(getTestUser()); + ugi.doAs(new PrivilegedExceptionAction<Object>() { + @Override + public Void run() throws Exception { + JavaActionExecutor ae = new JavaActionExecutor(); + ae.kill(context, context.getAction()); + + WorkflowJob wfJob = context.getWorkflow(); + Configuration conf = null; + if (wfJob.getConf() != null) { + conf = new XConfiguration(new StringReader(wfJob.getConf())); + } + String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), context.getAction()); + Configuration jobConf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag)); + jobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, + context.getAction().getStartTime().getTime()); + Set<String> childSet = LauncherMainHadoopUtils.getChildJobs(jobConf); + assertEquals(1, childSet.size()); + + JobClient jobClient = new JobClient(clusterConf); + for (String jobId : childSet) { + RunningJob childJob = jobClient.getJob(jobId); + assertEquals(JobStatus.State.KILLED.getValue(), childJob.getJobStatus().getRunState()); + } + assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); + return null; + } + }); + + assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus()); + assertEquals("KILLED", context.getAction().getExternalStatus()); + assertFalse(runningJob.isSuccessful()); + } + } + public void testExceptionSubmitException() throws Exception { String actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java index dca7820..c0a2377 100644 --- a/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java +++ b/hadooplibs/hadoop-utils-1/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java @@ -19,6 +19,7 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; +import java.util.Set; public class LauncherMainHadoopUtils { @@ -32,4 +33,8 @@ public class LauncherMainHadoopUtils { public static void killChildYarnJobs(Configuration actionConf) { // no-op } + + public static Set<String> getChildJobs(Configuration actionConf) { + return null; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java index ce8c14f..a0b7d62 100644 --- a/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java +++ b/hadooplibs/hadoop-utils-2/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java @@ -56,7 +56,12 @@ public class LauncherMainHadoopUtils { System.out.println("tag id : " + tag); long startTime = 0L; try { - startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); + if(actionConf.get(OOZIE_JOB_LAUNCH_TIME) != null) { + startTime = Long.parseLong(actionConf.get(OOZIE_JOB_LAUNCH_TIME)); + } + else { + startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); + } } catch(NumberFormatException nfe) { throw new RuntimeException("Could not find Oozie job launch time", nfe); } @@ -115,4 +120,12 @@ public class LauncherMainHadoopUtils { throw new RuntimeException("Exception occurred while killing child job(s)", ioe); } } + + public static Set<String> getChildJobs(Configuration actionConf) { + Set<String> jobList = new HashSet<String>(); + for(ApplicationId applicationId :getChildYarnJobs(actionConf)) { + jobList.add(applicationId.toString().replace("application", "job")); + } + return jobList; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java index 94e01ea..5fda0ef 100644 --- a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java +++ b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java @@ -53,7 +53,13 @@ public class LauncherMainHadoopUtils { System.out.println("tag id : " + tag); long startTime = 0L; try { - startTime = Long.parseLong((System.getProperty(OOZIE_JOB_LAUNCH_TIME))); + try { + if(actionConf.get(OOZIE_JOB_LAUNCH_TIME) != null) { + startTime = Long.parseLong(actionConf.get(OOZIE_JOB_LAUNCH_TIME)); + } + else { + startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); + } } catch(NumberFormatException nfe) { throw new RuntimeException("Could not find Oozie job launch time", nfe); } @@ -112,4 +118,12 @@ public class LauncherMainHadoopUtils { throw new RuntimeException("Exception occurred while killing child job(s)", ioe); } } + + public static Set<String> getChildJobs(Configuration actionConf) { + Set<String> jobList = new HashSet<String>(); + for(ApplicationId applicationId :getChildYarnJobs(actionConf)) { + jobList.add(applicationId.toString().replace("application", "job")); + } + return jobList; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/4f63e9f6/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 37f3b71..100c3e7 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2243 Kill Command does not kill the child job for java action (jaydeepvishwakarma) OOZIE-2649 Can't override sub-workflow configuration property if defined in parent workflow XML (asasvari via rkanter) OOZIE-2656 OozieShareLibCLI uses op system username instead of Kerberos to upload jars (gezapeti via rkanter) OOZIE-1173 Refactor: use ParamChecker inXOozieClient (abhishekbafna via jaydeepvishwakarma)
