Repository: spark Updated Branches: refs/heads/master 953031688 -> e41786c77
[SPARK-4088] [PySpark] Python worker should exit after socket is closed by JVM In case of take() or exception in Python, python worker may exit before JVM read() all the response, then the write thread may raise "Connection reset" exception. Python should always wait JVM to close the socket first. cc JoshRosen This is a warm fix, or the tests will be flaky, sorry for that. Author: Davies Liu <[email protected]> Closes #2941 from davies/fix_exit and squashes the following commits: 9d4d21e [Davies Liu] fix race Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e41786c7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e41786c7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e41786c7 Branch: refs/heads/master Commit: e41786c77482d3f9e3c01cfd583c8899815c3106 Parents: 9530316 Author: Davies Liu <[email protected]> Authored: Sat Oct 25 01:20:39 2014 -0700 Committer: Josh Rosen <[email protected]> Committed: Sat Oct 25 01:20:39 2014 -0700 ---------------------------------------------------------------------- python/pyspark/daemon.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e41786c7/python/pyspark/daemon.py ---------------------------------------------------------------------- diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index dbb3477..f09587f 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -62,8 +62,7 @@ def worker(sock): exit_code = compute_real_exit_code(exc.code) finally: outfile.flush() - if exit_code: - os._exit(exit_code) + return exit_code # Cleanup zombie children @@ -160,10 +159,13 @@ def manager(): outfile.flush() outfile.close() while True: - worker(sock) - if not reuse: + code = worker(sock) + if not reuse or code: # wait for closing - while sock.recv(1024): + try: + while sock.recv(1024): + pass + except Exception: pass break gc.collect() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
