Repository: oozie Updated Branches: refs/heads/branch-4.3 a73977f2b -> 70fad8055
OOZIE-2742 Unable to kill applications based on tag (satishsaley via rohini) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/70fad805 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/70fad805 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/70fad805 Branch: refs/heads/branch-4.3 Commit: 70fad8055e21b89f2db2be5a19901d2607c6be41 Parents: a73977f Author: Rohini Palaniswamy <[email protected]> Authored: Wed Nov 23 09:17:19 2016 -0800 Committer: Rohini Palaniswamy <[email protected]> Committed: Wed Nov 23 09:17:19 2016 -0800 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 15 ++- .../action/hadoop/TestJavaActionExecutor.java | 117 +++++++++---------- .../action/hadoop/LauncherMainHadoopUtils.java | 16 +-- release-log.txt | 1 + 4 files changed, 79 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/70fad805/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 65996d9..31d4817 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -25,6 +25,7 @@ import java.net.ConnectException; import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.security.PrivilegedExceptionAction; import java.text.MessageFormat; import java.util.ArrayList; import java.util.Arrays; @@ -52,6 +53,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; @@ -69,6 +71,7 @@ import org.apache.oozie.service.HadoopAccessorService; import org.apache.oozie.service.Services; import org.apache.oozie.service.ShareLibService; import org.apache.oozie.service.URIHandlerService; +import org.apache.oozie.service.UserGroupInformationService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.ELEvaluationException; import org.apache.oozie.util.ELEvaluator; @@ -1590,7 +1593,7 @@ public class JavaActionExecutor extends ActionExecutor { boolean exception = false; try { Element actionXml = XmlUtils.parseXml(action.getConf()); - JobConf jobConf = createBaseHadoopConf(context, actionXml); + final JobConf jobConf = createBaseHadoopConf(context, actionXml); WorkflowJob wfJob = context.getWorkflow(); Configuration conf = null; if ( wfJob.getConf() != null ) { @@ -1599,7 +1602,15 @@ public class JavaActionExecutor extends ActionExecutor { String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action); jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag)); jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime())); - LauncherMainHadoopUtils.killChildYarnJobs(jobConf); + UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class) + .getProxyUser(context.getWorkflow().getUser()); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + LauncherMainHadoopUtils.killChildYarnJobs(jobConf); + return null; + } + }); jobClient = createJobClient(context, jobConf); RunningJob runningJob = getRunningJob(context, action, jobClient); if (runningJob != null) { http://git-wip-us.apache.org/repos/asf/oozie/blob/70fad805/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 75301db..8965cdf 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -544,69 +544,64 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { } public void testChildKill() throws Exception { - if (HadoopShims.isYARN()) { - final JobConf clusterConf = createJobConf(); - FileSystem fileSystem = FileSystem.get(clusterConf); - Path confFile = new Path("/tmp/cluster-conf.xml"); - OutputStream out = fileSystem.create(confFile); - clusterConf.writeXml(out); - out.close(); - String confFileName = fileSystem.makeQualified(confFile).toString() + "#core-site.xml"; - final String actionXml = "<java>" + - "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + - "<name-node>" + getNameNodeUri() + "</name-node>" + - "<main-class> " + SleepJob.class.getName() + " </main-class>" + - "<arg>-mt</arg>" + - "<arg>300000</arg>" + - "<archive>" + confFileName + "</archive>" + - "</java>"; - final Context context = createContext(actionXml, null); - final RunningJob runningJob = submitAction(context); - waitFor(60 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return runningJob.getJobStatus().getRunState() == 1; - } - }); - assertFalse(runningJob.isComplete()); - Thread.sleep(15000); - UserGroupInformationService ugiService = Services.get(). - get(UserGroupInformationService.class); - - UserGroupInformation ugi = ugiService.getProxyUser(getTestUser()); - ugi.doAs(new PrivilegedExceptionAction<Object>() { - @Override - public Void run() throws Exception { - JavaActionExecutor ae = new JavaActionExecutor(); - ae.kill(context, context.getAction()); - - WorkflowJob wfJob = context.getWorkflow(); - Configuration conf = null; - if (wfJob.getConf() != null) { - conf = new XConfiguration(new StringReader(wfJob.getConf())); - } - String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), context.getAction()); - Configuration jobConf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag)); - jobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, - context.getAction().getStartTime().getTime()); - Set<String> childSet = LauncherMainHadoopUtils.getChildJobs(jobConf); - assertEquals(1, childSet.size()); - - JobClient jobClient = new JobClient(clusterConf); - for (String jobId : childSet) { - RunningJob childJob = jobClient.getJob(jobId); - assertEquals(JobStatus.State.KILLED.getValue(), childJob.getJobStatus().getRunState()); - } - assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); - return null; - } - }); + final JobConf clusterConf = createJobConf(); + FileSystem fileSystem = FileSystem.get(clusterConf); + Path confFile = new Path("/tmp/cluster-conf.xml"); + OutputStream out = fileSystem.create(confFile); + clusterConf.writeXml(out); + out.close(); + String confFileName = fileSystem.makeQualified(confFile).toString() + "#core-site.xml"; + final String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class> " + SleepJob.class.getName() + " </main-class>" + + "<arg>-mt</arg>" + + "<arg>300000</arg>" + + "<archive>" + confFileName + "</archive>" + + "</java>"; + final Context context = createContext(actionXml, null); + final RunningJob runningJob = submitAction(context); + waitFor(60 * 1000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return runningJob.getJobStatus().getRunState() == 1; + } + }); + assertFalse(runningJob.isComplete()); + Thread.sleep(15000); + JavaActionExecutor ae = new JavaActionExecutor(); + ae.kill(context, context.getAction()); - assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus()); - assertEquals("KILLED", context.getAction().getExternalStatus()); - assertFalse(runningJob.isSuccessful()); + WorkflowJob wfJob = context.getWorkflow(); + Configuration conf = null; + if (wfJob.getConf() != null) { + conf = new XConfiguration(new StringReader(wfJob.getConf())); } + String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), context.getAction()); + final Configuration jobConf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag)); + jobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, context.getAction().getStartTime().getTime()); + + UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); + UserGroupInformation ugi = ugiService.getProxyUser(getTestUser()); + Set<String> childSet = ugi.doAs(new PrivilegedExceptionAction<Set<String>>() { + @Override + public Set<String> run() throws Exception { + Set<String> childSet = LauncherMainHadoopUtils.getChildJobs(jobConf); + return childSet; + } + }); + assertEquals(1, childSet.size()); + + JobClient jobClient = new JobClient(clusterConf); + for (String jobId : childSet) { + RunningJob childJob = jobClient.getJob(jobId); + assertEquals(JobStatus.State.KILLED.getValue(), childJob.getJobStatus().getRunState()); + } + assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); + assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus()); + assertEquals("KILLED", context.getAction().getExternalStatus()); + assertFalse(runningJob.isSuccessful()); } public void testExceptionSubmitException() throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/70fad805/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java ---------------------------------------------------------------------- diff --git a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java index 5fda0ef..0cf2e90 100644 --- a/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java +++ b/hadooplibs/hadoop-utils-3/src/main/java/org/apache/oozie/action/hadoop/LauncherMainHadoopUtils.java @@ -23,6 +23,9 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; + +import org.apache.commons.lang.StringUtils; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; @@ -53,13 +56,12 @@ public class LauncherMainHadoopUtils { System.out.println("tag id : " + tag); long startTime = 0L; try { - try { - if(actionConf.get(OOZIE_JOB_LAUNCH_TIME) != null) { - startTime = Long.parseLong(actionConf.get(OOZIE_JOB_LAUNCH_TIME)); - } - else { - startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); - } + if(actionConf.get(OOZIE_JOB_LAUNCH_TIME) != null) { + startTime = Long.parseLong(actionConf.get(OOZIE_JOB_LAUNCH_TIME)); + } + else { + startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); + } } catch(NumberFormatException nfe) { throw new RuntimeException("Could not find Oozie job launch time", nfe); } http://git-wip-us.apache.org/repos/asf/oozie/blob/70fad805/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index f96531e..a803c28 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release +OOZIE-2742 Unable to kill applications based on tag (satishsaley via rohini) OOZIE-2720 Test failure - TestCoordMaterializeTriggerService#testMaxMatThrottleNotPicked (gezapeti via rohini) OOZIE-2536 Hadoop's cleanup of local directory in uber mode causing failures (satishsaley via rohini) OOZIE-2723 JSON.org license is now CatX (rkanter, abhishekbafna via shwethags)
