Fix multi-input named PTransforms. Now delegate the __ror__ logic entirely for the naming wrapper.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937cf69e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937cf69e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937cf69e Branch: refs/heads/python-sdk Commit: 937cf69e958d4a82fb274f311de248930298db69 Parents: 9fe102a Author: Robert Bradshaw <rober...@google.com> Authored: Fri Jul 22 14:32:33 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Sat Jul 23 16:43:45 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/transforms/ptransform.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937cf69e/sdks/python/apache_beam/transforms/ptransform.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index da8b671..b652bca 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -400,7 +400,7 @@ class PTransform(WithTypeHints): else: return NotImplemented - def __ror__(self, left): + def __ror__(self, left, label=None): """Used to apply this PTransform to non-PValues, e.g., a tuple.""" pvalueish, pvalues = self._extract_input_pvalues(left) pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)] @@ -434,7 +434,7 @@ class PTransform(WithTypeHints): if not isinstance(v, pvalue.PValue) and v is not None} pvalueish = _SetInputPValues().visit(pvalueish, replacements) self.pipeline = p - result = p.apply(self, pvalueish) + result = p.apply(self, pvalueish, label) if deferred: return result else: @@ -720,5 +720,8 @@ class _NamedPTransform(PTransform): super(_NamedPTransform, self).__init__(label) self.transform = transform + def __ror__(self, pvalueish): + return self.transform.__ror__(pvalueish, self.label) + def apply(self, pvalue): raise RuntimeError("Should never be applied directly.")