Repository: spark
Updated Branches:
  refs/heads/branch-1.0 9cd12f33d -> 318739a07


[SPARK-1808] Route bin/pyspark through Spark submit

**Problem.** For `bin/pyspark`, there is currently no other way to specify 
Spark configuration properties other than through `SPARK_JAVA_OPTS` in 
`conf/spark-env.sh`. However, this mechanism is supposedly deprecated. Instead, 
it needs to pick up configurations explicitly specified in 
`conf/spark-defaults.conf`.

**Solution.** Have `bin/pyspark` invoke `bin/spark-submit`, like all of its 
counterparts in Scala land (i.e. `bin/spark-shell`, `bin/run-example`). This 
has the additional benefit of making the invocation of all the user facing 
Spark scripts consistent.

**Details.** `bin/pyspark` inherently handles two cases: (1) running python 
applications and (2) running the python shell. For (1), Spark submit already 
handles running python applications. For cases in which `bin/pyspark` is given 
a python file, we can simply call pass the file directly to Spark submit and 
let it handle the rest.

For case (2), `bin/pyspark` starts a python process as before, which launches 
the JVM as a sub-process. The existing code already provides a code path to do 
this. All we needed to change is to use `bin/spark-submit` instead of 
`spark-class` to launch the JVM. This requires modifications to Spark submit to 
handle the pyspark shell as a special case.

This has been tested locally (OSX and Windows 7), on a standalone cluster, and 
on a YARN cluster. Running IPython also works as before, except now it takes in 
Spark submit arguments too.

Author: Andrew Or <[email protected]>

Closes #799 from andrewor14/pyspark-submit and squashes the following commits:

