Repository: beam Updated Branches: refs/heads/release-2.1.0 bd032586f -> 53b372b19
Fix PValue input in _PubSubReadEvaluator Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/389e15a1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/389e15a1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/389e15a1 Branch: refs/heads/release-2.1.0 Commit: 389e15a16a4b2324ac67f0586b73eabff5fdcb15 Parents: bd03258 Author: Charles Chen <[email protected]> Authored: Wed Jul 5 16:18:51 2017 -0700 Committer: Ahmet Altay <[email protected]> Committed: Wed Jul 5 22:10:38 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/direct/transform_evaluator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/389e15a1/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 641291d..cb2ace2 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -436,8 +436,9 @@ class _PubSubReadEvaluator(_TransformEvaluator): bundles = [bundle] else: bundles = [] - input_pvalue = self._applied_ptransform.inputs - if not input_pvalue: + if self._applied_ptransform.inputs: + input_pvalue = self._applied_ptransform.inputs[0] + else: input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline) unprocessed_bundle = self._evaluation_context.create_bundle( input_pvalue)
