Repository: oozie Updated Branches: refs/heads/master 230e42611 -> f45b18791
OOZIE-2923 Improve Spark options parsing (andras.piros via gezapeti) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f45b1879 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f45b1879 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f45b1879 Branch: refs/heads/master Commit: f45b18791a92201fb4959857cafbda7543942022 Parents: 230e426 Author: Gezapeti Cseh <[email protected]> Authored: Thu Jun 8 13:59:06 2017 +0200 Committer: Gezapeti Cseh <[email protected]> Committed: Thu Jun 8 13:59:17 2017 +0200 ---------------------------------------------------------------------- release-log.txt | 1 + .../oozie/action/hadoop/HadoopUriFinder.java | 66 +++ .../apache/oozie/action/hadoop/JarFilter.java | 122 +++++ .../oozie/action/hadoop/SparkArgsExtractor.java | 334 ++++++++++++ .../apache/oozie/action/hadoop/SparkMain.java | 520 +++---------------- .../action/hadoop/SparkOptionsSplitter.java | 64 +++ .../oozie/action/hadoop/TestJarFilter.java | 1 - .../action/hadoop/TestSparkArgsExtractor.java | 197 +++++++ .../oozie/action/hadoop/TestSparkMain.java | 13 - .../action/hadoop/TestSparkOptionsSplitter.java | 16 +- 10 files changed, 851 insertions(+), 483 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 5743ac9..5c5ae92 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.4.0 release (trunk - unreleased) +OOZIE-2923 Improve Spark options parsing (andras.piros via gezapeti) OOZIE-2886 Ensure consistent versioning of hadoop jars in sharelibs (dbist13 via rkanter) OOZIE-2875 Typo in ssh action twiki docs (Dongying Jiao via gezapeti) OOZIE-2387 Oozie is Unable to handle Spaces in file/archive tag. (asasvari via gezapeti) http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java new file mode 100644 index 0000000..3f9e982 --- /dev/null +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.jar.JarFile; +import java.util.jar.Manifest; + +class HadoopUriFinder { + + static String getJarVersion(final File jarFile) throws IOException { + try (final JarFile openedJarFile = new JarFile(jarFile)) { + final Manifest manifest = openedJarFile.getManifest(); + return manifest.getMainAttributes().getValue("Specification-Version"); + } + } + + static URI getFixedUri(final URI fileUri) throws URISyntaxException, IOException { + final FileSystem fs = FileSystem.get(new Configuration(true)); + return getFixedUri(fs, fileUri); + } + + /** + * Spark compares URIs based on scheme, host and port. Here we convert URIs + * into the default format so that Spark won't think those belong to + * different file system. This will avoid an extra copy of files which + * already exists on same hdfs. + * + * @param fs + * @param fileUri + * @return fixed uri + * @throws URISyntaxException + */ + static URI getFixedUri(final FileSystem fs, final URI fileUri) throws URISyntaxException { + if (fs.getUri().getScheme().equals(fileUri.getScheme()) + && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null) + && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1 + || fs.getUri().getPort() == fileUri.getPort())) { + return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(), + fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment()); + } + return fileUri; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java new file mode 100644 index 0000000..d0b4b5e --- /dev/null +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Collection; +import java.util.Iterator; + +/** + * This class is used for filtering out unwanted jars. + */ +class JarFilter { + private String sparkVersion = "1.X.X"; + private String sparkYarnJar; + private String applicationJar; + private Collection<URI> listUris = null; + + /** + * @param listUris List of URIs to be filtered + * @param jarPath Application jar + * @throws IOException + * @throws URISyntaxException + */ + JarFilter(final Collection<URI> listUris, final String jarPath) throws URISyntaxException, IOException { + this.listUris = listUris; + applicationJar = jarPath; + final Path p = new Path(jarPath); + if (p.isAbsolute()) { + applicationJar = HadoopUriFinder.getFixedUri(p.toUri()).toString(); + } + } + + /** + * Filters out the Spark yarn jar and application jar. Also records + * spark yarn jar's version. + * + * @throws OozieActionConfiguratorException + */ + void filter() throws OozieActionConfiguratorException { + final Iterator<URI> iterator = listUris.iterator(); + File matchedFile = null; + final Path applJarPath = new Path(applicationJar); + while (iterator.hasNext()) { + final URI uri = iterator.next(); + final Path p = new Path(uri); + if (SparkMain.SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) { + matchedFile = SparkMain.getMatchingFile(SparkMain.SPARK_YARN_JAR_PATTERN); + } + else if (SparkMain.SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) { + matchedFile = SparkMain.getMatchingFile(SparkMain.SPARK_ASSEMBLY_JAR_PATTERN); + } + if (matchedFile != null) { + sparkYarnJar = uri.toString(); + try { + sparkVersion = HadoopUriFinder.getJarVersion(matchedFile); + System.out.println("Spark Version " + sparkVersion); + } + catch (final IOException io) { + System.out.println( + "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion); + } + iterator.remove(); + matchedFile = null; + } + // Here we skip the application jar, because + // (if uris are same,) it will get distributed multiple times + // - one time with --files and another time as application jar. + if (isApplicationJar(p.getName(), uri, applJarPath)) { + final String fragment = uri.getFragment(); + applicationJar = fragment != null && fragment.length() > 0 ? fragment : uri.toString(); + iterator.remove(); + } + } + } + + /** + * Checks if a file is application jar + * + * @param fileName fileName name of the file + * @param fileUri fileUri URI of the file + * @param applJarPath Path of application jar + * @return true if fileName or fileUri is the application jar + */ + private boolean isApplicationJar(final String fileName, final URI fileUri, final Path applJarPath) { + return (fileName.equals(applicationJar) || fileUri.toString().equals(applicationJar) + || applJarPath.getName().equals(fileName) + || applicationJar.equals(fileUri.getFragment())); + } + + String getApplicationJar() { + return applicationJar; + } + + String getSparkYarnJar() { + return sparkYarnJar; + } + + String getSparkVersion() { + return sparkVersion; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java new file mode 100644 index 0000000..ffc95f9 --- /dev/null +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java @@ -0,0 +1,334 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang.StringUtils; +import org.apache.directory.api.util.Strings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.filecache.DistributedCache; +import org.apache.hadoop.fs.Path; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +class SparkArgsExtractor { + private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf"); + private static final String FILES_OPTION = "--files"; + private static final String ARCHIVES_OPTION = "--archives"; + private static final String LOG4J_CONFIGURATION_JAVA_OPTION = "-Dlog4j.configuration="; + private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled"; + private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled"; + private static final String PWD = "$PWD" + File.separator + "*"; + private static final String MASTER_OPTION = "--master"; + private static final String MODE_OPTION = "--deploy-mode"; + private static final String JOB_NAME_OPTION = "--name"; + private static final String CLASS_NAME_OPTION = "--class"; + private static final String VERBOSE_OPTION = "--verbose"; + private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path"; + private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath="; + private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath="; + private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions="; + private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions="; + private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*"); + private static final String SPARK_YARN_JAR = "spark.yarn.jar"; + private static final String SPARK_YARN_JARS = "spark.yarn.jars"; + private static final String OPT_SEPARATOR = "="; + private static final String OPT_VALUE_SEPARATOR = ","; + private static final String CONF_OPTION = "--conf"; + private static final String MASTER_OPTION_YARN_CLUSTER = "yarn-cluster"; + private static final String MASTER_OPTION_YARN_CLIENT = "yarn-client"; + private static final String MASTER_OPTION_YARN = "yarn"; + private static final String DEPLOY_MODE_CLUSTER = "cluster"; + private static final String DEPLOY_MODE_CLIENT = "client"; + private static final String SPARK_YARN_TAGS = "spark.yarn.tags"; + private static final String OPT_PROPERTIES_FILE = "--properties-file"; + + private boolean pySpark = false; + private final Configuration actionConf; + + SparkArgsExtractor(final Configuration actionConf) { + this.actionConf = actionConf; + } + + boolean isPySpark() { + return pySpark; + } + + List<String> extract(final String[] mainArgs) throws OozieActionConfiguratorException, IOException, URISyntaxException { + final List<String> sparkArgs = new ArrayList<>(); + + sparkArgs.add(MASTER_OPTION); + final String master = actionConf.get(SparkActionExecutor.SPARK_MASTER); + sparkArgs.add(master); + + // In local mode, everything runs here in the Launcher Job. + // In yarn-client mode, the driver runs here in the Launcher Job and the + // executor in Yarn. + // In yarn-cluster mode, the driver and executor run in Yarn. + final String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE); + if (sparkDeployMode != null) { + sparkArgs.add(MODE_OPTION); + sparkArgs.add(sparkDeployMode); + } + final boolean yarnClusterMode = master.equals(MASTER_OPTION_YARN_CLUSTER) + || (master.equals(MASTER_OPTION_YARN) && sparkDeployMode != null && sparkDeployMode.equals(DEPLOY_MODE_CLUSTER)); + final boolean yarnClientMode = master.equals(MASTER_OPTION_YARN_CLIENT) + || (master.equals(MASTER_OPTION_YARN) && sparkDeployMode != null && sparkDeployMode.equals(DEPLOY_MODE_CLIENT)); + + sparkArgs.add(JOB_NAME_OPTION); + sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME)); + + final String className = actionConf.get(SparkActionExecutor.SPARK_CLASS); + if (className != null) { + sparkArgs.add(CLASS_NAME_OPTION); + sparkArgs.add(className); + } + + appendOoziePropertiesToSparkConf(sparkArgs); + + String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); + if (jarPath != null && jarPath.endsWith(".py")) { + pySpark = true; + } + boolean addedHiveSecurityToken = false; + boolean addedHBaseSecurityToken = false; + boolean addedLog4jDriverSettings = false; + boolean addedLog4jExecutorSettings = false; + final StringBuilder driverClassPath = new StringBuilder(); + final StringBuilder executorClassPath = new StringBuilder(); + final StringBuilder userFiles = new StringBuilder(); + final StringBuilder userArchives = new StringBuilder(); + final String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); + if (StringUtils.isNotEmpty(sparkOpts)) { + final List<String> sparkOptions = SparkOptionsSplitter.splitSparkOpts(sparkOpts); + for (int i = 0; i < sparkOptions.size(); i++) { + String opt = sparkOptions.get(i); + boolean addToSparkArgs = true; + if (yarnClusterMode || yarnClientMode) { + if (opt.startsWith(EXECUTOR_CLASSPATH)) { + appendWithPathSeparator(opt.substring(EXECUTOR_CLASSPATH.length()), executorClassPath); + addToSparkArgs = false; + } + if (opt.startsWith(DRIVER_CLASSPATH)) { + appendWithPathSeparator(opt.substring(DRIVER_CLASSPATH.length()), driverClassPath); + addToSparkArgs = false; + } + if (opt.equals(DRIVER_CLASSPATH_OPTION)) { + // we need the next element after this option + appendWithPathSeparator(sparkOptions.get(i + 1), driverClassPath); + // increase i to skip the next element. + i++; + addToSparkArgs = false; + } + } + if (opt.startsWith(HIVE_SECURITY_TOKEN)) { + addedHiveSecurityToken = true; + } + if (opt.startsWith(HBASE_SECURITY_TOKEN)) { + addedHBaseSecurityToken = true; + } + if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) { + if (!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) { + opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS; + } else { + System.out.println("Warning: Spark Log4J settings are overwritten." + + " Child job IDs may not be available"); + } + if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS)) { + addedLog4jExecutorSettings = true; + } else { + addedLog4jDriverSettings = true; + } + } + if (opt.startsWith(FILES_OPTION)) { + final String userFile; + if (opt.contains(OPT_SEPARATOR)) { + userFile = opt.substring(opt.indexOf(OPT_SEPARATOR) + OPT_SEPARATOR.length()); + } + else { + userFile = sparkOptions.get(i + 1); + i++; + } + if (userFiles.length() > 0) { + userFiles.append(OPT_VALUE_SEPARATOR); + } + userFiles.append(userFile); + addToSparkArgs = false; + } + if (opt.startsWith(ARCHIVES_OPTION)) { + final String userArchive; + if (opt.contains(OPT_SEPARATOR)) { + userArchive = opt.substring(opt.indexOf(OPT_SEPARATOR) + OPT_SEPARATOR.length()); + } + else { + userArchive = sparkOptions.get(i + 1); + i++; + } + if (userArchives.length() > 0) { + userArchives.append(OPT_VALUE_SEPARATOR); + } + userArchives.append(userArchive); + addToSparkArgs = false; + } + if (addToSparkArgs) { + sparkArgs.add(opt); + } + else if (sparkArgs.get(sparkArgs.size() - 1).equals(CONF_OPTION)) { + sparkArgs.remove(sparkArgs.size() - 1); + } + } + } + + if ((yarnClusterMode || yarnClientMode)) { + // Include the current working directory (of executor container) + // in executor classpath, because it will contain localized + // files + appendWithPathSeparator(PWD, executorClassPath); + appendWithPathSeparator(PWD, driverClassPath); + + sparkArgs.add(CONF_OPTION); + sparkArgs.add(EXECUTOR_CLASSPATH + executorClassPath.toString()); + + sparkArgs.add(CONF_OPTION); + sparkArgs.add(DRIVER_CLASSPATH + driverClassPath.toString()); + } + + if (actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS) != null) { + sparkArgs.add(CONF_OPTION); + sparkArgs.add(SPARK_YARN_TAGS + OPT_SEPARATOR + actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS)); + } + + if (!addedHiveSecurityToken) { + sparkArgs.add(CONF_OPTION); + sparkArgs.add(HIVE_SECURITY_TOKEN + OPT_SEPARATOR + Boolean.toString(false)); + } + if (!addedHBaseSecurityToken) { + sparkArgs.add(CONF_OPTION); + sparkArgs.add(HBASE_SECURITY_TOKEN + OPT_SEPARATOR + Boolean.toString(false)); + } + if (!addedLog4jExecutorSettings) { + sparkArgs.add(CONF_OPTION); + sparkArgs.add(EXECUTOR_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS); + } + if (!addedLog4jDriverSettings) { + sparkArgs.add(CONF_OPTION); + sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS); + } + final File defaultConfFile = SparkMain.getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN); + if (defaultConfFile != null) { + sparkArgs.add(OPT_PROPERTIES_FILE); + sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString()); + } + + if ((yarnClusterMode || yarnClientMode)) { + final Map<String, URI> fixedFileUrisMap = + SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf)); + fixedFileUrisMap.put(SparkMain.SPARK_LOG4J_PROPS, new Path(SparkMain.SPARK_LOG4J_PROPS).toUri()); + fixedFileUrisMap.put(SparkMain.HIVE_SITE_CONF, new Path(SparkMain.HIVE_SITE_CONF).toUri()); + addUserDefined(userFiles.toString(), fixedFileUrisMap); + final Collection<URI> fixedFileUris = fixedFileUrisMap.values(); + final JarFilter jarFilter = new JarFilter(fixedFileUris, jarPath); + jarFilter.filter(); + jarPath = jarFilter.getApplicationJar(); + + final String cachedFiles = StringUtils.join(fixedFileUris, OPT_VALUE_SEPARATOR); + if (cachedFiles != null && !cachedFiles.isEmpty()) { + sparkArgs.add(FILES_OPTION); + sparkArgs.add(cachedFiles); + } + final Map<String, URI> fixedArchiveUrisMap = SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache. + getCacheArchives(actionConf)); + addUserDefined(userArchives.toString(), fixedArchiveUrisMap); + final String cachedArchives = StringUtils.join(fixedArchiveUrisMap.values(), OPT_VALUE_SEPARATOR); + if (cachedArchives != null && !cachedArchives.isEmpty()) { + sparkArgs.add(ARCHIVES_OPTION); + sparkArgs.add(cachedArchives); + } + setSparkYarnJarsConf(sparkArgs, jarFilter.getSparkYarnJar(), jarFilter.getSparkVersion()); + } + + if (!sparkArgs.contains(VERBOSE_OPTION)) { + sparkArgs.add(VERBOSE_OPTION); + } + + sparkArgs.add(jarPath); + sparkArgs.addAll(Arrays.asList(mainArgs)); + + return sparkArgs; + } + + private void appendWithPathSeparator(final String what, final StringBuilder to) { + if (to.length() > 0) { + to.append(File.pathSeparator); + } + to.append(what); + } + + private void addUserDefined(final String userList, final Map<String, URI> urisMap) { + if (userList != null) { + for (final String file : userList.split(OPT_VALUE_SEPARATOR)) { + if (!Strings.isEmpty(file)) { + final Path p = new Path(file); + urisMap.put(p.getName(), p.toUri()); + } + } + } + } + + /* + * Get properties that needs to be passed to Spark as Spark configuration from actionConf. + */ + @VisibleForTesting + void appendOoziePropertiesToSparkConf(final List<String> sparkArgs) { + for (final Map.Entry<String, String> oozieConfig : actionConf + .getValByRegex("^oozie\\.(?!launcher|spark).+").entrySet()) { + sparkArgs.add(CONF_OPTION); + sparkArgs.add(String.format("spark.%s=%s", oozieConfig.getKey(), oozieConfig.getValue())); + } + } + + /** + * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X. + * + * @param sparkArgs + * @param sparkYarnJar + * @param sparkVersion + */ + private void setSparkYarnJarsConf(final List<String> sparkArgs, final String sparkYarnJar, final String sparkVersion) { + if (SPARK_VERSION_1.matcher(sparkVersion).find()) { + // In Spark 1.X.X, set spark.yarn.jar to avoid + // multiple distribution + sparkArgs.add(CONF_OPTION); + sparkArgs.add(SPARK_YARN_JAR + OPT_SEPARATOR + sparkYarnJar); + } else { + // In Spark 2.X.X, set spark.yarn.jars + sparkArgs.add(CONF_OPTION); + sparkArgs.add(SPARK_YARN_JARS + OPT_SEPARATOR + sparkYarnJar); + } + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index 68f7a60..674839a 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -24,20 +24,13 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URI; import java.net.URISyntaxException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.jar.JarFile; -import java.util.jar.Manifest; import java.util.regex.Pattern; import org.apache.commons.io.FileUtils; -import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -47,243 +40,56 @@ import org.apache.spark.deploy.SparkSubmit; import com.google.common.annotations.VisibleForTesting; public class SparkMain extends LauncherMain { - private static final String MASTER_OPTION = "--master"; - private static final String MODE_OPTION = "--deploy-mode"; - private static final String JOB_NAME_OPTION = "--name"; - private static final String CLASS_NAME_OPTION = "--class"; - private static final String VERBOSE_OPTION = "--verbose"; - private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path"; - private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath="; - private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath="; - private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions="; - private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions="; - private static final String LOG4J_CONFIGURATION_JAVA_OPTION = "-Dlog4j.configuration="; - private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled"; - private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled"; - private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir"; - private static final String PWD = "$PWD" + File.separator + "*"; - private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"), - Pattern.compile("pyspark.zip") }; - private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf"); - private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties"; + @VisibleForTesting static final Pattern[] SPARK_JOB_IDS_PATTERNS = { Pattern.compile("Submitted application (application[0-9_]*)") }; - public static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern + @VisibleForTesting + static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern .compile("^spark-assembly((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$"); - public static final Pattern SPARK_YARN_JAR_PATTERN = Pattern + @VisibleForTesting + static final Pattern SPARK_YARN_JAR_PATTERN = Pattern .compile("^spark-yarn((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$"); - private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*"); - private static final String SPARK_YARN_JAR = "spark.yarn.jar"; - private static final String SPARK_YARN_JARS = "spark.yarn.jars"; - public static final String HIVE_SITE_CONF = "hive-site.xml"; - public static final String FILES_OPTION = "--files"; - public static final String ARCHIVES_OPTION = "--archives"; - - public static void main(String[] args) throws Exception { + static final String HIVE_SITE_CONF = "hive-site.xml"; + static final String SPARK_LOG4J_PROPS = "spark-log4j.properties"; + + private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir"; + private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"), + Pattern.compile("pyspark.zip") }; + + public static void main(final String[] args) throws Exception { run(SparkMain.class, args); } @Override - protected void run(String[] args) throws Exception { - boolean isPyspark = false; - Configuration actionConf = loadActionConf(); + protected void run(final String[] args) throws Exception { + final Configuration actionConf = loadActionConf(); prepareHadoopConfig(actionConf); setYarnTag(actionConf); LauncherMain.killChildYarnJobs(actionConf); - String logFile = setUpSparkLog4J(actionConf); + final String logFile = setUpSparkLog4J(actionConf); setHiveSite(actionConf); - List<String> sparkArgs = new ArrayList<String>(); - - sparkArgs.add(MASTER_OPTION); - String master = actionConf.get(SparkActionExecutor.SPARK_MASTER); - sparkArgs.add(master); - - // In local mode, everything runs here in the Launcher Job. - // In yarn-client mode, the driver runs here in the Launcher Job and the - // executor in Yarn. - // In yarn-cluster mode, the driver and executor run in Yarn. - String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE); - if (sparkDeployMode != null) { - sparkArgs.add(MODE_OPTION); - sparkArgs.add(sparkDeployMode); - } - boolean yarnClusterMode = master.equals("yarn-cluster") - || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster")); - boolean yarnClientMode = master.equals("yarn-client") - || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client")); - - sparkArgs.add(JOB_NAME_OPTION); - sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME)); - - String className = actionConf.get(SparkActionExecutor.SPARK_CLASS); - if (className != null) { - sparkArgs.add(CLASS_NAME_OPTION); - sparkArgs.add(className); - } - - appendOoziePropertiesToSparkConf(sparkArgs, actionConf); - - String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); - if(jarPath!=null && jarPath.endsWith(".py")){ - isPyspark = true; - } - boolean addedHiveSecurityToken = false; - boolean addedHBaseSecurityToken = false; - boolean addedLog4jDriverSettings = false; - boolean addedLog4jExecutorSettings = false; - StringBuilder driverClassPath = new StringBuilder(); - StringBuilder executorClassPath = new StringBuilder(); - String userFiles = null, userArchives = null; - String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); - if (StringUtils.isNotEmpty(sparkOpts)) { - List<String> sparkOptions = splitSparkOpts(sparkOpts); - for (int i = 0; i < sparkOptions.size(); i++) { - String opt = sparkOptions.get(i); - boolean addToSparkArgs = true; - if (yarnClusterMode || yarnClientMode) { - if (opt.startsWith(EXECUTOR_CLASSPATH)) { - appendWithPathSeparator(opt.substring(EXECUTOR_CLASSPATH.length()), executorClassPath); - addToSparkArgs = false; - } - if (opt.startsWith(DRIVER_CLASSPATH)) { - appendWithPathSeparator(opt.substring(DRIVER_CLASSPATH.length()), driverClassPath); - addToSparkArgs = false; - } - if (opt.equals(DRIVER_CLASSPATH_OPTION)) { - // we need the next element after this option - appendWithPathSeparator(sparkOptions.get(i + 1), driverClassPath); - // increase i to skip the next element. - i++; - addToSparkArgs = false; - } - } - if (opt.startsWith(HIVE_SECURITY_TOKEN)) { - addedHiveSecurityToken = true; - } - if (opt.startsWith(HBASE_SECURITY_TOKEN)) { - addedHBaseSecurityToken = true; - } - if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) { - if(!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) { - opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS; - }else{ - System.out.println("Warning: Spark Log4J settings are overwritten." + - " Child job IDs may not be available"); - } - if(opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS)) { - addedLog4jExecutorSettings = true; - }else{ - addedLog4jDriverSettings = true; - } - } - if(opt.startsWith(FILES_OPTION)) { - userFiles = sparkOptions.get(i + 1); - i++; - addToSparkArgs = false; - } - if(opt.startsWith(ARCHIVES_OPTION)) { - userArchives = sparkOptions.get(i + 1); - i++; - addToSparkArgs = false; - } - if(addToSparkArgs) { - sparkArgs.add(opt); - } - } - } - - if ((yarnClusterMode || yarnClientMode)) { - // Include the current working directory (of executor container) - // in executor classpath, because it will contain localized - // files - appendWithPathSeparator(PWD, executorClassPath); - appendWithPathSeparator(PWD, driverClassPath); - - sparkArgs.add("--conf"); - sparkArgs.add(EXECUTOR_CLASSPATH + executorClassPath.toString()); - - sparkArgs.add("--conf"); - sparkArgs.add(DRIVER_CLASSPATH + driverClassPath.toString()); - } - - if (actionConf.get(MAPREDUCE_JOB_TAGS) != null) { - sparkArgs.add("--conf"); - sparkArgs.add("spark.yarn.tags=" + actionConf.get(MAPREDUCE_JOB_TAGS)); - } - if (!addedHiveSecurityToken) { - sparkArgs.add("--conf"); - sparkArgs.add(HIVE_SECURITY_TOKEN + "=false"); - } - if (!addedHBaseSecurityToken) { - sparkArgs.add("--conf"); - sparkArgs.add(HBASE_SECURITY_TOKEN + "=false"); - } - if(!addedLog4jExecutorSettings) { - sparkArgs.add("--conf"); - sparkArgs.add(EXECUTOR_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS); - } - if(!addedLog4jDriverSettings) { - sparkArgs.add("--conf"); - sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS); - } - File defaultConfFile = getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN); - if (defaultConfFile != null) { - sparkArgs.add("--properties-file"); - sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString()); - } - - if ((yarnClusterMode || yarnClientMode)) { - Map<String, URI> fixedFileUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf)); - fixedFileUrisMap.put(SPARK_LOG4J_PROPS, new Path(SPARK_LOG4J_PROPS).toUri()); - fixedFileUrisMap.put(HIVE_SITE_CONF, new Path(HIVE_SITE_CONF).toUri()); - addUserDefined(userFiles, fixedFileUrisMap); - Collection<URI> fixedFileUris = fixedFileUrisMap.values(); - JarFilter jarfilter = new JarFilter(fixedFileUris, jarPath); - jarfilter.filter(); - jarPath = jarfilter.getApplicationJar(); - - String cachedFiles = StringUtils.join(fixedFileUris, ","); - if (cachedFiles != null && !cachedFiles.isEmpty()) { - sparkArgs.add("--files"); - sparkArgs.add(cachedFiles); - } - Map<String, URI> fixedArchiveUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache. - getCacheArchives(actionConf)); - addUserDefined(userArchives, fixedArchiveUrisMap); - String cachedArchives = StringUtils.join(fixedArchiveUrisMap.values(), ","); - if (cachedArchives != null && !cachedArchives.isEmpty()) { - sparkArgs.add("--archives"); - sparkArgs.add(cachedArchives); - } - setSparkYarnJarsConf(sparkArgs, jarfilter.getSparkYarnJar(), jarfilter.getSparkVersion()); - } + final SparkArgsExtractor sparkArgsExtractor = new SparkArgsExtractor(actionConf); + final List<String> sparkArgs = sparkArgsExtractor.extract(args); - if (!sparkArgs.contains(VERBOSE_OPTION)) { - sparkArgs.add(VERBOSE_OPTION); - } - - sparkArgs.add(jarPath); - for (String arg : args) { - sparkArgs.add(arg); - } - if (isPyspark){ + if (sparkArgsExtractor.isPySpark()){ createPySparkLibFolder(); } - System.out.println("Spark Action Main class : " + SparkSubmit.class.getName()); System.out.println(); System.out.println("Oozie Spark action configuration"); System.out.println("================================================================="); System.out.println(); - PasswordMasker passwordMasker = new PasswordMasker(); - for (String arg : sparkArgs) { + + final PasswordMasker passwordMasker = new PasswordMasker(); + for (final String arg : sparkArgs) { System.out.println(" " + passwordMasker.maskPasswordsIfNecessary(arg)); } System.out.println(); + try { runSpark(sparkArgs.toArray(new String[sparkArgs.size()])); } @@ -293,21 +99,12 @@ public class SparkMain extends LauncherMain { } } - private void addUserDefined(String userList, Map<String, URI> urisMap) { - if(userList != null) { - for (String file : userList.split(",")) { - Path p = new Path(file); - urisMap.put(p.getName(), p.toUri()); - } - } - } - - private void prepareHadoopConfig(Configuration actionConf) throws IOException { + private void prepareHadoopConfig(final Configuration actionConf) throws IOException { // Copying oozie.action.conf.xml into hadoop configuration *-site files. if (actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, false)) { - String actionXml = System.getProperty("oozie.action.conf.xml"); + final String actionXml = System.getProperty("oozie.action.conf.xml"); if (actionXml != null) { - File currentDir = new File(actionXml).getParentFile(); + final File currentDir = new File(actionXml).getParentFile(); writeHadoopConfig(actionXml, currentDir); } } @@ -321,15 +118,15 @@ public class SparkMain extends LauncherMain { * @throws IOException if there is an error during file copy */ private void createPySparkLibFolder() throws OozieActionConfiguratorException, IOException { - File pythonLibDir = new File("python/lib"); + final File pythonLibDir = new File("python/lib"); if(!pythonLibDir.exists()){ pythonLibDir.mkdirs(); System.out.println("PySpark lib folder " + pythonLibDir.getAbsolutePath() + " folder created."); } - for(Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) { - File file = getMatchingPyFile(fileNamePattern); - File destination = new File(pythonLibDir, file.getName()); + for(final Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) { + final File file = getMatchingPyFile(fileNamePattern); + final File destination = new File(pythonLibDir, file.getName()); FileUtils.copyFile(file, destination); System.out.println("Copied " + file + " to " + destination.getAbsolutePath()); } @@ -342,8 +139,8 @@ public class SparkMain extends LauncherMain { * @return the file if there is one * @throws OozieActionConfiguratorException if there is are no files matching the pattern */ - private File getMatchingPyFile(Pattern fileNamePattern) throws OozieActionConfiguratorException { - File f = getMatchingFile(fileNamePattern); + private File getMatchingPyFile(final Pattern fileNamePattern) throws OozieActionConfiguratorException { + final File f = getMatchingFile(fileNamePattern); if (f != null) { return f; } @@ -359,22 +156,24 @@ public class SparkMain extends LauncherMain { * @param fileNamePattern the pattern to look for * @return the file if there is one else it returns null */ - private static File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException { - File localDir = new File("."); - String[] files = localDir.list(); - - if (files != null) { - for(String fileName : files){ - if(fileNamePattern.matcher(fileName).find()){ - return new File(fileName); - } + static File getMatchingFile(final Pattern fileNamePattern) { + final File localDir = new File("."); + + final String[] localFileNames = localDir.list(); + if (localFileNames == null) { + return null; + } + + for (final String fileName : localFileNames){ + if (fileNamePattern.matcher(fileName).find()){ + return new File(fileName); } } return null; } - private void runSpark(String[] args) throws Exception { + private void runSpark(final String[] args) throws Exception { System.out.println("================================================================="); System.out.println(); System.out.println(">>> Invoking Spark class now >>>"); @@ -383,49 +182,16 @@ public class SparkMain extends LauncherMain { SparkSubmit.main(args); } - /** - * Converts the options to be Spark-compatible. - * <ul> - * <li>Parameters are separated by whitespace and can be groupped using double quotes</li> - * <li>Quotes should be removed</li> - * <li>Adjacent whitespace separators are treated as one</li> - * </ul> - * @param sparkOpts the options for Spark - * @return the options parsed into a list - */ - static List<String> splitSparkOpts(String sparkOpts){ - List<String> result = new ArrayList<String>(); - StringBuilder currentWord = new StringBuilder(); - boolean insideQuote = false; - for (int i = 0; i < sparkOpts.length(); i++) { - char c = sparkOpts.charAt(i); - if (c == '"') { - insideQuote = !insideQuote; - } else if (Character.isWhitespace(c) && !insideQuote) { - if (currentWord.length() > 0) { - result.add(currentWord.toString()); - currentWord.setLength(0); - } - } else { - currentWord.append(c); - } - } - if(currentWord.length()>0) { - result.add(currentWord.toString()); - } - return result; - } - - private String setUpSparkLog4J(Configuration distcpConf) throws IOException { + private String setUpSparkLog4J(final Configuration actionConf) throws IOException { // Logfile to capture job IDs - String hadoopJobId = System.getProperty("oozie.launcher.job.id"); + final String hadoopJobId = System.getProperty("oozie.launcher.job.id"); if (hadoopJobId == null) { throw new RuntimeException("Launcher Hadoop Job ID system,property not set"); } - String logFile = new File("spark-oozie-" + hadoopJobId + ".log").getAbsolutePath(); + final String logFile = new File("spark-oozie-" + hadoopJobId + ".log").getAbsolutePath(); - String logLevel = distcpConf.get("oozie.spark.log.level", "INFO"); - String rootLogLevel = distcpConf.get("oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL, "INFO"); + final String logLevel = actionConf.get("oozie.spark.log.level", "INFO"); + final String rootLogLevel = actionConf.get("oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL, "INFO"); log4jProperties.setProperty("log4j.rootLogger", rootLogLevel + ", A"); log4jProperties.setProperty("log4j.logger.org.apache.spark", logLevel + ", A, jobid"); @@ -441,7 +207,7 @@ public class SparkMain extends LauncherMain { log4jProperties.setProperty("log4j.logger.org.apache.hadoop.mapreduce.Job", "INFO, jobid"); log4jProperties.setProperty("log4j.logger.org.apache.hadoop.yarn.client.api.impl.YarnClientImpl", "INFO, jobid"); - String localProps = new File(SPARK_LOG4J_PROPS).getAbsolutePath(); + final String localProps = new File(SPARK_LOG4J_PROPS).getAbsolutePath(); try (OutputStream os1 = new FileOutputStream(localProps)) { log4jProperties.store(os1, ""); } @@ -458,100 +224,27 @@ public class SparkMain extends LauncherMain { * @throws IOException * @throws URISyntaxException */ - static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(URI[] files) throws IOException, URISyntaxException { - Map<String, URI> map= new HashMap<>(); + static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] files) throws IOException, URISyntaxException { + final Map<String, URI> map= new LinkedHashMap<>(); if (files == null) { return map; } - FileSystem fs = FileSystem.get(new Configuration(true)); + final FileSystem fs = FileSystem.get(new Configuration(true)); for (int i = 0; i < files.length; i++) { - URI fileUri = files[i]; - Path p = new Path(fileUri); - map.put(p.getName(), getFixedUri(fs, fileUri)); + final URI fileUri = files[i]; + final Path p = new Path(fileUri); + map.put(p.getName(), HadoopUriFinder.getFixedUri(fs, fileUri)); } return map; } /** - * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X. - * - * @param sparkArgs - * @param sparkYarnJar - * @param sparkVersion - */ - private void setSparkYarnJarsConf(List<String> sparkArgs, String sparkYarnJar, String sparkVersion) { - if (SPARK_VERSION_1.matcher(sparkVersion).find()) { - // In Spark 1.X.X, set spark.yarn.jar to avoid - // multiple distribution - sparkArgs.add("--conf"); - sparkArgs.add(SPARK_YARN_JAR + "=" + sparkYarnJar); - } - else { - // In Spark 2.X.X, set spark.yarn.jars - sparkArgs.add("--conf"); - sparkArgs.add(SPARK_YARN_JARS + "=" + sparkYarnJar); - } - } - - private static String getJarVersion(File jarFile) throws IOException { - @SuppressWarnings("resource") - Manifest manifest = new JarFile(jarFile).getManifest(); - return manifest.getMainAttributes().getValue("Specification-Version"); - } - - /* - * Get properties that needs to be passed to Spark as Spark configuration from actionConf. - */ - @VisibleForTesting - protected void appendOoziePropertiesToSparkConf(List<String> sparkArgs, Configuration actionConf) { - for (Map.Entry<String, String> oozieConfig : actionConf - .getValByRegex("^oozie\\.(?!launcher|spark).+").entrySet()) { - sparkArgs.add("--conf"); - sparkArgs.add(String.format("spark.%s=%s", oozieConfig.getKey(), oozieConfig.getValue())); - } - } - - private void appendWithPathSeparator(String what, StringBuilder to){ - if(to.length() > 0){ - to.append(File.pathSeparator); - } - to.append(what); - } - - private static URI getFixedUri(URI fileUri) throws URISyntaxException, IOException { - FileSystem fs = FileSystem.get(new Configuration(true)); - return getFixedUri(fs, fileUri); - } - - /** - * Spark compares URIs based on scheme, host and port. Here we convert URIs - * into the default format so that Spark won't think those belong to - * different file system. This will avoid an extra copy of files which - * already exists on same hdfs. - * - * @param fs - * @param fileUri - * @return fixed uri - * @throws URISyntaxException - */ - private static URI getFixedUri(FileSystem fs, URI fileUri) throws URISyntaxException { - if (fs.getUri().getScheme().equals(fileUri.getScheme()) - && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null) - && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1 - || fs.getUri().getPort() == fileUri.getPort())) { - return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(), - fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment()); - } - return fileUri; - } - - /** * Sets up hive-site.xml * * @param hiveConf * @throws IOException */ - private void setHiveSite(Configuration hiveConf) throws IOException { + private void setHiveSite(final Configuration hiveConf) throws IOException { // See https://issues.apache.org/jira/browse/HIVE-1411 hiveConf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG"); @@ -579,99 +272,4 @@ public class SparkMain extends LauncherMain { // to null. HiveConf.setHiveSiteLocation(HiveConf.class.getClassLoader().getResource("hive-site.xml")); } - - /** - * This class is used for filtering out unwanted jars. - */ - static class JarFilter { - private String sparkVersion = "1.X.X"; - private String sparkYarnJar; - private String applicationJar; - private Collection<URI> listUris = null; - - /** - * @param listUris List of URIs to be filtered - * @param jarPath Application jar - * @throws IOException - * @throws URISyntaxException - */ - public JarFilter(Collection<URI> listUris, String jarPath) throws URISyntaxException, IOException { - this.listUris = listUris; - applicationJar = jarPath; - Path p = new Path(jarPath); - if (p.isAbsolute()) { - applicationJar = getFixedUri(p.toUri()).toString(); - } - } - - /** - * Filters out the Spark yarn jar and application jar. Also records - * spark yarn jar's version. - * - * @throws OozieActionConfiguratorException - */ - public void filter() throws OozieActionConfiguratorException { - Iterator<URI> iterator = listUris.iterator(); - File matchedFile = null; - Path applJarPath = new Path(applicationJar); - while (iterator.hasNext()) { - URI uri = iterator.next(); - Path p = new Path(uri); - if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) { - matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN); - } - else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) { - matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN); - } - if (matchedFile != null) { - sparkYarnJar = uri.toString(); - try { - sparkVersion = getJarVersion(matchedFile); - System.out.println("Spark Version " + sparkVersion); - } - catch (IOException io) { - System.out.println( - "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion); - } - iterator.remove(); - matchedFile = null; - } - // Here we skip the application jar, because - // (if uris are same,) it will get distributed multiple times - // - one time with --files and another time as application jar. - if (isApplicationJar(p.getName(), uri, applJarPath)) { - String fragment = uri.getFragment(); - applicationJar = fragment != null && fragment.length() > 0 ? fragment : uri.toString(); - iterator.remove(); - } - } - } - - /** - * Checks if a file is application jar - * - * @param fileName fileName name of the file - * @param fileUri fileUri URI of the file - * @param applJarPath Path of application jar - * @return true if fileName or fileUri is the application jar - */ - private boolean isApplicationJar(String fileName, URI fileUri, Path applJarPath) { - return (fileName.equals(applicationJar) || fileUri.toString().equals(applicationJar) - || applJarPath.getName().equals(fileName) - || applicationJar.equals(fileUri.getFragment())); - } - - public String getApplicationJar() { - return applicationJar; - } - - public String getSparkYarnJar() { - return sparkYarnJar; - } - - public String getSparkVersion() { - return sparkVersion; - } - - } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java new file mode 100644 index 0000000..30def6f --- /dev/null +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import java.util.ArrayList; +import java.util.List; + +class SparkOptionsSplitter { + + /** + * Converts the options to be Spark-compatible. + * <ul> + * <li>Parameters are separated by whitespace and can be groupped using double quotes</li> + * <li>Quotes should be removed</li> + * <li>Adjacent whitespace separators are treated as one</li> + * </ul> + * + * @param sparkOpts the options for Spark + * @return the options parsed into a list + */ + static List<String> splitSparkOpts(final String sparkOpts) { + final List<String> result = new ArrayList<String>(); + final StringBuilder currentWord = new StringBuilder(); + + boolean insideQuote = false; + for (int i = 0; i < sparkOpts.length(); i++) { + final char c = sparkOpts.charAt(i); + if (c == '"') { + insideQuote = !insideQuote; + } + else if (Character.isWhitespace(c) && !insideQuote) { + if (currentWord.length() > 0) { + result.add(currentWord.toString()); + currentWord.setLength(0); + } + } + else { + currentWord.append(c); + } + } + + if (currentWord.length() > 0) { + result.add(currentWord.toString()); + } + + return result; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java index 2d4c83c..ff1b3ce 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java @@ -31,7 +31,6 @@ import java.util.jar.Attributes; import java.util.jar.JarOutputStream; import java.util.jar.Manifest; -import org.apache.oozie.action.hadoop.SparkMain.JarFilter; import org.junit.Test; public class TestJarFilter { http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java new file mode 100644 index 0000000..7db26a6 --- /dev/null +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class TestSparkArgsExtractor { + + @Test + public void testAppendOoziePropertiesToSparkConf() throws Exception { + final List<String> sparkArgs = new ArrayList<>(); + final Configuration actionConf = new Configuration(); + actionConf.set("foo", "foo-not-to-include"); + actionConf.set("oozie.launcher", "launcher-not-to-include"); + actionConf.set("oozie.spark", "spark-not-to-include"); + actionConf.set("oozie.bar", "bar"); + + new SparkArgsExtractor(actionConf).appendOoziePropertiesToSparkConf(sparkArgs); + + assertEquals(Lists.newArrayList("--conf", "spark.oozie.bar=bar"), sparkArgs); + } + + @Test + public void testLocalClientArgsParsing() throws Exception { + final Configuration actionConf = new Configuration(); + actionConf.set(SparkActionExecutor.SPARK_MASTER, "local[*]"); + actionConf.set(SparkActionExecutor.SPARK_MODE, "client"); + actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); + actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); + actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1042M " + + "--conf spark.executor.extraClassPath=aaa " + + "--conf user.property.after.spark.executor.extraClassPath=bbb " + + "--conf spark.driver.extraClassPath=ccc " + + "--conf user.property.after.spark.driver.extraClassPath=ddd " + + "--conf spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp\""); + actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar"); + + final String[] mainArgs = {"arg0", "arg1"}; + final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs); + + assertEquals("Spark args mismatch", + Lists.newArrayList("--master", "local[*]", + "--deploy-mode", "client", + "--name", "Spark Copy File", + "--class", "org.apache.oozie.example.SparkFileCopy", + "--driver-memory", "1042M", + "--conf", "spark.executor.extraClassPath=aaa", + "--conf", "user.property.after.spark.executor.extraClassPath=bbb", + "--conf", "spark.driver.extraClassPath=ccc", + "--conf", "user.property.after.spark.driver.extraClassPath=ddd", + "--conf", "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError " + + "-XX:HeapDumpPath=/tmp -Dlog4j.configuration=spark-log4j.properties", + "--conf", "spark.yarn.security.tokens.hive.enabled=false", + "--conf", "spark.yarn.security.tokens.hbase.enabled=false", + "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties", + "--verbose", + "/lib/test.jar", + "arg0", + "arg1"), + sparkArgs); + } + + @Test + public void testYarnClientExecutorAndDriverExtraClasspathsArgsParsing() throws Exception { + final Configuration actionConf = new Configuration(); + actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn"); + actionConf.set(SparkActionExecutor.SPARK_MODE, "client"); + actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); + actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); + actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1042M " + + "--conf spark.executor.extraClassPath=aaa " + + "--conf user.property.after.spark.executor.extraClassPath=bbb " + + "--conf spark.driver.extraClassPath=ccc " + + "--conf user.property.after.spark.driver.extraClassPath=ddd " + + "--conf spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp\""); + actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar"); + + final String[] mainArgs = {"arg0", "arg1"}; + final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs); + + assertEquals("Spark args mismatch", + Lists.newArrayList("--master", "yarn", + "--deploy-mode", "client", + "--name", "Spark Copy File", + "--class", "org.apache.oozie.example.SparkFileCopy", + "--driver-memory", "1042M", + "--conf", "user.property.after.spark.executor.extraClassPath=bbb", + "--conf", "user.property.after.spark.driver.extraClassPath=ddd", + "--conf", "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError " + + "-XX:HeapDumpPath=/tmp -Dlog4j.configuration=spark-log4j.properties", + "--conf", "spark.executor.extraClassPath=aaa:$PWD/*", + "--conf", "spark.driver.extraClassPath=ccc:$PWD/*", + "--conf", "spark.yarn.security.tokens.hive.enabled=false", + "--conf", "spark.yarn.security.tokens.hbase.enabled=false", + "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties", + "--files", "spark-log4j.properties,hive-site.xml", + "--conf", "spark.yarn.jar=null", + "--verbose", + "/lib/test.jar", + "arg0", + "arg1"), + sparkArgs); + } + + @Test + public void testYarnClientFilesAndArchivesArgsParsing() throws Exception { + final Configuration actionConf = new Configuration(); + actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn"); + actionConf.set(SparkActionExecutor.SPARK_MODE, "client"); + actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); + actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); + actionConf.set(SparkActionExecutor.SPARK_OPTS, "--files aaa " + + "--archives bbb " + + "--files=ccc " + + "--archives=ddd"); + actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar"); + + final String[] mainArgs = {"arg0", "arg1"}; + final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs); + + assertEquals("Spark args mismatch", + Lists.newArrayList("--master", "yarn", + "--deploy-mode", "client", + "--name", "Spark Copy File", + "--class", "org.apache.oozie.example.SparkFileCopy", + "--conf", "spark.executor.extraClassPath=$PWD/*", + "--conf", "spark.driver.extraClassPath=$PWD/*", + "--conf", "spark.yarn.security.tokens.hive.enabled=false", + "--conf", "spark.yarn.security.tokens.hbase.enabled=false", + "--conf", "spark.executor.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties", + "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties", + "--files", "spark-log4j.properties,hive-site.xml,aaa,ccc", + "--archives", "bbb,ddd", + "--conf", "spark.yarn.jar=null", + "--verbose", + "/lib/test.jar", + "arg0", + "arg1"), + sparkArgs); + } + + @Test + public void testDriverClassPathArgsParsing() throws Exception { + final Configuration actionConf = new Configuration(); + actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn"); + actionConf.set(SparkActionExecutor.SPARK_MODE, "client"); + actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); + actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); + actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-class-path aaa"); + actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar"); + + final String[] mainArgs = {"arg0", "arg1"}; + final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs); + + assertEquals("Spark args mismatch", + Lists.newArrayList("--master", "yarn", + "--deploy-mode", "client", + "--name", "Spark Copy File", + "--class", "org.apache.oozie.example.SparkFileCopy", + "--conf", "spark.executor.extraClassPath=$PWD/*", + "--conf", "spark.driver.extraClassPath=aaa:$PWD/*", + "--conf", "spark.yarn.security.tokens.hive.enabled=false", + "--conf", "spark.yarn.security.tokens.hbase.enabled=false", + "--conf", "spark.executor.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties", + "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties", + "--files", "spark-log4j.properties,hive-site.xml", + "--conf", "spark.yarn.jar=null", + "--verbose", + "/lib/test.jar", + "arg0", + "arg1"), + sparkArgs); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java index 5f6a0cd..b9f37c8 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java @@ -127,19 +127,6 @@ public class TestSparkMain extends MainTestCase { } } - public void testAppendOoziePropertiesToSparkConf() throws Exception { - SparkMain instance = SparkMain.class.newInstance(); - List<String> sparkArgs = new ArrayList<String>(); - Configuration actionConf = new Configuration(); - actionConf.set("foo", "foo-not-to-include"); - actionConf.set("oozie.launcher", "launcher-not-to-include"); - actionConf.set("oozie.spark", "spark-not-to-include"); - actionConf.set("oozie.bar", "bar"); - - instance.appendOoziePropertiesToSparkConf(sparkArgs, actionConf); - assertEquals(Lists.newArrayList("--conf", "spark.oozie.bar=bar"), sparkArgs); - } - public void testJobIDPattern() { List<String> lines = new ArrayList<String>(); lines.add("Submitted application application_001"); http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java index 31f53ac..02786a2 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java @@ -34,13 +34,13 @@ public class TestSparkOptionsSplitter { @Parameterized.Parameters public static List<Object[]> params() { return Arrays.asList(new Object[][]{ - {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})}, - {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})}, - {" --option1 value1 ", Arrays.asList(new String[]{"--option1", "value1"})}, - {"--conf special=value1", Arrays.asList(new String[]{"--conf", "special=value1"})}, - {"--conf special=\"value1\"", Arrays.asList(new String[]{"--conf", "special=value1"})}, - {"--conf special=\"value1 value2\"", Arrays.asList(new String[]{"--conf", "special=value1 value2"})}, - {" --conf special=\"value1 value2\" ", Arrays.asList(new String[]{"--conf", "special=value1 value2"})}, + {"--option1 value1", Arrays.asList("--option1", "value1")}, + {"--option1 value1", Arrays.asList("--option1", "value1")}, + {" --option1 value1 ", Arrays.asList("--option1", "value1")}, + {"--conf special=value1", Arrays.asList("--conf", "special=value1")}, + {"--conf special=\"value1\"", Arrays.asList("--conf", "special=value1")}, + {"--conf special=\"value1 value2\"", Arrays.asList("--conf", "special=value1 value2")}, + {" --conf special=\"value1 value2\" ", Arrays.asList("--conf", "special=value1 value2")}, }); } @@ -55,6 +55,6 @@ public class TestSparkOptionsSplitter { @Test public void test() { - assertThat("Error for input >>" + input + "<<", SparkMain.splitSparkOpts(input), is(output)); + assertThat("Error for input >>" + input + "<<", SparkOptionsSplitter.splitSparkOpts(input), is(output)); } }
