Repository: oozie Updated Branches: refs/heads/master 9ac7f5356 -> f3b022bb1
OOZIE-2910 Re-add testChildKill and adapt it to OYA (pbacsko and gezapeti) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f3b022bb Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f3b022bb Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f3b022bb Branch: refs/heads/master Commit: f3b022bb17c330b966b277be48515fe6909b839b Parents: 9ac7f53 Author: Gezapeti Cseh <[email protected]> Authored: Fri Jul 28 13:24:41 2017 +0200 Committer: Gezapeti Cseh <[email protected]> Committed: Fri Jul 28 13:24:41 2017 +0200 ---------------------------------------------------------------------- .../action/hadoop/TestJavaActionExecutor.java | 84 ++++++++++++++++++++ release-log.txt | 4 +- .../apache/oozie/action/hadoop/LauncherAM.java | 4 +- .../oozie/action/hadoop/LauncherMain.java | 65 ++++++++++++--- 4 files changed, 144 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/f3b022bb/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 59a21c4..c51c64a 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -27,15 +27,20 @@ import java.io.OutputStreamWriter; import java.io.StringReader; import java.io.Writer; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.text.SimpleDateFormat; import java.util.Arrays; import java.util.Collections; import java.util.Date; +import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Properties; +import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.examples.SleepJob; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -43,8 +48,14 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; @@ -58,6 +69,7 @@ import org.apache.oozie.service.LiteWorkflowStoreService; import org.apache.oozie.service.Services; import org.apache.oozie.service.ShareLibService; import org.apache.oozie.service.UUIDService; +import org.apache.oozie.service.UserGroupInformationService; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.service.WorkflowStoreService; import org.apache.oozie.util.IOUtils; @@ -2251,4 +2263,76 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { String actPath = JavaActionExecutor.getTrimmedEncodedPath("/user/map dev/test-case/shell/script/shell 1.sh"); assertEquals("/user/map%20dev/test-case/shell/script/shell%201.sh", actPath); } + + public void testChildKill() throws Exception { + final JobConf clusterConf = createJobConf(); + FileSystem fileSystem = FileSystem.get(clusterConf); + Path confFile = new Path("/tmp/cluster-conf.xml"); + OutputStream out = fileSystem.create(confFile); + clusterConf.writeXml(out); + out.close(); + String confFileName = fileSystem.makeQualified(confFile).toString() + "#core-site.xml"; + final String actionXml = "<java>" + + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + + "<name-node>" + getNameNodeUri() + "</name-node>" + + "<main-class> " + SleepJob.class.getName() + " </main-class>" + + "<arg>-mt</arg>" + + "<arg>300000</arg>" + + "<archive>" + confFileName + "</archive>" + + "</java>"; + final Context context = createContext(actionXml, null); + final String runningJob = submitAction(context); + YarnApplicationState state = waitUntilYarnAppState(runningJob, EnumSet.of(YarnApplicationState.RUNNING)); + assertEquals(YarnApplicationState.RUNNING, state); + + WorkflowJob wfJob = context.getWorkflow(); + Configuration conf = null; + if (wfJob.getConf() != null) { + conf = new XConfiguration(new StringReader(wfJob.getConf())); + } + String launcherTag = LauncherHelper.getActionYarnTag(conf, wfJob.getParentId(), context.getAction()); + JavaActionExecutor ae = new JavaActionExecutor(); + final Configuration jobConf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); + jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag)); + jobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, context.getAction().getStartTime().getTime()); + + // We have to use a proper UGI for retrieving the child apps, because the WF is + // submitted as a test user, not as the current login user + UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); + final UserGroupInformation ugi = ugiService.getProxyUser(getTestUser()); + final Set<ApplicationId> childSet = new HashSet<>(); + + // wait until we have a child MR job + waitFor(60_000, new Predicate() { + @Override + public boolean evaluate() throws Exception { + return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws Exception { + childSet.clear(); + childSet.addAll(LauncherMain.getChildYarnJobs(jobConf)); + return childSet.size() > 0; + } + }); + } + }); + assertEquals(1, childSet.size()); + + // kill the action - based on the job tag, the SleepJob is expected to be killed too + ae.kill(context, context.getAction()); + + HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class); + Configuration config = hadoopAccessorService.createConfiguration(getJobTrackerUri()); + YarnClient yarnClient = hadoopAccessorService.createYarnClient(getTestUser(), config); + + // check that both the launcher & MR job were successfully killed + ApplicationId jobId = childSet.iterator().next(); + assertEquals(YarnApplicationState.KILLED, yarnClient.getApplicationReport(jobId).getYarnApplicationState()); + assertTrue(ae.isCompleted(context.getAction().getExternalStatus())); + assertEquals(WorkflowAction.Status.DONE, context.getAction().getStatus()); + assertEquals(JavaActionExecutor.KILLED, context.getAction().getExternalStatus()); + assertEquals(FinalApplicationStatus.KILLED, + yarnClient.getApplicationReport(ConverterUtils.toApplicationId(runningJob)).getFinalApplicationStatus()); + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/f3b022bb/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index c18f89e..1121ad3 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,7 +1,9 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-2910 Re-add testChildKill and adapt it to OYA (pbacsko and gezapeti) +OOZIE-2995 In preparation for Java 8, remove MaxPermSize=512m (Artem Ervits via asasvari) OOZIE-3004 Forked action retry info is not working (puru) -OOZIE-2601 Ability to use local paths for the sharelib +OOZIE-2601 Ability to use local paths for the sharelib (asasvari) OOZIE-2987 Coord action missing dependencies should show URI template with unresolved dependencies (puru) OOZIE-2004 Improve Oozie version info output (Artem Ervits via gezapeti) OOZIE-2854 Oozie should handle transient database problems (andras.piros via gezapeti) http://git-wip-us.apache.org/repos/asf/oozie/blob/f3b022bb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java index bac17b2..6a98d6e 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@ -46,8 +46,8 @@ import com.google.common.base.Objects; import com.google.common.base.Preconditions; public class LauncherAM { - private static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml"; - private static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id"; + public static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml"; + public static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id"; public static final String JAVA_CLASS_PATH = "java.class.path"; public static final String OOZIE_ACTION_ID = "oozie.action.id"; http://git-wip-us.apache.org/repos/asf/oozie/blob/f3b022bb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java index 0236e1b..f1f52c6 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java @@ -23,10 +23,13 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.FileReader; +import java.io.FileWriter; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.OutputStreamWriter; import java.io.StringWriter; +import java.io.Writer; import java.net.URL; import java.util.Arrays; import java.util.Collection; @@ -73,6 +76,8 @@ public abstract class LauncherMain { public static final String TEZ_APPLICATION_TAGS = "tez.application.tags"; public static final String SPARK_YARN_TAGS = "spark.yarn.tags"; + public static final String PROPAGATION_CONF_XML = "propagation-conf.xml"; + protected static String[] HADOOP_SITE_FILES = new String[] {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", "yarn-site.xml"}; @@ -93,6 +98,7 @@ public abstract class LauncherMain { protected static void run(Class<? extends LauncherMain> klass, String[] args) throws Exception { LauncherMain main = klass.newInstance(); main.setupLog4jProperties(); + main.propagateToHadoopConf(); main.run(args); } @@ -194,7 +200,7 @@ public abstract class LauncherMain { Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); if (tag == null) { - System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS); + System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS); return childYarnJobs; } System.out.println("tag id : " + tag); @@ -223,17 +229,24 @@ public abstract class LauncherMain { throw new RuntimeException("Exception occurred while finding child jobs", ioe); } - System.out.println("Child yarn jobs are found - " + StringUtils.join(childYarnJobs, ",")); + if (childYarnJobs.isEmpty()) { + System.out.println("No child applications found"); + } else { + System.out.println("Found child YARN applications: " + StringUtils.join(childYarnJobs, ",")); + } + return childYarnJobs; } public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope) { System.out.println("Fetching child yarn jobs"); - long startTime = 0L; - try { - startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); - } catch(NumberFormatException nfe) { - throw new RuntimeException("Could not find Oozie job launch time", nfe); + long startTime = actionConf.getLong(OOZIE_JOB_LAUNCH_TIME, 0L); + if(startTime == 0) { + try { + startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); + } catch (NumberFormatException nfe) { + throw new RuntimeException("Could not find Oozie job launch time", nfe); + } } return getChildYarnJobs(actionConf, scope, startTime); } @@ -243,13 +256,13 @@ public abstract class LauncherMain { Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf); if (!childYarnJobs.isEmpty()) { System.out.println(); - System.out.println("Found [" + childYarnJobs.size() + "] Map-Reduce jobs from this launcher"); - System.out.println("Killing existing jobs and starting over:"); + System.out.println("Found [" + childYarnJobs.size() + "] YARN application(s) from this launcher"); + System.out.println("Killing existing applications and starting over:"); YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(actionConf); yarnClient.start(); for (ApplicationId app : childYarnJobs) { - System.out.print("Killing job [" + app + "] ... "); + System.out.print("Killing [" + app + "] ... "); yarnClient.killApplication(app); System.out.println("Done"); } @@ -419,6 +432,38 @@ public abstract class LauncherMain { } } } + + /* + * Pushing all important conf to hadoop conf for the action. This is also useful in a situation when a MapReduce job is + * submitted from a Java action, because the MR job tags must be set. If it's not set, then it's not possible to kill the + * MR job because child jobs are looked up based on tags. + */ + public void propagateToHadoopConf() throws IOException { + Configuration propagationConf = new Configuration(false); + if (System.getProperty(LauncherAM.OOZIE_ACTION_ID) != null) { + propagationConf.set(LauncherAM.OOZIE_ACTION_ID, System.getProperty(LauncherAM.OOZIE_ACTION_ID)); + } + if (System.getProperty(LauncherAM.OOZIE_JOB_ID) != null) { + propagationConf.set(LauncherAM.OOZIE_JOB_ID, System.getProperty(LauncherAM.OOZIE_JOB_ID)); + } + if(System.getProperty(LauncherAM.OOZIE_LAUNCHER_JOB_ID) != null) { + propagationConf.set(LauncherAM.OOZIE_LAUNCHER_JOB_ID, System.getProperty(LauncherAM.OOZIE_LAUNCHER_JOB_ID)); + } + + // loading action conf prepared by Oozie + Configuration actionConf = LauncherMain.loadActionConf(); + + if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) != null) { + propagationConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(CHILD_MAPREDUCE_JOB_TAGS)); + } + + try (Writer writer = new FileWriter(PROPAGATION_CONF_XML)) { + propagationConf.writeXml(writer); + } + + Configuration.dumpConfiguration(propagationConf, new OutputStreamWriter(System.out)); + Configuration.addDefaultResource(PROPAGATION_CONF_XML); + } } class LauncherMainException extends Exception {
