Repository: flink Updated Branches: refs/heads/master da10a9b5f -> 3e767b5a4
[FLINK-6111] [py] Remove unnecessary sleeps Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e767b5a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e767b5a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e767b5a Branch: refs/heads/master Commit: 3e767b5a4a0846c8d53a8167d6236dbd9657eb2f Parents: da10a9b Author: zentol <[email protected]> Authored: Sat Mar 18 13:10:32 2017 +0100 Committer: zentol <[email protected]> Committed: Sat Mar 18 16:11:36 2017 +0100 ---------------------------------------------------------------------- .../api/streaming/data/PythonStreamer.java | 43 ++++++++++++-------- .../api/streaming/plan/PythonPlanStreamer.java | 7 ---- 2 files changed, 25 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3e767b5a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 56ebf5b..3409960 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -75,6 +75,9 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { protected final AbstractRichFunction function; + protected transient Thread outPrinter; + protected transient Thread errorPrinter; + public PythonStreamer(AbstractRichFunction function, int id, boolean usesByteArray) { this.id = id; this.usePython3 = PythonPlanBinder.usePython3; @@ -114,8 +117,10 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { } process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments); - new StreamPrinter(process.getInputStream()).start(); - new StreamPrinter(process.getErrorStream(), true, msg).start(); + outPrinter = new StreamPrinter(process.getInputStream()); + outPrinter.start(); + errorPrinter = new StreamPrinter(process.getErrorStream(), true, msg); + errorPrinter.start(); shutdownThread = new Thread() { @Override @@ -139,16 +144,6 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { processOutput.write((outputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); processOutput.flush(); - try { // wait a bit to catch syntax errors - Thread.sleep(2000); - } catch (InterruptedException ignored) { - } - try { - process.exitValue(); - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); - } catch (IllegalThreadStateException ignored) { //process still active -> start receiving data - } - while (true) { try { socket = server.accept(); @@ -285,9 +280,15 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { case SIGNAL_FINISHED: return; case SIGNAL_ERROR: - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ignored) { + try { + outPrinter.join(1000); + } catch (InterruptedException e) { + outPrinter.interrupt(); + } + try { + errorPrinter.join(1000); + } catch (InterruptedException e) { + errorPrinter.interrupt(); } throw new RuntimeException( "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg); @@ -333,9 +334,15 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { case SIGNAL_FINISHED: return; case SIGNAL_ERROR: - try { //wait before terminating to ensure that the complete error message is printed - Thread.sleep(2000); - } catch (InterruptedException ignored) { + try { + outPrinter.join(1000); + } catch (InterruptedException e) { + outPrinter.interrupt(); + } + try { + errorPrinter.join(1000); + } catch (InterruptedException e) { + errorPrinter.interrupt(); } throw new RuntimeException( "External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely due to an error." + msg); http://git-wip-us.apache.org/repos/asf/flink/blob/3e767b5a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java index c27776b..d97cf69 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java @@ -82,13 +82,6 @@ public class PythonPlanStreamer { new StreamPrinter(process.getInputStream()).start(); new StreamPrinter(process.getErrorStream()).start(); - try { - Thread.sleep(2000); - } catch (InterruptedException ignored) { - } - - checkPythonProcessHealth(); - process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET)); process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); process.getOutputStream().flush();
