Repository: oozie Updated Branches: refs/heads/master 16234592b -> ea434d2f8
OOZIE-2350 Package changes for release (shwethags) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/ea434d2f Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/ea434d2f Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/ea434d2f Branch: refs/heads/master Commit: ea434d2f812b6b9a1aed4537691953728bba13a2 Parents: 1623459 Author: Shwetha GS <[email protected]> Authored: Thu Sep 3 10:37:57 2015 +0530 Committer: Shwetha GS <[email protected]> Committed: Thu Sep 3 10:37:57 2015 +0530 ---------------------------------------------------------------------- pom.xml | 15 +- release-log.txt | 1 + .../SparkMain.java | 230 ------------------- .../apache/oozie/action/hadoop/SparkMain.java | 230 +++++++++++++++++++ 4 files changed, 232 insertions(+), 244 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/ea434d2f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index c84aad1..9e72bf2 100644 --- a/pom.xml +++ b/pom.xml @@ -21,7 +21,7 @@ <parent> <groupId>org.apache</groupId> <artifactId>apache</artifactId> - <version>16</version> + <version>17</version> </parent> <modelVersion>4.0.0</modelVersion> @@ -120,19 +120,6 @@ <module>zookeeper-security-tests</module> </modules> - <distributionManagement> - <repository> - <id>apache.staging.https</id> - <name>Apache Release Distribution Repository</name> - <url>https://repository.apache.org/service/local/staging/deploy/maven2</url> - </repository> - <snapshotRepository> - <id>apache.snapshots.https</id> - <name>${distMgmtSnapshotsName}</name> - <url>${distMgmtSnapshotsUrl}</url> - </snapshotRepository> - </distributionManagement> - <repositories> <repository> <id>central</id> http://git-wip-us.apache.org/repos/asf/oozie/blob/ea434d2f/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index b2cc29b..3ecf02e 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2350 Package changes for release (shwethags) OOZIE-2347 Remove unnecessary new Configuration()/new jobConf() calls from oozie (puru) OOZIE-2348 Recovery service keeps on recovering coord action of suspended jobs (puru) OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/ea434d2f/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java deleted file mode 100644 index 5624951..0000000 --- a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oozie.action.hadoop; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.spark.deploy.SparkSubmit; - -import java.io.File; -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; - -public class SparkMain extends LauncherMain { - private static final String MASTER_OPTION = "--master"; - private static final String MODE_OPTION = "--deploy-mode"; - private static final String JOB_NAME_OPTION = "--name"; - private static final String CLASS_NAME_OPTION = "--class"; - private static final String VERBOSE_OPTION = "--verbose"; - private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath="; - private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath="; - private static final String DIST_FILES = "spark.yarn.dist.files="; - private static final String JARS_OPTION = "--jars"; - private static final String DELIM = "\\s+"; - - private String sparkJars = null; - private String sparkClasspath = null; - - public static void main(String[] args) throws Exception { - run(SparkMain.class, args); - } - - @Override - protected void run(String[] args) throws Exception { - Configuration actionConf = loadActionConf(); - setYarnTag(actionConf); - LauncherMainHadoopUtils.killChildYarnJobs(actionConf); - - List<String> sparkArgs = new ArrayList<String>(); - - sparkArgs.add(MASTER_OPTION); - String master = actionConf.get(SparkActionExecutor.SPARK_MASTER); - sparkArgs.add(master); - - String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE); - if (sparkDeployMode != null) { - sparkArgs.add(MODE_OPTION); - sparkArgs.add(sparkDeployMode); - } - boolean yarnClusterMode = master.equals("yarn-cluster") - || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster")); - boolean yarnClientMode = master.equals("yarn-client") - || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client")); - - sparkArgs.add(JOB_NAME_OPTION); - sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME)); - - String className = actionConf.get(SparkActionExecutor.SPARK_CLASS); - if (className != null) { - sparkArgs.add(CLASS_NAME_OPTION); - sparkArgs.add(className); - } - - String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); - - // In local mode, everything runs here in the Launcher Job. - // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn. - // In yarn-cluster mode, the driver and executor run in Yarn. - // Due to this, configuring Spark's classpath is not straightforward (see below) - - // Parse the spark opts. We need to make sure to pass the necessary jars (Hadoop, Spark, user) to Spark. The way we do - // this depends on what mode is being used: - // local/yarn-cluster/yarn-client: Passed as comma-separated list via --jars argument - // yarn-cluster/yarn-client: Passed as ':'-separated list via spark.executor.extraClassPath and spark.driver.extraClassPath - // yarn-client: Passed as comma-separted list via spark.yarn.dist.files - // - // --jars will cause the jars to be uploaded to HDFS and localized. To prevent the Sharelib and user jars from being - // unnecessarily reuploaded to HDFS, we use the HDFS paths for these. The hadoop jars are needed as well, but we'll have - // to use local paths for these because they're not in the Sharelib. - // - // spark.executor.extraClassPath and spark.driver.extraClassPath are blindly used as classpaths, so we need to put only - // localized jars. --jars will cause the jars to be localized to the working directory, so we can simply specify the jar - // names for the classpaths, as they'll be found in the working directory. - // - // This part looks more complicated than it is because we need to append the jars if the user already set things for - // these options - determineSparkJarsAndClasspath(actionConf, jarPath); - boolean addedExecutorClasspath = false; - boolean addedDriverClasspath = false; - boolean addedDistFiles = false; - boolean addedJars = false; - String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); - if (StringUtils.isNotEmpty(sparkOpts)) { - String[] sparkOptions = sparkOpts.split(DELIM); - for (int i = 0; i < sparkOptions.length; i++) { - String opt = sparkOptions[i]; - if (sparkJars != null) { - if (opt.equals(JARS_OPTION)) { - sparkArgs.add(opt); - i++; - opt = sparkOptions[i]; - opt = opt + "," + sparkJars; - addedJars = true; - } else if (yarnClientMode && opt.startsWith(DIST_FILES)) { - opt = opt + "," + sparkJars; - addedDistFiles = true; - } - } - if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) { - if (opt.startsWith(EXECUTOR_CLASSPATH)) { - opt = opt + File.pathSeparator + sparkClasspath; - addedExecutorClasspath = true; - } - if (opt.startsWith(DRIVER_CLASSPATH)) { - opt = opt + File.pathSeparator + sparkClasspath; - addedDriverClasspath = true; - } - } - sparkArgs.add(opt); - } - } - if (!addedJars && sparkJars != null) { - sparkArgs.add("--jars"); - sparkArgs.add(sparkJars); - } - if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) { - if (!addedExecutorClasspath) { - sparkArgs.add("--conf"); - sparkArgs.add(EXECUTOR_CLASSPATH + sparkClasspath); - } - if (!addedDriverClasspath) { - sparkArgs.add("--conf"); - sparkArgs.add(DRIVER_CLASSPATH + sparkClasspath); - } - } - if (yarnClientMode && !addedDistFiles && sparkJars != null) { - sparkArgs.add("--conf"); - sparkArgs.add(DIST_FILES + sparkJars); - } - - if (!sparkArgs.contains(VERBOSE_OPTION)) { - sparkArgs.add(VERBOSE_OPTION); - } - - sparkArgs.add(jarPath); - - for (String arg : args) { - sparkArgs.add(arg); - } - - System.out.println("Spark Action Main class : " + SparkSubmit.class.getName()); - System.out.println(); - System.out.println("Oozie Spark action configuration"); - System.out.println("================================================================="); - System.out.println(); - for (String arg : sparkArgs) { - System.out.println(" " + arg); - } - System.out.println(); - runSpark(sparkArgs.toArray(new String[sparkArgs.size()])); - } - - private void runSpark(String[] args) throws Exception { - System.out.println("================================================================="); - System.out.println(); - System.out.println(">>> Invoking Spark class now >>>"); - System.out.println(); - System.out.flush(); - SparkSubmit.main(args); - } - - private void determineSparkJarsAndClasspath(Configuration actionConf, String jarPath) { - // distCache gets all of the Sharelib and user jars (from the Distributed Cache) - // classpath gets all of the jars from the classpath, which includes the localized jars from the distCache - // sparkJars becomes all of the full paths to the Sharelib and user jars from HDFS and the deduped local paths - // sparkClasspath becomes all of the jar names in sparkJars (without paths) - // We also remove the Spark job jar and job.jar - String[] distCache = new String[]{}; - String dCache = actionConf.get("mapreduce.job.classpath.files"); - if (dCache != null) { - distCache = dCache.split(","); - } - String[] classpath = System.getProperty("java.class.path").split(File.pathSeparator); - StringBuilder cp = new StringBuilder(); - StringBuilder jars = new StringBuilder(); - HashSet<String> distCacheJars = new HashSet<String>(distCache.length); - for (String path : distCache) { - // Skip the job jar because it's already included elsewhere and Spark doesn't like duplicating it here - if (!path.equals(jarPath)) { - String name = path.substring(path.lastIndexOf("/") + 1); - distCacheJars.add(name); - cp.append(name).append(File.pathSeparator); - jars.append(path).append(","); - } - } - for (String path : classpath) { - if (!path.startsWith("job.jar") && path.endsWith(".jar")) { - String name = path.substring(path.lastIndexOf("/") + 1); - if (!distCacheJars.contains(name)) { - jars.append(path).append(","); - } - cp.append(name).append(File.pathSeparator); - } - } - if (cp.length() > 0) { - cp.setLength(cp.length() - 1); - sparkClasspath = cp.toString(); - } - if (jars.length() > 0) { - jars.setLength(jars.length() - 1); - sparkJars = jars.toString(); - } - } -} http://git-wip-us.apache.org/repos/asf/oozie/blob/ea434d2f/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java new file mode 100644 index 0000000..5624951 --- /dev/null +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.spark.deploy.SparkSubmit; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; + +public class SparkMain extends LauncherMain { + private static final String MASTER_OPTION = "--master"; + private static final String MODE_OPTION = "--deploy-mode"; + private static final String JOB_NAME_OPTION = "--name"; + private static final String CLASS_NAME_OPTION = "--class"; + private static final String VERBOSE_OPTION = "--verbose"; + private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath="; + private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath="; + private static final String DIST_FILES = "spark.yarn.dist.files="; + private static final String JARS_OPTION = "--jars"; + private static final String DELIM = "\\s+"; + + private String sparkJars = null; + private String sparkClasspath = null; + + public static void main(String[] args) throws Exception { + run(SparkMain.class, args); + } + + @Override + protected void run(String[] args) throws Exception { + Configuration actionConf = loadActionConf(); + setYarnTag(actionConf); + LauncherMainHadoopUtils.killChildYarnJobs(actionConf); + + List<String> sparkArgs = new ArrayList<String>(); + + sparkArgs.add(MASTER_OPTION); + String master = actionConf.get(SparkActionExecutor.SPARK_MASTER); + sparkArgs.add(master); + + String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE); + if (sparkDeployMode != null) { + sparkArgs.add(MODE_OPTION); + sparkArgs.add(sparkDeployMode); + } + boolean yarnClusterMode = master.equals("yarn-cluster") + || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster")); + boolean yarnClientMode = master.equals("yarn-client") + || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client")); + + sparkArgs.add(JOB_NAME_OPTION); + sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME)); + + String className = actionConf.get(SparkActionExecutor.SPARK_CLASS); + if (className != null) { + sparkArgs.add(CLASS_NAME_OPTION); + sparkArgs.add(className); + } + + String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR); + + // In local mode, everything runs here in the Launcher Job. + // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn. + // In yarn-cluster mode, the driver and executor run in Yarn. + // Due to this, configuring Spark's classpath is not straightforward (see below) + + // Parse the spark opts. We need to make sure to pass the necessary jars (Hadoop, Spark, user) to Spark. The way we do + // this depends on what mode is being used: + // local/yarn-cluster/yarn-client: Passed as comma-separated list via --jars argument + // yarn-cluster/yarn-client: Passed as ':'-separated list via spark.executor.extraClassPath and spark.driver.extraClassPath + // yarn-client: Passed as comma-separted list via spark.yarn.dist.files + // + // --jars will cause the jars to be uploaded to HDFS and localized. To prevent the Sharelib and user jars from being + // unnecessarily reuploaded to HDFS, we use the HDFS paths for these. The hadoop jars are needed as well, but we'll have + // to use local paths for these because they're not in the Sharelib. + // + // spark.executor.extraClassPath and spark.driver.extraClassPath are blindly used as classpaths, so we need to put only + // localized jars. --jars will cause the jars to be localized to the working directory, so we can simply specify the jar + // names for the classpaths, as they'll be found in the working directory. + // + // This part looks more complicated than it is because we need to append the jars if the user already set things for + // these options + determineSparkJarsAndClasspath(actionConf, jarPath); + boolean addedExecutorClasspath = false; + boolean addedDriverClasspath = false; + boolean addedDistFiles = false; + boolean addedJars = false; + String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); + if (StringUtils.isNotEmpty(sparkOpts)) { + String[] sparkOptions = sparkOpts.split(DELIM); + for (int i = 0; i < sparkOptions.length; i++) { + String opt = sparkOptions[i]; + if (sparkJars != null) { + if (opt.equals(JARS_OPTION)) { + sparkArgs.add(opt); + i++; + opt = sparkOptions[i]; + opt = opt + "," + sparkJars; + addedJars = true; + } else if (yarnClientMode && opt.startsWith(DIST_FILES)) { + opt = opt + "," + sparkJars; + addedDistFiles = true; + } + } + if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) { + if (opt.startsWith(EXECUTOR_CLASSPATH)) { + opt = opt + File.pathSeparator + sparkClasspath; + addedExecutorClasspath = true; + } + if (opt.startsWith(DRIVER_CLASSPATH)) { + opt = opt + File.pathSeparator + sparkClasspath; + addedDriverClasspath = true; + } + } + sparkArgs.add(opt); + } + } + if (!addedJars && sparkJars != null) { + sparkArgs.add("--jars"); + sparkArgs.add(sparkJars); + } + if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) { + if (!addedExecutorClasspath) { + sparkArgs.add("--conf"); + sparkArgs.add(EXECUTOR_CLASSPATH + sparkClasspath); + } + if (!addedDriverClasspath) { + sparkArgs.add("--conf"); + sparkArgs.add(DRIVER_CLASSPATH + sparkClasspath); + } + } + if (yarnClientMode && !addedDistFiles && sparkJars != null) { + sparkArgs.add("--conf"); + sparkArgs.add(DIST_FILES + sparkJars); + } + + if (!sparkArgs.contains(VERBOSE_OPTION)) { + sparkArgs.add(VERBOSE_OPTION); + } + + sparkArgs.add(jarPath); + + for (String arg : args) { + sparkArgs.add(arg); + } + + System.out.println("Spark Action Main class : " + SparkSubmit.class.getName()); + System.out.println(); + System.out.println("Oozie Spark action configuration"); + System.out.println("================================================================="); + System.out.println(); + for (String arg : sparkArgs) { + System.out.println(" " + arg); + } + System.out.println(); + runSpark(sparkArgs.toArray(new String[sparkArgs.size()])); + } + + private void runSpark(String[] args) throws Exception { + System.out.println("================================================================="); + System.out.println(); + System.out.println(">>> Invoking Spark class now >>>"); + System.out.println(); + System.out.flush(); + SparkSubmit.main(args); + } + + private void determineSparkJarsAndClasspath(Configuration actionConf, String jarPath) { + // distCache gets all of the Sharelib and user jars (from the Distributed Cache) + // classpath gets all of the jars from the classpath, which includes the localized jars from the distCache + // sparkJars becomes all of the full paths to the Sharelib and user jars from HDFS and the deduped local paths + // sparkClasspath becomes all of the jar names in sparkJars (without paths) + // We also remove the Spark job jar and job.jar + String[] distCache = new String[]{}; + String dCache = actionConf.get("mapreduce.job.classpath.files"); + if (dCache != null) { + distCache = dCache.split(","); + } + String[] classpath = System.getProperty("java.class.path").split(File.pathSeparator); + StringBuilder cp = new StringBuilder(); + StringBuilder jars = new StringBuilder(); + HashSet<String> distCacheJars = new HashSet<String>(distCache.length); + for (String path : distCache) { + // Skip the job jar because it's already included elsewhere and Spark doesn't like duplicating it here + if (!path.equals(jarPath)) { + String name = path.substring(path.lastIndexOf("/") + 1); + distCacheJars.add(name); + cp.append(name).append(File.pathSeparator); + jars.append(path).append(","); + } + } + for (String path : classpath) { + if (!path.startsWith("job.jar") && path.endsWith(".jar")) { + String name = path.substring(path.lastIndexOf("/") + 1); + if (!distCacheJars.contains(name)) { + jars.append(path).append(","); + } + cp.append(name).append(File.pathSeparator); + } + } + if (cp.length() > 0) { + cp.setLength(cp.length() - 1); + sparkClasspath = cp.toString(); + } + if (jars.length() > 0) { + jars.setLength(jars.length() - 1); + sparkJars = jars.toString(); + } + } +}
