http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java deleted file mode 100644 index 72ed2f1..0000000 --- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java +++ /dev/null @@ -1,345 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.action.hadoop; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.math.BigInteger; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RunningJob; -import org.apache.hadoop.mapred.Counters; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.service.HadoopAccessorException; -import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.URIHandlerService; -import org.apache.oozie.service.UserGroupInformationService; -import org.apache.oozie.util.IOUtils; -import org.apache.oozie.util.PropertiesUtils; - -public class LauncherMapperHelper { - - public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag"; - - private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator(); - - public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId) - throws HadoopAccessorException, IOException { - String jobId = null; - Path recoveryFile = new Path(actionDir, recoveryId); - FileSystem fs = Services.get().get(HadoopAccessorService.class) - .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf); - - if (fs.exists(recoveryFile)) { - InputStream is = fs.open(recoveryFile); - BufferedReader reader = new BufferedReader(new InputStreamReader(is)); - jobId = reader.readLine(); - reader.close(); - } - return jobId; - - } - - public static void setupMainClass(Configuration launcherConf, String javaMainClass) { - // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via - // <configuration> property - if (javaMainClass != null && !javaMainClass.equals("")) { - launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass); - } - } - - public static void setupLauncherURIHandlerConf(Configuration launcherConf) { - for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) { - launcherConf.set(entry.getKey(), entry.getValue()); - } - } - - public static void setupMainArguments(Configuration launcherConf, String[] args) { - launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length); - for (int i = 0; i < args.length; i++) { - launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]); - } - } - - public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) { - launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData); - } - - /** - * Set the maximum value of stats data - * - * @param launcherConf the oozie launcher configuration - * @param maxStatsData the maximum allowed size of stats data - */ - public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){ - launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData); - } - - /** - * Set the maximum number of globbed files/dirs - * - * @param launcherConf the oozie launcher configuration - * @param fsGlobMax the maximum number of files/dirs for FS operation - */ - public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){ - launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax); - } - - public static void setupLauncherInfo(JobConf launcherConf, String jobId, String actionId, Path actionDir, - String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException { - - launcherConf.setMapperClass(LauncherMapper.class); - launcherConf.setSpeculativeExecution(false); - launcherConf.setNumMapTasks(1); - launcherConf.setNumReduceTasks(0); - - launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); - launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); - launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString()); - launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId); - launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML); - - actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId); - actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId); - - if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) { - List<String> purgedEntries = new ArrayList<String>(); - Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files"); - for (String entry : entries) { - if (entry.contains("#")) { - purgedEntries.add(entry); - } - } - actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()])); - launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true); - } - - FileSystem fs = - Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"), - actionDir.toUri(), launcherConf); - fs.mkdirs(actionDir); - - OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML)); - try { - actionConf.writeXml(os); - } finally { - IOUtils.closeSafely(os); - } - - launcherConf.setInputFormat(launcherInputFormatClassLocator.locateOrGet()); - launcherConf.setOutputFormat(OozieLauncherOutputFormat.class); - launcherConf.setOutputCommitter(OozieLauncherOutputCommitter.class); - } - - public static void setupYarnRestartHandling(JobConf launcherJobConf, Configuration actionConf, String launcherTag, - long launcherTime) - throws NoSuchAlgorithmException { - launcherJobConf.setLong(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, launcherTime); - // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length) - String tag = getTag(launcherTag); - // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself. - // mapreduce.job.tags should only go to child job launch by launcher. - actionConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, tag); - } - - public static String getTag(String launcherTag) throws NoSuchAlgorithmException { - MessageDigest digest = MessageDigest.getInstance("MD5"); - digest.update(launcherTag.getBytes(), 0, launcherTag.length()); - String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16); - return md5; - } - - public static boolean isMainDone(RunningJob runningJob) throws IOException { - return runningJob.isComplete(); - } - - public static boolean isMainSuccessful(RunningJob runningJob) throws IOException { - boolean succeeded = runningJob.isSuccessful(); - if (succeeded) { - Counters counters = runningJob.getCounters(); - if (counters != null) { - Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP); - if (group != null) { - succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0; - } - } - } - return succeeded; - } - - /** - * Determine whether action has external child jobs or not - * @param actionData - * @return true/false - * @throws IOException - */ - public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException { - return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS); - } - - /** - * Determine whether action has output data or not - * @param actionData - * @return true/false - * @throws IOException - */ - public static boolean hasOutputData(Map<String, String> actionData) throws IOException { - return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS); - } - - /** - * Determine whether action has external stats or not - * @param actionData - * @return true/false - * @throws IOException - */ - public static boolean hasStatsData(Map<String, String> actionData) throws IOException{ - return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS); - } - - /** - * Determine whether action has new id (id swap) or not - * @param actionData - * @return true/false - * @throws IOException - */ - public static boolean hasIdSwap(Map<String, String> actionData) throws IOException { - return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID); - } - - /** - * Get the sequence file path storing all action data - * @param actionDir - * @return - */ - public static Path getActionDataSequenceFilePath(Path actionDir) { - return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE); - } - - /** - * Utility function to load the contents of action data sequence file into - * memory object - * - * @param fs Action Filesystem - * @param actionDir Path - * @param conf Configuration - * @return Map action data - * @throws IOException - * @throws InterruptedException - */ - public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf) - throws IOException, InterruptedException { - UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class); - UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME)); - - return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() { - @Override - public Map<String, String> run() throws IOException { - Map<String, String> ret = new HashMap<String, String>(); - Path seqFilePath = getActionDataSequenceFilePath(actionDir); - if (fs.exists(seqFilePath)) { - SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf); - Text key = new Text(), value = new Text(); - while (seqFile.next(key, value)) { - ret.put(key.toString(), value.toString()); - } - seqFile.close(); - } - else { // maintain backward-compatibility. to be deprecated - org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir); - InputStream is; - BufferedReader reader = null; - Properties props; - if (files != null && files.length > 0) { - for (int x = 0; x < files.length; x++) { - Path file = files[x].getPath(); - if (file.equals(new Path(actionDir, "externalChildIds.properties"))) { - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS, - IOUtils.getReaderAsString(reader, -1)); - } - else if (file.equals(new Path(actionDir, "newId.properties"))) { - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - props = PropertiesUtils.readProperties(reader, -1); - ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id")); - } - else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) { - int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, - 2 * 1024); - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils - .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData))); - } - else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) { - int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, - Integer.MAX_VALUE); - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils - .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData))); - } - else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) { - is = fs.open(file); - reader = new BufferedReader(new InputStreamReader(is)); - ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1)); - } - } - } - } - return ret; - } - }); - } - - public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) { - String tag; - if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) { - tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName(); - } else if (parentId != null) { - tag = parentId + "@" + wfAction.getName(); - } else { - tag = wfAction.getId(); - } - return tag; - } - -}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index 55c9372..634a1cb 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -21,7 +21,11 @@ package org.apache.oozie.action.hadoop; import java.io.IOException; import java.io.StringReader; import java.util.ArrayList; +import java.util.Collections; import java.util.List; +import java.util.Map; + +import javax.annotation.Nonnull; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -31,6 +35,14 @@ import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobID; import org.apache.hadoop.mapred.RunningJob; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.oozie.action.ActionExecutor; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; @@ -39,7 +51,11 @@ import org.apache.oozie.util.XLog; import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.Namespace; -import org.json.simple.JSONObject; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.io.Closeables; public class MapReduceActionExecutor extends JavaActionExecutor { @@ -47,16 +63,16 @@ public class MapReduceActionExecutor extends JavaActionExecutor { public static final String HADOOP_COUNTERS = "hadoop.counters"; public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable"; private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain"; + public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url"; private XLog log = XLog.getLog(getClass()); public MapReduceActionExecutor() { super("map-reduce"); } - @SuppressWarnings("rawtypes") @Override - public List<Class> getLauncherClasses() { - List<Class> classes = new ArrayList<Class>(); + public List<Class<?>> getLauncherClasses() { + List<Class<?>> classes = new ArrayList<Class<?>>(); try { classes.add(Class.forName(STREAMING_MAIN_CLASS_NAME)); } @@ -97,9 +113,25 @@ public class MapReduceActionExecutor extends JavaActionExecutor { } @Override - Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException { + Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) + throws ActionExecutorException { super.setupLauncherConf(conf, actionXml, appPath, context); conf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false); + + return conf; + } + + private void injectConfigClass(Configuration conf, Element actionXml) { + // Inject config-class for launcher to use for action + Element e = actionXml.getChild("config-class", actionXml.getNamespace()); + if (e != null) { + conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim()); + } + } + + @Override + protected Configuration createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) { + Configuration conf = super.createBaseHadoopConf(context, actionXml, loadResources); return conf; } @@ -108,6 +140,8 @@ public class MapReduceActionExecutor extends JavaActionExecutor { Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) throws ActionExecutorException { boolean regularMR = false; + + injectConfigClass(actionConf, actionXml); Namespace ns = actionXml.getNamespace(); if (actionXml.getChild("streaming", ns) != null) { Element streamingXml = actionXml.getChild("streaming", ns); @@ -193,7 +227,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor { try { if (action.getStatus() == WorkflowAction.Status.OK) { Element actionXml = XmlUtils.parseXml(action.getConf()); - JobConf jobConf = createBaseHadoopConf(context, actionXml); + Configuration jobConf = createBaseHadoopConf(context, actionXml); jobClient = createJobClient(context, jobConf); RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalChildIDs())); if (runningJob == null) { @@ -248,7 +282,8 @@ public class MapReduceActionExecutor extends JavaActionExecutor { } // Return the value of the specified configuration property - private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) throws ActionExecutorException { + private String evaluateConfigurationProperty(Element actionConf, String key, String defaultValue) + throws ActionExecutorException { try { String ret = defaultValue; if (actionConf != null) { @@ -267,26 +302,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor { } } - @SuppressWarnings("unchecked") - private JSONObject counterstoJson(Counters counters) { - - if (counters == null) { - return null; - } - - JSONObject groups = new JSONObject(); - for (String gName : counters.getGroupNames()) { - JSONObject group = new JSONObject(); - for (Counters.Counter counter : counters.getGroup(gName)) { - String cName = counter.getName(); - Long cValue = counter.getCounter(); - group.put(cName, cValue); - } - groups.put(gName, group); - } - return groups; - } - /** * Return the sharelib name for the action. * @@ -299,25 +314,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor { return (actionXml.getChild("streaming", ns) != null) ? "mapreduce-streaming" : null; } - @Override - JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, - Configuration actionConf) throws ActionExecutorException { - // If the user is using a regular MapReduce job and specified an uber jar, we need to also set it for the launcher; - // so we override createLauncherConf to call super and then to set the uber jar if specified. At this point, checking that - // uber jars are enabled and resolving the uber jar path is already done by setupActionConf() when it parsed the actionConf - // argument and we can just look up the uber jar in the actionConf argument. - JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf); - Namespace ns = actionXml.getNamespace(); - if (actionXml.getChild("streaming", ns) == null && actionXml.getChild("pipes", ns) == null) { - // Set for uber jar - String uberJar = actionConf.get(MapReduceMain.OOZIE_MAPREDUCE_UBER_JAR); - if (uberJar != null && uberJar.trim().length() > 0) { - launcherJobConf.setJar(uberJar); - } - } - return launcherJobConf; - } - public static void setStreaming(Configuration conf, String mapper, String reducer, String recordReader, String[] recordReaderMapping, String[] env) { if (mapper != null) { @@ -329,18 +325,93 @@ public class MapReduceActionExecutor extends JavaActionExecutor { if (recordReader != null) { conf.set("oozie.streaming.record-reader", recordReader); } - MapReduceMain.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping); - MapReduceMain.setStrings(conf, "oozie.streaming.env", env); + ActionUtils.setStrings(conf, "oozie.streaming.record-reader-mapping", recordReaderMapping); + ActionUtils.setStrings(conf, "oozie.streaming.env", env); } @Override - protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{ - RunningJob runningJob = null; - String jobId = getActualExternalId(action); - if (jobId != null) { - runningJob = jobClient.getJob(JobID.forName(jobId)); + protected void injectCallback(Context context, Configuration conf) { + // add callback for the MapReduce job + String callback = context.getCallbackUrl("$jobStatus"); + String originalCallbackURL = conf.get(JOB_END_NOTIFICATION_URL); + if (originalCallbackURL != null) { + LOG.warn("Overriding the action job end notification URI. Original value: {0}", originalCallbackURL); + } + conf.set(JOB_END_NOTIFICATION_URL, callback); + + super.injectCallback(context, conf); + } + + @Override + protected boolean needToAddMapReduceToClassPath() { + return true; + } + + @Override + public void check(Context context, WorkflowAction action) throws ActionExecutorException { + Map<String, String> actionData = Collections.emptyMap(); + Configuration jobConf = null; + + try { + FileSystem actionFs = context.getAppFileSystem(); + Element actionXml = XmlUtils.parseXml(action.getConf()); + jobConf = createBaseHadoopConf(context, actionXml); + Path actionDir = context.getActionDir(); + actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf); + } catch (Exception e) { + LOG.warn("Exception in check(). Message[{0}]", e.getMessage(), e); + throw convertException(e); + } + + final String newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID); + + // check the Hadoop job if newID is defined (which should be the case here) - otherwise perform the normal check() + if (newId != null) { + boolean jobCompleted; + JobClient jobClient = null; + boolean exception = false; + + try { + jobClient = createJobClient(context, new JobConf(jobConf)); + RunningJob runningJob = jobClient.getJob(JobID.forName(newId)); + + if (runningJob == null) { + context.setExternalStatus(FAILED); + throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017", + "Unknown hadoop job [{0}] associated with action [{1}]. Failing this action!", newId, + action.getId()); + } + + jobCompleted = runningJob.isComplete(); + } catch (Exception e) { + LOG.warn("Unable to check the state of a running MapReduce job -" + + " please check the health of the Job History Server!", e); + exception = true; + throw convertException(e); + } finally { + if (jobClient != null) { + try { + jobClient.close(); + } catch (Exception e) { + if (exception) { + LOG.error("JobClient error (not re-throwing due to a previous error): ", e); + } else { + throw convertException(e); + } + } + } + } + + // run original check() if the MR action is completed or there are errors - otherwise mark it as RUNNING + if (jobCompleted || actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) { + super.check(context, action); + } else { + context.setExternalStatus(RUNNING); + context.setExternalChildIDs(newId); + } + } else { + super.check(context, action); } - return runningJob; } @Override http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java index 581d3b3..d8b1f03 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/OozieJobInfo.java @@ -29,9 +29,7 @@ import org.apache.oozie.action.ActionExecutor.Context; import org.apache.oozie.action.oozie.SubWorkflowActionExecutor; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowAction; -import org.apache.oozie.command.wf.JobXCommand; import org.apache.oozie.service.ConfigurationService; -import org.apache.oozie.service.Services; import org.apache.oozie.util.XConfiguration; import com.google.common.annotations.VisibleForTesting; http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java index 8b2dc16..8a24ac3 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/PigActionExecutor.java @@ -18,25 +18,21 @@ package org.apache.oozie.action.hadoop; +import java.util.ArrayList; +import java.util.List; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import org.apache.oozie.action.ActionExecutorException; -import org.apache.oozie.action.ActionExecutor.Context; -import org.apache.oozie.client.XOozieClient; import org.apache.oozie.client.WorkflowAction; +import org.apache.oozie.client.XOozieClient; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.service.HadoopAccessorService; -import org.apache.oozie.service.Services; -import org.apache.oozie.service.WorkflowAppService; import org.jdom.Element; -import org.jdom.Namespace; import org.jdom.JDOMException; +import org.jdom.Namespace; import org.json.simple.parser.JSONParser; -import java.util.ArrayList; -import java.util.List; - public class PigActionExecutor extends ScriptLanguageActionExecutor { private static final String PIG_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.PigMain"; @@ -48,10 +44,9 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor { super("pig"); } - @SuppressWarnings("rawtypes") @Override - public List<Class> getLauncherClasses() { - List<Class> classes = new ArrayList<Class>(); + public List<Class<?>> getLauncherClasses() { + List<Class<?>> classes = new ArrayList<Class<?>>(); try { classes.add(Class.forName(PIG_MAIN_CLASS_NAME)); classes.add(JSONParser.class); @@ -73,7 +68,6 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor { } @Override - @SuppressWarnings("unchecked") Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) throws ActionExecutorException { super.setupActionConf(actionConf, context, actionXml, appPath); @@ -82,12 +76,14 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor { String script = actionXml.getChild("script", ns).getTextTrim(); String pigName = new Path(script).getName(); - List<Element> params = (List<Element>) actionXml.getChildren("param", ns); + @SuppressWarnings("unchecked") + List<Element> params = actionXml.getChildren("param", ns); String[] strParams = new String[params.size()]; for (int i = 0; i < params.size(); i++) { strParams[i] = params.get(i).getTextTrim(); } String[] strArgs = null; + @SuppressWarnings("unchecked") List<Element> eArgs = actionXml.getChildren("argument", ns); if (eArgs != null && eArgs.size() > 0) { strArgs = new String[eArgs.size()]; @@ -101,8 +97,8 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor { public static void setPigScript(Configuration conf, String script, String[] params, String[] args) { conf.set(PIG_SCRIPT, script); - MapReduceMain.setStrings(conf, PIG_PARAMS, params); - MapReduceMain.setStrings(conf, PIG_ARGS, args); + ActionUtils.setStrings(conf, PIG_PARAMS, params); + ActionUtils.setStrings(conf, PIG_ARGS, args); } @@ -127,10 +123,15 @@ public class PigActionExecutor extends ScriptLanguageActionExecutor { } @Override - protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) { + protected boolean needToAddMapReduceToClassPath() { + return true; + } + + @Override + protected Configuration loadHadoopDefaultResources(Context context, Element actionXml) { boolean loadDefaultResources = ConfigurationService .getBoolean(HadoopAccessorService.ACTION_CONFS_LOAD_DEFAULT_RESOURCES); - JobConf conf = super.createBaseHadoopConf(context, actionXml, loadDefaultResources); + Configuration conf = super.createBaseHadoopConf(context, actionXml, loadDefaultResources); return conf; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java index 92e149d..196f0b7 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/ScriptLanguageActionExecutor.java @@ -37,9 +37,8 @@ public abstract class ScriptLanguageActionExecutor extends JavaActionExecutor { super(type); } - @SuppressWarnings("rawtypes") @Override - public List<Class> getLauncherClasses() { + public List<Class<?>> getLauncherClasses() { return null; } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java index b9ffa7a..d44bbc5 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/ShellActionExecutor.java @@ -19,10 +19,13 @@ package org.apache.oozie.action.hadoop; +import java.io.File; import java.util.List; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.util.Apps; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.service.ConfigurationService; import org.jdom.Element; @@ -30,19 +33,12 @@ import org.jdom.Namespace; public class ShellActionExecutor extends JavaActionExecutor { - /** - * Config property name to set the child environment - */ - public String OOZIE_LAUNCHER_CHILD_ENV = "mapred.child.env"; - public String OOZIE_LAUNCHER_MAP_ENV = "mapreduce.map.env"; - public ShellActionExecutor() { super("shell"); } - @SuppressWarnings("rawtypes") @Override - public List<Class> getLauncherClasses() { + public List<Class<?>> getLauncherClasses() { return null; } @@ -51,7 +47,6 @@ public class ShellActionExecutor extends JavaActionExecutor { return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, ShellMain.class.getName()); } - @SuppressWarnings("unchecked") @Override Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) throws ActionExecutorException { @@ -103,6 +98,7 @@ public class ShellActionExecutor extends JavaActionExecutor { boolean checkKeyValue) throws ActionExecutorException { String[] strTagValue = null; Namespace ns = actionXml.getNamespace(); + @SuppressWarnings("unchecked") List<Element> eTags = actionXml.getChildren(tag, ns); if (eTags != null && eTags.size() > 0) { strTagValue = new String[eTags.size()]; @@ -113,7 +109,7 @@ public class ShellActionExecutor extends JavaActionExecutor { } } } - MapReduceMain.setStrings(actionConf, key, strTagValue); + ActionUtils.setStrings(actionConf, key, strTagValue); } /** @@ -130,23 +126,8 @@ public class ShellActionExecutor extends JavaActionExecutor { } @Override - protected Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) - throws ActionExecutorException { - super.setupLauncherConf(conf, actionXml, appPath, context); - addDefaultChildEnv(conf); - return conf; - } - - /** - * This method sets the PATH to current working directory for the launched - * map task from where shell command will run. - * - * @param conf - */ - protected void addDefaultChildEnv(Configuration conf) { - String envValues = "PATH=.:$PATH"; - updateProperty(conf, OOZIE_LAUNCHER_MAP_ENV, envValues); - updateProperty(conf, OOZIE_LAUNCHER_CHILD_ENV, envValues); + protected void addActionSpecificEnvVars(Map<String, String> env) { + Apps.setEnvFromInputString(env, "PATH=.:$PATH", File.pathSeparator); } /** http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java index 1a3197a..00497a7 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java @@ -21,7 +21,6 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.JobConf; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.service.ConfigurationService; @@ -32,12 +31,11 @@ import org.jdom.Namespace; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Properties; public class SparkActionExecutor extends JavaActionExecutor { public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain"; - public static final String TASK_USER_PRECEDENCE = "mapreduce.task.classpath.user.precedence"; // hadoop-2 - public static final String TASK_USER_CLASSPATH_PRECEDENCE = "mapreduce.user.classpath.first"; // hadoop-1 public static final String SPARK_MASTER = "oozie.spark.master"; public static final String SPARK_MODE = "oozie.spark.mode"; public static final String SPARK_OPTS = "oozie.spark.spark-opts"; @@ -78,7 +76,7 @@ public class SparkActionExecutor extends JavaActionExecutor { StringBuilder sparkOptsSb = new StringBuilder(); if (master.startsWith("yarn")) { - String resourceManager = actionConf.get(HADOOP_JOB_TRACKER); + String resourceManager = actionConf.get(HADOOP_YARN_RM); Properties sparkConfig = Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager); for (String property : sparkConfig.stringPropertyNames()) { @@ -102,20 +100,6 @@ public class SparkActionExecutor extends JavaActionExecutor { } @Override - JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, - Configuration actionConf) throws ActionExecutorException { - - JobConf launcherJobConf = super.createLauncherConf(actionFs, context, action, actionXml, actionConf); - if (launcherJobConf.get("oozie.launcher." + TASK_USER_PRECEDENCE) == null) { - launcherJobConf.set(TASK_USER_PRECEDENCE, "true"); - } - if (launcherJobConf.get("oozie.launcher." + TASK_USER_CLASSPATH_PRECEDENCE) == null) { - launcherJobConf.set(TASK_USER_CLASSPATH_PRECEDENCE, "true"); - } - return launcherJobConf; - } - - @Override Configuration setupLauncherConf(Configuration conf, Element actionXml, Path appPath, Context context) throws ActionExecutorException { super.setupLauncherConf(conf, actionXml, appPath, context); @@ -136,8 +120,8 @@ public class SparkActionExecutor extends JavaActionExecutor { } @Override - public List<Class> getLauncherClasses() { - List<Class> classes = new ArrayList<Class>(); + public List<Class<?>> getLauncherClasses() { + List<Class<?>> classes = new ArrayList<Class<?>>(); try { classes.add(Class.forName(SPARK_MAIN_CLASS_NAME)); } catch (ClassNotFoundException e) { @@ -159,6 +143,16 @@ public class SparkActionExecutor extends JavaActionExecutor { } @Override + protected boolean needToAddMapReduceToClassPath() { + return true; + } + + @Override + protected void addActionSpecificEnvVars(Map<String, String> env) { + env.put("SPARK_HOME", "."); + } + + @Override protected String getLauncherMain(Configuration launcherConf, Element actionXml) { return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, SPARK_MAIN_CLASS_NAME); } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java index 22e2874..955f3b7 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java @@ -18,6 +18,12 @@ package org.apache.oozie.action.hadoop; +import java.io.IOException; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.StringTokenizer; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapred.Counters; @@ -33,12 +39,6 @@ import org.apache.oozie.util.XmlUtils; import org.jdom.Element; import org.jdom.Namespace; -import java.io.IOException; -import java.io.StringReader; -import java.util.ArrayList; -import java.util.List; -import java.util.StringTokenizer; - public class SqoopActionExecutor extends JavaActionExecutor { public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write"; @@ -51,8 +51,8 @@ public class SqoopActionExecutor extends JavaActionExecutor { } @Override - public List<Class> getLauncherClasses() { - List<Class> classes = new ArrayList<Class>(); + public List<Class<?>> getLauncherClasses() { + List<Class<?>> classes = new ArrayList<Class<?>>(); try { classes.add(Class.forName(SQOOP_MAIN_CLASS_NAME)); } @@ -68,7 +68,6 @@ public class SqoopActionExecutor extends JavaActionExecutor { } @Override - @SuppressWarnings("unchecked") Configuration setupActionConf(Configuration actionConf, Context context, Element actionXml, Path appPath) throws ActionExecutorException { super.setupActionConf(actionConf, context, actionXml, appPath); @@ -96,6 +95,7 @@ public class SqoopActionExecutor extends JavaActionExecutor { } } else { + @SuppressWarnings("unchecked") List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns); for (Element elem : eArgs) { argList.add(elem.getTextTrim()); @@ -119,7 +119,7 @@ public class SqoopActionExecutor extends JavaActionExecutor { } private void setSqoopCommand(Configuration conf, String[] args) { - MapReduceMain.setStrings(conf, SQOOP_ARGS, args); + ActionUtils.setStrings(conf, SQOOP_ARGS, args); } /** @@ -141,7 +141,7 @@ public class SqoopActionExecutor extends JavaActionExecutor { try { if (action.getStatus() == WorkflowAction.Status.OK) { Element actionXml = XmlUtils.parseXml(action.getConf()); - JobConf jobConf = createBaseHadoopConf(context, actionXml); + Configuration jobConf = createBaseHadoopConf(context, actionXml); jobClient = createJobClient(context, jobConf); // Cumulative counters for all Sqoop mapreduce jobs @@ -236,6 +236,11 @@ public class SqoopActionExecutor extends JavaActionExecutor { } } + @Override + protected boolean needToAddMapReduceToClassPath() { + return true; + } + /** * Return the sharelib name for the action. * http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java index fb021bd..374c6ef 100644 --- a/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java +++ b/core/src/main/java/org/apache/oozie/client/rest/BulkResponseImpl.java @@ -18,7 +18,6 @@ package org.apache.oozie.client.rest; -import java.util.HashSet; import java.util.List; import java.util.Set; @@ -29,6 +28,8 @@ import org.apache.oozie.client.BulkResponse; import org.json.simple.JSONArray; import org.json.simple.JSONObject; +import com.google.common.collect.ImmutableSet; + /** * Server-side implementation class of the client interface BulkResponse * Declares all the bulk request specific user parameters and handling as JSON object @@ -48,20 +49,14 @@ public class BulkResponseImpl implements BulkResponse, JsonBean { public static final String BULK_FILTER_END_NOMINAL_EPOCH = "endscheduledtime"; public static final String BULK_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:SS'Z'"; - public static final Set<String> BULK_FILTER_NAMES = new HashSet<String>(); - - static { - - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_BUNDLE); - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_COORD); - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_LEVEL); - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_STATUS); - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH); - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH); - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH); - BULK_FILTER_NAMES.add(BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH); - - } + public static final Set<String> BULK_FILTER_NAMES = ImmutableSet.of(BulkResponseImpl.BULK_FILTER_BUNDLE, + BulkResponseImpl.BULK_FILTER_COORD, + BulkResponseImpl.BULK_FILTER_LEVEL, + BulkResponseImpl.BULK_FILTER_STATUS, + BulkResponseImpl.BULK_FILTER_START_CREATED_EPOCH, + BulkResponseImpl.BULK_FILTER_END_CREATED_EPOCH, + BulkResponseImpl.BULK_FILTER_START_NOMINAL_EPOCH, + BulkResponseImpl.BULK_FILTER_END_NOMINAL_EPOCH); /** * Construct JSON object using the bulk request object and the associated tags http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/XCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/XCommand.java b/core/src/main/java/org/apache/oozie/command/XCommand.java index bdf13f6..7b8f47c 100644 --- a/core/src/main/java/org/apache/oozie/command/XCommand.java +++ b/core/src/main/java/org/apache/oozie/command/XCommand.java @@ -244,7 +244,10 @@ public abstract class XCommand<T> implements XCallable<T> { @Override public final T call() throws CommandException { setLogInfo(); - if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) && used.get()) { + CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); + Set<String> interruptTypes = callableQueueService.getInterruptTypes(); + + if (interruptTypes.contains(this.getType()) && used.get()) { LOG.debug("Command [{0}] key [{1}] already used for [{2}]", getName(), getEntityKey(), this.toString()); return null; } @@ -271,7 +274,7 @@ public abstract class XCommand<T> implements XCallable<T> { } if (!isLockRequired() || (lock != null) || this.inInterruptMode()) { - if (CallableQueueService.INTERRUPT_TYPES.contains(this.getType()) + if (interruptTypes.contains(this.getType()) && !used.compareAndSet(false, true)) { LOG.debug("Command [{0}] key [{1}] already executed for [{2}]", getName(), getEntityKey(), this.toString()); @@ -289,7 +292,6 @@ public abstract class XCommand<T> implements XCallable<T> { instrumentation.addCron(INSTRUMENTATION_GROUP, getName() + ".execute", executeCron); } if (commandQueue != null) { - CallableQueueService callableQueueService = Services.get().get(CallableQueueService.class); for (Map.Entry<Long, List<XCommand<?>>> entry : commandQueue.entrySet()) { LOG.debug("Queuing [{0}] commands with delay [{1}]ms", entry.getValue().size(), entry.getKey()); if (!callableQueueService.queueSerial(entry.getValue(), entry.getKey())) { http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java index d2a2742..98d0f3c 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitHttpXCommand.java @@ -46,6 +46,8 @@ import org.apache.oozie.client.XOozieClient; import org.jdom.Element; import org.jdom.Namespace; +import com.google.common.collect.ImmutableSet; + import java.util.Date; import java.util.List; import java.util.Map; @@ -54,17 +56,8 @@ import java.util.HashSet; public abstract class SubmitHttpXCommand extends WorkflowXCommand<String> { - protected static final Set<String> MANDATORY_OOZIE_CONFS = new HashSet<String>(); - protected static final Set<String> OPTIONAL_OOZIE_CONFS = new HashSet<String>(); - - static { - MANDATORY_OOZIE_CONFS.add(XOozieClient.JT); - MANDATORY_OOZIE_CONFS.add(XOozieClient.NN); - MANDATORY_OOZIE_CONFS.add(OozieClient.LIBPATH); - - OPTIONAL_OOZIE_CONFS.add(XOozieClient.FILES); - OPTIONAL_OOZIE_CONFS.add(XOozieClient.ARCHIVES); - } + static final Set<String> MANDATORY_OOZIE_CONFS = ImmutableSet.of(XOozieClient.RM, XOozieClient.NN, OozieClient.LIBPATH); + static final Set<String> OPTIONAL_OOZIE_CONFS = ImmutableSet.of(XOozieClient.FILES, XOozieClient.ARCHIVES); private Configuration conf; http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java index cc61d3d..05e7595 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitMRXCommand.java @@ -41,11 +41,10 @@ public class SubmitMRXCommand extends SubmitHttpXCommand { static { SKIPPED_CONFS.add(WorkflowAppService.HADOOP_USER); - SKIPPED_CONFS.add(XOozieClient.JT); + SKIPPED_CONFS.add(XOozieClient.RM); SKIPPED_CONFS.add(XOozieClient.NN); DEPRECATE_MAP.put(XOozieClient.NN, XOozieClient.NN_2); - DEPRECATE_MAP.put(XOozieClient.JT, XOozieClient.JT_2); DEPRECATE_MAP.put(WorkflowAppService.HADOOP_USER, "mapreduce.job.user.name"); } @@ -93,8 +92,7 @@ public class SubmitMRXCommand extends SubmitHttpXCommand { protected Element generateSection(Configuration conf, Namespace ns) { Element mapreduce = new Element("map-reduce", ns); Element jt = new Element("job-tracker", ns); - String newJTVal = conf.get(DEPRECATE_MAP.get(XOozieClient.JT)); - jt.addContent(newJTVal != null ? newJTVal : (conf.get(XOozieClient.JT))); + jt.addContent(conf.get(XOozieClient.RM)); mapreduce.addContent(jt); Element nn = new Element("name-node", ns); String newNNVal = conf.get(DEPRECATE_MAP.get(XOozieClient.NN)); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java index 9d41305..fab4398 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitScriptLanguageXCommand.java @@ -19,7 +19,7 @@ package org.apache.oozie.command.wf; import org.apache.hadoop.conf.Configuration; -import org.apache.oozie.action.hadoop.MapReduceMain; +import org.apache.oozie.action.hadoop.ActionUtils; import org.apache.oozie.client.XOozieClient; import org.apache.oozie.command.CommandException; import org.jdom.Element; @@ -50,7 +50,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand { String name = getWorkflowName(); Element ele = new Element(name, ns); Element jt = new Element("job-tracker", ns); - jt.addContent(conf.get(XOozieClient.JT)); + jt.addContent(conf.get(XOozieClient.RM)); ele.addContent(jt); Element nn = new Element("name-node", ns); nn.addContent(conf.get(XOozieClient.NN)); @@ -58,7 +58,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand { List<String> Dargs = new ArrayList<String>(); List<String> otherArgs = new ArrayList<String>(); - String[] args = MapReduceMain.getStrings(conf, getOptions()); + String[] args = ActionUtils.getStrings(conf, getOptions()); for (String arg : args) { if (arg.startsWith("-D")) { Dargs.add(arg); @@ -67,7 +67,7 @@ public abstract class SubmitScriptLanguageXCommand extends SubmitHttpXCommand { otherArgs.add(arg); } } - String [] params = MapReduceMain.getStrings(conf, getScriptParamters()); + String [] params = ActionUtils.getStrings(conf, getScriptParamters()); // configuration section if (Dargs.size() > 0) { http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java index 51b739e..c5574c5 100644 --- a/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java +++ b/core/src/main/java/org/apache/oozie/command/wf/SubmitSqoopXCommand.java @@ -22,7 +22,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.oozie.client.XOozieClient; import org.apache.oozie.command.CommandException; -import org.apache.oozie.action.hadoop.MapReduceMain; +import org.apache.oozie.action.hadoop.ActionUtils; import org.jdom.Namespace; import org.jdom.Element; @@ -50,14 +50,14 @@ public class SubmitSqoopXCommand extends SubmitHttpXCommand { String name = "sqoop"; Element ele = new Element(name, ns); Element jt = new Element("job-tracker", ns); - jt.addContent(conf.get(XOozieClient.JT)); + jt.addContent(conf.get(XOozieClient.RM)); ele.addContent(jt); Element nn = new Element("name-node", ns); nn.addContent(conf.get(XOozieClient.NN)); ele.addContent(nn); List<String> Dargs = new ArrayList<String>(); - String[] args = MapReduceMain.getStrings(conf, getOptions()); + String[] args = ActionUtils.getStrings(conf, getOptions()); for (String arg : args) { if (arg.startsWith("-D")) { Dargs.add(arg); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java index ace120d..79355eb 100644 --- a/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java +++ b/core/src/main/java/org/apache/oozie/coord/input/dependency/AbstractCoordInputDependency.java @@ -111,15 +111,16 @@ public abstract class AbstractCoordInputDependency implements Writable, CoordInp missingDependenciesSet = new HashMap<String, List<String>>(); availableDependenciesSet = new HashMap<String, List<String>>(); - Set<String> keySets = dependencyMap.keySet(); - for (String key : keySets) { - for (CoordInputInstance coordInputInstance : dependencyMap.get(key)) + for (Entry<String, List<CoordInputInstance>> entry : dependencyMap.entrySet()) { + String key = entry.getKey(); + for (CoordInputInstance coordInputInstance : entry.getValue()) { if (coordInputInstance.isAvailable()) { addToAvailableDependencies(key, coordInputInstance); } else { addToMissingDependencies(key, coordInputInstance); } + } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java index 6f0abf6..a8b58d5 100644 --- a/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java +++ b/core/src/main/java/org/apache/oozie/dependency/hcat/EhcacheHCatDependencyCache.java @@ -492,13 +492,13 @@ public class EhcacheHCatDependencyCache implements HCatDependencyCache, CacheEve @Override public void removeNonWaitingCoordActions(Set<String> staleActions) { - Iterator<String> serverItr = missingDepsByServer.keySet().iterator(); - while (serverItr.hasNext()) { - String server = serverItr.next(); - Cache missingCache = missingDepsByServer.get(server); + for (Entry<String, Cache> entry : missingDepsByServer.entrySet()) { + Cache missingCache = entry.getValue(); + if (missingCache == null) { continue; } + synchronized (missingCache) { for (Object key : missingCache.getKeys()) { Element element = missingCache.get(key); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java index a86a8d0..cfd208a 100644 --- a/core/src/main/java/org/apache/oozie/service/CallableQueueService.java +++ b/core/src/main/java/org/apache/oozie/service/CallableQueueService.java @@ -48,6 +48,8 @@ import org.apache.oozie.util.PriorityDelayQueue.QueueElement; import org.apache.oozie.util.XCallable; import org.apache.oozie.util.XLog; +import com.google.common.collect.ImmutableSet; + /** * The callable queue service queues {@link XCallable}s for asynchronous execution. * <p> @@ -95,9 +97,9 @@ public class CallableQueueService implements Service, Instrumentable { private final Map<String, Date> uniqueCallables = new ConcurrentHashMap<String, Date>(); - private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<String, Set<XCallable<?>>>(); + private final ConcurrentHashMap<String, Set<XCallable<?>>> interruptCommandsMap = new ConcurrentHashMap<>(); - public static final HashSet<String> INTERRUPT_TYPES = new HashSet<String>(); + private Set<String> interruptTypes; private int interruptMapMaxSize; @@ -452,10 +454,12 @@ public class CallableQueueService implements Service, Instrumentable { int threads = ConfigurationService.getInt(conf, CONF_THREADS); boolean callableNextEligible = ConfigurationService.getBoolean(conf, CONF_CALLABLE_NEXT_ELIGIBLE); + interruptTypes = new HashSet<>(); for (String type : ConfigurationService.getStrings(conf, CONF_CALLABLE_INTERRUPT_TYPES)) { log.debug("Adding interrupt type [{0}]", type); - INTERRUPT_TYPES.add(type); + interruptTypes.add(type); } + interruptTypes = ImmutableSet.copyOf(interruptTypes); if (!callableNextEligible) { queue = new PriorityDelayQueue<CallableWrapper>(3, 1000 * 30, TimeUnit.MILLISECONDS, queueSize) { @@ -720,12 +724,12 @@ public class CallableQueueService implements Service, Instrumentable { public void checkInterruptTypes(XCallable<?> callable) { if ((callable instanceof CompositeCallable) && (((CompositeCallable) callable).getCallables() != null)) { for (XCallable<?> singleCallable : ((CompositeCallable) callable).getCallables()) { - if (INTERRUPT_TYPES.contains(singleCallable.getType())) { + if (interruptTypes.contains(singleCallable.getType())) { insertCallableIntoInterruptMap(singleCallable); } } } - else if (INTERRUPT_TYPES.contains(callable.getType())) { + else if (interruptTypes.contains(callable.getType())) { insertCallableIntoInterruptMap(callable); } } @@ -791,4 +795,8 @@ public class CallableQueueService implements Service, Instrumentable { return executor.invokeAll(tasks); } + public Set<String> getInterruptTypes() { + return interruptTypes; + } + } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java index 22c6fb0..a68f94f 100644 --- a/core/src/main/java/org/apache/oozie/service/EventHandlerService.java +++ b/core/src/main/java/org/apache/oozie/service/EventHandlerService.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.hadoop.conf.Configuration; @@ -170,14 +171,16 @@ public class EventHandlerService implements Service { @Override public void destroy() { eventsEnabled = false; - for (MessageType type : listenerMap.keySet()) { - Iterator<?> iter = listenerMap.get(type).iterator(); - while (iter.hasNext()) { + + for (Entry<MessageType, List<?>> entry : listenerMap.entrySet()) { + List<?> listeners = entry.getValue(); + MessageType type = entry.getKey(); + + for (Object listener : listeners) { if (type == MessageType.JOB) { - ((JobEventListener) iter.next()).destroy(); - } - else if (type == MessageType.SLA) { - ((SLAEventListener) iter.next()).destroy(); + ((JobEventListener) listener).destroy(); + } else if (type == MessageType.SLA) { + ((SLAEventListener) listener).destroy(); } } } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java index 23a9d92..9624104 100644 --- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java +++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java @@ -18,17 +18,22 @@ package org.apache.oozie.service; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.oozie.ErrorCode; import org.apache.oozie.action.hadoop.JavaActionExecutor; import org.apache.oozie.util.IOUtils; @@ -43,7 +48,7 @@ import java.io.FileInputStream; import java.io.FilenameFilter; import java.io.IOException; import java.io.InputStream; -import java.lang.reflect.InvocationTargetException; +import java.io.OutputStream; import java.lang.reflect.Method; import java.net.InetAddress; import java.net.URI; @@ -79,19 +84,16 @@ public class HadoopAccessorService implements Service { public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token"); - protected static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; /** The Kerberos principal for the job tracker.*/ protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; /** The Kerberos principal for the resource manager.*/ protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; - protected static final String HADOOP_JOB_TRACKER = "mapred.job.tracker"; - protected static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address"; protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; - private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); - - private static Configuration cachedConf; + private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; + private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); private static final String DEFAULT_ACTIONNAME = "default"; + private static Configuration cachedConf; private Set<String> jobTrackerWhitelist = new HashSet<String>(); private Set<String> nameNodeWhitelist = new HashSet<String>(); @@ -406,18 +408,20 @@ public class HadoopAccessorService implements Service { public boolean accept(File dir, String name) { return ActionConfFileType.isSupportedFileType(name); }}); - Arrays.sort(actionConfFiles, new Comparator<File>() { - @Override - public int compare(File o1, File o2) { - return o1.getName().compareTo(o2.getName()); - } - }); - for (File f : actionConfFiles) { - if (f.isFile() && f.canRead()) { - updateActionConfigWithFile(actionConf, f); + + if (actionConfFiles != null) { + Arrays.sort(actionConfFiles, new Comparator<File>() { + @Override + public int compare(File o1, File o2) { + return o1.getName().compareTo(o2.getName()); + } + }); + for (File f : actionConfFiles) { + if (f.isFile() && f.canRead()) { + updateActionConfigWithFile(actionConf, f); + } } } - } private Configuration readActionConfFile(File file) throws IOException { @@ -505,7 +509,7 @@ public class HadoopAccessorService implements Service { if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { throw new HadoopAccessorException(ErrorCode.E0903); } - String jobTracker = conf.get(JavaActionExecutor.HADOOP_JOB_TRACKER); + String jobTracker = conf.get(JavaActionExecutor.HADOOP_YARN_RM); validateJobTracker(jobTracker); try { UserGroupInformation ugi = getUGI(user); @@ -516,39 +520,60 @@ public class HadoopAccessorService implements Service { }); return jobClient; } - catch (InterruptedException ex) { - throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); - } - catch (IOException ex) { + catch (IOException | InterruptedException ex) { throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); } } /** - * Get the RM delegation token using jobClient and add it to conf + * Return a JobClient created with the provided user/group. * - * @param jobClient - * @param conf - * @throws HadoopAccessorException + * + * @param conf Configuration with all necessary information to create the + * JobClient. + * @return JobClient created with the provided user/group. + * @throws HadoopAccessorException if the client could not be created. */ - public void addRMDelegationToken(JobClient jobClient, JobConf conf) throws HadoopAccessorException { - Token<DelegationTokenIdentifier> mrdt; - try { - mrdt = jobClient.getDelegationToken(getMRDelegationTokenRenewer(conf)); - } - catch (IOException e) { - throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e); + public JobClient createJobClient(String user, Configuration conf) throws HadoopAccessorException { + return createJobClient(user, new JobConf(conf)); + } + + /** + * Return a YarnClient created with the provided user and configuration. The caller is responsible for closing it when done. + * + * @param user The username to impersonate + * @param conf The conf + * @return a YarnClient with the provided user and configuration + * @throws HadoopAccessorException if the client could not be created. + */ + public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException { + ParamChecker.notEmpty(user, "user"); + if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { + throw new HadoopAccessorException(ErrorCode.E0903); } - catch (InterruptedException e) { - throw new HadoopAccessorException(ErrorCode.E0902, e.getMessage(), e); + String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM); + validateJobTracker(rm); + try { + UserGroupInformation ugi = getUGI(user); + YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() { + @Override + public YarnClient run() throws Exception { + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(conf); + yarnClient.start(); + return yarnClient; + } + }); + return yarnClient; + } catch (IOException | InterruptedException ex) { + throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); } - conf.getCredentials().addToken(MR_TOKEN_ALIAS, mrdt); } /** * Return a FileSystem created with the provided user for the specified URI. * - * + * @param user The username to impersonate * @param uri file system URI. * @param conf Configuration with all necessary information to create the FileSystem. * @return FileSystem created with the provided user/group. @@ -556,8 +581,14 @@ public class HadoopAccessorService implements Service { */ public FileSystem createFileSystem(String user, final URI uri, final Configuration conf) throws HadoopAccessorException { + return createFileSystem(user, uri, conf, true); + } + + private FileSystem createFileSystem(String user, final URI uri, final Configuration conf, boolean checkAccessorProperty) + throws HadoopAccessorException { ParamChecker.notEmpty(user, "user"); - if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { + + if (checkAccessorProperty && !conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) { throw new HadoopAccessorException(ErrorCode.E0903); } @@ -585,10 +616,7 @@ public class HadoopAccessorService implements Service { } }); } - catch (InterruptedException ex) { - throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); - } - catch (IOException ex) { + catch (IOException | InterruptedException ex) { throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex); } } @@ -639,10 +667,7 @@ public class HadoopAccessorService implements Service { renewer = mrTokenRenewers.get(servicePrincipal); if (renewer == null) { // Mimic org.apache.hadoop.mapred.Master.getMasterPrincipal() - String target = jobConf.get(HADOOP_YARN_RM, jobConf.get(HADOOP_JOB_TRACKER_2)); - if (target == null) { - target = jobConf.get(HADOOP_JOB_TRACKER); - } + String target = jobConf.get(HADOOP_YARN_RM); try { String addr = NetUtils.createSocketAddr(target).getHostName(); renewer = new Text(SecurityUtil.getServerPrincipal(servicePrincipal, addr)); @@ -705,4 +730,48 @@ public class HadoopAccessorService implements Service { return supportedSchemes; } -} + /** + * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container. This involves also writing it + * to HDFS. + * Example usage: + * * <pre> + * {@code + * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir); + * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir); + * ... + * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); + * localResources.put(filename1, res1); + * localResources.put(filename2, res2); + * ... + * containerLaunchContext.setLocalResources(localResources); + * } + * </pre> + * + * @param filename The filename to use on the remote filesystem and once it has been localized. + * @param user The user + * @param conf The configuration to process + * @param uri The URI of the remote filesystem (e.g. HDFS) + * @param dir The directory on the remote filesystem to write the file to + * @return + * @throws IOException A problem occurred writing the file + * @throws HadoopAccessorException A problem occured with Hadoop + * @throws URISyntaxException A problem occurred parsing the URI + */ + public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri, + Path dir) + throws IOException, HadoopAccessorException, URISyntaxException { + Path dst = new Path(dir, filename); + FileSystem fs = createFileSystem(user, uri, conf, false); + try (OutputStream os = fs.create(dst)){ + conf.writeXml(os); + } + LocalResource localResource = Records.newRecord(LocalResource.class); + localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION); + localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst)); + FileStatus destStatus = fs.getFileStatus(dst); + localResource.setTimestamp(destStatus.getModificationTime()); + localResource.setSize(destStatus.getLen()); + return localResource; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/service/Services.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/Services.java b/core/src/main/java/org/apache/oozie/service/Services.java index 829d5f5..7f47f88 100644 --- a/core/src/main/java/org/apache/oozie/service/Services.java +++ b/core/src/main/java/org/apache/oozie/service/Services.java @@ -204,7 +204,6 @@ public class Services { * * @throws ServiceException thrown if any of the services could not initialize. */ - @SuppressWarnings("unchecked") public void init() throws ServiceException { XLog log = new XLog(LogFactory.getLog(getClass())); log.trace("Initializing"); @@ -255,9 +254,9 @@ public class Services { * configuration. * @throws ServiceException thrown if a service class could not be loaded. */ - private void loadServices(Class[] classes, List<Service> list) throws ServiceException { + private void loadServices(Class<?>[] classes, List<Service> list) throws ServiceException { XLog log = new XLog(LogFactory.getLog(getClass())); - for (Class klass : classes) { + for (Class<?> klass : classes) { try { Service service = (Service) klass.newInstance(); log.debug("Loading service [{0}] implementation [{1}]", service.getInterface(), @@ -284,10 +283,10 @@ public class Services { private void loadServices() throws ServiceException { XLog log = new XLog(LogFactory.getLog(getClass())); try { - Map<Class, Service> map = new LinkedHashMap<Class, Service>(); - Class[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES); + Map<Class<?>, Service> map = new LinkedHashMap<Class<?>, Service>(); + Class<?>[] classes = ConfigurationService.getClasses(conf, CONF_SERVICE_CLASSES); log.debug("Services list obtained from property '" + CONF_SERVICE_CLASSES + "'"); - Class[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES); + Class<?>[] classesExt = ConfigurationService.getClasses(conf, CONF_SERVICE_EXT_CLASSES); log.debug("Services list obtained from property '" + CONF_SERVICE_EXT_CLASSES + "'"); List<Service> list = new ArrayList<Service>(); loadServices(classes, list); @@ -301,11 +300,12 @@ public class Services { } map.put(service.getInterface(), service); } - for (Map.Entry<Class, Service> entry : map.entrySet()) { + for (Map.Entry<Class<?>, Service> entry : map.entrySet()) { setService(entry.getValue().getClass()); } } catch (RuntimeException rex) { - log.fatal("Runtime Exception during Services Load. Check your list of '" + CONF_SERVICE_CLASSES + "' or '" + CONF_SERVICE_EXT_CLASSES + "'"); + log.fatal("Runtime Exception during Services Load. Check your list of '{0}' or '{1}'", + CONF_SERVICE_CLASSES, CONF_SERVICE_EXT_CLASSES, rex); throw new ServiceException(ErrorCode.E0103, rex.getMessage(), rex); } }
