Repository: spark
Updated Branches:
  refs/heads/branch-1.3 e355b54de -> baad6b3cf


[SPARK-5363] [PySpark] check ending mark in non-block way

There is chance of dead lock that the Python process is waiting for ending mark 
from JVM, but which is eaten by corrupted stream.

This PR checks the ending mark from Python in non-block way, so it will not 
blocked by Python process.

There is a small chance that the ending mark is sent by Python process but not 
available right now, then Python process will not be used.

cc JoshRosen pwendell

Author: Davies Liu <dav...@databricks.com>

Closes #4601 from davies/freeze and squashes the following commits:

e15a8c3 [Davies Liu] update logging
890329c [Davies Liu] Merge branch 'freeze' of github.com:davies/spark into 
freeze
2bd2228 [Davies Liu] add more logging
656d544 [Davies Liu] Update PythonRDD.scala
05e1085 [Davies Liu] check ending mark in non-block way

(cherry picked from commit ac6fe67e1d8bf01ee565f9cc09ad48d88a275829)
Signed-off-by: Josh Rosen <joshro...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/baad6b3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/baad6b3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/baad6b3c

Branch: refs/heads/branch-1.3
Commit: baad6b3cf26ba5b939fde20f26ccd3c4b15bd0c1
Parents: e355b54
Author: Davies Liu <dav...@databricks.com>
Authored: Mon Feb 16 20:32:03 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Mon Feb 16 20:32:50 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala | 21 ++++++++++++++++----
 python/pyspark/worker.py                        |  1 +
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/baad6b3c/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 2527211..c3c8336 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -144,11 +144,24 @@ private[spark] class PythonRDD(
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
               }
+
               // Check whether the worker is ready to be re-used.
-              if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
-                if (reuse_worker) {
-                  env.releasePythonWorker(pythonExec, envVars.toMap, worker)
-                  released = true
+              if (reuse_worker) {
+                // It has a high possibility that the ending mark is already 
available,
+                // And current task should not be blocked by checking it
+
+                if (stream.available() >= 4) {
+                  val ending = stream.readInt()
+                  if (ending == SpecialLengths.END_OF_STREAM) {
+                    env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+                    released = true
+                    logInfo(s"Communication with worker ended cleanly, re-use 
it: $worker")
+                  } else {
+                    logInfo(s"Communication with worker did not end cleanly 
(ending with $ending), " +
+                      s"close it: $worker")
+                  }
+                } else {
+                  logInfo(s"The ending mark from worker is not available, 
close it: $worker")
                 }
               }
               null

http://git-wip-us.apache.org/repos/asf/spark/blob/baad6b3c/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 8a93c32..180bdbb 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -121,6 +121,7 @@ def main(infile, outfile):
     write_int(len(_accumulatorRegistry), outfile)
     for (aid, accum) in _accumulatorRegistry.items():
         pickleSer._write_with_length((aid, accum._value), outfile)
+    outfile.flush()
 
     # check end of stream
     if read_int(infile) == SpecialLengths.END_OF_STREAM:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to