[FLINK-4412] [py] Chaining properly handles broadcast variables

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84d28ba0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84d28ba0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84d28ba0

Branch: refs/heads/master
Commit: 84d28ba00f3e63a83132c1666c8dc8deec7800ba
Parents: 1050847
Author: zentol <[email protected]>
Authored: Wed Aug 17 13:47:30 2016 +0200
Committer: zentol <[email protected]>
Committed: Wed Aug 17 13:52:02 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/python/api/flink/plan/DataSet.py |  1 -
 .../apache/flink/python/api/flink/plan/Environment.py | 14 ++++++++++++--
 2 files changed, 12 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/84d28ba0/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
index caa4ae7..06557ca 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/DataSet.py
@@ -669,7 +669,6 @@ class OperatorSet(DataSet):
         child.other = set._info
         child.name = name
         self._info.bcvars.append(child)
-        set._info.children.append(child)
         self._env._broadcast.append(child)
         return self
 

http://git-wip-us.apache.org/repos/asf/flink/blob/84d28ba0/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
index a54dac8..1e4ba1a 100644
--- 
a/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
+++ 
b/flink-libraries/flink-python/src/main/python/org/apache/flink/python/api/flink/plan/Environment.py
@@ -227,7 +227,9 @@ class Environment(object):
                 parent = child.parent
                 # we can only chain to an actual python udf (=> operator is 
not None)
                 # we may only chain if the parent has only 1 child
-                if parent.operator is not None and len(parent.children) == 1 
and len(parent.sinks) == 0:
+                # we may only chain if the parent is not used as a broadcast 
variable
+                # we may only chain if the parent does not use the child as a 
broadcast variable
+                if parent.operator is not None and len(parent.children) == 1 
and len(parent.sinks) == 0 and parent not in self._broadcast and child not in 
parent.bcvars:
                     parent.chained_info = child
                     parent.name += " -> " + child.name
                     parent.types = child.types
@@ -242,6 +244,14 @@ class Environment(object):
                         else:
                             grand_child.parent = parent
                         parent.children.append(grand_child)
+                    # if child is used as a broadcast variable the parent must 
now be used instead
+                    for s in self._sets:
+                        if child in s.bcvars:
+                            s.bcvars.remove(child)
+                            s.bcvars.append(parent)
+                    for bcvar in self._broadcast:
+                        if bcvar.other.id == child.id:
+                            bcvar.other = parent
                     # child sinks now belong to the parent
                     for sink in child.sinks:
                         sink.parent = parent
@@ -256,7 +266,7 @@ class Environment(object):
             x -= 1
 
     def _remove_set(self, set):
-        self._sets[:] = [s for s in self._sets if s.id!=set.id]
+        self._sets[:] = [s for s in self._sets if s.id != set.id]
 
     def _send_plan(self):
         self._send_parameters()

Reply via email to