Repository: spark
Updated Branches:
  refs/heads/branch-1.2 432ceca2a -> 6be36d5a8


Revert "[SPARK-5363] [PySpark] check ending mark in non-block way"

This reverts commits ac6fe67e1d8bf01ee565f9cc09ad48d88a275829 and 
c06e42f2c1e5fcf123b466efd27ee4cb53bbed3f.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6be36d5a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6be36d5a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6be36d5a

Branch: refs/heads/branch-1.2
Commit: 6be36d5a88c172b446cc69ebde6176e606cf09f1
Parents: 432ceca
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Feb 17 07:48:27 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Feb 17 07:51:37 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala | 21 ++++----------------
 python/pyspark/worker.py                        |  1 -
 2 files changed, 4 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6be36d5a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index b513fb8..0d508d6 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -145,24 +145,11 @@ private[spark] class PythonRDD(
                 stream.readFully(update)
                 accumulator += Collections.singletonList(update)
               }
-
               // Check whether the worker is ready to be re-used.
-              if (reuse_worker) {
-                // It has a high possibility that the ending mark is already 
available,
-                // And current task should not be blocked by checking it
-
-                if (stream.available() >= 4) {
-                  val ending = stream.readInt()
-                  if (ending == SpecialLengths.END_OF_STREAM) {
-                    env.releasePythonWorker(pythonExec, envVars.toMap, worker)
-                    released = true
-                    logInfo(s"Communication with worker ended cleanly, re-use 
it: $worker")
-                  } else {
-                    logInfo(s"Communication with worker did not end cleanly " +
-                      s"(ending with $ending), close it: $worker")
-                  }
-                } else {
-                  logInfo(s"The ending mark from worker is not available, 
close it: $worker")
+              if (stream.readInt() == SpecialLengths.END_OF_STREAM) {
+                if (reuse_worker) {
+                  env.releasePythonWorker(pythonExec, envVars.toMap, worker)
+                  released = true
                 }
               }
               null

http://git-wip-us.apache.org/repos/asf/spark/blob/6be36d5a/python/pyspark/worker.py
----------------------------------------------------------------------
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index c2ddd4d..7e5343c 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -127,7 +127,6 @@ def main(infile, outfile):
     write_int(len(_accumulatorRegistry), outfile)
     for (aid, accum) in _accumulatorRegistry.items():
         pickleSer._write_with_length((aid, accum._value), outfile)
-    outfile.flush()
 
     # check end of stream
     if read_int(infile) == SpecialLengths.END_OF_STREAM:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to