Repository: beam Updated Branches: refs/heads/master e827642ef -> 0cabdf6e7
Populate PBegin input when decoding from Runner API Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4519681e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4519681e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4519681e Branch: refs/heads/master Commit: 4519681ec3d2fb723a514128d7c9c531c8de9dbf Parents: e827642 Author: Charles Chen <[email protected]> Authored: Thu Jun 15 15:27:18 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Fri Jun 16 13:53:59 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/4519681e/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index ab77956..d84a2b7 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -515,7 +515,18 @@ class Pipeline(object): p.applied_labels = set([ t.unique_name for t in proto.components.transforms.values()]) for id in proto.components.pcollections: - context.pcollections.get_by_id(id).pipeline = p + pcollection = context.pcollections.get_by_id(id) + pcollection.pipeline = p + + # Inject PBegin input where necessary. + from apache_beam.io.iobase import Read + from apache_beam.transforms.core import Create + has_pbegin = [Read, Create] + for id in proto.components.transforms: + transform = context.transforms.get_by_id(id) + if not transform.inputs and transform.transform.__class__ in has_pbegin: + transform.inputs = (pvalue.PBegin(p),) + return p
