http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/hcatalog/pom.xml b/sharelib/hcatalog/pom.xml index 6eb88ef..fea277f 100644 --- a/sharelib/hcatalog/pom.xml +++ b/sharelib/hcatalog/pom.xml @@ -297,18 +297,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin>
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/hive/pom.xml b/sharelib/hive/pom.xml index 1331219..7268fa9 100644 --- a/sharelib/hive/pom.xml +++ b/sharelib/hive/pom.xml @@ -122,18 +122,10 @@ <classifier>tests</classifier> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-minicluster</artifactId> </dependency> - <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> @@ -150,8 +142,8 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-hadoop-utils</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> <scope>provided</scope> </dependency> <dependency> @@ -184,18 +176,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java ---------------------------------------------------------------------- diff --git a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java index 6a600fa..1f88c85 100644 --- a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java +++ b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java @@ -233,8 +233,11 @@ public class HiveMain extends LauncherMain { File localDir = new File("dummy").getAbsoluteFile().getParentFile(); System.out.println("Current (local) dir = " + localDir.getAbsolutePath()); System.out.println("------------------------"); - for (String file : localDir.list()) { - System.out.println(" " + file); + String[] files = localDir.list(); + if (files != null) { + for (String file : files) { + System.out.println(" " + file); + } } System.out.println("------------------------"); System.out.println(); @@ -264,7 +267,7 @@ public class HiveMain extends LauncherMain { } // Pass any parameters to Hive via arguments - String[] params = MapReduceMain.getStrings(hiveConf, HiveActionExecutor.HIVE_PARAMS); + String[] params = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_PARAMS); if (params.length > 0) { System.out.println("Parameters:"); System.out.println("------------------------"); @@ -284,7 +287,7 @@ public class HiveMain extends LauncherMain { System.out.println(); } - String[] hiveArgs = MapReduceMain.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS); + String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS); for (String hiveArg : hiveArgs) { if (DISALLOWED_HIVE_OPTIONS.contains(hiveArg)) { throw new RuntimeException("Error: Hive argument " + hiveArg + " is not supported"); @@ -298,7 +301,7 @@ public class HiveMain extends LauncherMain { } System.out.println(); - LauncherMainHadoopUtils.killChildYarnJobs(hiveConf); + LauncherMain.killChildYarnJobs(hiveConf); System.out.println("================================================================="); System.out.println(); @@ -309,13 +312,6 @@ public class HiveMain extends LauncherMain { try { runHive(arguments.toArray(new String[arguments.size()])); } - catch (SecurityException ex) { - if (LauncherSecurityManager.getExitInvoked()) { - if (LauncherSecurityManager.getExitCode() != 0) { - throw ex; - } - } - } finally { System.out.println("\n<<< Invocation of Hive command completed <<<\n"); writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java index 12e1e91..71ee641 100644 --- a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java +++ b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java @@ -22,7 +22,6 @@ import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.io.OutputStreamWriter; -import java.io.StringReader; import java.io.Writer; import java.text.MessageFormat; import java.util.Arrays; @@ -32,22 +31,14 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.ClassUtils; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; -import org.jdom.Element; import org.jdom.Namespace; public class TestHiveActionExecutor extends ActionExecutorTestCase { @@ -163,19 +154,13 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { dataWriter.close(); Context context = createContext(getActionScriptXml()); Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.2"); - final RunningJob launcherJob = submitAction(context, ns); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context, ns); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); HiveActionExecutor ae = new HiveActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); @@ -192,19 +177,13 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { { Context context = createContext(getActionQueryXml(hiveScript)); Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.6"); - final RunningJob launcherJob = submitAction(context, ns); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + final String launcherId = submitAction(context, ns); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); HiveActionExecutor ae = new HiveActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); @@ -220,7 +199,7 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { } } - private RunningJob submitAction(Context context, Namespace ns) throws Exception { + private String submitAction(Context context, Namespace ns) throws Exception { HiveActionExecutor ae = new HiveActionExecutor(); WorkflowAction action = context.getAction(); @@ -234,22 +213,9 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase { assertNotNull(jobId); assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = - new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns)); - conf.set("fs.default.name", e.getChildTextTrim("name-node", ns)); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - String group = jobConf.get("group.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + + return jobId; } private String copyJar(String targetFile, Class<?> anyContainedClass) http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java ---------------------------------------------------------------------- diff --git a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java index 2ba0da7..bbd6246 100644 --- a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java +++ b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java @@ -110,10 +110,10 @@ public class TestHiveMain extends MainTestCase { SharelibUtils.addToDistributedCache("hive", fs, getFsTestCaseDir(), jobConf); jobConf.set(HiveActionExecutor.HIVE_SCRIPT, script.toString()); - MapReduceMain.setStrings(jobConf, HiveActionExecutor.HIVE_PARAMS, new String[]{ + ActionUtils.setStrings(jobConf, HiveActionExecutor.HIVE_PARAMS, new String[]{ "IN=" + inputDir.toUri().getPath(), "OUT=" + outputDir.toUri().getPath()}); - MapReduceMain.setStrings(jobConf, HiveActionExecutor.HIVE_ARGS, + ActionUtils.setStrings(jobConf, HiveActionExecutor.HIVE_ARGS, new String[]{ "-v" }); File actionXml = new File(getTestCaseDir(), "action.xml"); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/hive2/pom.xml b/sharelib/hive2/pom.xml index e81bfbe..be51cd1 100644 --- a/sharelib/hive2/pom.xml +++ b/sharelib/hive2/pom.xml @@ -171,11 +171,7 @@ <classifier>tests</classifier> <scope>test</scope> </dependency> - <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-hadoop-utils</artifactId> - <scope>provided</scope> - </dependency> + <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> @@ -221,18 +217,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java ---------------------------------------------------------------------- diff --git a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java index a3a07bd..e626dbb 100644 --- a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java +++ b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java @@ -152,8 +152,11 @@ public class Hive2Main extends LauncherMain { File localDir = new File("dummy").getAbsoluteFile().getParentFile(); System.out.println("Current (local) dir = " + localDir.getAbsolutePath()); System.out.println("------------------------"); - for (String file : localDir.list()) { - System.out.println(" " + file); + String[] files = localDir.list(); + if (files != null) { + for (String file : files) { + System.out.println(" " + file); + } } System.out.println("------------------------"); System.out.println(); @@ -183,7 +186,7 @@ public class Hive2Main extends LauncherMain { } // Pass any parameters to Beeline via arguments - String[] params = MapReduceMain.getStrings(actionConf, Hive2ActionExecutor.HIVE2_PARAMS); + String[] params = ActionUtils.getStrings(actionConf, Hive2ActionExecutor.HIVE2_PARAMS); if (params.length > 0) { System.out.println("Parameters:"); System.out.println("------------------------"); @@ -208,7 +211,7 @@ public class Hive2Main extends LauncherMain { arguments.add("-a"); arguments.add("delegationToken"); - String[] beelineArgs = MapReduceMain.getStrings(actionConf, Hive2ActionExecutor.HIVE2_ARGS); + String[] beelineArgs = ActionUtils.getStrings(actionConf, Hive2ActionExecutor.HIVE2_ARGS); for (String beelineArg : beelineArgs) { if (DISALLOWED_BEELINE_OPTIONS.contains(beelineArg)) { throw new RuntimeException("Error: Beeline argument " + beelineArg + " is not supported"); @@ -233,7 +236,7 @@ public class Hive2Main extends LauncherMain { } System.out.println(); - LauncherMainHadoopUtils.killChildYarnJobs(actionConf); + LauncherMain.killChildYarnJobs(actionConf); System.out.println("================================================================="); System.out.println(); @@ -244,13 +247,6 @@ public class Hive2Main extends LauncherMain { try { runBeeline(arguments.toArray(new String[arguments.size()]), logFile); } - catch (SecurityException ex) { - if (LauncherSecurityManager.getExitInvoked()) { - if (LauncherSecurityManager.getExitCode() != 0) { - throw ex; - } - } - } finally { System.out.println("\n<<< Invocation of Beeline command completed <<<\n"); writeExternalChildIDs(logFile, HIVE2_JOB_IDS_PATTERNS, "Beeline"); @@ -269,6 +265,7 @@ public class Hive2Main extends LauncherMain { BeeLine beeLine = new BeeLine(); beeLine.setErrorStream(new PrintStream(new TeeOutputStream(System.err, new FileOutputStream(logFile)))); int status = beeLine.begin(args, null); + beeLine.close(); if (status != 0) { System.exit(status); } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java ---------------------------------------------------------------------- diff --git a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java index 4818bb6..2127eb0 100644 --- a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java +++ b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java @@ -19,7 +19,6 @@ package org.apache.oozie.action.hadoop; import java.io.OutputStreamWriter; -import java.io.StringReader; import java.io.Writer; import java.text.MessageFormat; import java.util.ArrayList; @@ -29,15 +28,9 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; import org.apache.oozie.service.WorkflowAppService; import org.apache.oozie.util.XConfiguration; import org.apache.oozie.util.XmlUtils; @@ -69,10 +62,9 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { setSystemProperty("oozie.service.ActionService.executor.classes", Hive2ActionExecutor.class.getName()); } - @SuppressWarnings("unchecked") public void testSetupMethodsForScript() throws Exception { Hive2ActionExecutor ae = new Hive2ActionExecutor(); - List<Class> classes = new ArrayList<Class>(); + List<Class<?>> classes = new ArrayList<>(); classes.add(Hive2Main.class); assertEquals(classes, ae.getLauncherClasses()); @@ -110,10 +102,9 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { assertEquals("--dee", conf.get("oozie.hive2.args.1")); } - @SuppressWarnings("unchecked") public void testSetupMethodsForQuery() throws Exception { Hive2ActionExecutor ae = new Hive2ActionExecutor(); - List<Class> classes = new ArrayList<Class>(); + List<Class<?>> classes = new ArrayList<>(); classes.add(Hive2Main.class); assertEquals(classes, ae.getLauncherClasses()); @@ -192,7 +183,6 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { "<query>" + query + "</query>" + "</hive2>"; } - @SuppressWarnings("deprecation") public void testHive2Action() throws Exception { setupHiveServer2(); Path inputDir = new Path(getFsTestCaseDir(), INPUT_DIRNAME); @@ -205,21 +195,14 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { dataWriter.write(SAMPLE_DATA_TEXT); dataWriter.close(); Context context = createContext(getQueryActionXml(query)); - final RunningJob launcherJob = submitAction(context, + final String launcherId = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.2")); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); Hive2ActionExecutor ae = new Hive2ActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); @@ -241,21 +224,14 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { dataWriter.write(SAMPLE_DATA_TEXT); dataWriter.close(); Context context = createContext(getScriptActionXml()); - final RunningJob launcherJob = submitAction(context, + final String launcherId = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.1")); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); Configuration conf = new XConfiguration(); conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); + assertFalse(LauncherHelper.hasIdSwap(actionData)); Hive2ActionExecutor ae = new Hive2ActionExecutor(); ae.check(context, context.getAction()); assertTrue(launcherId.equals(context.getAction().getExternalId())); @@ -267,35 +243,33 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { assertTrue(fs.exists(outputDir)); assertTrue(fs.isDirectory(outputDir)); } - // Negative testcase with incorrect hive-query. - { - String query = getHive2BadScript(inputDir.toString(), outputDir.toString()); - Writer dataWriter = new OutputStreamWriter(fs.create(new Path(inputDir, DATA_FILENAME))); - dataWriter.write(SAMPLE_DATA_TEXT); - dataWriter.close(); - Context context = createContext(getQueryActionXml(query)); - final RunningJob launcherJob = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.2")); - String launcherId = context.getAction().getExternalId(); - waitFor(200 * 1000, new Predicate() { - @Override - public boolean evaluate() throws Exception { - return launcherJob.isComplete(); - } - }); - assertTrue(launcherJob.isSuccessful()); - Configuration conf = new XConfiguration(); - conf.set("user.name", getTestUser()); - Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(), - conf); - assertFalse(LauncherMapperHelper.hasIdSwap(actionData)); - Hive2ActionExecutor ae = new Hive2ActionExecutor(); - ae.check(context, context.getAction()); - assertTrue(launcherId.equals(context.getAction().getExternalId())); - assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); - ae.end(context, context.getAction()); - assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); - assertNull(context.getExternalChildIDs()); - } + } + + public void testHive2ActionFails() throws Exception { + setupHiveServer2(); + Path inputDir = new Path(getFsTestCaseDir(), INPUT_DIRNAME); + Path outputDir = new Path(getFsTestCaseDir(), OUTPUT_DIRNAME); + FileSystem fs = getFileSystem(); + + String query = getHive2BadScript(inputDir.toString(), outputDir.toString()); + Writer dataWriter = new OutputStreamWriter(fs.create(new Path(inputDir, DATA_FILENAME))); + dataWriter.write(SAMPLE_DATA_TEXT); + dataWriter.close(); + Context context = createContext(getQueryActionXml(query)); + final String launcherId = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.2")); + waitUntilYarnAppDoneAndAssertSuccess(launcherId); + Configuration conf = new XConfiguration(); + conf.set("user.name", getTestUser()); + Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(), + conf); + assertFalse(LauncherHelper.hasIdSwap(actionData)); + Hive2ActionExecutor ae = new Hive2ActionExecutor(); + ae.check(context, context.getAction()); + assertTrue(launcherId.equals(context.getAction().getExternalId())); + assertEquals("FAILED/KILLED", context.getAction().getExternalStatus()); + ae.end(context, context.getAction()); + assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus()); + assertNull(context.getExternalChildIDs()); } private String getHive2BadScript(String inputPath, String outputPath) { @@ -311,7 +285,7 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { return buffer.toString(); } - private RunningJob submitAction(Context context, Namespace ns) throws Exception { + private String submitAction(Context context, Namespace ns) throws Exception { Hive2ActionExecutor ae = new Hive2ActionExecutor(); WorkflowAction action = context.getAction(); @@ -325,21 +299,7 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase { assertNotNull(jobId); assertNotNull(jobTracker); assertNotNull(consoleUrl); - Element e = XmlUtils.parseXml(action.getConf()); - XConfiguration conf = - new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString())); - conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns)); - conf.set("fs.default.name", e.getChildTextTrim("name-node", ns)); - conf.set("user.name", context.getProtoActionConf().get("user.name")); - conf.set("group.name", getTestGroup()); - - JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker); - XConfiguration.copy(conf, jobConf); - String user = jobConf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); - final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId)); - assertNotNull(runningJob); - return runningJob; + return jobId; } private Context createContext(String actionXml) throws Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/oozie/pom.xml b/sharelib/oozie/pom.xml index f3ea071..12f5cdd 100644 --- a/sharelib/oozie/pom.xml +++ b/sharelib/oozie/pom.xml @@ -61,17 +61,17 @@ <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <scope>test</scope> </dependency> - - <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-hadoop-utils</artifactId> - <scope>compile</scope> - </dependency> </dependencies> <build> @@ -97,18 +97,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java new file mode 100644 index 0000000..e6c9d04 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeReport; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; + +// Note: methods which modify/read the state of errorHolder are synchronized to avoid data races when LauncherAM invokes getError() +public class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler { + private ErrorHolder errorHolder; + + @Override + public void onContainersCompleted(List<ContainerStatus> containerStatuses) { + //noop + } + + @Override + public void onContainersAllocated(List<Container> containers) { + //noop + } + + @Override + public synchronized void onShutdownRequest() { + System.out.println("Resource manager requested AM Shutdown"); + errorHolder = new ErrorHolder(); + errorHolder.setErrorCode(0); + errorHolder.setErrorMessage("ResourceManager requested AM Shutdown"); + } + + @Override + public void onNodesUpdated(List<NodeReport> nodeReports) { + //noop + } + + @Override + public float getProgress() { + return 0.5f; + } + + @Override + public synchronized void onError(final Throwable ex) { + System.out.println("Received asynchronous error"); + ex.printStackTrace(); + errorHolder = new ErrorHolder(); + errorHolder.setErrorCode(0); + errorHolder.setErrorMessage(ex.getMessage()); + errorHolder.setErrorCause(ex); + } + + public synchronized ErrorHolder getError() { + return errorHolder; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java new file mode 100644 index 0000000..b4cbb4b --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.yarn.client.api.AMRMClient; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; + +public class AMRMClientAsyncFactory { + + public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs) { + AMRMClient<?> amRmClient = AMRMClient.createAMRMClient(); + AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler(); + AMRMClientAsync<?> amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, intervalMs, callBackHandler); + + return amRmClientAsync; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java new file mode 100644 index 0000000..3002ad5 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; + +public final class ActionUtils { + + private ActionUtils() { + // no instances + } + + public static void setStrings(Configuration conf, String key, String[] values) { + if (values != null) { + conf.setInt(key + ".size", values.length); + for (int i = 0; i < values.length; i++) { + conf.set(key + "." + i, values[i]); + } + } + } + + public static String[] getStrings(Configuration conf, String key) { + String[] values = new String[conf.getInt(key + ".size", 0)]; + for (int i = 0; i < values.length; i++) { + values[i] = conf.get(key + "." + i); + if (values[i] == null) { + values[i] = ""; + } + } + return values; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java new file mode 100644 index 0000000..6a755db --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +public class ErrorHolder { + private int errorCode = 0; + private Throwable errorCause = null; + private String errorMessage = null; + private boolean populated = false; + + public int getErrorCode() { + return errorCode; + } + + public void setErrorCode(int errorCode) { + this.errorCode = errorCode; + this.populated = true; + } + + public Throwable getErrorCause() { + return errorCause; + } + + public void setErrorCause(Throwable errorCause) { + this.errorCause = errorCause; + this.populated = true; + } + + public String getErrorMessage() { + return errorMessage; + } + + public void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + this.populated = true; + } + + public boolean isPopulated() { + return populated; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java new file mode 100644 index 0000000..874d371 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; + +import com.google.common.base.Preconditions; + +public class HdfsOperations { + private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; + private final SequenceFileWriterFactory seqFileWriterFactory; + private final UserGroupInformation ugi; + + public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory, UserGroupInformation ugi) { + this.seqFileWriterFactory = Preconditions.checkNotNull(seqFileWriterFactory, "seqFileWriterFactory should not be null"); + this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null"); + } + + /** + * Creates a Sequence file which contains the output from an action and uploads it to HDFS. + */ + public void uploadActionDataToHDFS(final Configuration launcherJobConf, final Path actionDir, + final Map<String, String> actionData) throws IOException, InterruptedException { + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE); + // upload into sequence file + System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri()); + + try (SequenceFile.Writer wr = + seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) { + + if (wr != null) { + for (Entry<String, String> entry : actionData.entrySet()) { + wr.append(new Text(entry.getKey()), new Text(entry.getValue())); + } + } else { + throw new IOException("SequenceFile.Writer is null for " + finalPath); + } + } + + return null; + } + }); + } + + public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException { + return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + @Override + public Boolean run() throws Exception { + FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf); + return fs.exists(path); + } + }); + } + + public void writeStringToFile(final Path path, final Configuration conf, final String contents) + throws IOException, InterruptedException { + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + try (FileSystem fs = FileSystem.get(path.toUri(), conf); + java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) { + writer.write(contents); + } + + return null; + } + }); + } + + public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException { + return ugi.doAs(new PrivilegedExceptionAction<String>() { + @Override + public String run() throws Exception { + StringBuilder sb = new StringBuilder(); + + try (FileSystem fs = FileSystem.get(path.toUri(), conf); + InputStream is = fs.open(path); + BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) { + + String contents; + while ((contents = reader.readLine()) != null) { + sb.append(contents); + } + } + + return sb.toString(); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java index 30d68e2..c3e3d3f 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java @@ -44,11 +44,11 @@ public class JavaMain extends LauncherMain { setApplicationTags(actionConf, TEZ_APPLICATION_TAGS); setApplicationTags(actionConf, SPARK_YARN_TAGS); - LauncherMainHadoopUtils.killChildYarnJobs(actionConf); + LauncherMain.killChildYarnJobs(actionConf); Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class); - System.out.println("Main class : " + klass.getName()); - LauncherMapper.printArgs("Arguments :", args); + System.out.println("Java action main class : " + klass.getName()); + printArgs("Java action arguments :", args); System.out.println(); Method mainMethod = klass.getMethod("main", String[].class); try { @@ -60,4 +60,13 @@ public class JavaMain extends LauncherMain { } + /** + * Used by JavaMain to wrap a Throwable when an Exception occurs + */ + @SuppressWarnings("serial") + static class JavaMainException extends Exception { + public JavaMainException(Throwable t) { + super(t); + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java new file mode 100644 index 0000000..4f252d1 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java @@ -0,0 +1,614 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.Permission; +import java.security.PrivilegedExceptionAction; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.StringTokenizer; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Objects; +import com.google.common.base.Preconditions; + +public class LauncherAM { + private static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml"; + private static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id"; + + public static final String JAVA_CLASS_PATH = "java.class.path"; + public static final String OOZIE_ACTION_ID = "oozie.action.id"; + public static final String OOZIE_JOB_ID = "oozie.job.id"; + public static final String ACTION_PREFIX = "oozie.action."; + static final String OOZIE_ACTION_RECOVERY_ID = ACTION_PREFIX + "recovery.id"; + public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = ACTION_PREFIX + "max.output.data"; + public static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg."; + public static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count"; + public static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size"; + public static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path"; + public static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml"; + public static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE + public static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs"; + public static final String ACTION_DATA_OUTPUT_PROPS = "output.properties"; + public static final String ACTION_DATA_STATS = "stats.properties"; + public static final String ACTION_DATA_NEW_ID = "newId"; + public static final String ACTION_DATA_ERROR_PROPS = "error.properties"; + public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; + + // TODO: OYA: more unique file names? action.xml may be stuck for backwards compat though + public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml"; + public static final String ACTION_CONF_XML = "action.xml"; + public static final String ACTION_DATA_FINAL_STATUS = "final.status"; + + private final UserGroupInformation ugi; + private final AMRMCallBackHandler callbackHandler; + private final AMRMClientAsyncFactory amRmClientAsyncFactory; + private final HdfsOperations hdfsOperations; + private final LocalFsOperations localFsOperations; + private final PrepareActionsHandler prepareHandler; + private final LauncherAMCallbackNotifierFactory callbackNotifierFactory; + private final LauncherSecurityManager launcherSecurityManager; + private final ContainerId containerId; + + private Configuration launcherJobConf; + private AMRMClientAsync<?> amRmClientAsync; + private Path actionDir; + private Map<String, String> actionData = new HashMap<String,String>(); + + public LauncherAM(UserGroupInformation ugi, + AMRMClientAsyncFactory amRmClientAsyncFactory, + AMRMCallBackHandler callbackHandler, + HdfsOperations hdfsOperations, + LocalFsOperations localFsOperations, + PrepareActionsHandler prepareHandler, + LauncherAMCallbackNotifierFactory callbackNotifierFactory, + LauncherSecurityManager launcherSecurityManager, + String containerId) { + this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null"); + this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory, + "amRmClientAsyncFactory should not be null"); + this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null"); + this.hdfsOperations = Preconditions.checkNotNull(hdfsOperations, "hdfsOperations should not be null"); + this.localFsOperations = Preconditions.checkNotNull(localFsOperations, "localFsOperations should not be null"); + this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null"); + this.callbackNotifierFactory = Preconditions.checkNotNull(callbackNotifierFactory, + "callbackNotifierFactory should not be null"); + this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager, + "launcherSecurityManager should not be null"); + this.containerId = ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId should not be null")); + } + + public static void main(String[] args) throws Exception { + UserGroupInformation ugi = null; + String submitterUser = System.getProperty("submitter.user", "").trim(); + Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user is undefined"); + System.out.println("Submitter user is: " + submitterUser); + + // We don't need remote/proxy user if the current login user is the workflow submitter + // Otherwise we have to create a remote user + if (UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) { + System.out.println("Using login user for UGI"); + ugi = UserGroupInformation.getLoginUser(); + } else { + ugi = UserGroupInformation.createRemoteUser(submitterUser); + ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials()); + } + + AMRMClientAsyncFactory amRmClientAsyncFactory = new AMRMClientAsyncFactory(); + AMRMCallBackHandler callbackHandler = new AMRMCallBackHandler(); + HdfsOperations hdfsOperations = new HdfsOperations(new SequenceFileWriterFactory(), ugi); + LocalFsOperations localFSOperations = new LocalFsOperations(); + PrepareActionsHandler prepareHandler = new PrepareActionsHandler(); + LauncherAMCallbackNotifierFactory callbackNotifierFactory = new LauncherAMCallbackNotifierFactory(); + LauncherSecurityManager launcherSecurityManager = new LauncherSecurityManager(); + + LauncherAM launcher = new LauncherAM(ugi, + amRmClientAsyncFactory, + callbackHandler, + hdfsOperations, + localFSOperations, + prepareHandler, + callbackNotifierFactory, + launcherSecurityManager, + System.getenv("CONTAINER_ID")); + + launcher.run(); + } + + public void run() throws Exception { + final ErrorHolder errorHolder = new ErrorHolder(); + OozieActionResult actionResult = OozieActionResult.FAILED; + boolean launcherExecutedProperly = false; + boolean backgroundAction = false; + + try { + try { + launcherJobConf = localFsOperations.readLauncherConf(); + System.out.println("Launcher AM configuration loaded"); + } catch (Exception ex) { + errorHolder.setErrorMessage("Could not load the Launcher AM configuration file"); + errorHolder.setErrorCause(ex); + throw ex; + } + actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH)); + + registerWithRM(); + executePrepare(ugi, errorHolder); + final String[] mainArgs = getMainArguments(launcherJobConf); + printDebugInfo(); + setupMainConfiguration(); + launcherExecutedProperly = runActionMain(mainArgs, errorHolder, ugi); + + if (launcherExecutedProperly) { + handleActionData(); + if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) { + System.out.println(); + System.out.println("Oozie Launcher, capturing output data:"); + System.out.println("======================="); + System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS)); + System.out.println(); + System.out.println("======================="); + System.out.println(); + } + if (actionData.get(ACTION_DATA_NEW_ID) != null) { + System.out.println(); + System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie"); + System.out.println("======================="); + System.out.println(actionData.get(ACTION_DATA_NEW_ID)); + System.out.println("======================="); + System.out.println(); + backgroundAction = true; + } + } + } catch (Exception e) { + System.out.println("Launcher AM execution failed"); + System.err.println("Launcher AM execution failed"); + e.printStackTrace(System.out); + e.printStackTrace(System.err); + launcherExecutedProperly = false; + if (!errorHolder.isPopulated()) { + errorHolder.setErrorCause(e); + errorHolder.setErrorMessage(e.getMessage()); + } + throw e; + } finally { + try { + ErrorHolder callbackErrorHolder = callbackHandler.getError(); + + if (launcherExecutedProperly) { + actionResult = backgroundAction ? OozieActionResult.RUNNING : OozieActionResult.SUCCEEDED; + } + + if (!launcherExecutedProperly) { + updateActionDataWithFailure(errorHolder, actionData); + } else if (callbackErrorHolder != null) { // async error from the callback + actionResult = OozieActionResult.FAILED; + updateActionDataWithFailure(callbackErrorHolder, actionData); + } + + actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString()); + hdfsOperations.uploadActionDataToHDFS(launcherJobConf, actionDir, actionData); + } finally { + try { + unregisterWithRM(actionResult, errorHolder.getErrorMessage()); + } finally { + LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherJobConf); + cn.notifyURL(actionResult); + } + } + } + } + + @VisibleForTesting + Map<String, String> getActionData() { + return actionData; + } + + private void printDebugInfo() throws IOException { + localFsOperations.printContentsOfDir(new File(".")); + + System.out.println(); + System.out.println("Oozie Launcher Application Master configuration"); + System.out.println("==============================================="); + System.out.println("Workflow job id : " + launcherJobConf.get(OOZIE_JOB_ID)); + System.out.println("Workflow action id: " + launcherJobConf.get(OOZIE_ACTION_ID)); + System.out.println(); + System.out.println("Classpath :"); + System.out.println("------------------------"); + StringTokenizer st = new StringTokenizer(System.getProperty(JAVA_CLASS_PATH), ":"); + while (st.hasMoreTokens()) { + System.out.println(" " + st.nextToken()); + } + System.out.println("------------------------"); + System.out.println(); + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + System.out.println("Main class : " + mainClass); + System.out.println(); + System.out.println("Maximum output : " + + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024)); + System.out.println(); + + System.out.println(); + System.out.println("Java System Properties:"); + System.out.println("------------------------"); + System.getProperties().store(System.out, ""); + System.out.println("------------------------"); + System.out.println(); + + System.out.println("Environment variables"); + Map<String, String> env = System.getenv(); + System.out.println("------------------------"); + for (Map.Entry<String, String> entry : env.entrySet()) { + System.out.println(entry.getKey() + "=" + entry.getValue()); + } + System.out.println("------------------------"); + System.out.println("================================================================="); + System.out.println(); + System.out.println(">>> Invoking Main class now >>>"); + System.out.println(); + } + + private void registerWithRM() throws IOException, YarnException { + // TODO: OYA: make heartbeat interval configurable & make interval higher to put less load on RM, but lower than timeout + amRmClientAsync = amRmClientAsyncFactory.createAMRMClientAsync(60000); + amRmClientAsync.init(new Configuration(launcherJobConf)); + amRmClientAsync.start(); + + // hostname and tracking url are determined automatically + amRmClientAsync.registerApplicationMaster("", 0, ""); + } + + private void unregisterWithRM(OozieActionResult actionResult, String message) throws YarnException, IOException { + if (amRmClientAsync != null) { + System.out.println("Stopping AM"); + try { + message = (message == null) ? "" : message; + // tracking url is determined automatically + amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), message, ""); + } catch (Exception ex) { + System.out.println("Error un-registering AM client"); + throw ex; + } finally { + amRmClientAsync.stop(); + } + } + } + + // Method to execute the prepare actions + private void executePrepare(UserGroupInformation ugi, ErrorHolder errorHolder) throws Exception { + try { + System.out.println("\nStarting the execution of prepare actions"); + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML); + if (prepareXML != null) { + if (prepareXML.length() != 0) { + Configuration actionConf = new Configuration(launcherJobConf); + actionConf.addResource(ACTION_CONF_XML); + prepareHandler.prepareAction(prepareXML, actionConf); + } else { + System.out.println("There are no prepare actions to execute."); + } + } + return null; + } + }); + System.out.println("Completed the execution of prepare actions successfully"); + } catch (Exception ex) { + errorHolder.setErrorMessage("Prepare execution in the Launcher AM has failed"); + errorHolder.setErrorCause(ex); + throw ex; + } + } + + private void setupMainConfiguration() throws IOException { + System.setProperty(OOZIE_LAUNCHER_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); + System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID)); + System.setProperty(OOZIE_ACTION_ID, launcherJobConf.get(OOZIE_ACTION_ID)); + System.setProperty(OOZIE_ACTION_CONF_XML, new File(ACTION_CONF_XML).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, + new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_STATS, new File(ACTION_DATA_STATS).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath()); + System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath()); + + System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis())); + } + + private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) throws Exception { + // using AtomicBoolean because we want to modify it inside run() + final AtomicBoolean actionMainExecutedProperly = new AtomicBoolean(false); + + ugi.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + try { + setRecoveryId(); + Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null); + Preconditions.checkNotNull(klass, "Launcher class should not be null"); + System.out.println("Launcher class: " + klass.toString()); + Method mainMethod = klass.getMethod("main", String[].class); + // Enable LauncherSecurityManager to catch System.exit calls + launcherSecurityManager.enable(); + mainMethod.invoke(null, (Object) mainArgs); + + System.out.println(); + System.out.println("<<< Invocation of Main class completed <<<"); + System.out.println(); + actionMainExecutedProperly.set(true); + } catch (InvocationTargetException ex) { + ex.printStackTrace(System.out); + // Get what actually caused the exception + Throwable cause = ex.getCause(); + // If we got a JavaMainException from JavaMain, then we need to unwrap it + if (JavaMain.JavaMainException.class.isInstance(cause)) { + cause = cause.getCause(); + } + if (LauncherMainException.class.isInstance(cause)) { + int errorCode = ((LauncherMainException) ex.getCause()).getErrorCode(); + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + + errorCode + "]"); + eHolder.setErrorCode(errorCode); + } else if (SecurityException.class.isInstance(cause)) { + if (launcherSecurityManager.getExitInvoked()) { + final int exitCode = launcherSecurityManager.getExitCode(); + System.out.println("Intercepting System.exit(" + exitCode + ")"); + // if 0 main() method finished successfully + // ignoring + eHolder.setErrorCode(exitCode); + if (exitCode != 0) { + String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS); + eHolder.setErrorMessage("Main Class [" + mainClass + "]," + + " exit code [" + eHolder.getErrorCode() + "]"); + } else { + actionMainExecutedProperly.set(true); + } + } else { + // just SecurityException, no exit was invoked + eHolder.setErrorCode(0); + eHolder.setErrorCause(cause); + eHolder.setErrorMessage(cause.getMessage()); + } + } else { + eHolder.setErrorMessage(cause.getMessage()); + eHolder.setErrorCause(cause); + } + } catch (Throwable t) { + t.printStackTrace(); + eHolder.setErrorMessage(t.getMessage()); + eHolder.setErrorCause(t); + } finally { + // Disable LauncherSecurityManager + launcherSecurityManager.disable(); + } + + return null; + } + }); + + return actionMainExecutedProperly.get(); + } + + private void setRecoveryId() throws LauncherException { + try { + ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId(); + String applicationIdStr = applicationId.toString(); + + String recoveryId = Preconditions.checkNotNull(launcherJobConf.get(OOZIE_ACTION_RECOVERY_ID), + "RecoveryID should not be null"); + + Path path = new Path(actionDir, recoveryId); + if (!hdfsOperations.fileExists(path, launcherJobConf)) { + hdfsOperations.writeStringToFile(path, launcherJobConf, applicationIdStr); + } else { + String id = hdfsOperations.readFileContents(path, launcherJobConf); + + if (!applicationIdStr.equals(id)) { + throw new LauncherException(MessageFormat.format( + "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id, + applicationIdStr)); + } + } + } catch (RuntimeException | InterruptedException | IOException ex) { + throw new LauncherException("IO error", ex); + } + } + + private void handleActionData() throws IOException { + // external child IDs + processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, null, + ACTION_DATA_EXTERNAL_CHILD_IDS, -1); + + // external stats + processActionData(ACTION_PREFIX + ACTION_DATA_STATS, CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, + ACTION_DATA_STATS, Integer.MAX_VALUE); + + // output data + processActionData(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, + ACTION_DATA_OUTPUT_PROPS, 2048); + + // id swap + processActionData(ACTION_PREFIX + ACTION_DATA_NEW_ID, null, + ACTION_DATA_NEW_ID, -1); + } + + private void processActionData(String propertyName, String maxSizePropertyName, String actionDataPropertyName, + int maxSizeDefault) throws IOException { + String propValue = System.getProperty(propertyName); + int maxSize = maxSizeDefault; + + if (maxSizePropertyName != null) { + maxSize = launcherJobConf.getInt(maxSizePropertyName, maxSizeDefault); + } + + if (propValue != null) { + File actionDataFile = new File(propValue); + if (localFsOperations.fileExists(actionDataFile)) { + actionData.put(actionDataPropertyName, localFsOperations.getLocalFileContentAsString(actionDataFile, + actionDataPropertyName, maxSize)); + } + } + } + + private void updateActionDataWithFailure(ErrorHolder eHolder, Map<String, String> actionData) { + if (eHolder.getErrorCause() != null && eHolder.getErrorCause().getMessage() != null) { + if (Objects.equal(eHolder.getErrorMessage(), eHolder.getErrorCause().getMessage())) { + eHolder.setErrorMessage(eHolder.getErrorMessage()); + } else { + eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + eHolder.getErrorCause().getMessage()); + } + } + + Properties errorProps = new Properties(); + errorProps.setProperty("error.code", Integer.toString(eHolder.getErrorCode())); + String errorMessage = eHolder.getErrorMessage() == null ? "<empty>" : eHolder.getErrorMessage(); + errorProps.setProperty("error.reason", errorMessage); + if (eHolder.getErrorCause() != null) { + if (eHolder.getErrorCause().getMessage() != null) { + errorProps.setProperty("exception.message", eHolder.getErrorCause().getMessage()); + } + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + eHolder.getErrorCause().printStackTrace(pw); + pw.close(); + errorProps.setProperty("exception.stacktrace", sw.toString()); + } + + StringWriter sw = new StringWriter(); + try { + errorProps.store(sw, ""); + sw.close(); + actionData.put(LauncherAM.ACTION_DATA_ERROR_PROPS, sw.toString()); + + // external child IDs + String externalChildIdsProp = System.getProperty(LauncherAM.ACTION_PREFIX + LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS); + if (externalChildIdsProp != null) { + File externalChildIDs = new File(externalChildIdsProp); + if (localFsOperations.fileExists(externalChildIDs)) { + actionData.put(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS, + localFsOperations.getLocalFileContentAsString(externalChildIDs, ACTION_DATA_EXTERNAL_CHILD_IDS, -1)); + } + } + } catch (IOException ioe) { + System.out.println("A problem occured trying to fail the launcher"); + ioe.printStackTrace(); + } finally { + System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n"); + if (eHolder.getErrorCause() != null) { + eHolder.getErrorCause().printStackTrace(System.out); + } + } + } + + private String[] getMainArguments(Configuration conf) { + return LauncherMapper.getMainArguments(conf); + } + + public static class LauncherSecurityManager extends SecurityManager { + private boolean exitInvoked; + private int exitCode; + private SecurityManager originalSecurityManager; + + public LauncherSecurityManager() { + exitInvoked = false; + exitCode = 0; + originalSecurityManager = System.getSecurityManager(); + } + + @Override + public void checkPermission(Permission perm, Object context) { + if (originalSecurityManager != null) { + // check everything with the original SecurityManager + originalSecurityManager.checkPermission(perm, context); + } + } + + @Override + public void checkPermission(Permission perm) { + if (originalSecurityManager != null) { + // check everything with the original SecurityManager + originalSecurityManager.checkPermission(perm); + } + } + + @Override + public void checkExit(int status) throws SecurityException { + exitInvoked = true; + exitCode = status; + throw new SecurityException("Intercepted System.exit(" + status + ")"); + } + + public boolean getExitInvoked() { + return exitInvoked; + } + + public int getExitCode() { + return exitCode; + } + + public void enable() { + if (System.getSecurityManager() != this) { + System.setSecurityManager(this); + } + } + + public void disable() { + if (System.getSecurityManager() == this) { + System.setSecurityManager(originalSecurityManager); + } + } + } + + public enum OozieActionResult { + SUCCEEDED(FinalApplicationStatus.SUCCEEDED), + FAILED(FinalApplicationStatus.FAILED), + RUNNING(FinalApplicationStatus.SUCCEEDED); + + // YARN-equivalent status + private FinalApplicationStatus yarnStatus; + + OozieActionResult(FinalApplicationStatus yarnStatus) { + this.yarnStatus = yarnStatus; + } + + public FinalApplicationStatus getYarnStatus() { + return yarnStatus; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java new file mode 100644 index 0000000..2972658 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.Proxy; +import java.net.URL; + +// Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier +/** + * This call sends back an HTTP GET callback to the configured URL. It is meant for the {@link LauncherAM} to notify the + * Oozie Server that it has finished. + */ +public class LauncherAMCallbackNotifier { + private static final String OOZIE_LAUNCHER_CALLBACK = "oozie.launcher.callback."; + private static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000; + + public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "retry.attempts"; + public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = OOZIE_LAUNCHER_CALLBACK + "retry.interval"; + public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "max.attempts"; + public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = OOZIE_LAUNCHER_CALLBACK + "timeout"; + public static final String OOZIE_LAUNCHER_CALLBACK_URL = OOZIE_LAUNCHER_CALLBACK + "url"; + public static final String OOZIE_LAUNCHER_CALLBACK_PROXY = OOZIE_LAUNCHER_CALLBACK + "proxy"; + public static final String OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN = "$jobStatus"; + + protected String userUrl; + protected String proxyConf; + protected int numTries; //Number of tries to attempt notification + protected int waitInterval; //Time (ms) to wait between retrying notification + protected int timeout; // Timeout (ms) on the connection and notification + protected URL urlToNotify; //URL to notify read from the config + protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification + + + /** + * Parse the URL that needs to be notified of the end of the job, along + * with the number of retries in case of failure, the amount of time to + * wait between retries and proxy settings + * @param conf the configuration + */ + public LauncherAMCallbackNotifier(Configuration conf) { + numTries = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 0) + 1, + conf.getInt(OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, 1)); + + waitInterval = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX), + OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX); + waitInterval = (waitInterval < 0) ? OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX : waitInterval; + + timeout = conf.getInt(OOZIE_LAUNCHER_CALLBACK_TIMEOUT, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX); + + userUrl = conf.get(OOZIE_LAUNCHER_CALLBACK_URL); + + proxyConf = conf.get(OOZIE_LAUNCHER_CALLBACK_PROXY); + + //Configure the proxy to use if its set. It should be set like + //proxyType@proxyHostname:port + if(proxyConf != null && !proxyConf.equals("") && + proxyConf.lastIndexOf(":") != -1) { + int typeIndex = proxyConf.indexOf("@"); + Proxy.Type proxyType = Proxy.Type.HTTP; + if(typeIndex != -1 && + proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) { + proxyType = Proxy.Type.SOCKS; + } + String hostname = proxyConf.substring(typeIndex + 1, + proxyConf.lastIndexOf(":")); + String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1); + try { + int port = Integer.parseInt(portConf); + proxyToUse = new Proxy(proxyType, + new InetSocketAddress(hostname, port)); + System.out.println("Callback notification using proxy type \"" + proxyType + + "\" hostname \"" + hostname + "\" and port \"" + port + "\""); + } catch(NumberFormatException nfe) { + System.err.println("Callback notification couldn't parse configured proxy's port " + + portConf + ". Not going to use a proxy"); + } + } + + } + + /** + * Notify the URL just once. Use best effort. + */ + protected boolean notifyURLOnce() { + boolean success = false; + HttpURLConnection conn = null; + try { + System.out.println("Callback notification trying " + urlToNotify); + conn = (HttpURLConnection) urlToNotify.openConnection(proxyToUse); + conn.setConnectTimeout(timeout); + conn.setReadTimeout(timeout); + conn.setAllowUserInteraction(false); + if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) { + System.err.println("Callback notification to " + urlToNotify +" failed with code: " + + conn.getResponseCode() + " and message \"" + conn.getResponseMessage() + +"\""); + } + else { + success = true; + System.out.println("Callback notification to " + urlToNotify + " succeeded"); + } + } catch(IOException ioe) { + System.err.println("Callback notification to " + urlToNotify + " failed"); + ioe.printStackTrace(); + } finally { + if (conn != null) { + conn.disconnect(); + } + } + return success; + } + + /** + * Notify a server of the completion of a submitted job. + * @param actionResult The Action Result (failed/succeeded/running) + * + * @throws InterruptedException + */ + public void notifyURL(OozieActionResult actionResult) throws InterruptedException { + // Do we need job-end notification? + if (userUrl == null) { + System.out.println("Callback notification URL not set, skipping."); + return; + } + + //Do string replacements for final status + if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) { + userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, actionResult.toString()); + } + + // Create the URL, ensure sanity + try { + urlToNotify = new URL(userUrl); + } catch (MalformedURLException mue) { + System.err.println("Callback notification couldn't parse " + userUrl); + mue.printStackTrace(); + return; + } + + // Send notification + boolean success = false; + while (numTries-- > 0 && !success) { + System.out.println("Callback notification attempts left " + numTries); + success = notifyURLOnce(); + if (!success) { + Thread.sleep(waitInterval); + } + } + if (!success) { + System.err.println("Callback notification failed to notify : " + urlToNotify); + } else { + System.out.println("Callback notification succeeded"); + } + } +}
