Repository: oozie
Updated Branches:
  refs/heads/master 9ac7f5356 -> f3b022bb1


OOZIE-2910 Re-add testChildKill and adapt it to OYA (pbacsko and gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f3b022bb
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f3b022bb
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f3b022bb

Branch: refs/heads/master
Commit: f3b022bb17c330b966b277be48515fe6909b839b
Parents: 9ac7f53
Author: Gezapeti Cseh <[email protected]>
Authored: Fri Jul 28 13:24:41 2017 +0200
Committer: Gezapeti Cseh <[email protected]>
Committed: Fri Jul 28 13:24:41 2017 +0200

----------------------------------------------------------------------
 .../action/hadoop/TestJavaActionExecutor.java   | 84 ++++++++++++++++++++
 release-log.txt                                 |  4 +-
 .../apache/oozie/action/hadoop/LauncherAM.java  |  4 +-
 .../oozie/action/hadoop/LauncherMain.java       | 65 ++++++++++++---
 4 files changed, 144 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/f3b022bb/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 59a21c4..c51c64a 100644
--- 
a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ 
b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -27,15 +27,20 @@ import java.io.OutputStreamWriter;
 import java.io.StringReader;
 import java.io.Writer;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 import java.text.SimpleDateFormat;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,8 +48,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
@@ -58,6 +69,7 @@ import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.UUIDService;
+import org.apache.oozie.service.UserGroupInformationService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.WorkflowStoreService;
 import org.apache.oozie.util.IOUtils;
@@ -2251,4 +2263,76 @@ public class TestJavaActionExecutor extends 
ActionExecutorTestCase {
         String actPath = JavaActionExecutor.getTrimmedEncodedPath("/user/map 
dev/test-case/shell/script/shell 1.sh");
         assertEquals("/user/map%20dev/test-case/shell/script/shell%201.sh", 
actPath);
     }
+
+    public void testChildKill() throws Exception {
+        final JobConf clusterConf = createJobConf();
+        FileSystem fileSystem = FileSystem.get(clusterConf);
+        Path confFile = new Path("/tmp/cluster-conf.xml");
+        OutputStream out = fileSystem.create(confFile);
+        clusterConf.writeXml(out);
+        out.close();
+        String confFileName = fileSystem.makeQualified(confFile).toString() + 
"#core-site.xml";
+        final String actionXml = "<java>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<main-class> " + SleepJob.class.getName() + " </main-class>" +
+                "<arg>-mt</arg>" +
+                "<arg>300000</arg>" +
+                "<archive>" + confFileName + "</archive>" +
+                "</java>";
+        final Context context = createContext(actionXml, null);
+        final String runningJob = submitAction(context);
+        YarnApplicationState state = waitUntilYarnAppState(runningJob, 
EnumSet.of(YarnApplicationState.RUNNING));
+        assertEquals(YarnApplicationState.RUNNING, state);
+
+        WorkflowJob wfJob = context.getWorkflow();
+        Configuration conf = null;
+        if (wfJob.getConf() != null) {
+            conf = new XConfiguration(new StringReader(wfJob.getConf()));
+        }
+        String launcherTag = LauncherHelper.getActionYarnTag(conf, 
wfJob.getParentId(), context.getAction());
+        JavaActionExecutor ae = new JavaActionExecutor();
+        final Configuration jobConf = ae.createBaseHadoopConf(context, 
XmlUtils.parseXml(actionXml));
+        jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, 
LauncherHelper.getTag(launcherTag));
+        jobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, 
context.getAction().getStartTime().getTime());
+
+        // We have to use a proper UGI for retrieving the child apps, because 
the WF is
+        // submitted as a test user, not as the current login user
+        UserGroupInformationService ugiService = 
Services.get().get(UserGroupInformationService.class);
+        final UserGroupInformation ugi = 
ugiService.getProxyUser(getTestUser());
+        final Set<ApplicationId> childSet = new HashSet<>();
+
+        // wait until we have a child MR job
+        waitFor(60_000, new Predicate() {
+          @Override
+          public boolean evaluate() throws Exception {
+            return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+              @Override
+              public Boolean run() throws Exception {
+                childSet.clear();
+                childSet.addAll(LauncherMain.getChildYarnJobs(jobConf));
+                return childSet.size() > 0;
+              }
+            });
+          }
+        });
+        assertEquals(1, childSet.size());
+
+        // kill the action - based on the job tag, the SleepJob is expected to 
be killed too
+        ae.kill(context, context.getAction());
+
+        HadoopAccessorService hadoopAccessorService = 
Services.get().get(HadoopAccessorService.class);
+        Configuration config = 
hadoopAccessorService.createConfiguration(getJobTrackerUri());
+        YarnClient yarnClient =  
hadoopAccessorService.createYarnClient(getTestUser(), config);
+
+        // check that both the launcher & MR job were successfully killed
+        ApplicationId jobId = childSet.iterator().next();
+        assertEquals(YarnApplicationState.KILLED, 
yarnClient.getApplicationReport(jobId).getYarnApplicationState());
+        assertTrue(ae.isCompleted(context.getAction().getExternalStatus()));
+        assertEquals(WorkflowAction.Status.DONE, 
context.getAction().getStatus());
+        assertEquals(JavaActionExecutor.KILLED, 
context.getAction().getExternalStatus());
+        assertEquals(FinalApplicationStatus.KILLED,
+                
yarnClient.getApplicationReport(ConverterUtils.toApplicationId(runningJob)).getFinalApplicationStatus());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/f3b022bb/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index c18f89e..1121ad3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,7 +1,9 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-2910 Re-add testChildKill and adapt it to OYA (pbacsko and gezapeti)
+OOZIE-2995 In preparation for Java 8, remove MaxPermSize=512m (Artem Ervits 
via asasvari)
 OOZIE-3004 Forked action retry info is not working (puru)
