Repository: flink Updated Branches: refs/heads/master a66458eac -> 401837bdf
[FLINK-3308] [py] Remove debug mode Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/401837bd Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/401837bd Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/401837bd Branch: refs/heads/master Commit: 401837bdfce40b2401aaf19586a8ae3612c08d8b Parents: a66458e Author: zentol <s.mo...@web.de> Authored: Mon Feb 1 10:08:30 2016 +0100 Committer: zentol <s.mo...@web.de> Committed: Wed Feb 17 09:29:33 2016 +0100 ---------------------------------------------------------------------- docs/apis/batch/python.md | 11 -------- .../flink/python/api/PythonPlanBinder.java | 16 +++-------- .../api/streaming/data/PythonStreamer.java | 29 ++++++-------------- .../flink/python/api/flink/plan/Environment.py | 7 +---- 4 files changed, 13 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/401837bd/docs/apis/batch/python.md ---------------------------------------------------------------------- diff --git a/docs/apis/batch/python.md b/docs/apis/batch/python.md index 7e2060b..359d2ed 100644 --- a/docs/apis/batch/python.md +++ b/docs/apis/batch/python.md @@ -603,14 +603,3 @@ arguments that will be fed to the script. {% endhighlight %} {% top %} - -Debugging ---------------- - -If you are running Flink programs locally, you can debug your program following this guide. -First you have to enable debugging by setting the debug switch in the `env.execute(debug=True)` call. After -submitting your program, open the jobmanager log file, and look for a line that says -`Waiting for external Process : <taskname>. Run python /tmp/flink/executor.py <port>` Now open `/tmp/flink` in your python -IDE and run the `executor.py <port>`. - -{% top %} http://git-wip-us.apache.org/repos/asf/flink/blob/401837bd/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index 91e2369..289c84b 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -91,8 +91,6 @@ public class PythonPlanBinder { private static String FLINK_HDFS_PATH = "hdfs:/tmp"; public static final String FLINK_TMP_DATA_DIR = System.getProperty("java.io.tmpdir") + File.separator + "flink_data"; - public static boolean DEBUG = false; - private HashMap<Integer, Object> sets = new HashMap(); public ExecutionEnvironment env; private PythonPlanStreamer streamer; @@ -218,10 +216,8 @@ public class PythonPlanBinder { private void close() { try { //prevent throwing exception so that previous exceptions aren't hidden. - if (!DEBUG) { - FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH)); - hdfs.delete(new Path(FLINK_HDFS_PATH), true); - } + FileSystem hdfs = FileSystem.get(new URI(FLINK_HDFS_PATH)); + hdfs.delete(new Path(FLINK_HDFS_PATH), true); FileSystem local = FileSystem.getLocalFileSystem(); local.delete(new Path(FLINK_PYTHON_FILE_PATH), true); @@ -247,12 +243,11 @@ public class PythonPlanBinder { private enum Parameters { DOP, MODE, - RETRY, - DEBUG + RETRY } private void receiveParameters() throws IOException { - for (int x = 0; x < 4; x++) { + for (int x = 0; x < 3; x++) { Tuple value = (Tuple) streamer.getRecord(true); switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) { case DOP: @@ -266,9 +261,6 @@ public class PythonPlanBinder { int retry = (Integer) value.getField(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retry, 10000L)); break; - case DEBUG: - DEBUG = (Boolean) value.getField(1); - break; } } if (env.getParallelism() < 0) { http://git-wip-us.apache.org/repos/asf/flink/blob/401837bd/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 5568051..e67099e 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -26,7 +26,6 @@ import java.util.Iterator; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.configuration.Configuration; import org.apache.flink.python.api.PythonPlanBinder; -import static org.apache.flink.python.api.PythonPlanBinder.DEBUG; import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON2_BINARY_PATH; import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON3_BINARY_PATH; import static org.apache.flink.python.api.PythonPlanBinder.FLINK_PYTHON_DC_ID; @@ -54,7 +53,6 @@ public class PythonStreamer implements Serializable { private final int id; private final boolean usePython3; - private final boolean debug; private final String planArguments; private String inputFilePath; @@ -78,7 +76,6 @@ public class PythonStreamer implements Serializable { public PythonStreamer(AbstractRichFunction function, int id, boolean usesByteArray) { this.id = id; this.usePython3 = PythonPlanBinder.usePython3; - this.debug = DEBUG; planArguments = PythonPlanBinder.arguments.toString(); sender = new PythonSender(); receiver = new PythonReceiver(usesByteArray); @@ -113,15 +110,9 @@ public class PythonStreamer implements Serializable { throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary."); } - if (debug) { - socket.setSoTimeout(0); - LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName() - + " Run python " + planPath + planArguments); - } else { - process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments); - new StreamPrinter(process.getInputStream()).start(); - new StreamPrinter(process.getErrorStream(), true, msg).start(); - } + process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments); + new StreamPrinter(process.getInputStream()).start(); + new StreamPrinter(process.getErrorStream(), true, msg).start(); shutdownThread = new Thread() { @Override @@ -147,12 +138,10 @@ public class PythonStreamer implements Serializable { Thread.sleep(2000); } catch (InterruptedException ex) { } - if (!debug) { - try { - process.exitValue(); - throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); - } catch (IllegalThreadStateException ise) { //process still active -> start receiving data - } + try { + process.exitValue(); + throw new RuntimeException("External process for task " + function.getRuntimeContext().getTaskName() + " terminated prematurely." + msg); + } catch (IllegalThreadStateException ise) { //process still active -> start receiving data } socket = server.accept(); @@ -173,9 +162,7 @@ public class PythonStreamer implements Serializable { } catch (Exception e) { LOG.error("Exception occurred while closing Streamer. :" + e.getMessage()); } - if (!debug) { - destroyProcess(); - } + destroyProcess(); if (shutdownThread != null) { Runtime.getRuntime().removeShutdownHook(shutdownThread); } http://git-wip-us.apache.org/repos/asf/flink/blob/401837bd/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index a9f7f14..d0f28dc 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -44,7 +44,6 @@ class Environment(object): #parameters self._dop = -1 self._local_mode = False - self._debug_mode = False self._retry = 0 #sets @@ -142,16 +141,13 @@ class Environment(object): def get_number_of_execution_retries(self): return self._retry - def execute(self, local=False, debug=False): + def execute(self, local=False): """ Triggers the program execution. The environment will execute all parts of the program that have resulted in a "sink" operation. """ - if debug: - local = True self._local_mode = local - self._debug_mode = debug self._optimize_plan() plan_mode = sys.stdin.readline().rstrip('\n') == "plan" @@ -243,7 +239,6 @@ class Environment(object): def _send_parameters(self): collect = self._collector.collect collect(("dop", self._dop)) - collect(("debug", self._debug_mode)) collect(("mode", self._local_mode)) collect(("retry", self._retry))