Repository: spark
Updated Branches:
  refs/heads/master c44d140ca -> 01849da08


[SPARK-14110][CORE] PipedRDD to print the command ran on non zero exit

## What changes were proposed in this pull request?

In case of failure in subprocess launched in PipedRDD, the failure exception 
reads “Subprocess exited with status XXX”. Debugging this is not easy for 
users especially if there are multiple pipe() operations in the Spark 
application.

Changes done:
- Changed the exception message when non-zero exit code is seen
- If the reader and writer threads see exception, simply logging the command 
ran. The current model is to propagate the exception "as is" so that upstream 
Spark logic will take the right action based on what the exception was (eg. for 
fetch failure, it needs to retry; but for some fatal exception, it will decide 
to fail the stage / job). So wrapping the exception with a generic exception 
will not work. Altering the exception message will keep that guarantee but that 
is ugly (plus not all exceptions might have a constructor for a string message)

## How was this patch tested?

- Added a new test case
- Ran all existing tests for PipedRDD

Author: Tejas Patil <[email protected]>

Closes #11927 from tejasapatil/SPARK-14110-piperdd-failure.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01849da0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01849da0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01849da0

Branch: refs/heads/master
Commit: 01849da080439d1f2dbb90a8985c661522ed3d7a
Parents: c44d140
Author: Tejas Patil <[email protected]>
Authored: Thu Mar 24 00:31:13 2016 -0700
Committer: Reynold Xin <[email protected]>
Committed: Thu Mar 24 00:31:13 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |  6 +++++-
 .../org/apache/spark/rdd/PipedRDDSuite.scala    | 22 +++++++++++++++-----
 2 files changed, 22 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01849da0/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 50b4184..dd8e46b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -184,7 +184,8 @@ private[spark] class PipedRDD[T: ClassTag](
           val exitStatus = proc.waitFor()
           cleanup()
           if (exitStatus != 0) {
-            throw new IllegalStateException(s"Subprocess exited with status 
$exitStatus")
+            throw new IllegalStateException(s"Subprocess exited with status 
$exitStatus. " +
+              s"Command ran: " + command.mkString(" "))
           }
           false
         }
@@ -205,6 +206,9 @@ private[spark] class PipedRDD[T: ClassTag](
       private def propagateChildException(): Unit = {
         val t = childThreadException.get()
         if (t != null) {
+          val commandRan = command.mkString(" ")
+          logError(s"Caught exception while running pipe() operator. Command 
ran: $commandRan. " +
+            s"Exception: ${t.getMessage}")
           proc.destroy()
           cleanup()
           throw t

http://git-wip-us.apache.org/repos/asf/spark/blob/01849da0/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index d13da38..e9cc819 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -134,15 +134,27 @@ class PipedRDDSuite extends SparkFunSuite with 
SharedSparkContext {
     }
   }
 
-  test("pipe with non-zero exit status") {
+  test("pipe with process which cannot be launched due to bad command") {
+    if (!testCommandAvailable("some_nonexistent_command")) {
+      val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+      val command = Seq("some_nonexistent_command")
+      val piped = nums.pipe(command)
+      val exception = intercept[SparkException] {
+        piped.collect()
+      }
+      assert(exception.getMessage.contains(command.mkString(" ")))
+    }
+  }
+
+  test("pipe with process which is launched but fails with non-zero exit 
status") {
     if (testCommandAvailable("cat")) {
       val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-      val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null"))
-      intercept[SparkException] {
+      val command = Seq("cat", "nonexistent_file")
+      val piped = nums.pipe(command)
+      val exception = intercept[SparkException] {
         piped.collect()
       }
-    } else {
-      assert(true)
+      assert(exception.getMessage.contains(command.mkString(" ")))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to