Visit composite nodes when checking for picklability.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/90cc2bcf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/90cc2bcf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/90cc2bcf Branch: refs/heads/gearpump-runner Commit: 90cc2bcfdf2256d09ed2bc155c65c5a011c42026 Parents: b47e41d Author: Robert Bradshaw <[email protected]> Authored: Wed Jun 28 16:34:47 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Wed Jun 28 17:58:06 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/90cc2bcf/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 724c87d..fe36d85 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -477,6 +477,9 @@ class Pipeline(object): class Visitor(PipelineVisitor): # pylint: disable=used-before-assignment ok = True # Really a nonlocal. + def enter_composite_transform(self, transform_node): + self.visit_transform(transform_node) + def visit_transform(self, transform_node): if transform_node.side_inputs: # No side inputs (yet). @@ -555,7 +558,7 @@ class PipelineVisitor(object): pass def visit_transform(self, transform_node): - """Callback for visiting a transform node in the pipeline DAG.""" + """Callback for visiting a transform leaf node in the pipeline DAG.""" pass def enter_composite_transform(self, transform_node):
