Repository: flink Updated Branches: refs/heads/release-1.2 9d59e008d -> edd1065c7
[FLINK-5650] [py] Continuously check PyProcess health while waiting for incoming connection Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/edd1065c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/edd1065c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/edd1065c Branch: refs/heads/release-1.2 Commit: edd1065c73f3600929b1f731bc0b66255a56d26d Parents: 9d59e00 Author: zentol <[email protected]> Authored: Wed Mar 15 17:06:12 2017 +0100 Committer: zentol <[email protected]> Committed: Thu Mar 16 17:09:16 2017 +0100 ---------------------------------------------------------------------- .../api/streaming/data/PythonStreamer.java | 23 ++++++++++++- .../api/streaming/plan/PythonPlanStreamer.java | 35 ++++++++++++++------ 2 files changed, 46 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/edd1065c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 10aded8..bab4904 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -89,6 +89,7 @@ public class PythonStreamer implements Serializable { */ public void open() throws IOException { server = new ServerSocket(0); + server.setSoTimeout(50); startPython(); } @@ -145,11 +146,31 @@ public class PythonStreamer implements Serializable { } catch (IllegalThreadStateException ise) { //process still active -> start receiving data } - socket = server.accept(); + while (true) { + try { + socket = server.accept(); + break; + } catch (SocketTimeoutException ignored) { + checkPythonProcessHealth(); + } + } in = new DataInputStream(socket.getInputStream()); out = new DataOutputStream(socket.getOutputStream()); } + private void checkPythonProcessHealth() { + try { + int value = process.exitValue(); + if (value != 0) { + throw new RuntimeException("Plan file caused an error. Check log-files for details."); + } + if (value == 0) { + throw new RuntimeException("Plan file exited prematurely without an error."); + } + } catch (IllegalThreadStateException ise) {//Process still running + } + } + /** * Closes this streamer. * http://git-wip-us.apache.org/repos/asf/flink/blob/edd1065c/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java index ecbc7f4..eafbdc1 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.io.Serializable; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketTimeoutException; import org.apache.flink.python.api.streaming.util.StreamPrinter; import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH; import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH; @@ -47,8 +48,16 @@ public class PythonPlanStreamer implements Serializable { public void open(String tmpPath, String args) throws IOException { server = new ServerSocket(0); + server.setSoTimeout(50); startPython(tmpPath, args); - socket = server.accept(); + while (true) { + try { + socket = server.accept(); + break; + } catch (SocketTimeoutException ignored) { + checkPythonProcessHealth(); + } + } sender = new PythonPlanSender(socket.getOutputStream()); receiver = new PythonPlanReceiver(socket.getInputStream()); } @@ -71,16 +80,7 @@ public class PythonPlanStreamer implements Serializable { } catch (InterruptedException ex) { } - try { - int value = process.exitValue(); - if (value != 0) { - throw new RuntimeException("Plan file caused an error. Check log-files for details."); - } - if (value == 0) { - throw new RuntimeException("Plan file exited prematurely without an error."); - } - } catch (IllegalThreadStateException ise) {//Process still running - } + checkPythonProcessHealth(); process.getOutputStream().write("plan\n".getBytes()); process.getOutputStream().write((server.getLocalPort() + "\n").getBytes()); @@ -95,4 +95,17 @@ public class PythonPlanStreamer implements Serializable { process.destroy(); } } + + private void checkPythonProcessHealth() { + try { + int value = process.exitValue(); + if (value != 0) { + throw new RuntimeException("Plan file caused an error. Check log-files for details."); + } + if (value == 0) { + throw new RuntimeException("Plan file exited prematurely without an error."); + } + } catch (IllegalThreadStateException ise) {//Process still running + } + } }
