Hi All,

In the Python SDK there are three ways of representing the output of a
PTransform with multiple PCollections:

   - dictionary: PCollection tag --> PCollection
   - tuple: index --> PCollection
   - DoOutputsTuple: tag, index, or field name --> PCollection

I find this inconsistent way of accessing multiple outputs to be confusing.
Say that you have an arbitrary PTransform with multiple outputs. How do you
know how to access an individual output without looking at the source
code? *You
can't!* Remember there are three representations of multiple outputs. So,
you need to look at the output type and determine what the output actually

What purpose does it serve to have three different ways of representing a
single concept of multiple output PCollections?

My proposal is to have a single representation analogous to Java's
PCollectionTuple. With this new type you will able to access PCollections
by tag with the "[ ]" operator or by field name. It should also up-convert
returned tuples, dicts, and DoOutputsTuples from composites into this new

Full example:

class SomeCustomComposite(PTransform):
  def expand(self, pcoll):
    def my_multi_do_fn(x):
      if isinstance(x, int):
        yield pvalue.TaggedOutput('number', x)
      if isinstance(x, str):
        yield pvalue.TaggedOutput('string', x)

    def printer(x):
      yield x

    outputs = pcoll | beam.ParDo(my_multi_do_fn).with_outputs()*
return pvalue.PTuple({
        'number': output.number | beam.ParDo(printer),
        'string': output.string | beam.ParDo(printer)

p = beam.Pipeline()
*main = p | SomeCustomComposite()*

# Access PCollection by field name.
numbers = *main.number* | beam.ParDo(...)

# Access PCollection by tag.
strings = *main['string']* | beam.ParDo(...)

What do you think? Does this clear up the confusion of using multiple
output PCollections in Python?


Reply via email to