Repository: flink Updated Branches: refs/heads/master 0ccbd2779 -> 0ccab95e7
[FLINK-2440][py] Expand Environment feature coverage This closes #1383 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ccab95e Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ccab95e Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ccab95e Branch: refs/heads/master Commit: 0ccab95e7fbbc53e94dabd1032d903428a47db24 Parents: 0ccbd27 Author: zentol <ches...@apache.org> Authored: Thu Nov 19 13:30:48 2015 +0100 Committer: zentol <ches...@apache.org> Committed: Wed Dec 2 14:40:38 2015 +0100 ---------------------------------------------------------------------- .../flink/python/api/PythonPlanBinder.java | 4 +-- .../python/api/flink/example/TPCHQuery10.py | 2 +- .../python/api/flink/example/TPCHQuery3.py | 2 +- .../api/flink/example/TriangleEnumeration.py | 2 +- .../python/api/flink/example/WebLogAnalysis.py | 2 +- .../flink/python/api/flink/example/WordCount.py | 2 +- .../flink/python/api/flink/plan/Environment.py | 38 ++++++++++++++------ .../org/apache/flink/python/api/test_main.py | 2 +- .../org/apache/flink/python/api/test_main2.py | 3 +- 9 files changed, 37 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java index f4f501a..a27a589 100644 --- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java +++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java @@ -278,9 +278,7 @@ public class PythonPlanBinder { } private void receiveParameters() throws IOException { - Integer parameterCount = (Integer) receiver.getRecord(true); - - for (int x = 0; x < parameterCount; x++) { + for (int x = 0; x < 4; x++) { Tuple value = (Tuple) receiver.getRecord(true); switch (Parameters.valueOf(((String) value.getField(0)).toUpperCase())) { case DOP: http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py index 032ef85..cc9e7cf 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery10.py @@ -110,6 +110,6 @@ if __name__ == "__main__": result.write_csv(sys.argv[5], '\n', '|', WriteMode.OVERWRITE) - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py index 5fafb01..3eb72c9 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TPCHQuery3.py @@ -99,7 +99,7 @@ if __name__ == "__main__": result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE) - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py index 2727635..b1b3ef4 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/TriangleEnumeration.py @@ -147,6 +147,6 @@ if __name__ == "__main__": triangles.output() - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py index d571cf9..676043f 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WebLogAnalysis.py @@ -82,6 +82,6 @@ if __name__ == "__main__": result.write_csv(sys.argv[4], '\n', '|', WriteMode.OVERWRITE) - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py index 8a89a6f..71c2e28 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/example/WordCount.py @@ -56,6 +56,6 @@ if __name__ == "__main__": else: result.output() - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index bea6212..169e31b 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -40,7 +40,10 @@ class Environment(object): self._counter = 0 #parameters - self._parameters = [] + self._dop = -1 + self._local_mode = False + self._debug_mode = False + self._retry = 0 #sets self._sources = [] @@ -114,15 +117,28 @@ class Environment(object): self._sources.append(child) return child_set - def set_degree_of_parallelism(self, degree): + def set_parallelism(self, parallelism): """ - Sets the degree of parallelism (DOP) for operations executed through this environment. + Sets the parallelism for operations executed through this environment. Setting a DOP of x here will cause all operators (such as join, map, reduce) to run with x parallel instances. - :param degreeOfParallelism: The degree of parallelism + :param parallelism: The degree of parallelism """ - self._parameters.append(("dop", degree)) + self._dop = parallelism + + def get_parallelism(self): + """ + Gets the parallelism with which operation are executed by default. + :return The parallelism used by operations. + """ + return self._dop + + def set_number_of_execution_retries(self, count): + self._retry = count + + def get_number_of_execution_retries(self): + return self._retry def execute(self, local=False, debug=False): """ @@ -132,8 +148,8 @@ class Environment(object): """ if debug: local = True - self._parameters.append(("mode", local)) - self._parameters.append(("debug", debug)) + self._local_mode = local + self._debug_mode = debug self._optimize_plan() plan_mode = sys.stdin.readline().rstrip('\n') == "plan" @@ -243,9 +259,11 @@ class Environment(object): self._send_broadcast() def _send_parameters(self): - self._collector.collect(len(self._parameters)) - for parameter in self._parameters: - self._collector.collect(parameter) + collect = self._collector.collect + collect(("dop", self._dop)) + collect(("debug", self._debug_mode)) + collect(("mode", self._local_mode)) + collect(("retry", self._retry)) def _send_sources(self): for source in self._sources: http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py index 9a3a5e4..16e1a8c 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main.py @@ -227,6 +227,6 @@ if __name__ == "__main__": .map_partition(Verify([(4.3, 4.4, 1), (4.1, 4.1, 3)], "ChainedSortedGroupReduce"), STRING).output() #Execution - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True) http://git-wip-us.apache.org/repos/asf/flink/blob/0ccab95e/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index 2f30cda..56e3250 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -1,3 +1,4 @@ + # ############################################################################### # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file @@ -140,6 +141,6 @@ if __name__ == "__main__": .map_partition(Verify2([(1, 0.5, "hello", True), (2, 0.4, "world", False), (1, 0.5, "hello", True), (1, 0.4, "hello", False), (1, 0.5, "hello", True), (2, 0.4, "world", False)], "Union"), STRING).output() #Execution - env.set_degree_of_parallelism(1) + env.set_parallelism(1) env.execute(local=True)