Repository: oozie Updated Branches: refs/heads/master 9735dd38c -> 6323a8e43
OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6323a8e4 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6323a8e4 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6323a8e4 Branch: refs/heads/master Commit: 6323a8e43e4d70bcad69059a5e162f48cd0ed43c Parents: 9735dd3 Author: Andras Piros <[email protected]> Authored: Tue Sep 4 14:35:09 2018 +0200 Committer: Andras Piros <[email protected]> Committed: Tue Sep 4 14:35:09 2018 +0200 ---------------------------------------------------------------------- .../oozie/action/hadoop/JavaActionExecutor.java | 93 +++++++++++++------- release-log.txt | 1 + .../oozie/action/hadoop/LauncherMain.java | 48 +++++++--- .../oozie/action/hadoop/TestLauncherMain.java | 40 +++++++-- 4 files changed, 135 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 8f0f244..05fac39 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.util.Apps; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; @@ -1780,36 +1781,11 @@ public class JavaActionExecutor extends ActionExecutor { String launcherTag = getActionYarnTag(context, action); jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag)); yarnClient = createYarnClient(context, jobConf); - if(action.getExternalId() != null) { - try { - LOG.info("Killing action {0}'s external application {1}", action.getId(), action.getExternalId()); - yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId())); - } catch (Exception e) { - LOG.warn("Could not kill {0}", action.getExternalId(), e); - } - } - String externalChildIDs = action.getExternalChildIDs(); - if(externalChildIDs != null) { - for(String childId : externalChildIDs.split(",")) { - try { - LOG.info("Killing action {0}'s external child application {1}", action.getId(), childId); - yarnClient.killApplication(ConverterUtils.toApplicationId(childId.trim())); - } catch (Exception e) { - LOG.warn("Could not kill external child of {0}, {1}", action.getExternalId(), - childId, e); - } - } - } - for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL, - action.getStartTime().getTime())){ - try { - LOG.info("Killing action {0}'s external child application {1} based on tags", - action.getId(), id.toString()); - yarnClient.killApplication(id); - } catch (Exception e) { - LOG.warn("Could not kill child of {0}, {1}", action.getExternalId(), id, e); - } - } + + String appExternalId = action.getExternalId(); + killExternalApp(action, yarnClient, appExternalId); + killExternalChildApp(action, yarnClient, appExternalId); + killExternalChildAppByTags(action, yarnClient, jobConf, appExternalId); context.setExternalStatus(KILLED); context.setExecutionData(KILLED, null); @@ -1828,6 +1804,63 @@ public class JavaActionExecutor extends ActionExecutor { } } + private boolean finalAppStatusUndefined(ApplicationReport appReport) { + FinalApplicationStatus status = appReport.getFinalApplicationStatus(); + return !FinalApplicationStatus.SUCCEEDED.equals(status) && + !FinalApplicationStatus.FAILED.equals(status) && + !FinalApplicationStatus.KILLED.equals(status); + } + + void killExternalApp(WorkflowAction action, YarnClient yarnClient, String appExternalId) + throws YarnException, IOException { + if (appExternalId != null) { + ApplicationId appId = ConverterUtils.toApplicationId(appExternalId); + if (finalAppStatusUndefined(yarnClient.getApplicationReport(appId))) { + try { + LOG.info("Killing action {0}''s external application {1}", action.getId(), appExternalId); + yarnClient.killApplication(appId); + } catch (Exception e) { + LOG.warn("Could not kill {0}", appExternalId, e); + } + } + } + } + + void killExternalChildApp(WorkflowAction action, YarnClient yarnClient, String appExternalId) + throws YarnException, IOException { + String externalChildIDs = action.getExternalChildIDs(); + if (externalChildIDs != null) { + for (String childId : externalChildIDs.split(",")) { + ApplicationId appChildId = ConverterUtils.toApplicationId(childId.trim()); + if (finalAppStatusUndefined(yarnClient.getApplicationReport(appChildId))) { + try { + LOG.info("Killing action {0}''s external child application {1}", action.getId(), childId); + yarnClient.killApplication(appChildId); + } catch (Exception e) { + LOG.warn("Could not kill external child of {0}, {1}", + appExternalId, childId, e); + } + } + } + } + } + + void killExternalChildAppByTags(WorkflowAction action, YarnClient yarnClient, Configuration jobConf, String appExternalId) + throws YarnException, IOException { + for (ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL, + action.getStartTime().getTime())) { + if (finalAppStatusUndefined(yarnClient.getApplicationReport(id))) { + try { + LOG.info("Killing action {0}''s external child application {1} based on tags", + action.getId(), id.toString()); + yarnClient.killApplication(id); + } catch (Exception e) { + LOG.warn("Could not kill child of {0}, {1}", appExternalId, id, e); + } + } + } + } + private String getActionYarnTag(Context context, WorkflowAction action) { return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), context.getWorkflow().getParentId(), action); } http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index ce945b4..caacb1f 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.1.0 release (trunk - unreleased) +OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros) OOZIE-3155 [ui] Job DAG is not refreshed when a job is finished (asalamon74 via andras.piros) OOZIE-3334 Don't use org.apache.hadoop.hbase.security.User in HDFSCredentials (gezapeti) OOZIE-3210 [build] Revision information is empty (asalamon74 via andras.piros) http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java index e6e182c..c9e2a91 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java @@ -31,7 +31,9 @@ import java.io.OutputStreamWriter; import java.io.StringWriter; import java.io.Writer; import java.net.URL; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; @@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.exceptions.YarnException; @@ -202,6 +205,7 @@ public abstract class LauncherMain { System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS); return childYarnJobs; } + System.out.println("tag id : " + tag); GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); gar.setScope(scope); @@ -254,24 +258,44 @@ public abstract class LauncherMain { try { Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf); if (!childYarnJobs.isEmpty()) { - System.out.println(); - System.out.println("Found [" + childYarnJobs.size() + "] YARN application(s) from this launcher"); - System.out.println("Killing existing applications and starting over:"); - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(actionConf); - yarnClient.start(); - for (ApplicationId app : childYarnJobs) { - System.out.print("Killing [" + app + "] ... "); - yarnClient.killApplication(app); - System.out.println("Done"); - } - System.out.println(); + checkAndKillChildYarnJobs(YarnClient.createYarnClient(), actionConf, childYarnJobs); } } catch (IOException | YarnException ye) { throw new RuntimeException("Exception occurred while killing child job(s)", ye); } } + @VisibleForTesting + protected static Collection<ApplicationId> checkAndKillChildYarnJobs(YarnClient yarnClient, + Configuration actionConf, + Collection<ApplicationId> childYarnJobs) + throws YarnException, IOException { + + System.out.println(); + System.out.println("Found [" + childYarnJobs.size() + "] YARN application(s) from this launcher"); + System.out.println("Killing existing applications and starting over:"); + yarnClient.init(actionConf); + yarnClient.start(); + Collection<ApplicationId> killedapps = new ArrayList<>(); + for (ApplicationId app : childYarnJobs) { + if (finalAppStatusUndefined(yarnClient.getApplicationReport(app))) { + System.out.print("Killing [" + app + "] ... "); + yarnClient.killApplication(app); + System.out.println("Done"); + killedapps.add(app); + } + } + System.out.println(); + return killedapps; + } + + private static boolean finalAppStatusUndefined(ApplicationReport appReport) { + FinalApplicationStatus status = appReport.getFinalApplicationStatus(); + return !FinalApplicationStatus.SUCCEEDED.equals(status) && + !FinalApplicationStatus.FAILED.equals(status) && + !FinalApplicationStatus.KILLED.equals(status); + } + protected abstract void run(String[] args) throws Exception; /** http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java index 5fb7cf5..b613978 100644 --- a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java +++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java @@ -18,10 +18,11 @@ package org.apache.oozie.action.hadoop; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.conf.Configuration; import java.io.ByteArrayOutputStream; import java.io.File; @@ -29,10 +30,16 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.PrintStream; import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; import java.util.Properties; -import org.apache.hadoop.conf.Configuration; import org.junit.rules.TemporaryFolder; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.mockito.Mockito; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -110,4 +117,27 @@ public class TestLauncherMain { String contents = new String(Files.readAllBytes(f.toPath())); assertTrue(contents.contains("foo=bar")); } + + @Test + public void testKillChildYarnJobs() throws Exception { + YarnClient yc = Mockito.mock(YarnClient.class); + ApplicationReport ar = Mockito.mock(ApplicationReport.class); + Mockito.when(yc.getApplicationReport(Mockito.any(ApplicationId.class))).thenReturn(ar); + + Mockito.when(ar.getFinalApplicationStatus()) + .thenReturn(FinalApplicationStatus.UNDEFINED) + .thenReturn(FinalApplicationStatus.FAILED) + .thenReturn(FinalApplicationStatus.KILLED); + + ApplicationId appz[] = { + ApplicationId.newInstance(System.currentTimeMillis(), 1), + ApplicationId.newInstance(System.currentTimeMillis(), 2), + ApplicationId.newInstance(System.currentTimeMillis(), 3) + }; + + Collection<ApplicationId> result = LauncherMain.checkAndKillChildYarnJobs(yc, null, Arrays.asList(appz)); + + assertEquals(1, result.size()); + assertEquals(appz[0].getId(), result.iterator().next().getId()); + } }
