Fix PValue input in _PubSubReadEvaluator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/da3206c6 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/da3206c6 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/da3206c6 Branch: refs/heads/DSL_SQL Commit: da3206c61d3e0c59ef8ac2cac85e2097f5db116a Parents: d4fa33e Author: Charles Chen <[email protected]> Authored: Wed Jul 5 16:18:51 2017 -0700 Committer: Tyler Akidau <[email protected]> Committed: Wed Jul 12 20:00:59 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/direct/transform_evaluator.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/da3206c6/sdks/python/apache_beam/runners/direct/transform_evaluator.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 641291d..cb2ace2 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -436,8 +436,9 @@ class _PubSubReadEvaluator(_TransformEvaluator): bundles = [bundle] else: bundles = [] - input_pvalue = self._applied_ptransform.inputs - if not input_pvalue: + if self._applied_ptransform.inputs: + input_pvalue = self._applied_ptransform.inputs[0] + else: input_pvalue = pvalue.PBegin(self._applied_ptransform.transform.pipeline) unprocessed_bundle = self._evaluation_context.create_bundle( input_pvalue)
