Repository: flink
Updated Branches:
  refs/heads/master da10a9b5f -> 3e767b5a4


[FLINK-6111] [py] Remove unnecessary sleeps


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e767b5a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e767b5a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e767b5a

Branch: refs/heads/master
Commit: 3e767b5a4a0846c8d53a8167d6236dbd9657eb2f
Parents: da10a9b
Author: zentol <[email protected]>
Authored: Sat Mar 18 13:10:32 2017 +0100
Committer: zentol <[email protected]>
Committed: Sat Mar 18 16:11:36 2017 +0100

----------------------------------------------------------------------
 .../api/streaming/data/PythonStreamer.java      | 43 ++++++++++++--------
 .../api/streaming/plan/PythonPlanStreamer.java  |  7 ----
 2 files changed, 25 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e767b5a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 56ebf5b..3409960 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -75,6 +75,9 @@ public class PythonStreamer<IN1, IN2, OUT> implements 
Serializable {
 
        protected final AbstractRichFunction function;
 
+       protected transient Thread outPrinter;
+       protected transient Thread errorPrinter;
+
        public PythonStreamer(AbstractRichFunction function, int id, boolean 
usesByteArray) {
                this.id = id;
                this.usePython3 = PythonPlanBinder.usePython3;
@@ -114,8 +117,10 @@ public class PythonStreamer<IN1, IN2, OUT> implements 
Serializable {
                }
 
                process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B 
" + planPath + planArguments);
-               new StreamPrinter(process.getInputStream()).start();
-               new StreamPrinter(process.getErrorStream(), true, msg).start();
+               outPrinter = new StreamPrinter(process.getInputStream());
+               outPrinter.start();
+               errorPrinter = new StreamPrinter(process.getErrorStream(), 
true, msg);
+               errorPrinter.start();
 
                shutdownThread = new Thread() {
                        @Override
@@ -139,16 +144,6 @@ public class PythonStreamer<IN1, IN2, OUT> implements 
Serializable {
                processOutput.write((outputFilePath + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
                processOutput.flush();
 
-               try { // wait a bit to catch syntax errors
-                       Thread.sleep(2000);
-               } catch (InterruptedException ignored) {
-               }
-               try {
-                       process.exitValue();
-                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " terminated prematurely." + 
msg);
-               } catch (IllegalThreadStateException ignored) { //process still 
active -> start receiving data
-               }
-
                while (true) {
                        try {
                                socket = server.accept();
@@ -285,9 +280,15 @@ public class PythonStreamer<IN1, IN2, OUT> implements 
Serializable {
                                                case SIGNAL_FINISHED:
                                                        return;
                                                case SIGNAL_ERROR:
-                                                       try { //wait before 
terminating to ensure that the complete error message is printed
-                                                               
Thread.sleep(2000);
-                                                       } catch 
(InterruptedException ignored) {
+                                                       try {
+                                                               
outPrinter.join(1000);
+                                                       } catch 
(InterruptedException e) {
+                                                               
outPrinter.interrupt();
+                                                       }
+                                                       try {
+                                                               
errorPrinter.join(1000);
+                                                       } catch 
(InterruptedException e) {
+                                                               
errorPrinter.interrupt();
                                                        }
                                                        throw new 
RuntimeException(
                                                                        
"External process for task " + function.getRuntimeContext().getTaskName() + " 
terminated prematurely due to an error." + msg);
@@ -333,9 +334,15 @@ public class PythonStreamer<IN1, IN2, OUT> implements 
Serializable {
                                                case SIGNAL_FINISHED:
                                                        return;
                                                case SIGNAL_ERROR:
-                                                       try { //wait before 
terminating to ensure that the complete error message is printed
-                                                               
Thread.sleep(2000);
-                                                       } catch 
(InterruptedException ignored) {
+                                                       try {
+                                                               
outPrinter.join(1000);
+                                                       } catch 
(InterruptedException e) {
+                                                               
outPrinter.interrupt();
+                                                       }
+                                                       try {
+                                                               
errorPrinter.join(1000);
+                                                       } catch 
(InterruptedException e) {
+                                                               
errorPrinter.interrupt();
                                                        }
                                                        throw new 
RuntimeException(
                                                                        
"External process for task " + function.getRuntimeContext().getTaskName() + " 
terminated prematurely due to an error." + msg);

http://git-wip-us.apache.org/repos/asf/flink/blob/3e767b5a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
index c27776b..d97cf69 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java
@@ -82,13 +82,6 @@ public class PythonPlanStreamer {
                new StreamPrinter(process.getInputStream()).start();
                new StreamPrinter(process.getErrorStream()).start();
 
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException ignored) {
-               }
-
-               checkPythonProcessHealth();
-
                
process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET));
                process.getOutputStream().write((server.getLocalPort() + 
"\n").getBytes(ConfigConstants.DEFAULT_CHARSET));
                process.getOutputStream().flush();

Reply via email to