-OOZIE-2601 Ability to use local paths for the sharelib
+OOZIE-2601 Ability to use local paths for the sharelib (asasvari)
 OOZIE-2987 Coord action missing dependencies should show URI template with 
unresolved dependencies (puru)
 OOZIE-2004 Improve Oozie version info output (Artem Ervits via gezapeti)
 OOZIE-2854 Oozie should handle transient database problems (andras.piros via 
gezapeti)

http://git-wip-us.apache.org/repos/asf/oozie/blob/f3b022bb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index bac17b2..6a98d6e 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -46,8 +46,8 @@ import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 
 public class LauncherAM {
-    private static final String OOZIE_ACTION_CONF_XML = 
"oozie.action.conf.xml";
-    private static final String OOZIE_LAUNCHER_JOB_ID = 
"oozie.launcher.job.id";
+    public static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml";
+    public static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id";
 
     public static final String JAVA_CLASS_PATH = "java.class.path";
     public static final String OOZIE_ACTION_ID = "oozie.action.id";

http://git-wip-us.apache.org/repos/asf/oozie/blob/f3b022bb/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index 0236e1b..f1f52c6 100644
--- 
a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ 
b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -23,10 +23,13 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.FileReader;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.OutputStreamWriter;
 import java.io.StringWriter;
+import java.io.Writer;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
@@ -73,6 +76,8 @@ public abstract class LauncherMain {
 
     public static final String TEZ_APPLICATION_TAGS = "tez.application.tags";
     public static final String SPARK_YARN_TAGS = "spark.yarn.tags";
+    public static final String PROPAGATION_CONF_XML = "propagation-conf.xml";
+
     protected static String[] HADOOP_SITE_FILES = new String[]
             {"core-site.xml", "hdfs-site.xml", "mapred-site.xml", 
"yarn-site.xml"};
 
@@ -93,6 +98,7 @@ public abstract class LauncherMain {
     protected static void run(Class<? extends LauncherMain> klass, String[] 
args) throws Exception {
         LauncherMain main = klass.newInstance();
         main.setupLog4jProperties();
+        main.propagateToHadoopConf();
         main.run(args);
     }
 
@@ -194,7 +200,7 @@ public abstract class LauncherMain {
         Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
         String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
         if (tag == null) {
-            System.out.print("Could not find Yarn tags property " + 
CHILD_MAPREDUCE_JOB_TAGS);
+            System.out.print("Could not find YARN tags property " + 
CHILD_MAPREDUCE_JOB_TAGS);
             return childYarnJobs;
         }
         System.out.println("tag id : " + tag);
@@ -223,17 +229,24 @@ public abstract class LauncherMain {
             throw new RuntimeException("Exception occurred while finding child 
jobs", ioe);
         }
 
-        System.out.println("Child yarn jobs are found - " + 
StringUtils.join(childYarnJobs, ","));
+        if (childYarnJobs.isEmpty()) {
+            System.out.println("No child applications found");
+        } else {
+            System.out.println("Found child YARN applications: " + 
StringUtils.join(childYarnJobs, ","));
+        }
+
         return childYarnJobs;
     }
     public static Set<ApplicationId> getChildYarnJobs(Configuration 
actionConf, ApplicationsRequestScope scope) {
         System.out.println("Fetching child yarn jobs");
 
-        long startTime = 0L;
-        try {
-            startTime = 
Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
-        } catch(NumberFormatException nfe) {
-            throw new RuntimeException("Could not find Oozie job launch time", 
nfe);
+        long startTime = actionConf.getLong(OOZIE_JOB_LAUNCH_TIME, 0L);
+        if(startTime == 0) {
+            try {
+                startTime = 
Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
+            } catch (NumberFormatException nfe) {
+                throw new RuntimeException("Could not find Oozie job launch 
time", nfe);
+            }
         }
         return getChildYarnJobs(actionConf, scope, startTime);
     }
@@ -243,13 +256,13 @@ public abstract class LauncherMain {
             Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
             if (!childYarnJobs.isEmpty()) {
                 System.out.println();
-                System.out.println("Found [" + childYarnJobs.size() + "] 
Map-Reduce jobs from this launcher");
-                System.out.println("Killing existing jobs and starting over:");
+                System.out.println("Found [" + childYarnJobs.size() + "] YARN 
application(s) from this launcher");
+                System.out.println("Killing existing applications and starting 
over:");
                 YarnClient yarnClient = YarnClient.createYarnClient();
                 yarnClient.init(actionConf);
                 yarnClient.start();
                 for (ApplicationId app : childYarnJobs) {
-                    System.out.print("Killing job [" + app + "] ... ");
+                    System.out.print("Killing [" + app + "] ... ");
                     yarnClient.killApplication(app);
                     System.out.println("Done");
                 }
@@ -419,6 +432,38 @@ public abstract class LauncherMain {
             }
         }
     }
+
+    /*
+     * Pushing all important conf to hadoop conf for the action. This is also 
useful in a situation when a MapReduce job is
+     * submitted from a Java action, because the MR job tags must be set. If 
it's not set, then it's not possible to kill the
+     * MR job because child jobs are looked up based on tags.
+     */
+    public void propagateToHadoopConf() throws IOException {
+      Configuration propagationConf = new Configuration(false);
+      if (System.getProperty(LauncherAM.OOZIE_ACTION_ID) != null) {
+          propagationConf.set(LauncherAM.OOZIE_ACTION_ID, 
System.getProperty(LauncherAM.OOZIE_ACTION_ID));
+      }
+      if (System.getProperty(LauncherAM.OOZIE_JOB_ID) != null) {
+          propagationConf.set(LauncherAM.OOZIE_JOB_ID, 
System.getProperty(LauncherAM.OOZIE_JOB_ID));
+      }
+      if(System.getProperty(LauncherAM.OOZIE_LAUNCHER_JOB_ID) != null) {
+          propagationConf.set(LauncherAM.OOZIE_LAUNCHER_JOB_ID, 
System.getProperty(LauncherAM.OOZIE_LAUNCHER_JOB_ID));
+      }
+
+      // loading action conf prepared by Oozie
+      Configuration actionConf = LauncherMain.loadActionConf();
+
+      if (actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) != null) {
+          propagationConf.set(MAPREDUCE_JOB_TAGS, 
actionConf.get(CHILD_MAPREDUCE_JOB_TAGS));
+      }
+
+      try (Writer writer = new FileWriter(PROPAGATION_CONF_XML)) {
+        propagationConf.writeXml(writer);
+      }
+
+      Configuration.dumpConfiguration(propagationConf, new 
OutputStreamWriter(System.out));
+      Configuration.addDefaultResource(PROPAGATION_CONF_XML);
+  }
 }
 
 class LauncherMainException extends Exception {

Reply via email to