Repository: spark
Updated Branches:
  refs/heads/branch-1.0 0759ee790 -> 82c8e89c9


[SPARK-1688] Propagate PySpark worker stderr to driver

When at least one of the following conditions is true, PySpark cannot be loaded:

1. PYTHONPATH is not set
2. PYTHONPATH does not contain the python directory (or jar, in the case of 
YARN)
3. The jar does not contain pyspark files (YARN)
4. The jar does not contain py4j files (YARN)

However, we currently throw the same random `java.io.EOFException` for all of 
the above cases, when trying to read from the python daemon's output. This 
message is super unhelpful.

This PR includes the python stderr and the PYTHONPATH in the exception 
propagated to the driver. Now, the exception message looks something like:

```
Error from python worker:
  : No module named pyspark
PYTHONPATH was:
  /path/to/spark/python:/path/to/some/jar
java.io.EOFException
  <stack trace>
```

whereas before it was just

```
java.io.EOFException
  <stack trace>
```

Author: Andrew Or <andrewo...@gmail.com>

Closes #603 from andrewor14/pyspark-exception and squashes the following 
commits:

10d65d3 [Andrew Or] Throwable -> Exception, worker -> daemon
862d1d7 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-exception
a5ed798 [Andrew Or] Use block string and interpolation instead of var (minor)
cc09c45 [Andrew Or] Account for the fact that the python daemon may not have 
terminated yet
444f019 [Andrew Or] Use the new RedirectThread + include system PYTHONPATH
aab00ae [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-exception
0cc2402 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-exception
783efe2 [Andrew Or] Make python daemon stderr indentation consistent
9524172 [Andrew Or] Avoid potential NPE / error stream contention + Move things 
around
29f9688 [Andrew Or] Add back original exception type
e92d36b [Andrew Or] Include python worker stderr in the exception propagated to 
the driver
7c69360 [Andrew Or] Merge branch 'master' of github.com:apache/spark into 
pyspark-exception
cdbc185 [Andrew Or] Fix python attribute not found exception when PYTHONPATH is 
not set
dcc0353 [Andrew Or] Check both python and system environment variables for 
PYTHONPATH
6c09c21 [Andrew Or] Validate PYTHONPATH and PySpark modules before starting 
python workers

(cherry picked from commit 5200872243aa5906dc8a06772e61d75f19557aac)
Signed-off-by: Aaron Davidson <aa...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/82c8e89c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/82c8e89c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/82c8e89c

Branch: refs/heads/branch-1.0
Commit: 82c8e89c9581c45c7878b8f406cf3d90d4b0d74c
Parents: 0759ee7
Author: Andrew Or <andrewo...@gmail.com>
Authored: Wed May 7 14:35:22 2014 -0700
Committer: Aaron Davidson <aa...@databricks.com>
Committed: Wed May 7 14:35:37 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/api/python/PythonUtils.scala   |  27 +++-
 .../spark/api/python/PythonWorkerFactory.scala  | 136 ++++++++-----------
 .../org/apache/spark/deploy/PythonRunner.scala  |  24 +---
 .../scala/org/apache/spark/util/Utils.scala     |  37 +++++
 4 files changed, 123 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/82c8e89c/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
index cf69fa1..6d3e257 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.api.python
 
-import java.io.File
+import java.io.{File, InputStream, IOException, OutputStream}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -40,3 +40,28 @@ private[spark] object PythonUtils {
     paths.filter(_ != "").mkString(File.pathSeparator)
   }
 }
