OOZIE-2594 correctly implement MapReduceActionExecutor.kill() Change-Id: Ia091bd3943f4abf1b4e9c505a01fbb926fceac91
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/782837fc Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/782837fc Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/782837fc Branch: refs/heads/oya Commit: 782837fcef594ae73a46a620923fb69a8248d1de Parents: 61f3a9f Author: Peter Bacsko <[email protected]> Authored: Fri Nov 11 14:42:00 2016 +0100 Committer: Peter Bacsko <[email protected]> Committed: Fri Nov 11 17:36:56 2016 +0100 ---------------------------------------------------------------------- .../org/apache/oozie/action/ActionExecutor.java | 2 +- .../action/hadoop/MapReduceActionExecutor.java | 41 ++++++++++++++++++++ .../wf/TestWorkflowActionKillXCommand.java | 14 ------- .../org/apache/oozie/test/XDataTestCase.java | 2 +- 4 files changed, 43 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/main/java/org/apache/oozie/action/ActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java index 1d6456b..919509d 100644 --- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java @@ -596,7 +596,7 @@ public abstract class ActionExecutor { * @param action the action * @return the action yarn tag */ - public String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) { + public static String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) { if (conf.get(OOZIE_ACTION_YARN_TAG) != null) { return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName(); } http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index 1b975ab..e97de7e 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -33,6 +33,14 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; @@ -393,6 +401,39 @@ public class MapReduceActionExecutor extends JavaActionExecutor { } @Override + public void kill(Context context, WorkflowAction action) throws ActionExecutorException { + // Kill the LauncherAM which submits the MR job + super.kill(context, action); + + // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get + // the YARN ApplicationID based on the tag and kill it as well + + // TODO: this must be tested in TestMapReduceActionExecutor + try { + String tag = ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action); + GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(ApplicationsRequestScope.ALL); + gar.setApplicationTags(Collections.singleton(tag)); + Element actionXml = XmlUtils.parseXml(action.getConf()); + Configuration actionConf = loadHadoopDefaultResources(context, actionXml); + ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class); + GetApplicationsResponse apps = proxy.getApplications(gar); + List<ApplicationReport> appsList = apps.getApplicationList(); + + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(actionConf); + yarnClient.start(); + + for (ApplicationReport app : appsList) { + LOG.info("Killing MapReduce job {0}", app.getApplicationId().toString()); + yarnClient.killApplication(app.getApplicationId()); + } + } catch (Exception e) { + throw convertException(e); + } + } + + @Override void injectActionCallback(Context context, Configuration actionConf) { injectCallback(context, actionConf); } http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java index 71b46d1..ef75f14 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java @@ -115,20 +115,6 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { assertEquals(action.getExternalStatus(), "RUNNING"); } - // FIXME - fix JAE.kill() - public void testWfActionKillChildJob() throws Exception { - String externalJobID = launchSleepJob(1000); - String childId = launchSleepJob(1000000); - - WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED); - WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), externalJobID, "1", - WorkflowAction.Status.KILLED, childId); - - new ActionKillXCommand(action.getId()).call(); - - waitUntilYarnAppKilledAndAssertSuccess(childId); - } - protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName, WorkflowAction.Status status, String childID) throws Exception { WorkflowActionBean action = new WorkflowActionBean(); http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/test/java/org/apache/oozie/test/XDataTestCase.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java index ea778bd..2105e2f 100644 --- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java +++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java @@ -1452,7 +1452,7 @@ public abstract class XDataTestCase extends XHCatTestCase { action.setUserRetryMax(2); action.setUserRetryInterval(1); action.setErrorInfo("dummyErrorCode", "dummyErrorMessage"); - action.setExternalId("dummy external id"); + action.setExternalId("application_1234567890123_0001"); action.setExternalStatus("RUNNING"); return action;
