Repository: spark Updated Branches: refs/heads/branch-1.2 432ceca2a -> 6be36d5a8
Revert "[SPARK-5363] [PySpark] check ending mark in non-block way" This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6be36d5a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6be36d5a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6be36d5a Branch: refs/heads/branch-1.2 Commit: 6be36d5a88c172b446cc69ebde6176e606cf09f1 Parents: 432ceca Author: Josh Rosen <joshro...@databricks.com> Authored: Tue Feb 17 07:48:27 2015 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Tue Feb 17 07:51:37 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/api/python/PythonRDD.scala | 21 ++++---------------- python/pyspark/worker.py | 1 - 2 files changed, 4 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6be36d5a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index b513fb8..0d508d6 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -145,24 +145,11 @@ private[spark] class PythonRDD( stream.readFully(update) accumulator += Collections.singletonList(update) } - // Check whether the worker is ready to be re-used. - if (reuse_worker) { - // It has a high possibility that the ending mark is already available, - // And current task should not be blocked by checking it - - if (stream.available() >= 4) { - val ending = stream.readInt() - if (ending == SpecialLengths.END_OF_STREAM) { - env.releasePythonWorker(pythonExec, envVars.toMap, worker) - released = true - logInfo(s"Communication with worker ended cleanly, re-use it: $worker") - } else { - logInfo(s"Communication with worker did not end cleanly " + - s"(ending with $ending), close it: $worker") - } - } else { - logInfo(s"The ending mark from worker is not available, close it: $worker") + if (stream.readInt() == SpecialLengths.END_OF_STREAM) { + if (reuse_worker) { + env.releasePythonWorker(pythonExec, envVars.toMap, worker) + released = true } } null http://git-wip-us.apache.org/repos/asf/spark/blob/6be36d5a/python/pyspark/worker.py ---------------------------------------------------------------------- diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index c2ddd4d..7e5343c 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -127,7 +127,6 @@ def main(infile, outfile): write_int(len(_accumulatorRegistry), outfile) for (aid, accum) in _accumulatorRegistry.items(): pickleSer._write_with_length((aid, accum._value), outfile) - outfile.flush() # check end of stream if read_int(infile) == SpecialLengths.END_OF_STREAM: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org