Only encode PCollection outputs in Runner API protos.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c9ff44af Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c9ff44af Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c9ff44af Branch: refs/heads/master Commit: c9ff44afa3fcb47b7f0c4288f4f7d520f063d442 Parents: 0749982 Author: Robert Bradshaw <[email protected]> Authored: Fri Mar 31 16:57:01 2017 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Fri Mar 31 21:44:21 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/pipeline.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c9ff44af/sdks/python/apache_beam/pipeline.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index ee5904b..0841e5f 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -494,6 +494,10 @@ class AppliedPTransform(object): return {str(ix): input for ix, input in enumerate(self.inputs) if isinstance(input, pvalue.PCollection)} + def named_outputs(self): + return {str(tag): output for tag, output in self.outputs.items() + if isinstance(output, pvalue.PCollection)} + def to_runner_api(self, context): from apache_beam.runners.api import beam_runner_api_pb2 return beam_runner_api_pb2.PTransform( @@ -507,7 +511,7 @@ class AppliedPTransform(object): inputs={tag: context.pcollections.get_id(pc) for tag, pc in self.named_inputs().items()}, outputs={str(tag): context.pcollections.get_id(out) - for tag, out in self.outputs.items()}, + for tag, out in self.named_outputs().items()}, # TODO(BEAM-115): display_data display_data=None)