+
+
+/**
+ * A utility class to redirect the child process's stdout or stderr.
+ */
+private[spark] class RedirectThread(
+    in: InputStream,
+    out: OutputStream,
+    name: String)
+  extends Thread(name) {
+
+  setDaemon(true)
+  override def run() {
+    scala.util.control.Exception.ignoring(classOf[IOException]) {
+      // FIXME: We copy the stream on the level of bytes to avoid encoding 
problems.
+      val buf = new Array[Byte](1024)
+      var len = in.read(buf)
+      while (len != -1) {
+        out.write(buf, 0, len)
+        out.flush()
+        len = in.read(buf)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/82c8e89c/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index b0bf4e0..002f2ac 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -17,15 +17,18 @@
 
 package org.apache.spark.api.python
 
-import java.io.{DataInputStream, File, IOException, OutputStreamWriter}
+import java.io.{DataInputStream, InputStream, OutputStreamWriter}
 import java.net.{InetAddress, ServerSocket, Socket, SocketException}
 
 import scala.collection.JavaConversions._
 
 import org.apache.spark._
+import org.apache.spark.util.Utils
 
 private[spark] class PythonWorkerFactory(pythonExec: String, envVars: 
Map[String, String])
-    extends Logging {
+  extends Logging {
+
+  import PythonWorkerFactory._
 
   // Because forking processes from Java is expensive, we prefer to launch a 
single Python daemon
   // (pyspark/daemon.py) and tell it to fork new workers for our tasks. This 
daemon currently
@@ -38,7 +41,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, 
envVars: Map[String
   var daemonPort: Int = 0
 
   val pythonPath = PythonUtils.mergePythonPaths(
-    PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", ""))
+    PythonUtils.sparkPythonPath,
+    envVars.getOrElse("PYTHONPATH", ""),
+    sys.env.getOrElse("PYTHONPATH", ""))
 
   def create(): Socket = {
     if (useDaemon) {
@@ -61,12 +66,11 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
       try {
         new Socket(daemonHost, daemonPort)
       } catch {
-        case exc: SocketException => {
+        case exc: SocketException =>
           logWarning("Python daemon unexpectedly quit, attempting to restart")
           stopDaemon()
           startDaemon()
           new Socket(daemonHost, daemonPort)
-        }
         case e: Throwable => throw e
       }
     }
@@ -87,39 +91,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, 
envVars: Map[String
       workerEnv.put("PYTHONPATH", pythonPath)
       val worker = pb.start()
 
-      // Redirect the worker's stderr to ours
-      new Thread("stderr reader for " + pythonExec) {
-        setDaemon(true)
-        override def run() {
-          scala.util.control.Exception.ignoring(classOf[IOException]) {
-            // FIXME: We copy the stream on the level of bytes to avoid 
encoding problems.
-            val in = worker.getErrorStream
-            val buf = new Array[Byte](1024)
-            var len = in.read(buf)
-            while (len != -1) {
-              System.err.write(buf, 0, len)
-              len = in.read(buf)
-            }
-          }
-        }
-      }.start()
-
-      // Redirect worker's stdout to our stderr
-      new Thread("stdout reader for " + pythonExec) {
-        setDaemon(true)
-        override def run() {
-          scala.util.control.Exception.ignoring(classOf[IOException]) {
-            // FIXME: We copy the stream on the level of bytes to avoid 
encoding problems.
-            val in = worker.getInputStream
-            val buf = new Array[Byte](1024)
-            var len = in.read(buf)
-            while (len != -1) {
-              System.err.write(buf, 0, len)
-              len = in.read(buf)
-            }
-          }
-        }
-      }.start()
+      // Redirect worker stdout and stderr
+      redirectStreamsToStderr(worker.getInputStream, worker.getErrorStream)
 
       // Tell the worker our port
       val out = new OutputStreamWriter(worker.getOutputStream)
@@ -142,10 +115,6 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
     null
   }
 
-  def stop() {
-    stopDaemon()
-  }
-
   private def startDaemon() {
     synchronized {
       // Is it already running?
@@ -161,46 +130,38 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
         workerEnv.put("PYTHONPATH", pythonPath)
         daemon = pb.start()
 
-        // Redirect the stderr to ours
-        new Thread("stderr reader for " + pythonExec) {
-          setDaemon(true)
-          override def run() {
-            scala.util.control.Exception.ignoring(classOf[IOException]) {
-              // FIXME: We copy the stream on the level of bytes to avoid 
encoding problems.
-              val in = daemon.getErrorStream
-              val buf = new Array[Byte](1024)
-              var len = in.read(buf)
-              while (len != -1) {
-                System.err.write(buf, 0, len)
-                len = in.read(buf)
-              }
-            }
-          }
-        }.start()
-
         val in = new DataInputStream(daemon.getInputStream)
         daemonPort = in.readInt()
 
-        // Redirect further stdout output to our stderr
-        new Thread("stdout reader for " + pythonExec) {
-          setDaemon(true)
-          override def run() {
-            scala.util.control.Exception.ignoring(classOf[IOException]) {
-              // FIXME: We copy the stream on the level of bytes to avoid 
encoding problems.
-              val buf = new Array[Byte](1024)
-              var len = in.read(buf)
-              while (len != -1) {
-                System.err.write(buf, 0, len)
-                len = in.read(buf)
-              }
-            }
-          }
-        }.start()
+        // Redirect daemon stdout and stderr
+        redirectStreamsToStderr(in, daemon.getErrorStream)
+
       } catch {
-        case e: Throwable => {
+        case e: Exception =>
+
+          // If the daemon exists, wait for it to finish and get its stderr
+          val stderr = Option(daemon)
+            .flatMap { d => Utils.getStderr(d, PROCESS_WAIT_TIMEOUT_MS) }
+            .getOrElse("")
+
           stopDaemon()
-          throw e
-        }
+
+          if (stderr != "") {
+            val formattedStderr = stderr.replace("\n", "\n  ")
+            val errorMessage = s"""
+              |Error from python worker:
+              |  $formattedStderr
+              |PYTHONPATH was:
+              |  $pythonPath
+              |$e"""
+
+            // Append error message from python daemon, but keep original 
stack trace
+            val wrappedException = new SparkException(errorMessage.stripMargin)
+            wrappedException.setStackTrace(e.getStackTrace)
+            throw wrappedException
+          } else {
+            throw e
+          }
       }
 
       // Important: don't close daemon's stdin (daemon.getOutputStream) so it 
can correctly
@@ -208,6 +169,19 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
     }
   }
 
+  /**
+   * Redirect the given streams to our stderr in separate threads.
+   */
+  private def redirectStreamsToStderr(stdout: InputStream, stderr: 
InputStream) {
+    try {
+      new RedirectThread(stdout, System.err, "stdout reader for " + 
pythonExec).start()
+      new RedirectThread(stderr, System.err, "stderr reader for " + 
pythonExec).start()
+    } catch {
+      case e: Exception =>
+        logError("Exception in redirecting streams", e)
+    }
+  }
+
   private def stopDaemon() {
     synchronized {
       // Request shutdown of existing daemon by sending SIGTERM
@@ -219,4 +193,12 @@ private[spark] class PythonWorkerFactory(pythonExec: 
String, envVars: Map[String
       daemonPort = 0
     }
   }
+
+  def stop() {
+    stopDaemon()
+  }
+}
+
+private object PythonWorkerFactory {
+  val PROCESS_WAIT_TIMEOUT_MS = 10000
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82c8e89c/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index f2e7c7a..e20d448 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -17,13 +17,10 @@
 
 package org.apache.spark.deploy
 
-import java.io.{IOException, File, InputStream, OutputStream}
-
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 
-import org.apache.spark.SparkContext
-import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.api.python.{PythonUtils, RedirectThread}
 
 /**
  * A main class used by spark-submit to launch Python applications. It 
executes python as a
@@ -62,23 +59,4 @@ object PythonRunner {
 
     System.exit(process.waitFor())
   }
-
-  /**
-   * A utility class to redirect the child process's stdout or stderr
-   */
-  class RedirectThread(in: InputStream, out: OutputStream, name: String) 
extends Thread(name) {
-    setDaemon(true)
-    override def run() {
-      scala.util.control.Exception.ignoring(classOf[IOException]) {
-        // FIXME: We copy the stream on the level of bytes to avoid encoding 
problems.
-        val buf = new Array[Byte](1024)
-        var len = in.read(buf)
-        while (len != -1) {
-          out.write(buf, 0, len)
-          out.flush()
-          len = in.read(buf)
-        }
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/82c8e89c/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 202bd46..3f0ed61 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1088,4 +1088,41 @@ private[spark] object Utils extends Logging {
   def stripDirectory(path: String): String = {
     path.split(File.separator).last
   }
+
+  /**
+   * Wait for a process to terminate for at most the specified duration.
+   * Return whether the process actually terminated after the given timeout.
+   */
+  def waitForProcess(process: Process, timeoutMs: Long): Boolean = {
+    var terminated = false
+    val startTime = System.currentTimeMillis
+    while (!terminated) {
+      try {
+        process.exitValue
+        terminated = true
+      } catch {
+        case e: IllegalThreadStateException =>
+          // Process not terminated yet
+          if (System.currentTimeMillis - startTime > timeoutMs) {
+            return false
+          }
+          Thread.sleep(100)
+      }
+    }
+    true
+  }
+
+  /**
+   * Return the stderr of a process after waiting for the process to terminate.
+   * If the process does not terminate within the specified timeout, return 
None.
+   */
+  def getStderr(process: Process, timeoutMs: Long): Option[String] = {
+    val terminated = Utils.waitForProcess(process, timeoutMs)
+    if (terminated) {
+      
Some(Source.fromInputStream(process.getErrorStream).getLines().mkString("\n"))
+    } else {
+      None
+    }
+  }
+
 }

Reply via email to