Updated Branches: refs/heads/master c66a2ef1c -> c40619d48
Fix for SPARK-1025: PySpark hang on missing files. Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f8306849 Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f8306849 Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f8306849 Branch: refs/heads/master Commit: f83068497ba42c5ea5c636efebca81f684e96177 Parents: 6156990 Author: Josh Rosen <joshro...@apache.org> Authored: Thu Jan 23 18:10:16 2014 -0800 Committer: Josh Rosen <joshro...@apache.org> Committed: Thu Jan 23 18:24:51 2014 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/api/python/PythonRDD.scala | 9 +++++++++ python/pyspark/tests.py | 11 +++++++++++ 2 files changed, 20 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f8306849/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 57bde8d..70516bd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -52,6 +52,8 @@ private[spark] class PythonRDD[T: ClassTag]( val env = SparkEnv.get val worker = env.createPythonWorker(pythonExec, envVars.toMap) + @volatile var readerException: Exception = null + // Start a thread to feed the process input from our parent's iterator new Thread("stdin writer for " + pythonExec) { override def run() { @@ -82,6 +84,10 @@ private[spark] class PythonRDD[T: ClassTag]( dataOut.flush() worker.shutdownOutput() } catch { + case e: java.io.FileNotFoundException => + readerException = e + // Kill the Python worker process: + worker.shutdownOutput() case e: IOException => // This can happen for legitimate reasons if the Python code stops returning data before we are done // passing elements through, e.g., for take(). Just log a message to say it happened. @@ -106,6 +112,9 @@ private[spark] class PythonRDD[T: ClassTag]( } private def read(): Array[Byte] = { + if (readerException != null) { + throw readerException + } try { stream.readInt() match { case length if length > 0 => http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f8306849/python/pyspark/tests.py ---------------------------------------------------------------------- diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index acd1ca5..5271045 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -168,6 +168,17 @@ class TestRDDFunctions(PySparkTestCase): self.assertEqual("Hello World!", x.strip()) self.assertEqual("Hello World!", y.strip()) + def test_deleting_input_files(self): + # Regression test for SPARK-1025 + tempFile = NamedTemporaryFile(delete=False) + tempFile.write("Hello World!") + tempFile.close() + data = self.sc.textFile(tempFile.name) + filtered_data = data.filter(lambda x: True) + self.assertEqual(1, filtered_data.count()) + os.unlink(tempFile.name) + self.assertRaises(Exception, lambda: filtered_data.count()) + class TestIO(PySparkTestCase):