Hi All,

*Problem*
I would like to discuss BEAM-9322
<https://issues.apache.org/jira/projects/BEAM/issues/BEAM-9322> and the
correct way to set the output tags of a transform with nested PCollections,
e.g. a dict of PCollections, a tuple of dicts of PCollections. Before the
fixing of BEAM-1833 <https://issues.apache.org/jira/browse/BEAM-1833>, the
Python SDK when applying a PTransform would auto-generate the output tags
for the output PCollections even if they are manually set by the user:

class MyComposite(beam.PTransform):
  def expand(self, pcoll):
    a = PCollection.from_(pcoll)
    a.tag = 'a'

    b = PCollection.from_(pcoll)
    b.tag = 'b'
    return (a, b)

would yield a PTransform with two output PCollection and output tags with
'None' and '0' instead of 'a' and 'b'. This was corrected for simple cases
like this. However, this fails when the PCollections share the same output
tag (of course). This can happen like so:

class MyComposite(beam.PTransform):
  def expand(self, pcoll):
    partition_1 = beam.Partition(pcoll, ...)
    partition_2 = beam.Partition(pcoll, ...)
    return (partition_1[0], partition_2[0])

With the new code, this leads to an error because both output PCollections
have an output tag of '0'.

*Proposal*
When applying PTransforms to a pipeline (pipeline.py:550) we name the
PCollections according to their position in the tree concatenated with the
PCollection tag and a delimiter. From the first example, the output
PCollections of the applied transform will be: '0.a' and '1.b' because it
is a tuple of PCollections. In the second example, the outputs should be:
'0.0' and '1.0'. In the case of a dict of PCollections, it should simply be
the keys of the dict.

What do you think? Am I missing edge cases? Will this be unexpected to
users? Will this break people who rely on the generated PCollection output
tags?

Regards,
Sam

Reply via email to