bf37e36 [Andrew Or] Minor changes
01066fa [Andrew Or] bin/pyspark for Windows
c8cb3bf [Andrew Or] Handle perverse app names (with escaped quotes)
1866f85 [Andrew Or] Windows is not cooperating
456d844 [Andrew Or] Guard against shlex hanging if PYSPARK_SUBMIT_ARGS is not 
set
7eebda8 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-submit
b7ba0d8 [Andrew Or] Address a few comments (minor)
06eb138 [Andrew Or] Use shlex instead of writing our own parser
05879fa [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-submit
a823661 [Andrew Or] Fix --die-on-broken-pipe not propagated properly
6fba412 [Andrew Or] Deal with quotes + address various comments
fe4c8a7 [Andrew Or] Update --help for bin/pyspark
afe47bf [Andrew Or] Fix spark shell
f04aaa4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-submit
a371d26 [Andrew Or] Route bin/pyspark through Spark submit
(cherry picked from commit 4b8ec6fcfd7a7ef0857d5b21917183c181301c95)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/318739a0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/318739a0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/318739a0

Branch: refs/heads/branch-1.0
Commit: 318739a0794c9d2994901a5d3b16c4c133d293c6
Parents: 9cd12f3
Author: Andrew Or <[email protected]>
Authored: Fri May 16 22:34:38 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Fri May 16 22:35:02 2014 -0700

----------------------------------------------------------------------
 bin/pyspark                                     | 35 +++++++++++--
 bin/pyspark2.cmd                                | 21 ++++++--
 bin/spark-shell                                 |  6 +--
 bin/spark-shell.cmd                             |  2 +-
 .../org/apache/spark/deploy/PythonRunner.scala  |  2 +-
 .../org/apache/spark/deploy/SparkSubmit.scala   | 55 +++++++++++++++-----
 .../spark/deploy/SparkSubmitArguments.scala     |  6 ++-
 .../scala/org/apache/spark/util/Utils.scala     |  2 +-
 python/pyspark/java_gateway.py                  | 10 ++--
 python/pyspark/shell.py                         |  2 +-
 10 files changed, 107 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/bin/pyspark
----------------------------------------------------------------------
diff --git a/bin/pyspark b/bin/pyspark
index 10e35e0..9e1364e 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -25,6 +25,12 @@ export SPARK_HOME="$FWDIR"
 
 SCALA_VERSION=2.10
 
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
+  echo "Usage: ./bin/pyspark [options]"
+  ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
+  exit 0
+fi
+
 # Exit if the user hasn't compiled Spark
 if [ ! -f "$FWDIR/RELEASE" ]; then
   # Exit if the user hasn't compiled Spark
@@ -52,13 +58,34 @@ export 
PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
 export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
 export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py
 
+# If IPython options are specified, assume user wants to run IPython
 if [ -n "$IPYTHON_OPTS" ]; then
   IPYTHON=1
 fi
 
-# Only use ipython if no command line arguments were provided [SPARK-1134]
-if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
-  exec ipython $IPYTHON_OPTS
+# Build up arguments list manually to preserve quotes and backslashes.
+# We export Spark submit arguments as an environment variable because shell.py 
must run as a
+# PYTHONSTARTUP script, which does not take in arguments. This is required for 
IPython notebooks.
+
+PYSPARK_SUBMIT_ARGS=""
+whitespace="[[:space:]]"
+for i in "$@"; do
+  if [[ $i =~ \" ]]; then i=$(echo $i | sed 's/\"/\\\"/g'); fi
+  if [[ $i =~ $whitespace ]]; then i=\"$i\"; fi
+  PYSPARK_SUBMIT_ARGS="$PYSPARK_SUBMIT_ARGS $i"
+done
+export PYSPARK_SUBMIT_ARGS
+
+# If a python file is provided, directly run spark-submit.
+if [[ "$1" =~ \.py$ ]]; then
+  echo -e "\nWARNING: Running python applications through ./bin/pyspark is 
deprecated as of Spark 1.0." 1>&2
+  echo -e "Use ./bin/spark-submit <python file>\n" 1>&2
+  exec $FWDIR/bin/spark-submit "$@"
 else
-  exec "$PYSPARK_PYTHON" "$@"
+  # Only use ipython if no command line arguments were provided [SPARK-1134]
+  if [[ "$IPYTHON" = "1" ]]; then
+    exec ipython $IPYTHON_OPTS
+  else
+    exec "$PYSPARK_PYTHON"
+  fi
 fi

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/bin/pyspark2.cmd
----------------------------------------------------------------------
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index d7cfd5e..0ef9eea 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -31,7 +31,7 @@ set FOUND_JAR=0
 for %%d in 
("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do (
   set FOUND_JAR=1
 )
-if "%FOUND_JAR%"=="0" (
+if [%FOUND_JAR%] == [0] (
   echo Failed to find Spark assembly JAR.
   echo You need to build Spark with sbt\sbt assembly before running this 
program.
   goto exit
@@ -42,15 +42,30 @@ rem Load environment variables from conf\spark-env.cmd, if 
it exists
 if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd"
 
 rem Figure out which Python to use.
-if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
+if [%PYSPARK_PYTHON%] == [] set PYSPARK_PYTHON=python
 
 set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
 set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
 
 set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
 set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
+set PYSPARK_SUBMIT_ARGS=%*
 
 echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH%
 
-"%PYSPARK_PYTHON%" %*
+rem Check whether the argument is a file
+for /f %%i in ('echo %1^| findstr /R "\.py"') do (
+  set PYTHON_FILE=%%i
+)
+
+if [%PYTHON_FILE%] == [] (
+  %PYSPARK_PYTHON%
+) else (
+  echo.
+  echo WARNING: Running python applications through ./bin/pyspark.cmd is 
deprecated as of Spark 1.0.
+  echo Use ./bin/spark-submit ^<python file^>
+  echo.
+  "%FWDIR%\bin\spark-submit.cmd" %PYSPARK_SUBMIT_ARGS%
+)
+
 :exit

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/bin/spark-shell
----------------------------------------------------------------------
diff --git a/bin/spark-shell b/bin/spark-shell
index 7f03349..c158683 100755
--- a/bin/spark-shell
+++ b/bin/spark-shell
@@ -28,7 +28,7 @@ esac
 # Enter posix mode for bash
 set -o posix
 
-if [[ "$@" == *--help* ]]; then
+if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
   echo "Usage: ./bin/spark-shell [options]"
   ./bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
   exit 0
@@ -46,11 +46,11 @@ function main(){
         # (see https://github.com/sbt/sbt/issues/562).
         stty -icanon min 1 -echo > /dev/null 2>&1
         export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
-        $FWDIR/bin/spark-submit spark-internal "$@" --class 
org.apache.spark.repl.Main
+        $FWDIR/bin/spark-submit spark-shell "$@" --class 
org.apache.spark.repl.Main
         stty icanon echo > /dev/null 2>&1
     else
         export SPARK_SUBMIT_OPTS
-        $FWDIR/bin/spark-submit spark-internal "$@" --class 
org.apache.spark.repl.Main
+        $FWDIR/bin/spark-submit spark-shell "$@" --class 
org.apache.spark.repl.Main
     fi
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/bin/spark-shell.cmd
----------------------------------------------------------------------
diff --git a/bin/spark-shell.cmd b/bin/spark-shell.cmd
index ca0c722..4b9708a 100755
--- a/bin/spark-shell.cmd
+++ b/bin/spark-shell.cmd
@@ -19,4 +19,4 @@ rem
 
 set SPARK_HOME=%~dp0..
 
-cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-internal %* --class 
org.apache.spark.repl.Main
+cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class 
org.apache.spark.repl.Main

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index e20d448..2dfa02b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -42,7 +42,7 @@ object PythonRunner {
     // Build up a PYTHONPATH that includes the Spark assembly JAR (where this 
class is), the
     // python directories in SPARK_HOME (if set), and any files in the pyFiles 
argument
     val pathElements = new ArrayBuffer[String]
-    pathElements ++= pyFiles.split(",")
+    pathElements ++= Option(pyFiles).getOrElse("").split(",")
     pathElements += PythonUtils.sparkPythonPath
     pathElements += sys.env.getOrElse("PYTHONPATH", "")
     val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index e86182e..a99b217 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -41,10 +41,10 @@ object SparkSubmit {
   private var clusterManager: Int = LOCAL
 
   /**
-   * A special jar name that indicates the class being run is inside of Spark 
itself,
-   * and therefore no user jar is needed.
+   * Special primary resource names that represent shells rather than 
application jars.
    */
-  private val RESERVED_JAR_NAME = "spark-internal"
+  private val SPARK_SHELL = "spark-shell"
+  private val PYSPARK_SHELL = "pyspark-shell"
 
   def main(args: Array[String]) {
     val appArgs = new SparkSubmitArguments(args)
@@ -71,8 +71,8 @@ object SparkSubmit {
    *         entries for the child, a list of system properties, a list of env 
vars
    *         and the main class for the child
    */
-  private[spark] def createLaunchEnv(args: SparkSubmitArguments): 
(ArrayBuffer[String],
-      ArrayBuffer[String], Map[String, String], String) = {
+  private[spark] def createLaunchEnv(args: SparkSubmitArguments)
+      : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], 
String) = {
     if (args.master.startsWith("local")) {
       clusterManager = LOCAL
     } else if (args.master.startsWith("yarn")) {
@@ -121,24 +121,30 @@ object SparkSubmit {
       printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
     }
 
-    // If we're running a Python app, set the Java class to run to be our 
PythonRunner, add
-    // Python files to deployment list, and pass the main file and Python path 
to PythonRunner
+    // If we're running a python app, set the main class to our specific 
python runner
     if (isPython) {
       if (deployOnCluster) {
         printErrorAndExit("Cannot currently run Python driver programs on 
cluster")
       }
-      args.mainClass = "org.apache.spark.deploy.PythonRunner"
-      args.files = mergeFileLists(args.files, args.pyFiles, 
args.primaryResource)
+      if (args.primaryResource == PYSPARK_SHELL) {
+        args.mainClass = "py4j.GatewayServer"
+        args.childArgs = ArrayBuffer("--die-on-broken-pipe", "0")
+      } else {
+        // If a python file is provided, add it to the child arguments and 
list of files to deploy.
+        // Usage: PythonAppRunner <main python file> <extra python files> [app 
arguments]
+        args.mainClass = "org.apache.spark.deploy.PythonRunner"
+        args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ 
args.childArgs
+        args.files = mergeFileLists(args.files, args.primaryResource)
+      }
       val pyFiles = Option(args.pyFiles).getOrElse("")
-      args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ 
args.childArgs
-      args.primaryResource = RESERVED_JAR_NAME
+      args.files = mergeFileLists(args.files, pyFiles)
       sysProps("spark.submit.pyFiles") = pyFiles
     }
 
     // If we're deploying into YARN, use yarn.Client as a wrapper around the 
user class
     if (!deployOnCluster) {
       childMainClass = args.mainClass
-      if (args.primaryResource != RESERVED_JAR_NAME) {
+      if (isUserJar(args.primaryResource)) {
         childClasspath += args.primaryResource
       }
     } else if (clusterManager == YARN) {
@@ -219,7 +225,7 @@ object SparkSubmit {
     // For python files, the primary resource is already distributed as a 
regular file
     if (!isYarnCluster && !isPython) {
       var jars = sysProps.get("spark.jars").map(x => 
x.split(",").toSeq).getOrElse(Seq())
-      if (args.primaryResource != RESERVED_JAR_NAME) {
+      if (isUserJar(args.primaryResource)) {
         jars = jars ++ Seq(args.primaryResource)
       }
       sysProps.put("spark.jars", jars.mkString(","))
@@ -293,7 +299,7 @@ object SparkSubmit {
   }
 
   private def addJarToClasspath(localJar: String, loader: 
ExecutorURLClassLoader) {
-    val localJarFile = new File(new URI(localJar).getPath())
+    val localJarFile = new File(new URI(localJar).getPath)
     if (!localJarFile.exists()) {
       printWarning(s"Jar $localJar does not exist, skipping.")
     }
@@ -303,6 +309,27 @@ object SparkSubmit {
   }
 
   /**
+   * Return whether the given primary resource represents a user jar.
+   */
+  private def isUserJar(primaryResource: String): Boolean = {
+    !isShell(primaryResource) && !isPython(primaryResource)
+  }
+
+  /**
+   * Return whether the given primary resource represents a shell.
+   */
+  private def isShell(primaryResource: String): Boolean = {
+    primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
+  }
+
+  /**
+   * Return whether the given primary resource requires running python.
+   */
+  private[spark] def isPython(primaryResource: String): Boolean = {
+    primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
+  }
+
+  /**
    * Merge a sequence of comma-separated file lists, some of which may be null 
to indicate
    * no files, into a single comma-separated string.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 2d327aa..264d454 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -298,11 +298,13 @@ private[spark] class SparkSubmitArguments(args: 
Seq[String]) {
             case v =>
               primaryResource = v
               inSparkOpts = false
-              isPython = v.endsWith(".py")
+              isPython = SparkSubmit.isPython(v)
               parse(tail)
           }
         } else {
-          childArgs += value
+          if (!value.isEmpty) {
+            childArgs += value
+          }
           parse(tail)
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 388f722..0c7cff0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1101,7 +1101,7 @@ private[spark] object Utils extends Logging {
    * Strip the directory from a path name
    */
   def stripDirectory(path: String): String = {
-    path.split(File.separator).last
+    new File(path).getName
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 3d0936f..91ae826 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -18,12 +18,12 @@
 import os
 import sys
 import signal
+import shlex
 import platform
 from subprocess import Popen, PIPE
 from threading import Thread
 from py4j.java_gateway import java_import, JavaGateway, GatewayClient
 
-
 def launch_gateway():
     SPARK_HOME = os.environ["SPARK_HOME"]
 
@@ -34,9 +34,11 @@ def launch_gateway():
         # Launch the Py4j gateway using Spark's run command so that we pick up 
the
         # proper classpath and settings from spark-env.sh
         on_windows = platform.system() == "Windows"
-        script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class"
-        command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
-                   "--die-on-broken-pipe", "0"]
+        script = "./bin/spark-submit.cmd" if on_windows else 
"./bin/spark-submit"
+        submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS")
+        submit_args = submit_args if submit_args is not None else ""
+        submit_args = shlex.split(submit_args)
+        command = [os.path.join(SPARK_HOME, script), "pyspark-shell"] + 
submit_args
         if not on_windows:
             # Don't send ctrl-c / SIGINT to the Java gateway:
             def preexec_func():

http://git-wip-us.apache.org/repos/asf/spark/blob/318739a0/python/pyspark/shell.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 52df22d..0a63cfc 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -40,7 +40,7 @@ add_files = os.environ.get("ADD_FILES").split(',') if 
os.environ.get("ADD_FILES"
 if os.environ.get("SPARK_EXECUTOR_URI"):
     SparkContext.setSystemProperty("spark.executor.uri", 
os.environ["SPARK_EXECUTOR_URI"])
 
-sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", 
pyFiles=add_files)
+sc = SparkContext(appName="PySparkShell", pyFiles=add_files)
 
 print("""Welcome to
       ____              __

Reply via email to