Repository: oozie
Updated Branches:
  refs/heads/master 50fef427d -> a7f39456e


OOZIE-2391 spark-opts value in workflow.xml is not parsed properly (gezapeti 
via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/a7f39456
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/a7f39456
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/a7f39456

Branch: refs/heads/master
Commit: a7f39456ebff7db88ccc5f94202e5bbca8c391d1
Parents: 50fef42
Author: Robert Kanter <[email protected]>
Authored: Fri Apr 15 17:46:54 2016 -0700
Committer: Robert Kanter <[email protected]>
Committed: Fri Apr 15 17:46:54 2016 -0700

----------------------------------------------------------------------
 .../site/twiki/DG_SparkActionExtension.twiki    |  5 +-
 release-log.txt                                 |  1 +
 .../apache/oozie/action/hadoop/SparkMain.java   | 50 +++++++++++++---
 .../oozie/action/hadoop/TestSparkMain.java      |  4 +-
 .../action/hadoop/TestSparkOptionsSplitter.java | 60 ++++++++++++++++++++
 5 files changed, 110 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki 
b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index ee11229..18b84b7 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -99,7 +99,7 @@ The =jar= element indicates a comma separated list of jars or 
python files.
 
 The =spark-opts= element if present, contains a list of spark options that can 
be passed to spark driver. Spark configuration
 options can be passed by specifying '--conf key=value' here, or from 
=oozie.service.SparkConfigurationService.spark.configurations=
-in oozie-site.xml.  The =spark-opts= configs have priority.
+in oozie-site.xml. Values containing whitespaces should be enclosed by double 
quotes. The =spark-opts= configs have priority.
 
 The =arg= element if present, contains arguments that can be passed to spark 
application.
 
@@ -129,7 +129,8 @@ expressions.
             <name>Spark Example</name>
             <class>org.apache.spark.examples.mllib.JavaALS</class>
             <jar>/lib/spark-examples_2.10-1.1.0.jar</jar>
-            <spark-opts>--executor-memory 20G --num-executors 50</spark-opts>
+            <spark-opts>--executor-memory 20G --num-executors 50
+             --conf 
spark.executor.extraJavaOptions="-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp"</spark-opts>
             <arg>inputpath=hdfs://localhost/input/file.txt</arg>
             <arg>value=2</arg>
         </spark>

http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index a30a245..d9abb4b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2391 spark-opts value in workflow.xml is not parsed properly (gezapeti 
via rkanter)
 OOZIE-2489 XML parsing is vulnerable (fdenes via rkanter)
 OOZIE-2485 Oozie client keeps trying to use expired auth token (rkanter)
 OOZIE-2490 Oozie can't set hadoop.security.token.service.use_ip (rkanter)

http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java 
b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 5624951..4587ab4 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -37,7 +37,6 @@ public class SparkMain extends LauncherMain {
     private static final String DRIVER_CLASSPATH = 
"spark.driver.extraClassPath=";
     private static final String DIST_FILES = "spark.yarn.dist.files=";
     private static final String JARS_OPTION = "--jars";
-    private static final String DELIM = "\\s+";
 
     private String sparkJars = null;
     private String sparkClasspath = null;
@@ -107,16 +106,20 @@ public class SparkMain extends LauncherMain {
         boolean addedJars = false;
         String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
         if (StringUtils.isNotEmpty(sparkOpts)) {
-            String[] sparkOptions = sparkOpts.split(DELIM);
-            for (int i = 0; i < sparkOptions.length; i++) {
-                String opt = sparkOptions[i];
+            List<String> sparkOptions = splitSparkOpts(sparkOpts);
+            for (int i = 0; i < sparkOptions.size(); i++) {
+                String opt = sparkOptions.get(i);
                 if (sparkJars != null) {
                     if (opt.equals(JARS_OPTION)) {
                         sparkArgs.add(opt);
                         i++;
-                        opt = sparkOptions[i];
-                        opt = opt + "," + sparkJars;
-                        addedJars = true;
+                        if(i < sparkOptions.size()) {
+                            opt = sparkOptions.get(i);
+                            opt = opt + "," + sparkJars;
+                            addedJars = true;
+                        } else {
+                            throw new 
OozieActionConfiguratorException(JARS_OPTION + " missing a parameter.");
+                        }
                     } else if (yarnClientMode && opt.startsWith(DIST_FILES)) {
                         opt = opt + "," + sparkJars;
                         addedDistFiles = true;
@@ -227,4 +230,37 @@ public class SparkMain extends LauncherMain {
             sparkJars = jars.toString();
         }
     }
+
+    /**
+     * Converts the options to be Spark-compatible.
+     * <ul>
+     *     <li>Parameters are separated by whitespace and can be groupped 
using double quotes</li>
+     *     <li>Quotes should be removed</li>
+     *     <li>Adjacent whitespace separators are treated as one</li>
+     * </ul>
+     * @param sparkOpts the options for Spark
+     * @return the options parsed into a list
+     */
+    static List<String> splitSparkOpts(String sparkOpts){
+        List<String> result = new ArrayList<String>();
+        StringBuilder currentWord = new StringBuilder();
+        boolean insideQuote = false;
+        for (int i = 0; i < sparkOpts.length(); i++) {
+            char c = sparkOpts.charAt(i);
+            if (c == '"') {
+                insideQuote = !insideQuote;
+            } else if (Character.isWhitespace(c) && !insideQuote) {
+                if (currentWord.length() > 0) {
+                    result.add(currentWord.toString());
+                    currentWord.setLength(0);
+                }
+            } else {
+                currentWord.append(c);
+            }
+        }
+        if(currentWord.length()>0) {
+            result.add(currentWord.toString());
+        }
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
index 7707622..f3ec899 100644
--- 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
@@ -58,7 +58,9 @@ public class TestSparkMain extends MainTestCase {
         jobConf.set(SparkActionExecutor.SPARK_MODE, "client");
         jobConf.set(SparkActionExecutor.SPARK_CLASS, 
"org.apache.oozie.example.SparkFileCopy");
         jobConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
-        jobConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1024M");
+        jobConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory  1042M " +
+                "--conf 
spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp\"");
+
         jobConf.set(SparkActionExecutor.SPARK_JAR, getFsTestCaseDir() + 
"/lib/test.jar");
 
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/a7f39456/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
----------------------------------------------------------------------
diff --git 
a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
new file mode 100644
index 0000000..3c154cc
--- /dev/null
+++ 
b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TestSparkOptionsSplitter {
+
+    @Parameterized.Parameters
+    public static List<Object[]> params() {
+        return Arrays.asList(new Object[][]{
+                {"--option1 value1", Arrays.asList(new String[]{"--option1", 
"value1"})},
+                {"--option1   value1", Arrays.asList(new String[]{"--option1", 
"value1"})},
+                {"   --option1 value1   ", Arrays.asList(new 
String[]{"--option1", "value1"})},
+                {"--conf special=value1", Arrays.asList(new String[]{"--conf", 
"special=value1"})},
+                {"--conf special=\"value1\"", Arrays.asList(new 
String[]{"--conf", "special=value1"})},
+                {"--conf special=\"value1 value2\"", Arrays.asList(new 
String[]{"--conf", "special=value1 value2"})},
+                {" --conf special=\"value1 value2\"  ", Arrays.asList(new 
String[]{"--conf", "special=value1 value2"})},
+        });
+    }
+
+    private String input;
+
+    private List<String> output;
+
+    public TestSparkOptionsSplitter(String input, List<String> result) {
+        this.input = input;
+        this.output = result;
+    }
+
+    @Test
+    public void test() {
+        assertThat("Error for input >>" + input + "<<", 
SparkMain.splitSparkOpts(input), is(output));
+    }
+}

Reply via email to