Repository: flink Updated Branches: refs/heads/master b19648eb4 -> 84d28ba00
[FLINK-4411] [py] Properly propagate chained dual input children Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10508477 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10508477 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10508477 Branch: refs/heads/master Commit: 1050847787b416399a6c03c0568969df93ed4822 Parents: b19648e Author: zentol <[email protected]> Authored: Wed Aug 17 12:15:37 2016 +0200 Committer: zentol <[email protected]> Committed: Wed Aug 17 13:51:55 2016 +0200 ---------------------------------------------------------------------- .../flink/python/api/flink/plan/Environment.py | 15 ++++++++++++--- .../python/org/apache/flink/python/api/test_main2.py | 4 ++-- 2 files changed, 14 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/10508477/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index 9d08baf..a54dac8 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -219,15 +219,21 @@ class Environment(object): dual_input = set([_Identifier.JOIN, _Identifier.JOINH, _Identifier.JOINT, _Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST, _Identifier.COGROUP, _Identifier.UNION]) x = len(self._sets) - 1 while x > -1: + # CHAIN(parent -> child) -> grand_child + # for all intents and purposes the child set ceases to exist; it is merged into the parent child = self._sets[x] child_type = child.identifier if child_type in chainable: parent = child.parent + # we can only chain to an actual python udf (=> operator is not None) + # we may only chain if the parent has only 1 child if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0: parent.chained_info = child parent.name += " -> " + child.name parent.types = child.types + # grand_children now belong to the parent for grand_child in child.children: + # dual_input operations have 2 parents; hence we have to change the correct one if grand_child.identifier in dual_input: if grand_child.parent.id == child.id: grand_child.parent = parent @@ -235,15 +241,18 @@ class Environment(object): grand_child.other = parent else: grand_child.parent = parent - parent.children.append(grand_child) - parent.children.remove(child) + parent.children.append(grand_child) + # child sinks now belong to the parent for sink in child.sinks: sink.parent = parent parent.sinks.append(sink) + # child broadcast variables now belong to the parent for bcvar in child.bcvars: bcvar.parent = parent parent.bcvars.append(bcvar) - self._remove_set((child)) + # remove child set as it has been merged into the parent + parent.children.remove(child) + self._remove_set(child) x -= 1 def _remove_set(self, set): http://git-wip-us.apache.org/repos/asf/flink/blob/10508477/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py index ceb26d0..f1d40e1 100644 --- a/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py +++ b/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/test_main2.py @@ -22,14 +22,14 @@ from flink.functions.CrossFunction import CrossFunction from flink.functions.JoinFunction import JoinFunction from flink.functions.CoGroupFunction import CoGroupFunction from flink.functions.Aggregation import Max, Min, Sum -from utils import Verify, Verify2 +from utils import Verify, Verify2, Id if __name__ == "__main__": env = get_environment() d1 = env.from_elements(1, 6, 12) - d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False)) + d2 = env.from_elements((1, 0.5, "hello", True), (2, 0.4, "world", False)).map(Id()).map(Id()) # force map chaining d3 = env.from_elements(("hello",), ("world",))
