[FLINK-4412] [py] Chaining properly handles broadcast variables
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84d28ba0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84d28ba0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84d28ba0 Branch: refs/heads/master Commit: 84d28ba00f3e63a83132c1666c8dc8deec7800ba Parents: 1050847 Author: zentol <[email protected]> Authored: Wed Aug 17 13:47:30 2016 +0200 Committer: zentol <[email protected]> Committed: Wed Aug 17 13:52:02 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/python/api/flink/plan/DataSet.py | 1 - .../apache/flink/python/api/flink/plan/Environment.py | 14 ++++++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/84d28ba0/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py index caa4ae7..06557ca 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py @@ -669,7 +669,6 @@ class OperatorSet(DataSet): child.other = set._info child.name = name self._info.bcvars.append(child) - set._info.children.append(child) self._env._broadcast.append(child) return self http://git-wip-us.apache.org/repos/asf/flink/blob/84d28ba0/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py index a54dac8..1e4ba1a 100644 --- a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py +++ b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py @@ -227,7 +227,9 @@ class Environment(object): parent = child.parent # we can only chain to an actual python udf (=> operator is not None) # we may only chain if the parent has only 1 child - if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0: + # we may only chain if the parent is not used as a broadcast variable + # we may only chain if the parent does not use the child as a broadcast variable + if parent.operator is not None and len(parent.children) == 1 and len(parent.sinks) == 0 and parent not in self._broadcast and child not in parent.bcvars: parent.chained_info = child parent.name += " -> " + child.name parent.types = child.types @@ -242,6 +244,14 @@ class Environment(object): else: grand_child.parent = parent parent.children.append(grand_child) + # if child is used as a broadcast variable the parent must now be used instead + for s in self._sets: + if child in s.bcvars: + s.bcvars.remove(child) + s.bcvars.append(parent) + for bcvar in self._broadcast: + if bcvar.other.id == child.id: + bcvar.other = parent # child sinks now belong to the parent for sink in child.sinks: sink.parent = parent @@ -256,7 +266,7 @@ class Environment(object): x -= 1 def _remove_set(self, set): - self._sets[:] = [s for s in self._sets if s.id!=set.id] + self._sets[:] = [s for s in self._sets if s.id != set.id] def _send_plan(self): self._send_parameters()
