Merge branch 'master' into oya
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/80854d15 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/80854d15 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/80854d15 Branch: refs/heads/oya Commit: 80854d151d67666132a02797de4efdbf3b2af175 Parents: 40ee320 1103882 Author: Gezapeti Cseh <[email protected]> Authored: Wed May 10 14:09:31 2017 -0700 Committer: Gezapeti Cseh <[email protected]> Committed: Wed May 10 14:09:31 2017 -0700 ---------------------------------------------------------------------- .../org/apache/oozie/client/OozieClient.java | 4 +- .../apache/oozie/client/rest/RestConstants.java | 2 +- .../main/java/org/apache/oozie/BaseEngine.java | 18 +- .../org/apache/oozie/BaseLocalOozieClient.java | 601 +++++++++++++++++++ .../java/org/apache/oozie/LocalOozieClient.java | 245 +------- .../apache/oozie/LocalOozieClientBundle.java | 93 +++ .../org/apache/oozie/LocalOozieClientCoord.java | 328 ++-------- .../oozie/OozieClientOperationHandler.java | 173 ++++++ .../java/org/apache/oozie/OozieJsonFactory.java | 55 ++ .../oozie/action/hadoop/JavaActionExecutor.java | 15 +- .../oozie/action/hadoop/LauncherHelper.java | 2 + .../hadoop/LauncherInputFormatClassLocator.java | 84 +++ .../command/coord/CoordSubmitXCommand.java | 25 - .../java/org/apache/oozie/local/LocalOozie.java | 93 ++- .../org/apache/oozie/servlet/V0JobsServlet.java | 12 +- .../org/apache/oozie/servlet/V1JobsServlet.java | 184 ++---- core/src/main/resources/oozie-default.xml | 8 + .../apache/oozie/TestLocalOozieClientCoord.java | 72 ++- .../TestLauncherInputFormatClassLocator.java | 57 ++ .../command/coord/TestCoordSubmitXCommand.java | 30 - .../resources/coord-invalid-el-function.xml | 35 -- .../resources/coord-invalid-output-instance.xml | 58 -- .../coord-multiple-input-instance4.xml | 2 +- .../coord-multiple-input-start-instance2.xml | 2 +- .../coord-multiple-output-instance4.xml | 2 +- docs/src/site/twiki/AG_Install.twiki | 4 +- docs/src/site/twiki/AG_Monitoring.twiki | 2 +- .../site/twiki/CoordinatorFunctionalSpec.twiki | 2 +- .../site/twiki/DG_ActionAuthentication.twiki | 4 +- docs/src/site/twiki/DG_SLAMonitoring.twiki | 2 +- docs/src/site/twiki/WebServicesAPI.twiki | 6 +- .../src/site/twiki/WorkflowFunctionalSpec.twiki | 2 +- release-log.txt | 4 +- 33 files changed, 1412 insertions(+), 814 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index f3e5e32,d60a5c7..7a762ab --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@@ -49,14 -48,13 +49,15 @@@ import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AccessControlException; -import org.apache.oozie.hadoop.utils.HadoopShims; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobClient; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.JobID; -import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; +import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.security.Credentials; + import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; @@@ -144,11 -119,18 +148,12 @@@ public class JavaActionExecutor extend private static int maxActionOutputLen; private static int maxExternalStatsSize; private static int maxFSGlobMax; - private static final String SUCCEEDED = "SUCCEEDED"; - private static final String KILLED = "KILLED"; - private static final String FAILED = "FAILED"; - private static final String FAILED_KILLED = "FAILED/KILLED"; + + protected static final String HADOOP_USER = "user.name"; + protected XLog LOG = XLog.getLog(getClass()); - private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])"); private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir="; - public static final String CONF_HADOOP_YARN_UBER_MODE = OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE; - public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader"; - public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first"; - public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip"; + private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator(); public XConfiguration workflowConf = null; @@@ -166,15 -150,15 +171,15 @@@ super(type); } - public static List<Class> getCommonLauncherClasses() { - List<Class> classes = new ArrayList<Class>(); - classes.add(LauncherMapper.class); + public static List<Class<?>> getCommonLauncherClasses() { + List<Class<?>> classes = new ArrayList<Class<?>>(); - classes.add(OozieLauncherInputFormat.class); + classes.add(LauncherMain.class); + classes.add(launcherInputFormatClassLocator.locateOrGet()); classes.add(OozieLauncherOutputFormat.class); classes.add(OozieLauncherOutputCommitter.class); - classes.add(LauncherMainHadoopUtils.class); - classes.add(HadoopShims.class); classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher()); + classes.add(LauncherAM.class); + classes.add(LauncherAMCallbackNotifier.class); return classes; } http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java index 5ac1a16,0000000..442f9a3 mode 100644,000000..100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java @@@ -1,321 -1,0 +1,323 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.math.BigInteger; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Counters; +import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.oozie.client.OozieClient; +import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.service.HadoopAccessorException; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.URIHandlerService; +import org.apache.oozie.service.UserGroupInformationService; +import org.apache.oozie.util.IOUtils; +import org.apache.oozie.util.PropertiesUtils; + +// TODO: we're no longer using Launcher Mapper -- give this class a better name +public class LauncherHelper { + + public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; + ++ private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator(); ++ + public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) + throws HadoopAccessorException, IOException { + String jobId = null; + Path recoveryFile = new Path(actionDir, recoveryId); + FileSystem fs = Services.get().get(HadoopAccessorService.class) + .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); + + if (fs.exists(recoveryFile)) { + InputStream is = fs.open(recoveryFile); + BufferedReader reader = new BufferedReader(new InputStreamReader(is)); + jobId = reader.readLine(); + reader.close(); + } + return jobId; + + } + + public static void setupMainClass(Configuration launcherConf, String javaMainClass) { + // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via + // <configuration> property + if (javaMainClass != null && !javaMainClass.equals("")) { + launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); + } + } + + public static void setupLauncherURIHandlerConf(Configuration launcherConf) { + for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) { + launcherConf.set(entry.getKey(), entry.getValue()); + } + } + + public static void setupMainArguments(Configuration launcherConf, String[] args) { + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); + for (int i = 0; i < args.length; i++) { + launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); + } + } + + public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); + } + + /** + * Set the maximum value of stats data + * + * @param launcherConf the oozie launcher configuration + * @param maxStatsData the maximum allowed size of stats data + */ + public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ + launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); + } + + /** + * Set the maximum number of globbed files/dirs + * + * @param launcherConf the oozie launcher configuration + * @param fsGlobMax the maximum number of files/dirs for FS operation + */ + public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){ + launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax); + } + + public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir, + String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { + + launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); + launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); + launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString()); + launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId); + launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML); + + actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); + actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); + + if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { + List<String> purgedEntries = new ArrayList<String>(); + Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); + for (String entry : entries) { + if (entry.contains("#")) { + purgedEntries.add(entry); + } + } + actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); + launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); + } + } + + public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag, + long launcherTime) + throws NoSuchAlgorithmException { + launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime); + // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length) + String tag = getTag(launcherTag); + // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself. + // mapreduce.job.tags should only go to child job launch by launcher. + actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag); + } + + public static String getTag(String launcherTag) throws NoSuchAlgorithmException { + MessageDigest digest = MessageDigest.getInstance("MD5"); + digest.update(launcherTag.getBytes(), 0, launcherTag.length()); + String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16); + return md5; + } + + public static boolean isMainDone(RunningJob runningJob) throws IOException { + return runningJob.isComplete(); + } + + public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { + boolean succeeded = runningJob.isSuccessful(); + if (succeeded) { + Counters counters = runningJob.getCounters(); + if (counters != null) { + Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); + if (group != null) { + succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0; + } + } + } + return succeeded; + } + + /** + * Determine whether action has external child jobs or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); + } + + /** + * Determine whether action has output data or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasOutputData(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS); + } + + /** + * Determine whether action has external stats or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasStatsData(Map<String, String> actionData) throws IOException{ + return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS); + } + + /** + * Determine whether action has new id (id swap) or not + * @param actionData + * @return true/false + * @throws IOException + */ + public static boolean hasIdSwap(Map<String, String> actionData) throws IOException { + return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID); + } + + /** + * Get the sequence file path storing all action data + * @param actionDir + * @return + */ + public static Path getActionDataSequenceFilePath(Path actionDir) { + return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE); + } + + /** + * Utility function to load the contents of action data sequence file into + * memory object + * + * @param fs Action Filesystem + * @param actionDir Path + * @param conf Configuration + * @return Map action data + * @throws IOException + * @throws InterruptedException + */ + public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf) + throws IOException, InterruptedException { + UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); + UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME)); + + return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() { + @Override + public Map<String, String> run() throws IOException { + Map<String, String> ret = new HashMap<String, String>(); + Path seqFilePath = getActionDataSequenceFilePath(actionDir); + if (fs.exists(seqFilePath)) { + SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf); + Text key = new Text(), value = new Text(); + while (seqFile.next(key, value)) { + ret.put(key.toString(), value.toString()); + } + seqFile.close(); + } + else { // maintain backward-compatibility. to be deprecated + org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir); + InputStream is; + BufferedReader reader = null; + Properties props; + if (files != null && files.length > 0) { + for (int x = 0; x < files.length; x++) { + Path file = files[x].getPath(); + if (file.equals(new Path(actionDir, "externalChildIds.properties"))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS, + IOUtils.getReaderAsString(reader, -1)); + } + else if (file.equals(new Path(actionDir, "newId.properties"))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + props = PropertiesUtils.readProperties(reader, -1); + ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id")); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) { + int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, + 2 * 1024); + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils + .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData))); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) { + int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, + Integer.MAX_VALUE); + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils + .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData))); + } + else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) { + is = fs.open(file); + reader = new BufferedReader(new InputStreamReader(is)); + ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1)); + } + } + } + } + return ret; + } + }); + } + + public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) { + String tag; + if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) { + tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName(); + } else if (parentId != null) { + tag = parentId + "@" + wfAction.getName(); + } else { + tag = wfAction.getId(); + } + return tag; + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/core/src/main/resources/oozie-default.xml ---------------------------------------------------------------------- diff --cc core/src/main/resources/oozie-default.xml index 4ab7512,076401d..61b1d88 --- a/core/src/main/resources/oozie-default.xml +++ b/core/src/main/resources/oozie-default.xml @@@ -1835,6 -1835,39 +1835,14 @@@ will be the requeue interval for the ac </property> <property> - <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name> - <value>true</value> - <description> - Enables Uber Mode for the launcher job in YARN/Hadoop 2 (no effect in Hadoop 1) for all action types by default. - This can be overridden on a per-action-type basis by setting - oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site.xml (where #action-type# is the action - type; for example, "pig"). And that can be overridden on a per-action basis by setting - oozie.launcher.mapreduce.job.ubertask.enable in an action's configuration section in a workflow. In summary, the - priority is this: - 1. action's configuration section in a workflow - 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site - 3. oozie.action.launcher.mapreduce.job.ubertask.enable in oozie-site - </description> - </property> - - <property> - <name>oozie.action.shell.launcher.mapreduce.job.ubertask.enable</name> - <value>false</value> - <description> - The Shell action may have issues with the $PATH environment when using Uber Mode, and so Uber Mode is disabled by - default for it. See oozie.action.launcher.mapreduce.job.ubertask.enable - </description> - </property> - - <property> + <name>oozie.action.launcher.mapreduce.input.format.class</name> + <value>org.apache.oozie.action.hadoop.OozieLauncherInputFormat</value> + <description> + Make the Launcher Mapper map-only job's InputFormat class pluggable in order to provide alternative implementations. + </description> + </property> + + <property> <name>oozie.action.spark.setup.hadoop.conf.dir</name> <value>false</value> <description> http://git-wip-us.apache.org/repos/asf/oozie/blob/80854d15/release-log.txt ----------------------------------------------------------------------
