Repository: spark
Updated Branches:
  refs/heads/master 1d03a26a4 -> 28dcbb531


[SPARK-2898] [PySpark] fix bugs in deamon.py

1. do not use signal handler for SIGCHILD, it's easy to cause deadlock
2. handle EINTR during accept()
3. pass errno into JVM
4. handle EAGAIN during fork()

Now, it can pass 50k tasks tests in 180 seconds.

Author: Davies Liu <[email protected]>

Closes #1842 from davies/qa and squashes the following commits:

f0ea451 [Davies Liu] fix lint
03a2e8c [Davies Liu] cleanup dead children every seconds
32cb829 [Davies Liu] fix lint
0cd0817 [Davies Liu] fix bugs in deamon.py


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28dcbb53
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28dcbb53
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28dcbb53

Branch: refs/heads/master
Commit: 28dcbb531ae57dc50f15ad9df6c31022731669c9
Parents: 1d03a26
Author: Davies Liu <[email protected]>
Authored: Sun Aug 10 13:00:38 2014 -0700
Committer: Josh Rosen <[email protected]>
Committed: Sun Aug 10 13:00:38 2014 -0700

----------------------------------------------------------------------
 .../spark/api/python/PythonWorkerFactory.scala  |  2 +-
 python/pyspark/daemon.py                        | 78 ++++++++++++--------
 2 files changed, 48 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28dcbb53/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 7af260d..bf716a8 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -68,7 +68,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, 
envVars: Map[String
       val socket = new Socket(daemonHost, daemonPort)
       val pid = new DataInputStream(socket.getInputStream).readInt()
       if (pid < 0) {
-        throw new IllegalStateException("Python daemon failed to launch 
worker")
+        throw new IllegalStateException("Python daemon failed to launch worker 
with code " + pid)
       }
       daemonWorkers.put(socket, pid)
       socket

http://git-wip-us.apache.org/repos/asf/spark/blob/28dcbb53/python/pyspark/daemon.py
----------------------------------------------------------------------
diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py
index e73538b..22ab8d3 100644
--- a/python/pyspark/daemon.py
+++ b/python/pyspark/daemon.py
@@ -22,7 +22,8 @@ import select
 import socket
 import sys
 import traceback
-from errno import EINTR, ECHILD
+import time
+from errno import EINTR, ECHILD, EAGAIN
 from socket import AF_INET, SOCK_STREAM, SOMAXCONN
 from signal import SIGHUP, SIGTERM, SIGCHLD, SIG_DFL, SIG_IGN
 from pyspark.worker import main as worker_main
@@ -80,6 +81,17 @@ def worker(sock):
         os._exit(compute_real_exit_code(exit_code))
 
 
+# Cleanup zombie children
+def cleanup_dead_children():
+    try:
+        while True:
+            pid, _ = os.waitpid(0, os.WNOHANG)
+            if not pid:
+                break
+    except:
+        pass
+
+
 def manager():
     # Create a new process group to corral our children
     os.setpgid(0, 0)
@@ -102,29 +114,21 @@ def manager():
     signal.signal(SIGTERM, handle_sigterm)  # Gracefully exit on SIGTERM
     signal.signal(SIGHUP, SIG_IGN)  # Don't die on SIGHUP
 
-    # Cleanup zombie children
-    def handle_sigchld(*args):
-        try:
-            pid, status = os.waitpid(0, os.WNOHANG)
-            if status != 0:
-                msg = "worker %s crashed abruptly with exit status %s" % (pid, 
status)
-                print >> sys.stderr, msg
-        except EnvironmentError as err:
-            if err.errno not in (ECHILD, EINTR):
-                raise
-    signal.signal(SIGCHLD, handle_sigchld)
-
     # Initialization complete
     sys.stdout.close()
     try:
         while True:
             try:
-                ready_fds = select.select([0, listen_sock], [], [])[0]
+                ready_fds = select.select([0, listen_sock], [], [], 1)[0]
             except select.error as ex:
                 if ex[0] == EINTR:
                     continue
                 else:
                     raise
+
+            # cleanup in signal handler will cause deadlock
+            cleanup_dead_children()
+
             if 0 in ready_fds:
                 try:
                     worker_pid = read_int(sys.stdin)
@@ -137,29 +141,41 @@ def manager():
                     pass  # process already died
 
             if listen_sock in ready_fds:
-                sock, addr = listen_sock.accept()
+                try:
+                    sock, _ = listen_sock.accept()
+                except OSError as e:
+                    if e.errno == EINTR:
+                        continue
+                    raise
+
                 # Launch a worker process
                 try:
                     pid = os.fork()
-                    if pid == 0:
-                        listen_sock.close()
-                        try:
-                            worker(sock)
-                        except:
-                            traceback.print_exc()
-                            os._exit(1)
-                        else:
-                            os._exit(0)
+                except OSError as e:
+                    if e.errno in (EAGAIN, EINTR):
+                        time.sleep(1)
+                        pid = os.fork()  # error here will shutdown daemon
                     else:
+                        outfile = sock.makefile('w')
+                        write_int(e.errno, outfile)  # Signal that the fork 
failed
+                        outfile.flush()
+                        outfile.close()
                         sock.close()
-
-                except OSError as e:
-                    print >> sys.stderr, "Daemon failed to fork PySpark 
worker: %s" % e
-                    outfile = os.fdopen(os.dup(sock.fileno()), "a+", 65536)
-                    write_int(-1, outfile)  # Signal that the fork failed
-                    outfile.flush()
-                    outfile.close()
+                        continue
+
+                if pid == 0:
+                    # in child process
+                    listen_sock.close()
+                    try:
+                        worker(sock)
+                    except:
+                        traceback.print_exc()
+                        os._exit(1)
+                    else:
+                        os._exit(0)
+                else:
                     sock.close()
+
     finally:
         shutdown(1)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to