OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/fea512cf Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/fea512cf Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/fea512cf Branch: refs/heads/oya Commit: fea512cf66aec92d867e13c200978fd103868ab1 Parents: a37835f Author: Robert Kanter <[email protected]> Authored: Mon Jul 25 18:24:35 2016 -0700 Committer: Robert Kanter <[email protected]> Committed: Mon Jul 25 18:24:35 2016 -0700 ---------------------------------------------------------------------- core/pom.xml | 17 - .../action/hadoop/DistcpActionExecutor.java | 7 +- .../action/hadoop/Hive2ActionExecutor.java | 5 +- .../oozie/action/hadoop/HiveActionExecutor.java | 5 +- .../oozie/action/hadoop/JavaActionExecutor.java | 593 ++++++++--------- .../action/hadoop/LauncherMapperHelper.java | 12 - .../action/hadoop/SparkActionExecutor.java | 5 +- .../action/hadoop/SqoopActionExecutor.java | 9 +- .../oozie/service/HadoopAccessorService.java | 97 ++- .../org/apache/oozie/util/ClasspathUtils.java | 145 +++++ core/src/main/resources/oozie-default.xml | 25 - .../java/org/apache/oozie/QueryServlet.java | 40 ++ .../action/hadoop/TestJavaActionExecutor.java | 531 ++-------------- .../oozie/action/hadoop/TestLauncherAM.java | 46 ++ .../hadoop/TestLauncherAMCallbackNotifier.java | 170 +++++ .../action/hadoop/TestPrepareActionsDriver.java | 15 +- .../action/hadoop/TestShellActionExecutor.java | 21 - .../apache/oozie/command/wf/HangServlet.java | 19 +- .../oozie/service/TestConfigurationService.java | 3 - .../service/TestHadoopAccessorService.java | 121 +++- .../java/org/apache/oozie/test/XTestCase.java | 4 + .../apache/oozie/util/TestClasspathUtils.java | 110 ++++ release-log.txt | 1 + sharelib/distcp/pom.xml | 12 - sharelib/hcatalog/pom.xml | 12 - sharelib/hive/pom.xml | 12 - sharelib/hive2/pom.xml | 12 - sharelib/oozie/pom.xml | 12 - .../apache/oozie/action/hadoop/LauncherAM.java | 636 +++++++++++++++++++ .../hadoop/LauncherAMCallbackNotifier.java | 175 +++++ .../oozie/action/hadoop/LauncherMapper.java | 6 +- .../action/hadoop/PrepareActionsDriver.java | 43 +- sharelib/pig/pom.xml | 12 - sharelib/spark/pom.xml | 12 - sharelib/sqoop/pom.xml | 12 - sharelib/streaming/pom.xml | 33 - 36 files changed, 1899 insertions(+), 1091 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/pom.xml ---------------------------------------------------------------------- diff --git a/core/pom.xml b/core/pom.xml index 6584af8..86feea0 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -453,23 +453,6 @@ </configuration> </plugin> <plugin> - <artifactId>maven-dependency-plugin</artifactId> - <executions> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> - </executions> - </plugin> - <plugin> <groupId>org.apache.openjpa</groupId> <artifactId>openjpa-maven-plugin</artifactId> <executions> http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java index 96726da..99652e8 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java @@ -26,13 +26,10 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.action.ActionExecutorException; -import org.apache.oozie.action.ActionExecutor.Context; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorException; -import org.apache.oozie.service.Services; import org.apache.oozie.util.XLog; import org.jdom.Element; import org.jdom.JDOMException; @@ -126,9 +123,9 @@ public class DistcpActionExecutor extends JavaActionExecutor{ } @Override - protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) + protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { - super.getActionData(actionFs, runningJob, action, context); + super.getActionData(actionFs, action, context); readExternalChildIDs(action, context); } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java index b5b1bf9..9ba6318 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java @@ -28,7 +28,6 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.HadoopAccessorException; @@ -134,9 +133,9 @@ public class Hive2ActionExecutor extends ScriptLanguageActionExecutor { } @Override - protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) + protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { - super.getActionData(actionFs, runningJob, action, context); + super.getActionData(actionFs, action, context); readExternalChildIDs(action, context); } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java index c74e9e6..a850957 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java @@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.XOozieClient; @@ -125,9 +124,9 @@ public class HiveActionExecutor extends ScriptLanguageActionExecutor { } @Override - protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) + protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { - super.getActionData(actionFs, runningJob, action, context); + super.getActionData(actionFs, action, context); readExternalChildIDs(action, context); } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 99e3344..d573fc3 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -18,42 +18,38 @@ package org.apache.oozie.action.hadoop; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.StringReader; -import java.net.ConnectException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.text.MessageFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Properties; -import java.util.Set; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - +import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AccessControlException; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; +import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.oozie.WorkflowActionBean; import org.apache.oozie.WorkflowJobBean; import org.apache.oozie.action.ActionExecutor; @@ -69,18 +65,41 @@ import org.apache.oozie.service.Services; import org.apache.oozie.service.ShareLibService; import org.apache.oozie.service.URIHandlerService; import org.apache.oozie.service.WorkflowAppService; +import org.apache.oozie.util.ClasspathUtils; import org.apache.oozie.util.ELEvaluationException; import org.apache.oozie.util.ELEvaluator; -import org.apache.oozie.util.XLog; -import org.apache.oozie.util.XConfiguration; -import org.apache.oozie.util.XmlUtils; import org.apache.oozie.util.JobUtils; import org.apache.oozie.util.LogUtils; import org.apache.oozie.util.PropertiesUtils; +import org.apache.oozie.util.XConfiguration; +import org.apache.oozie.util.XLog; +import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.JDOMException; import org.jdom.Namespace; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.StringReader; +import java.net.ConnectException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.text.MessageFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + public class JavaActionExecutor extends ActionExecutor { @@ -94,7 +113,6 @@ public class JavaActionExecutor extends ActionExecutor { public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job"; public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job"; public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled"; - public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable"; public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs"; public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb"; public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts"; @@ -117,7 +135,6 @@ public class JavaActionExecutor extends ActionExecutor { protected XLog LOG = XLog.getLog(getClass()); private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])"); private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir="; - public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE; public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader"; public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; @@ -138,10 +155,11 @@ public class JavaActionExecutor extends ActionExecutor { public static List<Class> getCommonLauncherClasses() { List<Class> classes = new ArrayList<Class>(); - classes.add(LauncherMapper.class); classes.add(OozieLauncherInputFormat.class); classes.add(LauncherMain.class); classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher()); + classes.add(LauncherAM.class); + classes.add(LauncherAMCallbackNotifier.class); return classes; } @@ -159,7 +177,7 @@ public class JavaActionExecutor extends ActionExecutor { @Override public void initActionType() { super.initActionType(); - maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA); + maxActionOutputLen = ConfigurationService.getInt(LauncherAM.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA); //Get the limit for the maximum allowed size of action stats maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE); maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize; @@ -257,8 +275,6 @@ public class JavaActionExecutor extends ActionExecutor { } catch (URISyntaxException ex) { throw convertException(ex); } - // Inject use uber mode for launcher - injectLauncherUseUberMode(launcherConf); XConfiguration.copy(launcherConf, conf); checkForDisallowedProps(launcherConf, "launcher configuration"); // Inject config-class for launcher to use for action @@ -273,25 +289,6 @@ public class JavaActionExecutor extends ActionExecutor { } } - void injectLauncherUseUberMode(Configuration launcherConf) { - // Set Uber Mode for the launcher (YARN only, ignored by MR1) - // Priority: - // 1. action's <configuration> - // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable - // 3. oozie.action.launcher.mapreduce.job.ubertask.enable - if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) { - if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) { - if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) { - launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); - } - } else { - if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) { - launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true); - } - } - } - } - void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) { // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service. if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null @@ -303,104 +300,6 @@ public class JavaActionExecutor extends ActionExecutor { } } - void updateConfForUberMode(Configuration launcherConf) { - - // child.env - boolean hasConflictEnv = false; - String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV); - if (launcherMapEnv == null) { - launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV); - } - String amEnv = launcherConf.get(YARN_AM_ENV); - StringBuffer envStr = new StringBuffer(); - HashMap<String, List<String>> amEnvMap = null; - HashMap<String, List<String>> launcherMapEnvMap = null; - if (amEnv != null) { - envStr.append(amEnv); - amEnvMap = populateEnvMap(amEnv); - } - if (launcherMapEnv != null) { - launcherMapEnvMap = populateEnvMap(launcherMapEnv); - if (amEnvMap != null) { - Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator(); - while (envKeyItr.hasNext()) { - String envKey = envKeyItr.next(); - if (amEnvMap.containsKey(envKey)) { - List<String> amValList = amEnvMap.get(envKey); - List<String> launcherValList = launcherMapEnvMap.get(envKey); - Iterator<String> valItr = launcherValList.iterator(); - while (valItr.hasNext()) { - String val = valItr.next(); - if (!amValList.contains(val)) { - hasConflictEnv = true; - break; - } - else { - valItr.remove(); - } - } - if (launcherValList.isEmpty()) { - envKeyItr.remove(); - } - } - } - } - } - if (hasConflictEnv) { - launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false); - } - else { - if (launcherMapEnvMap != null) { - for (String key : launcherMapEnvMap.keySet()) { - List<String> launcherValList = launcherMapEnvMap.get(key); - for (String val : launcherValList) { - if (envStr.length() > 0) { - envStr.append(","); - } - envStr.append(key).append("=").append(val); - } - } - } - - launcherConf.set(YARN_AM_ENV, envStr.toString()); - - // memory.mb - int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536); - int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536); - // YARN_MEMORY_MB_MIN to provide buffer. - // suppose launcher map aggressively use high memory, need some - // headroom for AM - int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN; - // limit to 4096 in case of 32 bit - if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) { - memoryMB = 4096; - } - launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB); - - // We already made mapred.child.java.opts and - // mapreduce.map.java.opts equal, so just start with one of them - String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, ""); - String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS); - StringBuilder optsStr = new StringBuilder(); - int heapSizeForMap = extractHeapSizeMB(launcherMapOpts); - int heapSizeForAm = extractHeapSizeMB(amChildOpts); - int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN; - // limit to 3584 in case of 32 bit - if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) { - heapSize = 3584; - } - if (amChildOpts != null) { - optsStr.append(amChildOpts); - } - optsStr.append(" ").append(launcherMapOpts.trim()); - if (heapSize > 0) { - // append calculated total heap size to the end - optsStr.append(" ").append("-Xmx").append(heapSize).append("m"); - } - launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim()); - } - } - void updateConfForJavaTmpDir(Configuration conf) { String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS); String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp"; @@ -868,7 +767,7 @@ public class JavaActionExecutor extends ActionExecutor { protected String getLauncherMain(Configuration launcherConf, Element actionXml) { - return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName()); + return launcherConf.get(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName()); } private void setJavaMain(Configuration actionConf, Element actionXml) { @@ -1004,15 +903,6 @@ public class JavaActionExecutor extends ActionExecutor { launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim()); launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim()); - // setting for uber mode - if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) { - if (checkPropertiesToDisableUber(launcherJobConf)) { - launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false); - } - else { - updateConfForUberMode(launcherJobConf); - } - } updateConfForJavaTmpDir(launcherJobConf); injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf); @@ -1027,23 +917,9 @@ public class JavaActionExecutor extends ActionExecutor { } } - private boolean checkPropertiesToDisableUber(Configuration launcherConf) { - boolean disable = false; - if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) { - disable = true; - } - else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) { - disable = true; - } - return disable; - } - private void injectCallback(Context context, Configuration conf) { - String callback = context.getCallbackUrl("$jobStatus"); - if (conf.get("job.end.notification.url") != null) { - LOG.warn("Overriding the action job end notification URI"); - } - conf.set("job.end.notification.url", callback); + String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN); + conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback); } void injectActionCallback(Context context, Configuration actionConf) { @@ -1062,7 +938,7 @@ public class JavaActionExecutor extends ActionExecutor { } } - public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException { + public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException { JobClient jobClient = null; boolean exception = false; try { @@ -1119,14 +995,17 @@ public class JavaActionExecutor extends ActionExecutor { } } } - JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); - LOG.debug("Creating Job Client for action " + action.getId()); - jobClient = createJobClient(context, launcherJobConf); - String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context - .getRecoveryId()); - boolean alreadyRunning = launcherId != null; + boolean alreadyRunning = false; + String launcherId = null; + String consoleUrl = null; + // TODO: OYA: equivalent of this? (recovery, alreadyRunning) When does this happen? +// LOG.debug("Creating Job Client for action " + action.getId()); +// jobClient = createJobClient(context, launcherJobConf); +// launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context +// .getRecoveryId()); +// alreadyRunning = launcherId != null; RunningJob runningJob; // if user-retry is on, always submit new launcher @@ -1141,13 +1020,13 @@ public class JavaActionExecutor extends ActionExecutor { } } else { - LOG.debug("Submitting the job through Job Client for action " + action.getId()); - - // setting up propagation of the delegation token. - HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); - Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has - .getMRDelegationTokenRenewer(launcherJobConf)); - launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); + // TODO: OYA: do we actually need an MR token? IIRC, it's issued by the JHS +// // setting up propagation of the delegation token. +// Token<DelegationTokenIdentifier> mrdt = null; +// HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); +// mrdt = jobClient.getDelegationToken(has +// .getMRDelegationTokenRenewer(launcherJobConf)); +// launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); // insert credentials tokens to launcher job conf if needed if (needInjectCredentials() && credentialsConf != null) { @@ -1173,17 +1052,36 @@ public class JavaActionExecutor extends ActionExecutor { else { LOG.info("No need to inject credentials."); } - runningJob = jobClient.submitJob(launcherJobConf); - if (runningJob == null) { - throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", - "Error submitting launcher for action [{0}]", action.getId()); + + YarnClient yarnClient = null; + try { + String user = context.getWorkflow().getUser(); + + // Create application + yarnClient = createYarnClient(context, launcherJobConf); + YarnClientApplication newApp = yarnClient.createApplication(); + ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); + + // Create launch context for app master + ApplicationSubmissionContext appContext = + createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf); + + // Submit the launcher AM + yarnClient.submitApplication(appContext); + + launcherId = appId.toString(); + LOG.debug("After submission get the launcherId [{0}]", launcherId); + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + consoleUrl = appReport.getTrackingUrl(); + } finally { + if (yarnClient != null) { + yarnClient.close(); + yarnClient = null; + } } - launcherId = runningJob.getID().toString(); - LOG.debug("After submission get the launcherId " + launcherId); } String jobTracker = launcherJobConf.get(HADOOP_YARN_RM); - String consoleUrl = runningJob.getTrackingURL(); context.setStartData(launcherId, jobTracker, consoleUrl); } catch (Exception ex) { @@ -1206,6 +1104,91 @@ public class JavaActionExecutor extends ActionExecutor { } } } + + private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, JobConf launcherJobConf, String user, + Context context, Configuration actionConf) + throws IOException, HadoopAccessorException, URISyntaxException { + // Create launch context for app master + ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); + + // set the application id + appContext.setApplicationId(appId); + + // set the application name + appContext.setApplicationName(launcherJobConf.getJobName()); + appContext.setApplicationType("Oozie Launcher"); + + // Set the priority for the application master + Priority pri = Records.newRecord(Priority.class); + int priority = 0; // TODO: OYA: Add a constant or a config + pri.setPriority(priority); + appContext.setPriority(pri); + + // Set the queue to which this application is to be submitted in the RM + appContext.setQueue(launcherJobConf.getQueueName()); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + // Set the resources to localize + Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf); + MRApps.setupDistributedCache(launcherJobConf, localResources); + // Add the Launcher and Action configs as Resources + HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); + LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user, + launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir()); + localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR); + LocalResource actionConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.ACTION_CONF_XML, user, actionConf, + context.getAppFileSystem().getUri(), context.getActionDir()); + localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR); + amContainer.setLocalResources(localResources); + + // Set the environment variables + Map<String, String> env = new HashMap<String, String>(); + // This adds the Hadoop jars to the classpath in the Launcher JVM + ClasspathUtils.setupClasspath(env, launcherJobConf); + if (false) { // TODO: OYA: config to add MR jars? Probably also needed for MR Action + ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf); + } + amContainer.setEnvironment(env); + + // Set the command + List<String> vargs = new ArrayList<String>(6); + vargs.add(MRApps.crossPlatformifyMREnv(launcherJobConf, ApplicationConstants.Environment.JAVA_HOME) + + "/bin/java"); + // TODO: OYA: remove attach debugger to AM; useful for debugging +// vargs.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"); + MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs); + vargs.add(LauncherAM.class.getName()); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDOUT); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDERR); + List<String> vargsFinal = new ArrayList<String>(6); + StringBuilder mergedCommand = new StringBuilder(); + for (CharSequence str : vargs) { + mergedCommand.append(str).append(" "); + } + vargsFinal.add(mergedCommand.toString()); + LOG.debug("Command to launch container for ApplicationMaster is : " + + mergedCommand); + amContainer.setCommands(vargsFinal); + appContext.setAMContainerSpec(amContainer); + + // Set tokens + DataOutputBuffer dob = new DataOutputBuffer(); + launcherJobConf.getCredentials().writeTokenStorageToStream(dob); + amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength())); + + // Set Resources + // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores) + Resource resource = Resource.newInstance(2048, 1); + appContext.setResource(resource); + + return appContext; + } + private boolean needInjectCredentials() { boolean methodExists = true; @@ -1409,6 +1392,19 @@ public class JavaActionExecutor extends ActionExecutor { return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf); } + /** + * Create yarn client object + * + * @param context + * @param jobConf + * @return YarnClient + * @throws HadoopAccessorException + */ + protected YarnClient createYarnClient(Context context, JobConf jobConf) throws HadoopAccessorException { + String user = context.getWorkflow().getUser(); + return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf); + } + protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{ RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId())); return runningJob; @@ -1425,129 +1421,112 @@ public class JavaActionExecutor extends ActionExecutor { @Override public void check(Context context, WorkflowAction action) throws ActionExecutorException { - JobClient jobClient = null; - boolean exception = false; + boolean fallback = false; + LOG = XLog.resetPrefix(LOG); LogUtils.setLogInfo(action); + YarnClient yarnClient = null; try { Element actionXml = XmlUtils.parseXml(action.getConf()); - FileSystem actionFs = context.getAppFileSystem(); JobConf jobConf = createBaseHadoopConf(context, actionXml); - jobClient = createJobClient(context, jobConf); - RunningJob runningJob = getRunningJob(context, action, jobClient); - if (runningJob == null) { - context.setExecutionData(FAILED, null); - throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", - "Could not lookup launched hadoop Job ID [{0}] which was associated with " + - " action [{1}]. Failing this action!", getActualExternalId(action), action.getId()); - } - if (runningJob.isComplete()) { + FileSystem actionFs = context.getAppFileSystem(); + yarnClient = createYarnClient(context, jobConf); + FinalApplicationStatus appStatus = null; + try { + ApplicationReport appReport = + yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId())); + YarnApplicationState appState = appReport.getYarnApplicationState(); + if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED + || appState == YarnApplicationState.KILLED) { + appStatus = appReport.getFinalApplicationStatus(); + } + + } catch (Exception ye) { + LOG.debug("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye); + // Fallback to action data file if we can't find the Launcher AM (maybe it got purged) + fallback = true; + } + if (appStatus != null || fallback) { Path actionDir = context.getActionDir(); String newId = null; // load sequence file into object Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf); - if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) { - newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID); - String launcherId = action.getExternalId(); - runningJob = jobClient.getJob(JobID.forName(newId)); - if (runningJob == null) { - context.setExternalStatus(FAILED); + if (fallback) { + String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS); + if (finalStatus != null) { + appStatus = FinalApplicationStatus.valueOf(finalStatus); + } else { + context.setExecutionData(FAILED, null); throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", - "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, - action.getId()); + "Unknown hadoop job [{0}] associated with action [{1}] and couldn't determine status from" + + " action data. Failing this action!", action.getExternalId(), action.getId()); } - context.setExternalChildIDs(newId); - LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId, - newId); } - else { - String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); - if (externalIDs != null) { - context.setExternalChildIDs(externalIDs); - LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); - } + String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS); + if (externalIDs != null) { + context.setExternalChildIDs(externalIDs); + LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs); } - if (runningJob.isComplete()) { - // fetching action output and stats for the Map-Reduce action. - if (newId != null) { - actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf); + LOG.info(XLog.STD, "action completed, external ID [{0}]", + action.getExternalId()); + context.setExecutionData(appStatus.toString(), null); + if (appStatus == FinalApplicationStatus.SUCCEEDED) { + if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) { + context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData + .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS))); + LOG.info(XLog.STD, "action produced output"); } - LOG.info(XLog.STD, "action completed, external ID [{0}]", - action.getExternalId()); - if (LauncherMapperHelper.isMainSuccessful(runningJob)) { - if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) { - context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData - .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS))); - LOG.info(XLog.STD, "action produced output"); + else { + context.setExecutionData(SUCCEEDED, null); + } + if (LauncherMapperHelper.hasStatsData(actionData)) { + context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS)); + LOG.info(XLog.STD, "action produced stats"); + } + getActionData(actionFs, action, context); + } + else { + String errorReason; + if (actionData.containsKey(LauncherAM.ACTION_DATA_ERROR_PROPS)) { + Properties props = PropertiesUtils.stringToProperties(actionData + .get(LauncherAM.ACTION_DATA_ERROR_PROPS)); + String errorCode = props.getProperty("error.code"); + if ("0".equals(errorCode)) { + errorCode = "JA018"; } - else { - context.setExecutionData(SUCCEEDED, null); + if ("-1".equals(errorCode)) { + errorCode = "JA019"; } - if (LauncherMapperHelper.hasStatsData(actionData)) { - context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS)); - LOG.info(XLog.STD, "action produced stats"); + errorReason = props.getProperty("error.reason"); + LOG.warn("Launcher ERROR, reason: {0}", errorReason); + String exMsg = props.getProperty("exception.message"); + String errorInfo = (exMsg != null) ? exMsg : errorReason; + context.setErrorInfo(errorCode, errorInfo); + String exStackTrace = props.getProperty("exception.stacktrace"); + if (exMsg != null) { + LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); } - getActionData(actionFs, runningJob, action, context); } else { - String errorReason; - if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) { - Properties props = PropertiesUtils.stringToProperties(actionData - .get(LauncherMapper.ACTION_DATA_ERROR_PROPS)); - String errorCode = props.getProperty("error.code"); - if ("0".equals(errorCode)) { - errorCode = "JA018"; - } - if ("-1".equals(errorCode)) { - errorCode = "JA019"; - } - errorReason = props.getProperty("error.reason"); - LOG.warn("Launcher ERROR, reason: {0}", errorReason); - String exMsg = props.getProperty("exception.message"); - String errorInfo = (exMsg != null) ? exMsg : errorReason; - context.setErrorInfo(errorCode, errorInfo); - String exStackTrace = props.getProperty("exception.stacktrace"); - if (exMsg != null) { - LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace); - } - } - else { - errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action - .getTrackerUri(), action.getExternalId()); - LOG.warn(errorReason); - } - context.setExecutionData(FAILED_KILLED, null); + errorReason = XLog.format("Launcher AM died, check Hadoop LOG for job [{0}:{1}]", action + .getTrackerUri(), action.getExternalId()); + LOG.warn(errorReason); } - } - else { - context.setExternalStatus("RUNNING"); - LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]", - runningJob.getID()); + context.setExecutionData(FAILED_KILLED, null); } } else { - context.setExternalStatus("RUNNING"); + context.setExternalStatus(YarnApplicationState.RUNNING.toString()); LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]", - runningJob.getID()); + action.getExternalId()); } } catch (Exception ex) { LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex); - exception = true; throw convertException(ex); } finally { - if (jobClient != null) { - try { - jobClient.close(); - } - catch (Exception e) { - if (exception) { - LOG.error("JobClient error: ", e); - } - else { - throw convertException(e); - } - } + if (yarnClient != null) { + IOUtils.closeQuietly(yarnClient); } } } @@ -1555,14 +1534,12 @@ public class JavaActionExecutor extends ActionExecutor { /** * Get the output data of an action. Subclasses should override this method * to get action specific output data. - * * @param actionFs the FileSystem object - * @param runningJob the runningJob * @param action the Workflow action * @param context executor context * */ - protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) + protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { } @@ -1585,38 +1562,28 @@ public class JavaActionExecutor extends ActionExecutor { @Override public void kill(Context context, WorkflowAction action) throws ActionExecutorException { - JobClient jobClient = null; - boolean exception = false; + YarnClient yarnClient = null; try { Element actionXml = XmlUtils.parseXml(action.getConf()); + String user = context.getWorkflow().getUser(); JobConf jobConf = createBaseHadoopConf(context, actionXml); - jobClient = createJobClient(context, jobConf); - RunningJob runningJob = getRunningJob(context, action, jobClient); - if (runningJob != null) { - runningJob.killJob(); - } + yarnClient = createYarnClient(context, jobConf); + yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId())); context.setExternalStatus(KILLED); context.setExecutionData(KILLED, null); - } - catch (Exception ex) { - exception = true; + } catch (Exception ex) { + LOG.error("Error: ", ex); throw convertException(ex); - } - finally { + } finally { try { FileSystem actionFs = context.getAppFileSystem(); cleanUpActionDir(actionFs, context); - if (jobClient != null) { - jobClient.close(); - } - } - catch (Exception ex) { - if (exception) { - LOG.error("Error: ", ex); - } - else { - throw convertException(ex); + if (yarnClient != null) { + yarnClient.close(); } + } catch (Exception ex) { + LOG.error("Error: ", ex); + throw convertException(ex); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java index 69e1044..07d1262 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java @@ -145,18 +145,6 @@ public class LauncherMapperHelper { launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); } - FileSystem fs = - Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"), - actionDir.toUri(), launcherConf); - fs.mkdirs(actionDir); - - OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML)); - try { - actionConf.writeXml(os); - } finally { - IOUtils.closeSafely(os); - } - launcherConf.setInputFormat(OozieLauncherInputFormat.class); launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString()); } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java index 252f461..6a41235 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.HadoopAccessorException; @@ -157,9 +156,9 @@ public class SparkActionExecutor extends JavaActionExecutor { } @Override - protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) + protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException { - super.getActionData(actionFs, runningJob, action, context); + super.getActionData(actionFs, action, context); readExternalChildIDs(action, context); } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java index 6813a37..82e5f0c 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java @@ -23,7 +23,6 @@ import java.io.StringReader; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; -import java.util.Properties; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; @@ -232,17 +231,15 @@ public class SqoopActionExecutor extends JavaActionExecutor { /** * Get the stats and external child IDs - * - * @param actionFs the FileSystem object - * @param runningJob the runningJob + * @param actionFs the FileSystem object * @param action the Workflow action * @param context executor context * */ @Override - protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context) + protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context) throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{ - super.getActionData(actionFs, runningJob, action, context); + super.getActionData(actionFs, action, context); readExternalChildIDs(action, context); } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java index 794e825..0177241 100644 --- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java +++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java @@ -18,6 +18,7 @@ package org.apache.oozie.service; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; @@ -29,7 +30,14 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.oozie.ErrorCode; +import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.hadoop.JavaActionExecutor; import org.apache.oozie.util.ParamChecker; import org.apache.oozie.util.XConfiguration; @@ -39,6 +47,7 @@ import org.apache.oozie.workflow.lite.LiteWorkflowAppParser; import java.io.File; import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; @@ -511,9 +520,43 @@ public class HadoopAccessorService implements Service { } /** - * Return a FileSystem created with the provided user for the specified URI. + * Return a YarnClient created with the provided user and configuration. * + * @param user The username to impersonate + * @param conf The conf + * @return a YarnClient with the provided user and configuration + * @throws HadoopAccessorException if the client could not be created. + */ + public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException { + ParamChecker.notEmpty(user, "user"); + if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { + throw new HadoopAccessorException(ErrorCode.E0903); + } + String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM); + validateJobTracker(rm); + try { + UserGroupInformation ugi = getUGI(user); + YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() { + @Override + public YarnClient run() throws Exception { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + }); + return yarnClient; + } catch (InterruptedException ex) { + throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); + } catch (IOException ex) { + throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); + } + } + + /** + * Return a FileSystem created with the provided user for the specified URI. * + * @param user The username to impersonate * @param uri file system URI. * @param conf Configuration with all necessary information to create the FileSystem. * @return FileSystem created with the provided user/group. @@ -667,4 +710,56 @@ public class HadoopAccessorService implements Service { return supportedSchemes; } + /** + * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container. This involves also writing it + * to HDFS. + * Example usage: + * * <pre> + * {@code + * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir); + * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir); + * ... + * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + * localResources.put(filename1, res1); + * localResources.put(filename2, res2); + * ... + * containerLaunchContext.setLocalResources(localResources); + * } + * </pre> + * + * @param filename The filename to use on the remote filesystem and once it has been localized. + * @param user The user + * @param conf The configuration to process + * @param uri The URI of the remote filesystem (e.g. HDFS) + * @param dir The directory on the remote filesystem to write the file to + * @return + * @throws IOException A problem occurred writing the file + * @throws HadoopAccessorException A problem occured with Hadoop + * @throws URISyntaxException A problem occurred parsing the URI + */ + public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri, + Path dir) + throws IOException, HadoopAccessorException, URISyntaxException { + File f = File.createTempFile(filename, ".tmp"); + FileOutputStream fos = null; + try { + fos = new FileOutputStream(f); + conf.writeXml(fos); + } finally { + if (fos != null) { + fos.close(); + } + } + FileSystem fs = createFileSystem(user, uri, conf); + Path dst = new Path(dir, filename); + fs.copyFromLocalFile(new Path(f.getAbsolutePath()), dst); + LocalResource localResource = Records.newRecord(LocalResource.class); + localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION); + localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + FileStatus destStatus = fs.getFileStatus(dst); + localResource.setTimestamp(destStatus.getModificationTime()); + localResource.setSize(destStatus.getLen()); + return localResource; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java new file mode 100644 index 0000000..8533371 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.util; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import java.io.IOException; +import java.net.URI; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +public class ClasspathUtils { + private static boolean usingMiniYarnCluster = false; + private static final List<String> CLASSPATH_ENTRIES = Arrays.asList( + ApplicationConstants.Environment.PWD.$(), + MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR, + MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR, + MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*", + ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + "*" + ); + + @VisibleForTesting + public static void setUsingMiniYarnCluster(boolean useMiniYarnCluster) { + usingMiniYarnCluster = useMiniYarnCluster; + } + + // Adapted from MRApps#setClasspath. Adds Yarn, HDFS, Common, and distributed cache jars. + public static void setupClasspath(Map<String, String> env, Configuration conf) throws IOException { + // Propagate the system classpath when using the mini cluster + if (usingMiniYarnCluster) { + MRApps.addToEnvironment( + env, + ApplicationConstants.Environment.CLASSPATH.name(), + System.getProperty("java.class.path"), conf); + } + + for (String entry : CLASSPATH_ENTRIES) { + MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(), entry, conf); + } + + // a * in the classpath will only find a .jar, so we need to filter out + // all .jars and add everything else + addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getFileClassPaths(conf), + org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheFiles(conf), + conf, + env, ApplicationConstants.Environment.PWD.$()); + addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getArchiveClassPaths(conf), + org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheArchives(conf), + conf, + env, ApplicationConstants.Environment.PWD.$()); + + + boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, + MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM); + + for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH, + crossPlatform + ? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH + : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { + MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(), + c.trim(), conf); + } + } + + // Adapted from MRApps#setClasspath + public static void addMapReduceToClasspath(Map<String, String> env, Configuration conf) { + boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM, + MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM); + + for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH, + crossPlatform ? + StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH) + : StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) { + MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(), + c.trim(), conf); + } + } + + // Borrowed from MRApps#addToClasspathIfNotJar + private static void addToClasspathIfNotJar(Path[] paths, + URI[] withLinks, Configuration conf, + Map<String, String> environment, + String classpathEnvVar) throws IOException { + if (paths != null) { + HashMap<Path, String> linkLookup = new HashMap<Path, String>(); + if (withLinks != null) { + for (URI u: withLinks) { + Path p = new Path(u); + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + String name = (null == u.getFragment()) + ? p.getName() : u.getFragment(); + if (!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) { + linkLookup.put(p, name); + } + } + } + + for (Path p : paths) { + FileSystem remoteFS = p.getFileSystem(conf); + p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(), + remoteFS.getWorkingDirectory())); + String name = linkLookup.get(p); + if (name == null) { + name = p.getName(); + } + if(!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) { + MRApps.addToEnvironment( + environment, + classpathEnvVar, + ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + name, conf); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml index 6c2f7d8..5f4645c 100644 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@ -1782,31 +1782,6 @@ will be the requeue interval for the actions which are waiting for a long time w </property> <property> - <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name> - <value>true</value> - <description> - Enables Uber Mode for the launcher job in YARN/Hadoop 2 (no effect in Hadoop 1) for all action types by default. - This can be overridden on a per-action-type basis by setting - oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site.xml (where #action-type# is the action - type; for example, "pig"). And that can be overridden on a per-action basis by setting - oozie.launcher.mapreduce.job.ubertask.enable in an action's configuration section in a workflow. In summary, the - priority is this: - 1. action's configuration section in a workflow - 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site - 3. oozie.action.launcher.mapreduce.job.ubertask.enable in oozie-site - </description> - </property> - - <property> - <name>oozie.action.shell.launcher.mapreduce.job.ubertask.enable</name> - <value>false</value> - <description> - The Shell action may have issues with the $PATH environment when using Uber Mode, and so Uber Mode is disabled by - default for it. See oozie.action.launcher.mapreduce.job.ubertask.enable - </description> - </property> - - <property> <name>oozie.action.shell.setup.hadoop.conf.dir</name> <value>false</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/QueryServlet.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/QueryServlet.java b/core/src/test/java/org/apache/oozie/QueryServlet.java new file mode 100644 index 0000000..8789438 --- /dev/null +++ b/core/src/test/java/org/apache/oozie/QueryServlet.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URLDecoder; + +/** + * Servlet that keeps track of the last query string it recieved + */ +public class QueryServlet extends HttpServlet { + + public static String lastQueryString = null; + + protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { + lastQueryString = URLDecoder.decode(request.getQueryString(), "UTF-8"); + response.setStatus(HttpServletResponse.SC_OK); + } + +}
