Repository: oozie Updated Branches: refs/heads/master 50fef427d -> a7f39456e
OOZIE-2391 spark-opts value in workflow.xml is not parsed properly (gezapeti via rkanter) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a7f39456 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a7f39456 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a7f39456 Branch: refs/heads/master Commit: a7f39456ebff7db88ccc5f94202e5bbca8c391d1 Parents: 50fef42 Author: Robert Kanter <[email protected]> Authored: Fri Apr 15 17:46:54 2016 -0700 Committer: Robert Kanter <[email protected]> Committed: Fri Apr 15 17:46:54 2016 -0700 ---------------------------------------------------------------------- .../site/twiki/DG_SparkActionExtension.twiki | 5 +- release-log.txt | 1 + .../apache/oozie/action/hadoop/SparkMain.java | 50 +++++++++++++--- .../oozie/action/hadoop/TestSparkMain.java | 4 +- .../action/hadoop/TestSparkOptionsSplitter.java | 60 ++++++++++++++++++++ 5 files changed, 110 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/docs/src/site/twiki/DG_SparkActionExtension.twiki ---------------------------------------------------------------------- diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki index ee11229..18b84b7 100644 --- a/docs/src/site/twiki/DG_SparkActionExtension.twiki +++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki @@ -99,7 +99,7 @@ The =jar= element indicates a comma separated list of jars or python files. The =spark-opts= element if present, contains a list of spark options that can be passed to spark driver. Spark configuration options can be passed by specifying '--conf key=value' here, or from =oozie.service.SparkConfigurationService.spark.configurations= -in oozie-site.xml. The =spark-opts= configs have priority. +in oozie-site.xml. Values containing whitespaces should be enclosed by double quotes. The =spark-opts= configs have priority. The =arg= element if present, contains arguments that can be passed to spark application. @@ -129,7 +129,8 @@ expressions. <name>Spark Example</name> <class>org.apache.spark.examples.mllib.JavaALS</class> <jar>/lib/spark-examples_2.10-1.1.0.jar</jar> - <spark-opts>--executor-memory 20G --num-executors 50</spark-opts> + <spark-opts>--executor-memory 20G --num-executors 50 + --conf spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"</spark-opts> <arg>inputpath=hdfs://localhost/input/file.txt</arg> <arg>value=2</arg> </spark> http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index a30a245..d9abb4b 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 4.3.0 release (trunk - unreleased) +OOZIE-2391 spark-opts value in workflow.xml is not parsed properly (gezapeti via rkanter) OOZIE-2489 XML parsing is vulnerable (fdenes via rkanter) OOZIE-2485 Oozie client keeps trying to use expired auth token (rkanter) OOZIE-2490 Oozie can't set hadoop.security.token.service.use_ip (rkanter) http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java index 5624951..4587ab4 100644 --- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java +++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java @@ -37,7 +37,6 @@ public class SparkMain extends LauncherMain { private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath="; private static final String DIST_FILES = "spark.yarn.dist.files="; private static final String JARS_OPTION = "--jars"; - private static final String DELIM = "\\s+"; private String sparkJars = null; private String sparkClasspath = null; @@ -107,16 +106,20 @@ public class SparkMain extends LauncherMain { boolean addedJars = false; String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS); if (StringUtils.isNotEmpty(sparkOpts)) { - String[] sparkOptions = sparkOpts.split(DELIM); - for (int i = 0; i < sparkOptions.length; i++) { - String opt = sparkOptions[i]; + List<String> sparkOptions = splitSparkOpts(sparkOpts); + for (int i = 0; i < sparkOptions.size(); i++) { + String opt = sparkOptions.get(i); if (sparkJars != null) { if (opt.equals(JARS_OPTION)) { sparkArgs.add(opt); i++; - opt = sparkOptions[i]; - opt = opt + "," + sparkJars; - addedJars = true; + if(i < sparkOptions.size()) { + opt = sparkOptions.get(i); + opt = opt + "," + sparkJars; + addedJars = true; + } else { + throw new OozieActionConfiguratorException(JARS_OPTION + " missing a parameter."); + } } else if (yarnClientMode && opt.startsWith(DIST_FILES)) { opt = opt + "," + sparkJars; addedDistFiles = true; @@ -227,4 +230,37 @@ public class SparkMain extends LauncherMain { sparkJars = jars.toString(); } } + + /** + * Converts the options to be Spark-compatible. + * <ul> + * <li>Parameters are separated by whitespace and can be groupped using double quotes</li> + * <li>Quotes should be removed</li> + * <li>Adjacent whitespace separators are treated as one</li> + * </ul> + * @param sparkOpts the options for Spark + * @return the options parsed into a list + */ + static List<String> splitSparkOpts(String sparkOpts){ + List<String> result = new ArrayList<String>(); + StringBuilder currentWord = new StringBuilder(); + boolean insideQuote = false; + for (int i = 0; i < sparkOpts.length(); i++) { + char c = sparkOpts.charAt(i); + if (c == '"') { + insideQuote = !insideQuote; + } else if (Character.isWhitespace(c) && !insideQuote) { + if (currentWord.length() > 0) { + result.add(currentWord.toString()); + currentWord.setLength(0); + } + } else { + currentWord.append(c); + } + } + if(currentWord.length()>0) { + result.add(currentWord.toString()); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java index 7707622..f3ec899 100644 --- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java @@ -58,7 +58,9 @@ public class TestSparkMain extends MainTestCase { jobConf.set(SparkActionExecutor.SPARK_MODE, "client"); jobConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy"); jobConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File"); - jobConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1024M"); + jobConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1042M " + + "--conf spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp\""); + jobConf.set(SparkActionExecutor.SPARK_JAR, getFsTestCaseDir() + "/lib/test.jar"); http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java ---------------------------------------------------------------------- diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java new file mode 100644 index 0000000..3c154cc --- /dev/null +++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static org.junit.Assert.*; +import static org.hamcrest.CoreMatchers.*; + +import java.util.Arrays; +import java.util.List; + +@RunWith(Parameterized.class) +public class TestSparkOptionsSplitter { + + @Parameterized.Parameters + public static List<Object[]> params() { + return Arrays.asList(new Object[][]{ + {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})}, + {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})}, + {" --option1 value1 ", Arrays.asList(new String[]{"--option1", "value1"})}, + {"--conf special=value1", Arrays.asList(new String[]{"--conf", "special=value1"})}, + {"--conf special=\"value1\"", Arrays.asList(new String[]{"--conf", "special=value1"})}, + {"--conf special=\"value1 value2\"", Arrays.asList(new String[]{"--conf", "special=value1 value2"})}, + {" --conf special=\"value1 value2\" ", Arrays.asList(new String[]{"--conf", "special=value1 value2"})}, + }); + } + + private String input; + + private List<String> output; + + public TestSparkOptionsSplitter(String input, List<String> result) { + this.input = input; + this.output = result; + } + + @Test + public void test() { + assertThat("Error for input >>" + input + "<<", SparkMain.splitSparkOpts(input), is(output)); + } +}
