Repository: spark Updated Branches: refs/heads/branch-1.2 a39da171c -> 0df26bb97
[SPARK-5363] [PySpark] check ending mark in non-block way There is chance of dead lock that the Python process is waiting for ending mark from JVM, but which is eaten by corrupted stream. This PR checks the ending mark from Python in non-block way, so it will not blocked by Python process. There is a small chance that the ending mark is sent by Python process but not available right now, then Python process will not be used. cc JoshRosen pwendell Author: Davies Liu <dav...@databricks.com> Closes #4601 from davies/freeze and squashes the following commits: e15a8c3 [Davies Liu] update logging 890329c [Davies Liu] Merge branch 'freeze' of github.com:davies/spark into freeze 2bd2228 [Davies Liu] add more logging 656d544 [Davies Liu] Update PythonRDD.scala 05e1085 [Davies Liu] check ending mark in non-block way (cherry picked from commit ac6fe67e1d8bf01ee565f9cc09ad48d88a275829) Signed-off-by: Josh Rosen <joshro...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0df26bb9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0df26bb9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0df26bb9 Branch: refs/heads/branch-1.2 Commit: 0df26bb976e3b3af7f6924fad5b1bc510993102d Parents: a39da17 Author: Davies Liu <dav...@databricks.com> Authored: Mon Feb 16 20:32:03 2015 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Mon Feb 16 20:37:24 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/api/python/PythonRDD.scala | 21 ++++++++++++++++---- python/pyspark/worker.py | 1 + 2 files changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0df26bb9/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0d508d6..ba085c5 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -145,11 +145,24 @@ private[spark] class PythonRDD( stream.readFully(update) accumulator += Collections.singletonList(update) } + // Check whether the worker is ready to be re-used. - if (stream.readInt() == SpecialLengths.END_OF_STREAM) { - if (reuse_worker) { - env.releasePythonWorker(pythonExec, envVars.toMap, worker) - released = true + if (reuse_worker) { + // It has a high possibility that the ending mark is already available, + // And current task should not be blocked by checking it + + if (stream.available() >= 4) { + val ending = stream.readInt() + if (ending == SpecialLengths.END_OF_STREAM) { + env.releasePythonWorker(pythonExec, envVars.toMap, worker) + released = true + logInfo(s"Communication with worker ended cleanly, re-use it: $worker") + } else { + logInfo(s"Communication with worker did not end cleanly (ending with $ending), " + + s"close it: $worker") + } + } else { + logInfo(s"The ending mark from worker is not available, close it: $worker") } } null http://git-wip-us.apache.org/repos/asf/spark/blob/0df26bb9/python/pyspark/worker.py ---------------------------------------------------------------------- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 7e5343c..c2ddd4d 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -127,6 +127,7 @@ def main(infile, outfile): write_int(len(_accumulatorRegistry), outfile) for (aid, accum) in _accumulatorRegistry.items(): pickleSer._write_with_length((aid, accum._value), outfile) + outfile.flush() # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org