Fixes bug due to accessing cached pvalues multiple times. This lookup is not an idempotent operation, as it decrements the refcount and may even delete the pvalue from the cache. Instead, we do the lookup once, storing the result in a map that is referenced elsewhere.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/246fda51 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/246fda51 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/246fda51 Branch: refs/heads/python-sdk Commit: 246fda517fd7d6abdbbd47882657e66c34a4ac51 Parents: 77f90ff Author: Robert Bradshaw <[email protected]> Authored: Tue Jul 12 10:43:29 2016 -0700 Committer: Robert Bradshaw <[email protected]> Committed: Tue Jul 12 12:15:25 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/runners/dataflow_runner.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/246fda51/sdks/python/apache_beam/runners/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 24edb05..5a3f6a5 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -361,7 +361,11 @@ class DataflowPipelineRunner(PipelineRunner): # Attach side inputs. si_dict = {} - lookup_label = lambda side_pval: self._cache.get_pvalue(side_pval).step_name + # We must call self._cache.get_pvalue exactly once due to refcounting. + si_labels = {} + for side_pval in transform_node.side_inputs: + si_labels[side_pval] = self._cache.get_pvalue(side_pval).step_name + lookup_label = lambda side_pval: si_labels[side_pval] for side_pval in transform_node.side_inputs: assert isinstance(side_pval, PCollectionView) si_label = lookup_label(side_pval)
