Repository: spark
Updated Branches:
  refs/heads/branch-0.9 87e4dd58c -> ef74e44e0


SPARK-1019: pyspark RDD take() throws an NPE

Author: Patrick Wendell <[email protected]>

Closes #112 from pwendell/pyspark-take and squashes the following commits:

daae80e [Patrick Wendell] SPARK-1019: pyspark RDD take() throws an NPE
(cherry picked from commit 4ea23db0efff2f39ac5b8f0bd1d9a6ffa3eceb0d)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef74e44e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef74e44e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef74e44e

Branch: refs/heads/branch-0.9
Commit: ef74e44e04517abd4c7058c87abd1f4e8fbaa09d
Parents: 87e4dd5
Author: Patrick Wendell <[email protected]>
Authored: Wed Mar 12 23:16:59 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Wed Mar 12 23:17:17 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/TaskContext.scala       | 3 ++-
 .../main/scala/org/apache/spark/api/python/PythonRDD.scala   | 8 ++++++++
 2 files changed, 10 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ef74e44e/core/src/main/scala/org/apache/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index cae983e..be53ca2 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -46,6 +46,7 @@ class TaskContext(
   }
 
   def executeOnCompleteCallbacks() {
-    onCompleteCallbacks.foreach{_()}
+    // Process complete callbacks in the reverse order of registration
+    onCompleteCallbacks.reverse.foreach{_()}
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ef74e44e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 9cbd26b..e03c6f9 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -99,6 +99,14 @@ private[spark] class PythonRDD[T: ClassTag](
       }
     }.start()
 
+    /*
+     * Partial fix for SPARK-1019: Attempts to stop reading the input stream 
since
+     * other completion callbacks might invalidate the input. Because 
interruption
+     * is not synchronous this still leaves a potential race where the 
interruption is
+     * processed only after the stream becomes invalid.
+     */
+    context.addOnCompleteCallback(() => context.interrupted = true)
+
     // Return an iterator that read lines from the process's stdout
     val stream = new DataInputStream(new 
BufferedInputStream(worker.getInputStream, bufferSize))
     val stdoutIterator = new Iterator[Array[Byte]] {

Reply via email to