Repository: flink
Updated Branches:
  refs/heads/master a66458eac -> 401837bdf


[FLINK-3308] [py] Remove debug mode


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/401837bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/401837bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/401837bd

Branch: refs/heads/master
Commit: 401837bdfce40b2401aaf19586a8ae3612c08d8b
Parents: a66458e
Author: zentol <s.mo...@web.de>
Authored: Mon Feb 1 10:08:30 2016 +0100
Committer: zentol <s.mo...@web.de>
Committed: Wed Feb 17 09:29:33 2016 +0100

----------------------------------------------------------------------
 docs/apis/batch/python.md                       | 11 --------
 .../flink/python/api/PythonPlanBinder.java      | 16 +++--------
 .../api/streaming/data/PythonStreamer.java      | 29 ++++++--------------
 .../flink/python/api/flink/plan/Environment.py  |  7 +----
 4 files changed, 13 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/401837bd/docs/apis/batch/python.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md
index 7e2060b..359d2ed 100644
--- a/docs/apis/batch/python.md
+++ b/docs/apis/batch/python.md
@@ -603,14 +603,3 @@ arguments that will be fed to the script.
 {% endhighlight %}
 
 {% top %}
-
-Debugging
----------------
-
-If you are running Flink programs locally, you can debug your program 
following this guide.
-First you have to enable debugging by setting the debug switch in the 
`env.execute(debug=True)` call. After
-submitting your program, open the jobmanager log file, and look for a line 
that says
-`Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py 
<port>` Now open `/tmp/flink` in your python
-IDE and run the `executor.py <port>`.
-
-{% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/401837bd/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index 91e2369..289c84b 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -91,8 +91,6 @@ public class PythonPlanBinder {
        private static String FLINK_HDFS_PATH = "hdfs:/tmp";
        public static final String FLINK_TMP_DATA_DIR = 
System.getProperty("java.io.tmpdir") + File.separator + "flink_data";
 
-       public static boolean DEBUG = false;
-
        private HashMap<Integer, Object> sets = new HashMap();
        public ExecutionEnvironment env;
        private PythonPlanStreamer streamer;
@@ -218,10 +216,8 @@ public class PythonPlanBinder {
 
        private void close() {
                try { //prevent throwing exception so that previous exceptions 
aren't hidden.
-                       if (!DEBUG) {
-                               FileSystem hdfs = FileSystem.get(new 
URI(FLINK_HDFS_PATH));
-                               hdfs.delete(new Path(FLINK_HDFS_PATH), true);
-                       }
+                       FileSystem hdfs = FileSystem.get(new 
URI(FLINK_HDFS_PATH));
+                       hdfs.delete(new Path(FLINK_HDFS_PATH), true);
 
                        FileSystem local = FileSystem.getLocalFileSystem();
                        local.delete(new Path(FLINK_PYTHON_FILE_PATH), true);
@@ -247,12 +243,11 @@ public class PythonPlanBinder {
        private enum Parameters {
                DOP,
                MODE,
-               RETRY,
-               DEBUG
+               RETRY
        }
 
        private void receiveParameters() throws IOException {
-               for (int x = 0; x < 4; x++) {
+               for (int x = 0; x < 3; x++) {
                        Tuple value = (Tuple) streamer.getRecord(true);
                        switch (Parameters.valueOf(((String) 
value.getField(0)).toUpperCase())) {
                                case DOP:
@@ -266,9 +261,6 @@ public class PythonPlanBinder {
                                        int retry = (Integer) value.getField(1);
                                        
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retry, 10000L));
                                        break;
-                               case DEBUG:
-                                       DEBUG = (Boolean) value.getField(1);
-                                       break;
                        }
                }
                if (env.getParallelism() < 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/401837bd/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
index 5568051..e67099e 100644
--- 
a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
+++ 
b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java
@@ -26,7 +26,6 @@ import java.util.Iterator;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.python.api.PythonPlanBinder;
-import static org.apache.flink.python.api.PythonPlanBinder.DEBUG;
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH;
 import static 
org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH;
 import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID;
@@ -54,7 +53,6 @@ public class PythonStreamer implements Serializable {
 
        private final int id;
        private final boolean usePython3;
-       private final boolean debug;
        private final String planArguments;
 
        private String inputFilePath;
@@ -78,7 +76,6 @@ public class PythonStreamer implements Serializable {
        public PythonStreamer(AbstractRichFunction function, int id, boolean 
usesByteArray) {
                this.id = id;
                this.usePython3 = PythonPlanBinder.usePython3;
-               this.debug = DEBUG;
                planArguments = PythonPlanBinder.arguments.toString();
                sender = new PythonSender();
                receiver = new PythonReceiver(usesByteArray);
@@ -113,15 +110,9 @@ public class PythonStreamer implements Serializable {
                        throw new RuntimeException(pythonBinaryPath + " does 
not point to a valid python binary.");
                }
 
-               if (debug) {
-                       socket.setSoTimeout(0);
-                       LOG.info("Waiting for Python Process : " + 
function.getRuntimeContext().getTaskName()
-                                       + " Run python " + planPath + 
planArguments);
-               } else {
-                       process = Runtime.getRuntime().exec(pythonBinaryPath + 
" -O -B " + planPath + planArguments);
-                       new StreamPrinter(process.getInputStream()).start();
-                       new StreamPrinter(process.getErrorStream(), true, 
msg).start();
-               }
+               process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B 
" + planPath + planArguments);
+               new StreamPrinter(process.getInputStream()).start();
+               new StreamPrinter(process.getErrorStream(), true, msg).start();
 
                shutdownThread = new Thread() {
                        @Override
@@ -147,12 +138,10 @@ public class PythonStreamer implements Serializable {
                        Thread.sleep(2000);
                } catch (InterruptedException ex) {
                }
-               if (!debug) {
-                       try {
-                               process.exitValue();
-                               throw new RuntimeException("External process 
for task " + function.getRuntimeContext().getTaskName() + " terminated 
prematurely." + msg);
-                       } catch (IllegalThreadStateException ise) { //process 
still active -> start receiving data
-                       }
+               try {
+                       process.exitValue();
+                       throw new RuntimeException("External process for task " 
+ function.getRuntimeContext().getTaskName() + " terminated prematurely." + 
msg);
+               } catch (IllegalThreadStateException ise) { //process still 
active -> start receiving data
                }
 
                socket = server.accept();
@@ -173,9 +162,7 @@ public class PythonStreamer implements Serializable {
                } catch (Exception e) {
                        LOG.error("Exception occurred while closing Streamer. 
:" + e.getMessage());
                }
-               if (!debug) {
-                       destroyProcess();
-               }
+               destroyProcess();
                if (shutdownThread != null) {
                        Runtime.getRuntime().removeShutdownHook(shutdownThread);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/401837bd/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index a9f7f14..d0f28dc 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -44,7 +44,6 @@ class Environment(object):
         #parameters
         self._dop = -1
         self._local_mode = False
-        self._debug_mode = False
         self._retry = 0
 
         #sets
@@ -142,16 +141,13 @@ class Environment(object):
     def get_number_of_execution_retries(self):
         return self._retry
 
-    def execute(self, local=False, debug=False):
+    def execute(self, local=False):
         """
         Triggers the program execution.
 
         The environment will execute all parts of the program that have 
resulted in a "sink" operation.
         """
-        if debug:
-            local = True
         self._local_mode = local
-        self._debug_mode = debug
         self._optimize_plan()
 
         plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
@@ -243,7 +239,6 @@ class Environment(object):
     def _send_parameters(self):
         collect = self._collector.collect
         collect(("dop", self._dop))
-        collect(("debug", self._debug_mode))
         collect(("mode", self._local_mode))
         collect(("retry", self._retry))
 

Reply via email to