Repository: flink Updated Branches: refs/heads/master 3e767b5a4 -> 2ce080da6
[FLINK-5183] [py] Support mulitple jobs per plan file This closes #3232. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ce080da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ce080da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ce080da Branch: refs/heads/master Commit: 2ce080da627cbc77527b69bfb877d4904e9a0701 Parents: 3e767b5 Author: Geoffrey Mon <[email protected]> Authored: Thu Jan 26 09:15:55 2017 -0500 Committer: zentol <[email protected]> Committed: Sun Mar 19 21:20:55 2017 +0100 ---------------------------------------------------------------------- .../flink/python/api/PythonOperationInfo.java | 5 +- .../flink/python/api/PythonPlanBinder.java | 61 ++++++----- .../python/api/functions/PythonCoGroup.java | 4 +- .../api/functions/PythonMapPartition.java | 4 +- .../api/streaming/data/PythonStreamer.java | 15 +-- .../api/streaming/plan/PythonPlanStreamer.java | 66 ++++++++---- .../flink/python/api/flink/plan/Environment.py | 101 ++++++++++++++----- .../flink/python/api/test_multiple_jobs.py | 47 +++++++++ 8 files changed, 226 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2ce080da/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java index 5f3f9f1..694c1b4 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonOperationInfo.java @@ -44,8 +44,9 @@ public class PythonOperationInfo { public String name; public boolean usesUDF; public int parallelism; + public int envID; - public PythonOperationInfo(PythonPlanStreamer streamer) throws IOException { + public PythonOperationInfo(PythonPlanStreamer streamer, int environmentID) throws IOException { identifier = (String) streamer.getRecord(); parentID = (Integer) streamer.getRecord(true); otherID = (Integer) streamer.getRecord(true); @@ -92,6 +93,8 @@ public class PythonOperationInfo { values[x] = streamer.getRecord(); } parallelism = (Integer) streamer.getRecord(true); + + envID = environmentID; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/2ce080da/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index cc63ef4..a3cae4a 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -94,6 +94,7 @@ public class PythonPlanBinder { private HashMap<Integer, Object> sets = new HashMap<>(); public ExecutionEnvironment env; + private int currentEnvironmentID = 0; private PythonPlanStreamer streamer; public static final int MAPPED_FILE_SIZE = 1024 * 1024 * 64; @@ -126,8 +127,6 @@ public class PythonPlanBinder { } private void runPlan(String[] args) throws Exception { - env = ExecutionEnvironment.getExecutionEnvironment(); - int split = 0; for (int x = 0; x < args.length; x++) { if (args[x].compareTo("-") == 0) { @@ -139,15 +138,23 @@ public class PythonPlanBinder { String tmpPath = FLINK_PYTHON_FILE_PATH + r.nextInt(); prepareFiles(tmpPath, Arrays.copyOfRange(args, 0, split == 0 ? args.length : split)); startPython(tmpPath, Arrays.copyOfRange(args, split == 0 ? args.length : split + 1, args.length)); - receivePlan(); - if (env instanceof LocalEnvironment) { - FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + // Python process should terminate itself when all jobs have been run + while (streamer.preparePlanMode()) { + receivePlan(); + + if (env instanceof LocalEnvironment) { + FLINK_HDFS_PATH = "file:" + System.getProperty("java.io.tmpdir") + File.separator + "flink"; + } + + distributeFiles(tmpPath, env); + JobExecutionResult jer = env.execute(); + sendResult(jer); + + streamer.finishPlanMode(); } - distributeFiles(tmpPath, env); - JobExecutionResult jer = env.execute(); - sendResult(jer); + clearPath(tmpPath); close(); } catch (Exception e) { close(); @@ -200,7 +207,6 @@ public class PythonPlanBinder { clearPath(FLINK_HDFS_PATH); FileCache.copy(new Path(tmpPath), new Path(FLINK_HDFS_PATH), true); env.registerCachedFile(FLINK_HDFS_PATH, FLINK_PYTHON_DC_ID); - clearPath(tmpPath); } private void startPython(String tempPath, String[] args) throws IOException { @@ -234,6 +240,9 @@ public class PythonPlanBinder { //====Plan========================================================================================================== private void receivePlan() throws IOException { + env = ExecutionEnvironment.getExecutionEnvironment(); + //IDs used in HashMap of sets are only unique for each environment + sets.clear(); receiveParameters(); receiveOperations(); } @@ -245,11 +254,12 @@ public class PythonPlanBinder { private enum Parameters { DOP, MODE, - RETRY + RETRY, + ID } private void receiveParameters() throws IOException { - for (int x = 0; x < 3; x++) { + for (int x = 0; x < 4; x++) { Tuple value = (Tuple) streamer.getRecord(true); switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) { case DOP: @@ -263,6 +273,9 @@ public class PythonPlanBinder { int retry = (Integer) value.getField(1); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(retry, 10000L)); break; + case ID: + currentEnvironmentID = (Integer) value.getField(1); + break; } } if (env.getParallelism() < 0) { @@ -285,7 +298,7 @@ public class PythonPlanBinder { private void receiveOperations() throws IOException { Integer operationCount = (Integer) streamer.getRecord(true); for (int x = 0; x < operationCount; x++) { - PythonOperationInfo info = new PythonOperationInfo(streamer); + PythonOperationInfo info = new PythonOperationInfo(streamer, currentEnvironmentID); Operation op; try { op = Operation.valueOf(info.identifier.toUpperCase()); @@ -518,7 +531,7 @@ public class PythonPlanBinder { DataSet op2 = (DataSet) sets.get(info.otherID); Keys.ExpressionKeys<?> key1 = new Keys.ExpressionKeys(info.keys1, op1.getType()); Keys.ExpressionKeys<?> key2 = new Keys.ExpressionKeys(info.keys2, op2.getType()); - PythonCoGroup pcg = new PythonCoGroup(info.setID, info.types); + PythonCoGroup pcg = new PythonCoGroup(info.envID, info.setID, info.types); sets.put(info.setID, new CoGroupRawOperator(op1, op2, key1, key2, pcg, info.types, info.name).setParallelism(getParallelism(info))); } @@ -544,7 +557,7 @@ public class PythonPlanBinder { defaultResult.setParallelism(getParallelism(info)); if (info.usesUDF) { - sets.put(info.setID, defaultResult.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); + sets.put(info.setID, defaultResult.mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } else { sets.put(info.setID, defaultResult.name("DefaultCross")); } @@ -553,13 +566,13 @@ public class PythonPlanBinder { @SuppressWarnings("unchecked") private void createFilterOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); + sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } @SuppressWarnings("unchecked") private void createFlatMapOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); + sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } private void createGroupReduceOperation(PythonOperationInfo info) { @@ -580,19 +593,19 @@ public class PythonPlanBinder { @SuppressWarnings("unchecked") private DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) { return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).name("PythonGroupReducePreStep").setParallelism(getParallelism(info)) - .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } @SuppressWarnings("unchecked") private DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) { return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } @SuppressWarnings("unchecked") private DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info) { return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonGroupReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } @SuppressWarnings("unchecked") @@ -602,7 +615,7 @@ public class PythonPlanBinder { if (info.usesUDF) { sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info)) - .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); + .mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } else { sets.put(info.setID, createDefaultJoin(op1, op2, info.keys1, info.keys2, mode, getParallelism(info))); } @@ -628,13 +641,13 @@ public class PythonPlanBinder { @SuppressWarnings("unchecked") private void createMapOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); + sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } @SuppressWarnings("unchecked") private void createMapPartitionOperation(PythonOperationInfo info) { DataSet op1 = (DataSet) sets.get(info.parentID); - sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); + sets.put(info.setID, op1.mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name)); } private void createReduceOperation(PythonOperationInfo info) { @@ -651,12 +664,12 @@ public class PythonPlanBinder { @SuppressWarnings("unchecked") private DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) { return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } @SuppressWarnings("unchecked") private DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) { return op1.reduceGroup(new IdentityGroupReduce()).setCombinable(false).setParallelism(getParallelism(info)).name("PythonReducePreStep") - .mapPartition(new PythonMapPartition(info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); + .mapPartition(new PythonMapPartition(info.envID, info.setID, info.types)).setParallelism(getParallelism(info)).name(info.name); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ce080da/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java index 9da5a4c..2065b98 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonCoGroup.java @@ -34,9 +34,9 @@ public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2, private final PythonStreamer<IN1, IN2, OUT> streamer; private final transient TypeInformation<OUT> typeInformation; - public PythonCoGroup(int id, TypeInformation<OUT> typeInformation) { + public PythonCoGroup(int envID, int setID, TypeInformation<OUT> typeInformation) { this.typeInformation = typeInformation; - streamer = new PythonStreamer<>(this, id, true); + streamer = new PythonStreamer<>(this, envID, setID, true); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2ce080da/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java index c596d6c..dc21c7c 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/functions/PythonMapPartition.java @@ -35,9 +35,9 @@ public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OU private final PythonStreamer<IN, IN, OUT> streamer; private final transient TypeInformation<OUT> typeInformation; - public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) { + public PythonMapPartition(int envId, int setId, TypeInformation<OUT> typeInformation) { this.typeInformation = typeInformation; - streamer = new PythonStreamer<>(this, id, typeInformation instanceof PrimitiveArrayTypeInfo); + streamer = new PythonStreamer(this, envId, setId, typeInformation instanceof PrimitiveArrayTypeInfo); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/2ce080da/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java index 3409960..136bb69 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonStreamer.java @@ -56,7 +56,8 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { private static final int SIGNAL_ERROR = -2; private static final byte SIGNAL_LAST = 32; - private final int id; + private final int envID; + private final int setID; private final boolean usePython3; private final String planArguments; @@ -78,8 +79,9 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { protected transient Thread outPrinter; protected transient Thread errorPrinter; - public PythonStreamer(AbstractRichFunction function, int id, boolean usesByteArray) { - this.id = id; + public PythonStreamer(AbstractRichFunction function, int envID, int setID, boolean usesByteArray) { + this.envID = envID; + this.setID = setID; this.usePython3 = PythonPlanBinder.usePython3; planArguments = PythonPlanBinder.arguments.toString(); sender = new PythonSender(); @@ -99,8 +101,8 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { } private void startPython() throws IOException { - String outputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; - String inputFilePath = FLINK_TMP_DATA_DIR + "/" + id + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; + String outputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "output"; + String inputFilePath = FLINK_TMP_DATA_DIR + "/" + envID + "_" + setID + this.function.getRuntimeContext().getIndexOfThisSubtask() + "input"; sender.open(inputFilePath); receiver.open(outputFilePath); @@ -136,8 +138,9 @@ public class PythonStreamer<IN1, IN2, OUT> implements Serializable { OutputStream processOutput = process.getOutputStream(); processOutput.write("operator\n".getBytes(ConfigConstants.DEFAULT_CHARSET)); + processOutput.write((envID + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); + processOutput.write((setID + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); processOutput.write(("" + server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); - processOutput.write((id + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); processOutput.write((this.function.getRuntimeContext().getIndexOfThisSubtask() + "\n") .getBytes(ConfigConstants.DEFAULT_CHARSET)); processOutput.write((inputFilePath + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); http://git-wip-us.apache.org/repos/asf/flink/blob/2ce080da/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java index d97cf69..4eb0f51 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/plan/PythonPlanStreamer.java @@ -54,19 +54,7 @@ public class PythonPlanStreamer { } public void open(String tmpPath, String args) throws IOException { - server = new ServerSocket(0); - server.setSoTimeout(50); startPython(tmpPath, args); - while (true) { - try { - socket = server.accept(); - break; - } catch (SocketTimeoutException ignored) { - checkPythonProcessHealth(); - } - } - sender = new PythonPlanSender(socket.getOutputStream()); - receiver = new PythonPlanReceiver(socket.getInputStream()); } private void startPython(String tmpPath, String args) throws IOException { @@ -82,11 +70,48 @@ public class PythonPlanStreamer { new StreamPrinter(process.getInputStream()).start(); new StreamPrinter(process.getErrorStream()).start(); + server = new ServerSocket(0); + server.setSoTimeout(50); + process.getOutputStream().write("plan\n".getBytes(ConfigConstants.DEFAULT_CHARSET)); - process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); process.getOutputStream().flush(); } + public boolean preparePlanMode() throws IOException { + try { + process.getOutputStream().write((server.getLocalPort() + "\n").getBytes(ConfigConstants.DEFAULT_CHARSET)); + process.getOutputStream().flush(); + } catch (IOException ignored) { + // the python process most likely shutdown in the meantime + return false; + } + while (true) { + try { + socket = server.accept(); + sender = new PythonPlanSender(socket.getOutputStream()); + receiver = new PythonPlanReceiver(socket.getInputStream()); + return true; + } catch (SocketTimeoutException ignored) { + switch(checkPythonProcessHealth()) { + case RUNNING: + continue; + case STOPPED: + return false; + case FAILED: + throw new RuntimeException("Plan file caused an error. Check log-files for details."); + } + } + } + } + + public void finishPlanMode() { + try { + socket.close(); + } catch (IOException e) { + LOG.error("Failed to close socket.", e); + } + } + public void close() { try { process.exitValue(); @@ -95,22 +120,29 @@ public class PythonPlanStreamer { process.destroy(); } finally { try { - socket.close(); + server.close(); } catch (IOException e) { LOG.error("Failed to close socket.", e); } } } - private void checkPythonProcessHealth() { + private ProcessState checkPythonProcessHealth() { try { int value = process.exitValue(); if (value != 0) { - throw new RuntimeException("Plan file caused an error. Check log-files for details."); + return ProcessState.FAILED; } else { - throw new RuntimeException("Plan file exited prematurely without an error."); + return ProcessState.STOPPED; } } catch (IllegalThreadStateException ignored) {//Process still running + return ProcessState.RUNNING; } } + + private enum ProcessState { + RUNNING, + FAILED, + STOPPED + } } http://git-wip-us.apache.org/repos/asf/flink/blob/2ce080da/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index 1e4ba1a..6e496de 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -27,17 +27,63 @@ import copy import sys from struct import pack + +class EnvironmentContainer(object): + """Keeps track of which ExecutionEnvironment is active.""" + + _environment_counter = 0 + _environment_id_to_execute = None + _plan_mode = None + + def create_environment(self): + """Creates a new environment with a unique id.""" + env = Environment(self, self._environment_counter) + self._environment_counter += 1 + return env + + def is_planning(self): + """ + Checks whether we are generating the plan or executing an operator. + + :return: True, if the plan is generated, false otherwise + """ + if self._plan_mode is None: + mode = sys.stdin.readline().rstrip('\n') + if mode == "plan": + self._plan_mode = True + elif mode == "operator": + self._plan_mode = False + else: + raise ValueError("Invalid mode specified: " + mode) + return self._plan_mode + + def should_execute(self, environment): + """ + Checks whether the given ExecutionEnvironment should run the contained plan. + + :param: ExecutionEnvironment to check + :return: True, if the environment should run the contained plan, false otherise + """ + if self._environment_id_to_execute is None: + self._environment_id_to_execute = int(sys.stdin.readline().rstrip('\n')) + + return environment._env_id == self._environment_id_to_execute + + +container = EnvironmentContainer() + + def get_environment(): """ Creates an execution environment that represents the context in which the program is currently executed. - + :return:The execution environment of the context in which the program is executed. """ - return Environment() + return container.create_environment() class Environment(object): - def __init__(self): + def __init__(self, container, env_id): # util self._counter = 0 @@ -46,6 +92,9 @@ class Environment(object): self._local_mode = False self._retry = 0 + self._container = container + self._env_id = env_id + #sets self._sources = [] self._sets = [] @@ -166,9 +215,7 @@ class Environment(object): self._local_mode = local self._optimize_plan() - plan_mode = sys.stdin.readline().rstrip('\n') == "plan" - - if plan_mode: + if self._container.is_planning(): port = int(sys.stdin.readline().rstrip('\n')) self._connection = Connection.PureTCPConnection(port) self._iterator = Iterator.PlanIterator(self._connection, self) @@ -180,31 +227,34 @@ class Environment(object): else: import struct operator = None + port = None try: - port = int(sys.stdin.readline().rstrip('\n')) - - id = int(sys.stdin.readline().rstrip('\n')) - subtask_index = int(sys.stdin.readline().rstrip('\n')) - input_path = sys.stdin.readline().rstrip('\n') - output_path = sys.stdin.readline().rstrip('\n') - - used_set = None - operator = None - for set in self._sets: - if set.id == id: - used_set = set - operator = set.operator - operator._configure(input_path, output_path, port, self, used_set, subtask_index) - operator._go() - operator._close() - sys.stdout.flush() - sys.stderr.flush() + if self._container.should_execute(self): + id = int(sys.stdin.readline().rstrip('\n')) + + port = int(sys.stdin.readline().rstrip('\n')) + subtask_index = int(sys.stdin.readline().rstrip('\n')) + input_path = sys.stdin.readline().rstrip('\n') + output_path = sys.stdin.readline().rstrip('\n') + + used_set = None + operator = None + + for set in self._sets: + if set.id == id: + used_set = set + operator = set.operator + operator._configure(input_path, output_path, port, self, used_set, subtask_index) + operator._go() + operator._close() + sys.stdout.flush() + sys.stderr.flush() except: sys.stdout.flush() sys.stderr.flush() if operator is not None: operator._connection._socket.send(struct.pack(">i", -2)) - else: + elif port is not None: socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM) socket.connect((SOCKET.gethostbyname("localhost"), port)) socket.send(struct.pack(">i", -2)) @@ -277,6 +327,7 @@ class Environment(object): collect(("dop", self._dop)) collect(("mode", self._local_mode)) collect(("retry", self._retry)) + collect(("id", self._env_id)) def _send_operations(self): self._collector.collect(len(self._sources) + len(self._sets) + len(self._sinks) + len(self._broadcast)) http://git-wip-us.apache.org/repos/asf/flink/blob/2ce080da/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_multiple_jobs.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_multiple_jobs.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_multiple_jobs.py new file mode 100644 index 0000000..2e8d1a0 --- /dev/null +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_multiple_jobs.py @@ -0,0 +1,47 @@ + +# ############################################################################### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from flink.plan.Environment import get_environment +from flink.functions.MapFunction import MapFunction +from flink.functions.CrossFunction import CrossFunction +from flink.functions.JoinFunction import JoinFunction +from flink.functions.CoGroupFunction import CoGroupFunction +from flink.functions.Aggregation import Max, Min, Sum +from utils import Verify, Verify2, Id + +# Test multiple jobs in one Python plan file +if __name__ == "__main__": + env = get_environment() + env.set_parallelism(1) + + d1 = env.from_elements(1, 6, 12) + d1 \ + .first(1) \ + .map_partition(Verify([1], "First with multiple jobs in one Python plan file")).output() + + env.execute(local=True) + + env2 = get_environment() + env2.set_parallelism(1) + + d2 = env2.from_elements(1, 1, 12) + d2 \ + .map(lambda x: x * 2) \ + .map_partition(Verify([2, 2, 24], "Lambda Map with multiple jobs in one Python plan file")).output() + + env2.execute(local=True)
