Repository: spark
Updated Branches:
  refs/heads/branch-1.1 78e3c036e -> 54ccd93e6


[HOTFIX] Wait for EOF only for the PySpark shell

In `SparkSubmitDriverBootstrapper`, we wait for the parent process to send us 
an `EOF` before finishing the application. This is applicable for the PySpark 
shell because we terminate the application the same way. However if we run a 
python application, for instance, the JVM actually never exits unless it 
receives a manual EOF from the user. This is causing a few tests to timeout.

We only need to do this for the PySpark shell because Spark submit runs as a 
python subprocess only in this case. Thus, the normal Spark shell doesn't need 
to go through this case even though it is also a REPL.

Thanks davies for reporting this.

Author: Andrew Or <[email protected]>

Closes #2170 from andrewor14/bootstrap-hotfix and squashes the following 
commits:

42963f5 [Andrew Or] Do not wait for EOF unless this is the pyspark shell
(cherry picked from commit dafe343499bbc688e266106e4bb897f9e619834e)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/54ccd93e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/54ccd93e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/54ccd93e

Branch: refs/heads/branch-1.1
Commit: 54ccd93e621c1bc4afc709a208b609232ab701d1
Parents: 78e3c03
Author: Andrew Or <[email protected]>
Authored: Wed Aug 27 23:03:46 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Wed Aug 27 23:04:28 2014 -0700

----------------------------------------------------------------------
 bin/pyspark                                     |  2 ++
 .../deploy/SparkSubmitDriverBootstrapper.scala  | 26 +++++++++++---------
 2 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/54ccd93e/bin/pyspark
----------------------------------------------------------------------
diff --git a/bin/pyspark b/bin/pyspark
index 01d4202..6687648 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -102,6 +102,8 @@ if [[ "$1" =~ \.py$ ]]; then
   gatherSparkSubmitOpts "$@"
   exec $FWDIR/bin/spark-submit "${SUBMISSION_OPTS[@]}" $primary 
"${APPLICATION_OPTS[@]}"
 else
+  # PySpark shell requires special handling downstream
+  export PYSPARK_SHELL=1
   # Only use ipython if no command line arguments were provided [SPARK-1134]
   if [[ "$IPYTHON" = "1" ]]; then
     exec ipython $IPYTHON_OPTS

http://git-wip-us.apache.org/repos/asf/spark/blob/54ccd93e/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index 7ca96ed..38b5d8e 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -132,25 +132,29 @@ private[spark] object SparkSubmitDriverBootstrapper {
     val builder = new ProcessBuilder(filteredCommand)
     val process = builder.start()
 
-    // Redirect stdin, stdout, and stderr to/from the child JVM
+    // Redirect stdout and stderr from the child JVM
     val stdoutThread = new RedirectThread(process.getInputStream, System.out, 
"redirect stdout")
     val stderrThread = new RedirectThread(process.getErrorStream, System.err, 
"redirect stderr")
     stdoutThread.start()
     stderrThread.start()
 
-    // In Windows, the subprocess reads directly from our stdin, so we should 
avoid spawning
-    // a thread that contends with the subprocess in reading from System.in.
-    if (Utils.isWindows) {
-      // For the PySpark shell, the termination of this process is handled in 
java_gateway.py
-      process.waitFor()
-    } else {
-      // Terminate on broken pipe, which signals that the parent process has 
exited. This is
-      // important for the PySpark shell, where Spark submit itself is a 
python subprocess.
+    // Redirect stdin to child JVM only if we're not running Windows. This is 
because the
+    // subprocess there already reads directly from our stdin, so we should 
avoid spawning a
+    // thread that contends with the subprocess in reading from System.in.
+    val isWindows = Utils.isWindows
+    val isPySparkShell = sys.env.contains("PYSPARK_SHELL")
+    if (!isWindows) {
       val stdinThread = new RedirectThread(System.in, process.getOutputStream, 
"redirect stdin")
       stdinThread.start()
-      stdinThread.join()
-      process.destroy()
+      // For the PySpark shell, Spark submit itself runs as a python 
subprocess, and so this JVM
+      // should terminate on broken pipe, which signals that the parent 
process has exited. In
+      // Windows, the termination logic for the PySpark shell is handled in 
java_gateway.py
+      if (isPySparkShell) {
+        stdinThread.join()
+        process.destroy()
+      }
     }
+    process.waitFor()
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to