Repository: flink
Updated Branches:
  refs/heads/release-1.2 9d59e008d -> edd1065c7


[FLINK-5650] [py] Continuously check PyProcess health while waiting for 
incoming connection


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edd1065c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edd1065c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edd1065c

Branch: refs/heads/release-1.2
Commit: edd1065c73f3600929b1f731bc0b66255a56d26d
Parents: 9d59e00
Author: zentol <[email protected]>
Authored: Wed Mar 15 17:06:12 2017 +0100
Committer: zentol <[email protected]>
Committed: Thu Mar 16 17:09:16 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 23 ++++++++++++-
 .../api/streaming/plan/PythonPlanStreamer.java  | 35 ++++++++++++++------
 2 files changed, 46 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/edd1065c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 10aded8..bab4904 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -89,6 +89,7 @@ public class PythonStreamer implements Serializable {
         */
        public void open() throws IOException {
                server = new ServerSocket(0);
+               server.setSoTimeout(50);
                startPython();
        }
 
@@ -145,11 +146,31 @@ public class PythonStreamer implements Serializable {
                } catch (IllegalThreadStateException ise) { //process still 
active -> start receiving data
                }
 
-               socket = server.accept();
+               while (true) {
+                       try {
+                               socket = server.accept();
+                               break;
+                       } catch (SocketTimeoutException ignored) {
+                               checkPythonProcessHealth();
+                       }
+               }
                in = new DataInputStream(socket.getInputStream());
                out = new DataOutputStream(socket.getOutputStream());
        }
 
+       private void checkPythonProcessHealth() {
+               try {
+                       int value = process.exitValue();
+                       if (value != 0) {
+                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
+                       }
+                       if (value == 0) {
+                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
+                       }
+               } catch (IllegalThreadStateException ise) {//Process still 
running
+               }
+       }
+
        /**
         * Closes this streamer.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/edd1065c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index ecbc7f4..eafbdc1 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -16,6 +16,7 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import org.apache.flink.python.api.streaming.util.StreamPrinter;
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
@@ -47,8 +48,16 @@ public class PythonPlanStreamer implements Serializable {
 
        public void open(String tmpPath, String args) throws IOException {
                server = new ServerSocket(0);
+               server.setSoTimeout(50);
                startPython(tmpPath, args);
-               socket = server.accept();
+               while (true) {
+                       try {
+                               socket = server.accept();
+                               break;
+                       } catch (SocketTimeoutException ignored) {
+                               checkPythonProcessHealth();
+                       }
+               }
                sender = new PythonPlanSender(socket.getOutputStream());
                receiver = new PythonPlanReceiver(socket.getInputStream());
        }
@@ -71,16 +80,7 @@ public class PythonPlanStreamer implements Serializable {
                } catch (InterruptedException ex) {
                }
 
-               try {
-                       int value = process.exitValue();
-                       if (value != 0) {
-                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
-                       }
-                       if (value == 0) {
-                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
-                       }
-               } catch (IllegalThreadStateException ise) {//Process still 
running
-               }
+               checkPythonProcessHealth();
 
                process.getOutputStream().write("plan\n".getBytes());
                process.getOutputStream().write((server.getLocalPort() + 
"\n").getBytes());
@@ -95,4 +95,17 @@ public class PythonPlanStreamer implements Serializable {
                        process.destroy();
                }
        }
+
+       private void checkPythonProcessHealth() {
+               try {
+                       int value = process.exitValue();
+                       if (value != 0) {
+                               throw new RuntimeException("Plan file caused an 
error. Check log-files for details.");
+                       }
+                       if (value == 0) {
+                               throw new RuntimeException("Plan file exited 
prematurely without an error.");
+                       }
+               } catch (IllegalThreadStateException ise) {//Process still 
running
+               }
+       }
 }

Reply via email to