http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java new file mode 100644 index 0000000..688424b --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; + +public class LauncherAMCallbackNotifierFactory { + + public LauncherAMCallbackNotifier createCallbackNotifier(Configuration conf) { + return new LauncherAMCallbackNotifier(conf); + } +}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java index 9a411ac..0236e1b 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java @@ -30,7 +30,10 @@ import java.io.StringWriter; import java.net.URL; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; @@ -45,7 +48,15 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; public abstract class LauncherMain { @@ -56,6 +67,10 @@ public abstract class LauncherMain { public static final String OUTPUT_PROPERTIES = ACTION_PREFIX + "output.properties"; public static final String HADOOP_JOBS = "hadoopJobs"; public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags"; + + public static final String CHILD_MAPREDUCE_JOB_TAGS = "oozie.child.mapreduce.job.tags"; + public static final String OOZIE_JOB_LAUNCH_TIME = "oozie.job.launch.time"; + public static final String TEZ_APPLICATION_TAGS = "tez.application.tags"; public static final String SPARK_YARN_TAGS = "spark.yarn.tags"; protected static String[] HADOOP_SITE_FILES = new String[] @@ -170,6 +185,81 @@ public abstract class LauncherMain { } } + public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) { + return getChildYarnJobs(actionConf, ApplicationsRequestScope.OWN); + } + + public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope, + long startTime) { + Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>(); + String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS); + if (tag == null) { + System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS); + return childYarnJobs; + } + System.out.println("tag id : " + tag); + GetApplicationsRequest gar = GetApplicationsRequest.newInstance(); + gar.setScope(scope); + gar.setApplicationTags(Collections.singleton(tag)); + long endTime = System.currentTimeMillis(); + if (startTime > endTime) { + System.out.println("WARNING: Clock skew between the Oozie server host and this host detected. Please fix this. " + + "Attempting to work around..."); + // We don't know which one is wrong (relative to the RM), so to be safe, let's assume they're both wrong and add an + // offset in both directions + long diff = 2 * (startTime - endTime); + startTime = startTime - diff; + endTime = endTime + diff; + } + gar.setStartRange(startTime, endTime); + try { + ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class); + GetApplicationsResponse apps = proxy.getApplications(gar); + List<ApplicationReport> appsList = apps.getApplicationList(); + for(ApplicationReport appReport : appsList) { + childYarnJobs.add(appReport.getApplicationId()); + } + } catch (YarnException | IOException ioe) { + throw new RuntimeException("Exception occurred while finding child jobs", ioe); + } + + System.out.println("Child yarn jobs are found - " + StringUtils.join(childYarnJobs, ",")); + return childYarnJobs; + } + public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope) { + System.out.println("Fetching child yarn jobs"); + + long startTime = 0L; + try { + startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME)); + } catch(NumberFormatException nfe) { + throw new RuntimeException("Could not find Oozie job launch time", nfe); + } + return getChildYarnJobs(actionConf, scope, startTime); + } + + public static void killChildYarnJobs(Configuration actionConf) { + try { + Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf); + if (!childYarnJobs.isEmpty()) { + System.out.println(); + System.out.println("Found [" + childYarnJobs.size() + "] Map-Reduce jobs from this launcher"); + System.out.println("Killing existing jobs and starting over:"); + YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(actionConf); + yarnClient.start(); + for (ApplicationId app : childYarnJobs) { + System.out.print("Killing job [" + app + "] ... "); + yarnClient.killApplication(app); + System.out.println("Done"); + } + System.out.println(); + } + } catch (IOException | YarnException ye) { + throw new RuntimeException("Exception occurred while killing child job(s)", ye); + } + } + protected abstract void run(String[] args) throws Exception; /** @@ -181,12 +271,13 @@ public abstract class LauncherMain { * @param conf Configuration/Properties object to dump to STDOUT * @throws IOException thrown if an IO error ocurred. */ - @SuppressWarnings("unchecked") - protected static void logMasking(String header, Collection<String> maskSet, Iterable conf) throws IOException { + + protected static void logMasking(String header, Collection<String> maskSet, Iterable<Map.Entry<String,String>> conf) + throws IOException { StringWriter writer = new StringWriter(); writer.write(header + "\n"); writer.write("--------------------\n"); - for (Map.Entry entry : (Iterable<Map.Entry>) conf) { + for (Map.Entry<String, String> entry : conf) { String name = (String) entry.getKey(); String value = (String) entry.getValue(); for (String mask : maskSet) { @@ -221,30 +312,6 @@ public abstract class LauncherMain { } /** - * Will run the user specified OozieActionConfigurator subclass (if one is provided) to update the action configuration. - * - * @param actionConf The action configuration to update - * @throws OozieActionConfiguratorException - */ - protected static void runConfigClass(JobConf actionConf) throws OozieActionConfiguratorException { - String configClass = System.getProperty(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS); - if (configClass != null) { - try { - Class<?> klass = Class.forName(configClass); - Class<? extends OozieActionConfigurator> actionConfiguratorKlass = klass.asSubclass(OozieActionConfigurator.class); - OozieActionConfigurator actionConfigurator = actionConfiguratorKlass.newInstance(); - actionConfigurator.configure(actionConf); - } catch (ClassNotFoundException e) { - throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e); - } catch (InstantiationException e) { - throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e); - } catch (IllegalAccessException e) { - throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e); - } - } - } - - /** * Read action configuration passes through action xml file. * * @return action Configuration @@ -268,13 +335,13 @@ public abstract class LauncherMain { } protected static void setYarnTag(Configuration actionConf) { - if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) { + if(actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) != null) { // in case the user set their own tags, appending the launcher tag. if(actionConf.get(MAPREDUCE_JOB_TAGS) != null) { actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(MAPREDUCE_JOB_TAGS) + "," - + actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS)); + + actionConf.get(CHILD_MAPREDUCE_JOB_TAGS)); } else { - actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS)); + actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(CHILD_MAPREDUCE_JOB_TAGS)); } } } @@ -331,6 +398,27 @@ public abstract class LauncherMain { } copyFileMultiplex(actionXmlFile, dstFiles); } + /** + * Print arguments to standard output stream. Mask out argument values to option with name 'password' in them. + * @param banner source banner + * @param args arguments to be printed + */ + void printArgs(String banner, String[] args) { + System.out.println(banner); + boolean maskNextArg = false; + for (String arg : args) { + if (maskNextArg) { + System.out.println(" " + "********"); + maskNextArg = false; + } + else { + System.out.println(" " + arg); + if (arg.toLowerCase().contains("password")) { + maskNextArg = true; + } + } + } + } } class LauncherMainException extends Exception { http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java index 8657c67..912eba2 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java @@ -34,8 +34,8 @@ import java.security.Permission; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import java.util.Properties; -import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -49,9 +49,12 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; +import org.xml.sax.SAXException; import com.google.common.base.Strings; +import javax.xml.parsers.ParserConfigurationException; +// TODO: OYA: Delete :) public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable { static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class"; @@ -238,7 +241,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R // Get what actually caused the exception Throwable cause = ex.getCause(); // If we got a JavaMainException from JavaMain, then we need to unwrap it - if (JavaMainException.class.isInstance(cause)) { + if (JavaMain.JavaMainException.class.isInstance(cause)) { cause = cause.getCause(); } if (LauncherMainException.class.isInstance(cause)) { @@ -348,9 +351,9 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R // loading action conf prepared by Oozie Configuration actionConf = LauncherMain.loadActionConf(); - if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) { + if(actionConf.get(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS) != null) { propagationConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, - actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS)); + actionConf.get(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS)); } propagationConf.writeXml(new FileWriter(PROPAGATION_CONF_XML)); @@ -432,9 +435,8 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R try { wr = SequenceFile.createWriter(fs, getJobConf(), finalPath, Text.class, Text.class); if (wr != null) { - Set<String> keys = actionData.keySet(); - for (String propsKey : keys) { - wr.append(new Text(propsKey), new Text(actionData.get(propsKey))); + for (Entry<String, String> entry : actionData.entrySet()) { + wr.append(new Text(entry.getKey()), new Text(entry.getValue())); } } else { @@ -469,9 +471,9 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath()); System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath()); - if (getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME) != null) { - System.setProperty(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, - getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME)); + if (getJobConf().get(LauncherMain.OOZIE_JOB_LAUNCH_TIME) != null) { + System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, + getJobConf().get(LauncherMain.OOZIE_JOB_LAUNCH_TIME)); } String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS); @@ -481,7 +483,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R } // Method to execute the prepare actions - private void executePrepare() throws IOException, LauncherException { + private void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException { String prepareXML = getJobConf().get(ACTION_PREPARE_XML); if (prepareXML != null) { if (!prepareXML.equals("")) { @@ -601,20 +603,26 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R System.out.println("======================"); File[] listOfFiles = folder.listFiles(); - for (File fileName : listOfFiles) { - if (fileName.isFile()) { - System.out.println("File: " + fileName.getName()); - } - else if (fileName.isDirectory()) { - System.out.println("Dir: " + fileName.getName()); - File subDir = new File(fileName.getName()); - File[] moreFiles = subDir.listFiles(); - for (File subFileName : moreFiles) { - if (subFileName.isFile()) { - System.out.println(" File: " + subFileName.getName()); - } - else if (subFileName.isDirectory()) { - System.out.println(" Dir: " + subFileName.getName()); + + if (listOfFiles != null) { + for (File fileName : listOfFiles) { + if (fileName.isFile()) { + System.out.println("File: " + fileName.getName()); + } + else if (fileName.isDirectory()) { + System.out.println("Dir: " + fileName.getName()); + File subDir = new File(fileName.getName()); + File[] moreFiles = subDir.listFiles(); + + if (moreFiles != null) { + for (File subFileName : moreFiles) { + if (subFileName.isFile()) { + System.out.println(" File: " + subFileName.getName()); + } + else if (subFileName.isDirectory()) { + System.out.println(" Dir: " + subFileName.getName()); + } + } } } } @@ -709,12 +717,3 @@ class LauncherSecurityManager extends SecurityManager { } } -/** - * Used by JavaMain to wrap a Throwable when an Exception occurs - */ -@SuppressWarnings("serial") -class JavaMainException extends Exception { - public JavaMainException(Throwable t) { - super(t); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java new file mode 100644 index 0000000..011ce93 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileVisitOption; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.EnumSet; + +import org.apache.hadoop.conf.Configuration; + +public class LocalFsOperations { + private static final int WALK_DEPTH = 2; + + /** + * Reads the launcher configuration "launcher.xml" + * @return Configuration object + */ + public Configuration readLauncherConf() { + File confFile = new File(LauncherAM.LAUNCHER_JOB_CONF_XML); + Configuration conf = new Configuration(false); + conf.addResource(new org.apache.hadoop.fs.Path(confFile.getAbsolutePath())); + return conf; + } + + /** + * Print files and directories in current directory. Will list files in the sub-directory (only 2 level deep) + * @throws IOException + */ + public void printContentsOfDir(File folder) throws IOException { + System.out.println(); + System.out.println("Files in current dir:" + folder.getAbsolutePath()); + System.out.println("======================"); + + final Path root = folder.toPath(); + Files.walkFileTree(root, EnumSet.of(FileVisitOption.FOLLOW_LINKS), WALK_DEPTH, new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + if (attrs.isRegularFile()) { + System.out.println(" File: " + root.relativize(file)); + } else if (attrs.isDirectory()) { + System.out.println(" Dir: " + root.relativize(file) + "/"); + } + + return FileVisitResult.CONTINUE; + } + }); + } + + /** + * Returns the contents of a file as string. + * + * @param file the File object which represents the file to be read + * @param type Type of the file + * @param maxLen Maximum allowed length + * @return The file contents as string + * @throws IOException if the file is bigger than maxLen or there is any I/O error + * @throws FileNotFoundException if the file does not exist + */ + public String getLocalFileContentAsString(File file, String type, int maxLen) throws IOException { + if (file.exists()) { + if (maxLen > -1 && file.length() > maxLen) { + throw new IOException(type + " data exceeds its limit [" + maxLen + "]"); + } + + return com.google.common.io.Files.toString(file, StandardCharsets.UTF_8); + } else { + throw new FileNotFoundException("File not found: " + file.toPath().toAbsolutePath()); + } + } + + /** + * Checks if a given File exists or not. This method helps writing unit tests. + */ + public boolean fileExists(File file) { + return file.exists(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java index d376057..e0974e8 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java @@ -52,7 +52,7 @@ public class MapReduceMain extends LauncherMain { JobConf jobConf = new JobConf(); addActionConf(jobConf, actionConf); - LauncherMainHadoopUtils.killChildYarnJobs(jobConf); + LauncherMain.killChildYarnJobs(jobConf); // Run a config class if given to update the job conf runConfigClass(jobConf); @@ -132,31 +132,27 @@ public class MapReduceMain extends LauncherMain { return runJob; } - @SuppressWarnings("unchecked") protected JobClient createJobClient(JobConf jobConf) throws IOException { return new JobClient(jobConf); } - // allows any character in the value, the conf.setStrings() does not allow - // commas - public static void setStrings(Configuration conf, String key, String[] values) { - if (values != null) { - conf.setInt(key + ".size", values.length); - for (int i = 0; i < values.length; i++) { - conf.set(key + "." + i, values[i]); - } - } - } - - public static String[] getStrings(Configuration conf, String key) { - String[] values = new String[conf.getInt(key + ".size", 0)]; - for (int i = 0; i < values.length; i++) { - values[i] = conf.get(key + "." + i); - if (values[i] == null) { - values[i] = ""; + /** + * Will run the user specified OozieActionConfigurator subclass (if one is provided) to update the action configuration. + * + * @param actionConf The action configuration to update + * @throws OozieActionConfiguratorException + */ + private static void runConfigClass(JobConf actionConf) throws OozieActionConfiguratorException { + String configClass = actionConf.get(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS); + if (configClass != null) { + try { + Class<?> klass = Class.forName(configClass); + Class<? extends OozieActionConfigurator> actionConfiguratorKlass = klass.asSubclass(OozieActionConfigurator.class); + OozieActionConfigurator actionConfigurator = actionConfiguratorKlass.newInstance(); + actionConfigurator.configure(actionConf); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e); } } - return values; } - } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java index 21ae456..cb5b1ac 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java @@ -38,7 +38,9 @@ import javax.xml.parsers.ParserConfigurationException; * Utility class to perform operations on the prepare block of Workflow * */ +@Deprecated public class PrepareActionsDriver { + private static final PrepareActionsHandler prepareHandler = new PrepareActionsHandler(); /** * Method to parse the prepare XML and execute the corresponding prepare actions @@ -46,52 +48,9 @@ public class PrepareActionsDriver { * @param prepareXML Prepare XML block in string format * @throws LauncherException */ - static void doOperations(String prepareXML, Configuration conf) throws LauncherException { - try { - Document doc = getDocumentFromXML(prepareXML); - doc.getDocumentElement().normalize(); - - // Get the list of child nodes, basically, each one corresponding to a separate action - NodeList nl = doc.getDocumentElement().getChildNodes(); - LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf); - - for (int i = 0; i < nl.getLength(); ++i) { - Node n = nl.item(i); - String operation = n.getNodeName(); - if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) { - continue; - } - String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim(); - // use Path to avoid URIsyntax error caused by square bracket in glob - URI uri = new Path(pathStr).toUri(); - LauncherURIHandler handler = factory.getURIHandler(uri); - execute(operation, uri, handler, conf); - } - } catch (IOException ioe) { - throw new LauncherException(ioe.getMessage(), ioe); - } catch (SAXException saxe) { - throw new LauncherException(saxe.getMessage(), saxe); - } catch (ParserConfigurationException pce) { - throw new LauncherException(pce.getMessage(), pce); - } catch (IllegalArgumentException use) { - throw new LauncherException(use.getMessage(), use); - } - } - - /** - * Method to execute the prepare actions based on the command - * - * @param n Child node of the prepare XML - * @throws LauncherException - */ - private static void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf) - throws LauncherException { - if (operation.equals("delete")) { - handler.delete(uri, conf); - } - else if (operation.equals("mkdir")) { - handler.create(uri, conf); - } + static void doOperations(String prepareXML, Configuration conf) + throws IOException, SAXException, ParserConfigurationException, LauncherException { + prepareHandler.prepareAction(prepareXML, conf); } // Method to return the document from the prepare XML block http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java new file mode 100644 index 0000000..b5377b1 --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; +import javax.xml.parsers.ParserConfigurationException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.w3c.dom.Document; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.xml.sax.SAXException; + +public class PrepareActionsHandler { + + /** + * Method to parse the prepare XML and execute the corresponding prepare actions + * + * @param prepareXML Prepare XML block in string format + * @throws LauncherException + */ + public void prepareAction(String prepareXML, Configuration conf) + throws IOException, SAXException, ParserConfigurationException, LauncherException { + Document doc = getDocumentFromXML(prepareXML); + doc.getDocumentElement().normalize(); + + // Get the list of child nodes, basically, each one corresponding to a separate action + NodeList nl = doc.getDocumentElement().getChildNodes(); + LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf); + + for (int i = 0; i < nl.getLength(); ++i) { + Node n = nl.item(i); + String operation = n.getNodeName(); + if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) { + continue; + } + String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim(); + // use Path to avoid URIsyntax error caused by square bracket in glob + URI uri = new Path(pathStr).toUri(); + LauncherURIHandler handler = factory.getURIHandler(uri); + execute(operation, uri, handler, conf); + } + } + + private void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf) + throws LauncherException { + + switch (operation) { + case "delete": + handler.delete(uri, conf); + break; + + case "mkdir": + handler.create(uri, conf); + break; + + default: + System.out.println("Warning: unknown prepare operation " + operation + " -- skipping"); + } + } + + // Method to return the document from the prepare XML block + static Document getDocumentFromXML(String prepareXML) throws ParserConfigurationException, SAXException, + IOException { + DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); + docBuilderFactory.setNamespaceAware(true); + // support for includes in the xml file + docBuilderFactory.setXIncludeAware(true); + // ignore all comments inside the xml file + docBuilderFactory.setIgnoringComments(true); + docBuilderFactory.setExpandEntityReferences(false); + docBuilderFactory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true); + DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder(); + InputStream is = new ByteArrayInputStream(prepareXML.getBytes("UTF-8")); + return docBuilder.parse(is); + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java new file mode 100644 index 0000000..8d986af --- /dev/null +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; + +public class SequenceFileWriterFactory { + + public SequenceFile.Writer createSequenceFileWriter(Configuration launcherJobConf, Path finalPath, + Class<?> keyClass, Class<?> valueClass) throws IOException { + return SequenceFile.createWriter(launcherJobConf, + SequenceFile.Writer.file(finalPath), + SequenceFile.Writer.keyClass(keyClass), + SequenceFile.Writer.valueClass(valueClass)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java index f109318..0ee35e8 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java @@ -24,10 +24,10 @@ import java.io.BufferedWriter; import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; -import java.io.PrintWriter; -import java.io.StringReader; import java.io.IOException; import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.io.StringReader; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -186,7 +186,7 @@ public class ShellMain extends LauncherMain { */ private Map<String, String> getEnvMap(Map<String, String> envp, Configuration actionConf) { // Adding user-specified environments - String[] envs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS); + String[] envs = ActionUtils.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS); for (String env : envs) { String[] varValue = env.split("=",2); // Error case is handled in // ShellActionExecutor @@ -339,7 +339,7 @@ public class ShellMain extends LauncherMain { */ protected List<String> getShellArguments(Configuration actionConf) { List<String> arguments = new ArrayList<String>(); - String[] scrArgs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS); + String[] scrArgs = ActionUtils.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS); for (String scrArg : scrArgs) { arguments.add(scrArg); } http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java new file mode 100644 index 0000000..718bf64 --- /dev/null +++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +public class LauncherAMTestMainClass { + public static final String SECURITY_EXCEPTION = "security"; + public static final String LAUNCHER_EXCEPTION = "launcher"; + public static final String JAVA_EXCEPTION = "java"; + public static final String THROWABLE = "throwable"; + + public static final String JAVA_EXCEPTION_MESSAGE = "Java Exception"; + public static final String SECURITY_EXCEPTION_MESSAGE = "Security Exception"; + public static final String THROWABLE_MESSAGE = "Throwable"; + public static final int LAUNCHER_ERROR_CODE = 1234; + + public static void main(String args[]) throws Throwable { + System.out.println("Invocation of TestMain"); + + if (args != null && args.length == 1) { + switch (args[0]){ + case JAVA_EXCEPTION: + throw new JavaMain.JavaMainException(new RuntimeException(JAVA_EXCEPTION_MESSAGE)); + case LAUNCHER_EXCEPTION: + throw new LauncherMainException(LAUNCHER_ERROR_CODE); + case SECURITY_EXCEPTION: + throw new SecurityException(SECURITY_EXCEPTION_MESSAGE); + case THROWABLE: + throw new Throwable(THROWABLE_MESSAGE); + } + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java new file mode 100644 index 0000000..68c0f4b --- /dev/null +++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import static org.junit.Assert.assertEquals; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +public class TestHdfsOperations { + @Mock + private SequenceFileWriterFactory seqFileWriterFactoryMock; + + @Mock + private SequenceFile.Writer writerMock; + + @Mock + private UserGroupInformation ugiMock; + + @Mock + private Configuration configurationMock; + + private Path path = new Path("."); + + private Map<String, String> actionData = new HashMap<>(); + + @InjectMocks + private HdfsOperations hdfsOperations; + + @Before + public void setup() throws Exception { + configureMocksForHappyPath(); + actionData.put("testKey", "testValue"); + } + + @Test + public void testActionDataUploadToHdfsSucceeds() throws Exception { + hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData); + + verify(seqFileWriterFactoryMock).createSequenceFileWriter(eq(configurationMock), + any(Path.class), eq(Text.class), eq(Text.class)); + ArgumentCaptor<Text> keyCaptor = ArgumentCaptor.forClass(Text.class); + ArgumentCaptor<Text> valueCaptor = ArgumentCaptor.forClass(Text.class); + verify(writerMock).append(keyCaptor.capture(), valueCaptor.capture()); + assertEquals("testKey", keyCaptor.getValue().toString()); + assertEquals("testValue", valueCaptor.getValue().toString()); + } + + @Test(expected = IOException.class) + public void testActionDataUploadToHdfsFailsWhenAppendingToWriter() throws Exception { + willThrow(new IOException()).given(writerMock).append(any(Text.class), any(Text.class)); + + hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData); + } + + @Test(expected = IOException.class) + public void testActionDataUploadToHdfsFailsWhenWriterIsNull() throws Exception { + given(seqFileWriterFactoryMock.createSequenceFileWriter(eq(configurationMock), + any(Path.class), eq(Text.class), eq(Text.class))).willReturn(null); + + hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData); + } + + @SuppressWarnings("unchecked") + private void configureMocksForHappyPath() throws Exception { + given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + PrivilegedExceptionAction<?> action = (PrivilegedExceptionAction<?>) invocation.getArguments()[0]; + return action.run(); + } + }); + + given(seqFileWriterFactoryMock.createSequenceFileWriter(eq(configurationMock), + any(Path.class), eq(Text.class), eq(Text.class))).willReturn(writerMock); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java new file mode 100644 index 0000000..9cdedb7 --- /dev/null +++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java @@ -0,0 +1,641 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.action.hadoop; + +import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS; +import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_NEW_ID; +import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_OUTPUT_PROPS; +import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_STATS; +import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.JAVA_EXCEPTION; +import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.JAVA_EXCEPTION_MESSAGE; +import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.LAUNCHER_ERROR_CODE; +import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.LAUNCHER_EXCEPTION; +import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.SECURITY_EXCEPTION; +import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.SECURITY_EXCEPTION_MESSAGE; +import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.THROWABLE; +import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.THROWABLE_MESSAGE; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +import static org.mockito.BDDMockito.given; +import static org.mockito.BDDMockito.willReturn; +import static org.mockito.BDDMockito.willThrow; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +import java.io.File; +import java.io.IOException; +import java.io.StringReader; +import java.security.PrivilegedExceptionAction; +import java.text.MessageFormat; +import java.util.Map; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.oozie.action.hadoop.LauncherAM.LauncherSecurityManager; +import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; + +@RunWith(MockitoJUnitRunner.class) +public class TestLauncherAM { + private static final String DEFAULT_CONTAINER_ID = "container_1479473450392_0001_01_000001"; + private static final String ACTIONDATA_ERROR_PROPERTIES = "error.properties"; + private static final String ACTIONDATA_FINAL_STATUS_PROPERTY = "final.status"; + private static final String ERROR_CODE_PROPERTY = "error.code"; + private static final String EXCEPTION_STACKTRACE_PROPERTY = "exception.stacktrace"; + private static final String EXCEPTION_MESSAGE_PROPERTY = "exception.message"; + private static final String ERROR_REASON_PROPERTY = "error.reason"; + + private static final String EMPTY_STRING = ""; + private static final String EXIT_CODE_1 = "1"; + private static final String EXIT_CODE_0 = "0"; + private static final String DUMMY_XML = "<dummy>dummyXml</dummy>"; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Mock + private UserGroupInformation ugiMock; + + @Mock + private AMRMClientAsyncFactory amRMClientAsyncFactoryMock; + + @Mock + private AMRMClientAsync<?> amRmAsyncClientMock; + + @Mock + private AMRMCallBackHandler callbackHandlerMock; + + @Mock + private HdfsOperations hdfsOperationsMock; + + @Mock + private LocalFsOperations localFsOperationsMock; + + @Mock + private PrepareActionsHandler prepareHandlerMock; + + @Mock + private LauncherAMCallbackNotifierFactory launcherCallbackNotifierFactoryMock; + + @Mock + private LauncherAMCallbackNotifier launcherCallbackNotifierMock; + + @Mock + private LauncherSecurityManager launcherSecurityManagerMock; + + private Configuration launcherJobConfig = new Configuration(); + + private String containerId = DEFAULT_CONTAINER_ID; + + private String applicationId = ConverterUtils.toContainerId(containerId) + .getApplicationAttemptId().getApplicationId().toString(); + + private LauncherAM launcherAM; + + private ExpectedFailureDetails failureDetails = new ExpectedFailureDetails(); + + @Before + public void setup() throws Exception { + configureMocksForHappyPath(); + launcherJobConfig.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, "1"); + instantiateLauncher(); + } + + @Test + public void testMainIsSuccessfullyInvokedWithActionData() throws Exception { + setupActionOutputContents(); + + executeLauncher(); + + verifyZeroInteractions(prepareHandlerMock); + assertSuccessfulExecution(OozieActionResult.RUNNING); + assertActionOutputDataPresentAndCorrect(); + } + + @Test + public void testMainIsSuccessfullyInvokedWithoutActionData() throws Exception { + executeLauncher(); + + verifyZeroInteractions(prepareHandlerMock); + assertSuccessfulExecution(OozieActionResult.SUCCEEDED); + assertNoActionOutputData(); + } + + @Test + public void testActionHasPrepareXML() throws Exception { + launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, DUMMY_XML); + + executeLauncher(); + + verify(prepareHandlerMock).prepareAction(eq(DUMMY_XML), any(Configuration.class)); + assertSuccessfulExecution(OozieActionResult.SUCCEEDED); + } + + @Test + public void testActionHasEmptyPrepareXML() throws Exception { + launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, EMPTY_STRING); + + executeLauncher(); + + verifyZeroInteractions(prepareHandlerMock); + assertSuccessfulExecution(OozieActionResult.SUCCEEDED); + assertNoActionOutputData(); + } + + @Test + public void testLauncherClassNotDefined() throws Exception { + launcherJobConfig.unset(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS); + + executeLauncher(); + + failureDetails.expectedExceptionMessage("Launcher class should not be null") + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason("Launcher class should not be null") + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testMainIsSuccessfullyInvokedAndAsyncErrorReceived() throws Exception { + ErrorHolder errorHolder = new ErrorHolder(); + errorHolder.setErrorCode(6); + errorHolder.setErrorMessage("dummy error"); + errorHolder.setErrorCause(new Exception()); + given(callbackHandlerMock.getError()).willReturn(errorHolder); + + executeLauncher(); + + failureDetails.expectedExceptionMessage(null) + .expectedErrorCode("6") + .expectedErrorReason("dummy error") + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testMainClassNotFound() throws Exception { + launcherJobConfig.set(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, "org.apache.non.existing.Klass"); + + executeLauncher(); + + failureDetails.expectedExceptionMessage(ClassNotFoundException.class.getCanonicalName()) + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason(ClassNotFoundException.class.getCanonicalName()) + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testLauncherJobConfCannotBeLoaded() throws Exception { + given(localFsOperationsMock.readLauncherConf()).willThrow(new RuntimeException()); + thrown.expect(RuntimeException.class); + + try { + executeLauncher(); + } finally { + failureDetails.expectedExceptionMessage(null) + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason("Could not load the Launcher AM configuration file") + .withStackTrace(); + + assertFailedExecution(); + } + } + + @Test + public void testActionPrepareFails() throws Exception { + launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, DUMMY_XML); + willThrow(new IOException()).given(prepareHandlerMock).prepareAction(anyString(), any(Configuration.class)); + thrown.expect(IOException.class); + + try { + executeLauncher(); + } finally { + failureDetails.expectedExceptionMessage(null) + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason("Prepare execution in the Launcher AM has failed") + .withStackTrace(); + + assertFailedExecution(); + } + } + + @Test + public void testActionThrowsJavaMainException() throws Exception { + setupArgsForMainClass(JAVA_EXCEPTION); + + executeLauncher(); + + failureDetails.expectedExceptionMessage(JAVA_EXCEPTION_MESSAGE) + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason(JAVA_EXCEPTION_MESSAGE) + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testActionThrowsLauncherException() throws Exception { + setupArgsForMainClass(LAUNCHER_EXCEPTION); + + executeLauncher(); + + failureDetails.expectedExceptionMessage(null) + .expectedErrorCode(String.valueOf(LAUNCHER_ERROR_CODE)) + .expectedErrorReason("exit code [" + LAUNCHER_ERROR_CODE + "]") + .withoutStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testActionThrowsSecurityExceptionWithExitCode0() throws Exception { + setupArgsForMainClass(SECURITY_EXCEPTION); + given(launcherSecurityManagerMock.getExitInvoked()).willReturn(true); + given(launcherSecurityManagerMock.getExitCode()).willReturn(0); + + executeLauncher(); + + assertSuccessfulExecution(OozieActionResult.SUCCEEDED); + } + + @Test + public void testActionThrowsSecurityExceptionWithExitCode1() throws Exception { + setupArgsForMainClass(SECURITY_EXCEPTION); + given(launcherSecurityManagerMock.getExitInvoked()).willReturn(true); + given(launcherSecurityManagerMock.getExitCode()).willReturn(1); + + executeLauncher(); + + failureDetails.expectedExceptionMessage(null) + .expectedErrorCode(EXIT_CODE_1) + .expectedErrorReason("exit code ["+ EXIT_CODE_1 + "]") + .withoutStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testActionThrowsSecurityExceptionWithoutSystemExit() throws Exception { + setupArgsForMainClass(SECURITY_EXCEPTION); + given(launcherSecurityManagerMock.getExitInvoked()).willReturn(false); + + executeLauncher(); + + failureDetails.expectedExceptionMessage(SECURITY_EXCEPTION_MESSAGE) + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason(SECURITY_EXCEPTION_MESSAGE) + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testActionThrowsThrowable() throws Exception { + setupArgsForMainClass(THROWABLE); + + executeLauncher(); + + failureDetails.expectedExceptionMessage(THROWABLE_MESSAGE) + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason(THROWABLE_MESSAGE) + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testActionThrowsThrowableAndAsyncErrorReceived() throws Exception { + setupArgsForMainClass(THROWABLE); + ErrorHolder errorHolder = new ErrorHolder(); + errorHolder.setErrorCode(6); + errorHolder.setErrorMessage("dummy error"); + errorHolder.setErrorCause(new Exception()); + given(callbackHandlerMock.getError()).willReturn(errorHolder); + + executeLauncher(); + + // sync problem overrides async problem + failureDetails.expectedExceptionMessage(THROWABLE_MESSAGE) + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason(THROWABLE_MESSAGE) + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testYarnUnregisterFails() throws Exception { + willThrow(new IOException()).given(amRmAsyncClientMock).unregisterApplicationMaster(any(FinalApplicationStatus.class), + anyString(), anyString()); + thrown.expect(IOException.class); + + try { + executeLauncher(); + } finally { + // TODO: check if this behaviour is correct (url callback: successful, but unregister fails) + assertSuccessfulExecution(OozieActionResult.SUCCEEDED); + } + } + + @Test + public void testUpdateActionDataFailsWithActionError() throws Exception { + setupActionOutputContents(); + given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_EXTERNAL_CHILD_IDS), anyInt())) + .willThrow(new IOException()); + thrown.expect(IOException.class); + + try { + executeLauncher(); + } finally { + Map<String, String> actionData = launcherAM.getActionData(); + assertThat(actionData, not(hasKey(ACTION_DATA_EXTERNAL_CHILD_IDS))); + verify(launcherCallbackNotifierMock).notifyURL(OozieActionResult.FAILED); + } + } + + @Test + public void testRecoveryIdNotSet() throws Exception { + launcherJobConfig.unset(LauncherMapper.OOZIE_ACTION_RECOVERY_ID); + instantiateLauncher(); + + executeLauncher(); + + failureDetails.expectedExceptionMessage("IO error") + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason("IO error") + .withStackTrace(); + + assertFailedExecution(); + } + + @Test + public void testRecoveryIdExistsAndRecoveryIsdMatch() throws Exception { + given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true); + given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(applicationId); + + executeLauncher(); + + verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig)); + } + + @Test + public void testRecoveryIdExistsAndRecoveryIdsDoNotMatch() throws Exception { + String newAppId = "not_matching_appid"; + given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true); + given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(newAppId); + + executeLauncher(); + + String errorMessage = MessageFormat.format( + "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", "dummy/1", + newAppId, + applicationId); + + failureDetails.expectedExceptionMessage(errorMessage) + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason(errorMessage) + .withStackTrace(); + + verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig)); + assertFailedExecution(); + } + + @Test + public void testReadingRecoveryIdFails() throws Exception { + willThrow(new IOException()).given(hdfsOperationsMock) + .writeStringToFile(any(Path.class), eq(launcherJobConfig), eq(applicationId)); + + executeLauncher(); + + failureDetails.expectedExceptionMessage("IO error") + .expectedErrorCode(EXIT_CODE_0) + .expectedErrorReason("IO error") + .withStackTrace(); + + assertFailedExecution(); + } + + private void instantiateLauncher() { + launcherAM = new LauncherAM(ugiMock, + amRMClientAsyncFactoryMock, + callbackHandlerMock, + hdfsOperationsMock, + localFsOperationsMock, + prepareHandlerMock, + launcherCallbackNotifierFactoryMock, + launcherSecurityManagerMock, + containerId); + } + + @SuppressWarnings("unchecked") + private void configureMocksForHappyPath() throws Exception { + launcherJobConfig.set(LauncherAM.OOZIE_ACTION_DIR_PATH, "dummy"); + launcherJobConfig.set(LauncherAM.OOZIE_JOB_ID, "dummy"); + launcherJobConfig.set(LauncherAM.OOZIE_ACTION_ID, "dummy"); + launcherJobConfig.set(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, LauncherAMTestMainClass.class.getCanonicalName()); + + given(localFsOperationsMock.readLauncherConf()).willReturn(launcherJobConfig); + given(localFsOperationsMock.fileExists(any(File.class))).willReturn(true); + willReturn(amRmAsyncClientMock).given(amRMClientAsyncFactoryMock).createAMRMClientAsync(anyInt()); + given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + PrivilegedExceptionAction<?> action = (PrivilegedExceptionAction<?>) invocation.getArguments()[0]; + return action.run(); + } + }); + given(launcherCallbackNotifierFactoryMock.createCallbackNotifier(any(Configuration.class))) + .willReturn(launcherCallbackNotifierMock); + } + + private void setupActionOutputContents() throws IOException { + // output files generated by an action + given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_EXTERNAL_CHILD_IDS), anyInt())) + .willReturn(ACTION_DATA_EXTERNAL_CHILD_IDS); + + given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_NEW_ID), anyInt())) + .willReturn(ACTION_DATA_NEW_ID); + + given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_OUTPUT_PROPS), anyInt())) + .willReturn(ACTION_DATA_OUTPUT_PROPS); + + given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_STATS), anyInt())) + .willReturn(ACTION_DATA_STATS); + } + + private void setupArgsForMainClass(final String... args) { + launcherJobConfig.set(String.valueOf(LauncherAM.CONF_OOZIE_ACTION_MAIN_ARG_COUNT), String.valueOf(args.length)); + + for (int i = 0; i < args.length; i++) { + launcherJobConfig.set(String.valueOf(LauncherAM.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i), args[i]); + } + } + + private void executeLauncher() throws Exception { + launcherAM.run(); + } + + @SuppressWarnings("unchecked") + private void assertSuccessfulExecution(OozieActionResult actionResult) throws Exception { + verify(amRmAsyncClientMock).registerApplicationMaster(anyString(), anyInt(), anyString()); + verify(amRmAsyncClientMock).unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, EMPTY_STRING, EMPTY_STRING); + verify(amRmAsyncClientMock).stop(); + verify(ugiMock, times(2)).doAs(any(PrivilegedExceptionAction.class)); // prepare & action main + verify(hdfsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class)); + verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class)); + verify(launcherCallbackNotifierMock).notifyURL(actionResult); + verify(hdfsOperationsMock).writeStringToFile(any(Path.class), any(Configuration.class), any(String.class)); + + Map<String, String> actionData = launcherAM.getActionData(); + verifyFinalStatus(actionData, actionResult); + verifyNoError(actionData); + } + + private void assertActionOutputDataPresentAndCorrect() { + Map<String, String> actionData = launcherAM.getActionData(); + String extChildId = actionData.get(ACTION_DATA_EXTERNAL_CHILD_IDS); + String stats = actionData.get(ACTION_DATA_STATS); + String output = actionData.get(ACTION_DATA_OUTPUT_PROPS); + String idSwap = actionData.get(ACTION_DATA_NEW_ID); + + assertThat("extChildID output", ACTION_DATA_EXTERNAL_CHILD_IDS, equalTo(extChildId)); + assertThat("stats output", ACTION_DATA_STATS, equalTo(stats)); + assertThat("action output", ACTION_DATA_OUTPUT_PROPS, equalTo(output)); + assertThat("idSwap output", ACTION_DATA_NEW_ID, equalTo(idSwap)); + } + + private void assertNoActionOutputData() { + Map<String, String> actionData = launcherAM.getActionData(); + String extChildId = actionData.get(ACTION_DATA_EXTERNAL_CHILD_IDS); + String stats = actionData.get(ACTION_DATA_STATS); + String output = actionData.get(ACTION_DATA_OUTPUT_PROPS); + String idSwap = actionData.get(ACTION_DATA_NEW_ID); + + assertThat("extChildId", extChildId, nullValue()); + assertThat("stats", stats, nullValue()); + assertThat("Output", output, nullValue()); + assertThat("idSwap", idSwap, nullValue()); + } + + private void assertFailedExecution() throws Exception { + Map<String, String> actionData = launcherAM.getActionData(); + verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class)); + verify(launcherCallbackNotifierMock).notifyURL(OozieActionResult.FAILED); + verifyFinalStatus(actionData, OozieActionResult.FAILED); + + // Note: actionData contains properties inside a property, so we have to extract them into a new Property object + String fullError = actionData.get(ACTIONDATA_ERROR_PROPERTIES); + Properties props = new Properties(); + props.load(new StringReader(fullError)); + + String errorReason = props.getProperty(ERROR_REASON_PROPERTY); + if (failureDetails.expectedErrorReason != null) { + assertThat("errorReason", errorReason, containsString(failureDetails.expectedErrorReason)); + } else { + assertThat("errorReason", errorReason, nullValue()); + } + + String exceptionMessage = props.getProperty(EXCEPTION_MESSAGE_PROPERTY); + if (failureDetails.expectedExceptionMessage != null) { + assertThat("exceptionMessage", exceptionMessage, containsString(failureDetails.expectedExceptionMessage)); + } else { + assertThat("exceptionMessage", exceptionMessage, nullValue()); + } + + String stackTrace = props.getProperty(EXCEPTION_STACKTRACE_PROPERTY); + if (failureDetails.hasStackTrace) { + assertThat("stackTrace", stackTrace, notNullValue()); + } else { + assertThat("stackTrace", stackTrace, nullValue()); + } + + String errorCode = props.getProperty(ERROR_CODE_PROPERTY); + assertThat("errorCode", errorCode, equalTo(failureDetails.expectedErrorCode)); + } + + private void verifyFinalStatus(Map<String, String> actionData, OozieActionResult actionResult) { + String finalStatus = actionData.get(ACTIONDATA_FINAL_STATUS_PROPERTY); + assertThat("actionResult", actionResult.toString(), equalTo(finalStatus)); + } + + private void verifyNoError(Map<String, String> actionData) { + String fullError = actionData.get(ACTIONDATA_ERROR_PROPERTIES); + assertThat("error properties", fullError, nullValue()); + } + + private class ExpectedFailureDetails { + String expectedExceptionMessage; + String expectedErrorCode; + String expectedErrorReason; + boolean hasStackTrace; + + public ExpectedFailureDetails expectedExceptionMessage(String expectedExceptionMessage) { + this.expectedExceptionMessage = expectedExceptionMessage; + return this; + } + + public ExpectedFailureDetails expectedErrorCode(String expectedErrorCode) { + this.expectedErrorCode = expectedErrorCode; + return this; + } + + public ExpectedFailureDetails expectedErrorReason(String expectedErrorReason) { + this.expectedErrorReason = expectedErrorReason; + return this; + } + + public ExpectedFailureDetails withStackTrace() { + this.hasStackTrace = true; + return this; + } + + public ExpectedFailureDetails withoutStackTrace() { + this.hasStackTrace = false; + return this; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/pom.xml ---------------------------------------------------------------------- diff --git a/sharelib/pig/pom.xml b/sharelib/pig/pom.xml index 99148d7..8f74ded 100644 --- a/sharelib/pig/pom.xml +++ b/sharelib/pig/pom.xml @@ -82,9 +82,9 @@ </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - <scope>provided</scope> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <scope>test</scope> </dependency> <dependency> @@ -103,8 +103,8 @@ <scope>provided</scope> </dependency> <dependency> - <groupId>org.apache.oozie</groupId> - <artifactId>oozie-hadoop-utils</artifactId> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> <scope>provided</scope> </dependency> <dependency> @@ -170,18 +170,6 @@ <outputFile>${project.build.directory}/classpath</outputFile> </configuration> </execution> - <execution> - <id>create-mrapp-generated-classpath</id> - <phase>generate-test-resources</phase> - <goals> - <goal>build-classpath</goal> - </goals> - <configuration> - <!-- needed to run the unit test for DS to generate the required classpath - that is required in the env of the launch container in the mini mr/yarn cluster --> - <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile> - </configuration> - </execution> </executions> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java ---------------------------------------------------------------------- diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java index 11cc7ee..5a9123a 100644 --- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java +++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java @@ -131,7 +131,8 @@ public class PigMain extends LauncherMain { pigProperties.store(os, ""); os.close(); - logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet()); + logMasking("pig.properties:", Arrays.asList("password"), + (Iterable<Map.Entry<String, String>>)(Iterable<?>) pigProperties.entrySet()); List<String> arguments = new ArrayList<String>(); String script = actionConf.get(PigActionExecutor.PIG_SCRIPT); @@ -148,7 +149,7 @@ public class PigMain extends LauncherMain { arguments.add("-file"); arguments.add(script); - String[] params = MapReduceMain.getStrings(actionConf, PigActionExecutor.PIG_PARAMS); + String[] params = ActionUtils.getStrings(actionConf, PigActionExecutor.PIG_PARAMS); for (String param : params) { arguments.add("-param"); arguments.add(param); @@ -193,7 +194,7 @@ public class PigMain extends LauncherMain { arguments.add("-logfile"); arguments.add(pigLog); - String[] pigArgs = MapReduceMain.getStrings(actionConf, PigActionExecutor.PIG_ARGS); + String[] pigArgs = ActionUtils.getStrings(actionConf, PigActionExecutor.PIG_ARGS); for (String pigArg : pigArgs) { if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) { throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported"); @@ -212,7 +213,7 @@ public class PigMain extends LauncherMain { System.out.println(" " + arg); } - LauncherMainHadoopUtils.killChildYarnJobs(actionConf); + LauncherMain.killChildYarnJobs(actionConf); System.out.println("================================================================="); System.out.println(); http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java ---------------------------------------------------------------------- diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java index 503d0eb..0ee4b0b 100644 --- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java +++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java @@ -135,7 +135,7 @@ public class PigMainWithOldAPI extends LauncherMain { arguments.add("-file"); arguments.add(script); - String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params"); + String[] params = ActionUtils.getStrings(actionConf, "oozie.pig.params"); for (String param : params) { arguments.add("-param"); arguments.add(param); @@ -178,7 +178,7 @@ public class PigMainWithOldAPI extends LauncherMain { arguments.add("-logfile"); arguments.add(pigLog); - String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args"); + String[] pigArgs = ActionUtils.getStrings(actionConf, "oozie.pig.args"); for (String pigArg : pigArgs) { if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) { throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
