Repository: oozie Updated Branches: refs/heads/master c09561ff2 -> 6056afb7e
OOZIE-2911 Re-add test testWfActionKillChildJob and adapt it to OYA (gezapeti) Change-Id: I2dd9512590994a364d6c770ea3d4792e119701ca Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6056afb7 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6056afb7 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6056afb7 Branch: refs/heads/master Commit: 6056afb7e27f070a652bc9a75756c9de0783a36c Parents: c09561f Author: Gezapeti Cseh <[email protected]> Authored: Sun Jul 9 14:53:24 2017 +0200 Committer: Gezapeti Cseh <[email protected]> Committed: Sun Jul 9 14:54:55 2017 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 24 +++- .../action/hadoop/MapReduceActionExecutor.java | 4 +- .../command/wf/TestActionCheckXCommand.java | 9 +- .../wf/TestWorkflowActionKillXCommand.java | 17 ++- .../java/org/apache/oozie/test/XTestCase.java | 4 +- release-log.txt | 1 + .../hadoop/TestMapReduceActionExecutor.java | 121 +++++-------------- 7 files changed, 71 insertions(+), 109 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/6056afb7/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index fa83f1e..89bac95 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -1508,20 +1508,38 @@ public class JavaActionExecutor extends ActionExecutor { YarnClient yarnClient = null; try { Element actionXml = XmlUtils.parseXml(action.getConf()); - final Configuration jobConf = createBaseHadoopConf(context, actionXml); String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action); jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag)); yarnClient = createYarnClient(context, jobConf); if(action.getExternalId() != null) { - yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId())); + try { + LOG.info("Killing action {0}'s external application {1}", action.getId(), action.getExternalId()); + yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId())); + } catch (Exception e) { + LOG.warn("Could not kill {0}", action.getExternalId(), e); + } + } + String externalChildIDs = action.getExternalChildIDs(); + if(externalChildIDs != null) { + for(String childId : externalChildIDs.split(",")) { + try { + LOG.info("Killing action {0}'s external child application {1}", action.getId(), childId); + yarnClient.killApplication(ConverterUtils.toApplicationId(childId.trim())); + } catch (Exception e) { + LOG.warn("Could not kill external child of {0}, {1}", action.getExternalId(), + childId, e); + } + } } for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL, action.getStartTime().getTime())){ try { + LOG.info("Killing action {0}'s external child application {1} based on tags", + action.getId(), id.toString()); yarnClient.killApplication(id); } catch (Exception e) { - LOG.warn("Could not kill child of {0}, {1}", action.getExternalId(), id); + LOG.warn("Could not kill child of {0}, {1}", action.getExternalId(), id, e); } } http://git-wip-us.apache.org/repos/asf/oozie/blob/6056afb7/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index 4427688..338e508 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -33,6 +33,7 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; @@ -392,7 +393,8 @@ public class MapReduceActionExecutor extends JavaActionExecutor { super.check(context, action); } else { context.setExternalStatus(RUNNING); - context.setExternalChildIDs(newId); + String externalAppId = TypeConverter.toYarn(JobID.forName(newId)).getAppId().toString(); + context.setExternalChildIDs(externalAppId); } } else { super.check(context, action); http://git-wip-us.apache.org/repos/asf/oozie/blob/6056afb7/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java index a5128a8..25d16cf 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java @@ -277,14 +277,7 @@ public class TestActionCheckXCommand extends XDataTestCase { assertEquals("LauncherId", launcherId, externalId); assertNotNull(childId); - final RunningJob mrJob = jobClient.getJob(JobID.forName(childId)); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertTrue(mrJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(childId); new ActionCheckXCommand(action.getId()).call(); action = jpaService.execute(wfActionGetCmd); http://git-wip-us.apache.org/repos/asf/oozie/blob/6056afb7/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java index d68f656..cf77f18 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java @@ -44,8 +44,6 @@ import org.apache.oozie.service.UUIDService; import org.apache.oozie.test.XDataTestCase; import org.apache.oozie.workflow.WorkflowInstance; -import com.google.common.collect.Sets; - public class TestWorkflowActionKillXCommand extends XDataTestCase { private Services services; @@ -160,6 +158,18 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { return action; } + public void testWfActionKillChildJob() throws Exception { + String externalJobID = launchSleepJob(1000); + String childId = launchSleepJob(1000000); + + WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED); + WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), externalJobID, "1", + WorkflowAction.Status.KILLED, childId); + new ActionKillXCommand(action.getId()).call(); + + waitUntilYarnAppKilledAndAssertSuccess(childId); + } + private String launchSleepJob(int sleep) throws Exception { Configuration jobConf = Services.get().get(HadoopAccessorService.class) .createConfiguration(new URI(getNameNodeUri()).getAuthority()); @@ -173,8 +183,7 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis())); jobClient.submitJob(new JobConf(jobConf)); - Set<ApplicationId> apps = Sets.newHashSet(); - apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL); + Set<ApplicationId> apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL); assertEquals("Number of YARN apps", apps.size(), 1); sleepjob.close(); http://git-wip-us.apache.org/repos/asf/oozie/blob/6056afb7/core/src/test/java/org/apache/oozie/test/XTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java index 161927a..2816e5d 100644 --- a/core/src/test/java/org/apache/oozie/test/XTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java @@ -1263,13 +1263,13 @@ public abstract class XTestCase extends TestCase { protected void waitUntilYarnAppDoneAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); - assertEquals("YARN App state", YarnApplicationState.FINISHED, state); + assertEquals("YARN App state for app " + externalId, YarnApplicationState.FINISHED, state); } protected void waitUntilYarnAppKilledAndAssertSuccess(String externalId) throws HadoopAccessorException, IOException, YarnException { YarnApplicationState state = waitUntilYarnAppState(externalId, YARN_TERMINAL_STATES); - assertEquals("YARN App state", YarnApplicationState.KILLED, state); + assertEquals("YARN App state for app " + externalId, YarnApplicationState.KILLED, state); } protected class TestLogAppender extends AppenderSkeleton { http://git-wip-us.apache.org/repos/asf/oozie/blob/6056afb7/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index de6aff5..a35772e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-2911 Re-add test testWfActionKillChildJob and adapt it to OYA (gezapeti) OOZIE-2918 Delete LauncherMapper and its test (asasvari via pbacsko) OOZIE-2733 change org.apache.hadoop.fs.permission.AccessControlException to org.apache.hadoop.security.AccessControlException (gezapeti) OOZIE-2884 consolidate hadoop versions in pomfiles (Artem Ervits via gezapeti) http://git-wip-us.apache.org/repos/asf/oozie/blob/6056afb7/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index 3b38042..2c92f41 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -51,8 +51,11 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.streaming.StreamJob; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutorException; @@ -403,17 +406,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertTrue(mrJob.isSuccessful()); + String externalChildIDs = context.getAction().getExternalChildIDs(); + waitUntilYarnAppDoneAndAssertSuccess(externalChildIDs); ae.check(context, context.getAction()); assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); @@ -431,7 +425,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { //External Child IDs used to be null, but after 4.0, become Non-Null in case of MR action. assertNotNull(context.getExternalChildIDs()); - return mrJob.getID().toString(); + return externalChildIDs; } private void _testSubmitError(String actionXml, String errorMessage) throws Exception { @@ -463,17 +457,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertTrue(mrJob.isSuccessful()); + String externalChildIDs = context.getAction().getExternalChildIDs(); + waitUntilYarnAppDoneAndAssertSuccess(externalChildIDs); ae.check(context, context.getAction()); assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); @@ -481,7 +466,12 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.end(context, context.getAction()); assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); - + Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + String user = conf.get("user.name"); + JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); + org.apache.hadoop.mapreduce.JobID jobID = TypeConverter.fromYarn( + ConverterUtils.toApplicationId(externalChildIDs)); + final RunningJob mrJob = jobClient.getJob(JobID.downgrade(jobID)); assertTrue(MapperReducerCredentialsForTest.hasCredentials(mrJob)); } @@ -673,20 +663,10 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { MapReduceActionExecutor mae = new MapReduceActionExecutor(); mae.check(context, context.getAction()); // must be called so that externalChildIDs are read from HDFS - Configuration conf = mae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); mae.kill(context, context.getAction()); - waitFor(10_000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertEquals(JobStatus.State.KILLED, mrJob.getJobStatus().getState()); + waitUntilYarnAppKilledAndAssertSuccess(context.getAction().getExternalChildIDs()); } public void testMapReduceWithCredentials() throws Exception { @@ -767,16 +747,16 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>" + getNameNodeUri() + "</name-node>" + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>"; - String jobID = _testSubmit(MAP_REDUCE, actionXml); + String appID = _testSubmit(MAP_REDUCE, actionXml); boolean containsLib1Jar = false; - String lib1JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar"; + String lib1JarStr = "jobcache/" + appID + "/jars/lib/lib1.jar"; Pattern lib1JarPatYarn = Pattern.compile( - ".*appcache/application_" + jobID.replaceFirst("job_", "") + "/filecache/.*/uber.jar/lib/lib1.jar.*"); + ".*appcache/" + appID + "/filecache/.*/uber.jar/lib/lib1.jar.*"); boolean containsLib2Jar = false; - String lib2JarStr = "jobcache/" + jobID + "/jars/lib/lib1.jar"; + String lib2JarStr = "jobcache/" + appID + "/jars/lib/lib1.jar"; Pattern lib2JarPatYarn = Pattern.compile( - ".*appcache/application_" + jobID.replaceFirst("job_", "") + "/filecache/.*/uber.jar/lib/lib2.jar.*"); + ".*appcache/" + appID + "/filecache/.*/uber.jar/lib/lib2.jar.*"); FileStatus[] fstats = getFileSystem().listStatus(outputDir); for (FileStatus fstat : fstats) { @@ -995,17 +975,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - String user = conf.get("user.name"); - String group = conf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertTrue(mrJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalChildIDs()); ae.check(context, context.getAction()); assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); @@ -1065,18 +1035,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - String user = conf.get("user.name"); - String group = conf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertTrue(mrJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalChildIDs()); ae.check(context, context.getAction()); assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); @@ -1132,18 +1091,7 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - String user = conf.get("user.name"); - String group = conf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertTrue(mrJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalChildIDs()); ae.check(context, context.getAction()); assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); @@ -1214,19 +1162,8 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); - Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); - String user = conf.get("user.name"); - - JobClient jobClient = Services.get().get(HadoopAccessorService.class) - .createJobClient(user, conf); - final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalChildIDs())); - - waitFor(120 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return mrJob.isComplete(); - } - }); - assertTrue(mrJob.isSuccessful()); + String externalChildIDs = context.getAction().getExternalChildIDs(); + waitUntilYarnAppDoneAndAssertSuccess(externalChildIDs); ae.check(context, context.getAction()); assertEquals(JavaActionExecutor.SUCCEEDED, context.getAction().getExternalStatus()); @@ -1235,9 +1172,11 @@ public class TestMapReduceActionExecutor extends ActionExecutorTestCase { ae.end(context, context.getAction()); assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); + Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri()); + final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), conf); + ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(externalChildIDs)); // Assert Mapred job name has been set - System.out.println("Mapred job name: " + mrJob.getJobName()); - assertTrue(mrJob.getJobName().equals(mapredJobName)); + assertEquals(mapredJobName, report.getName()); // Assert for stats info stored in the context. assertNull(context.getExecutionStats());
