On Fri, Feb 6, 2026 at 4:43 PM Danny McCormick <[email protected]> wrote:
> On Fri, Feb 6, 2026 at 4:22 PM Joey Tran <[email protected]> > wrote: > >> >> >> On Fri, Feb 6, 2026 at 1:34 PM Danny McCormick via dev < >> [email protected]> wrote: >> >>> > But that is exactly the behavior our current DoFns have (if one does >>> > not call with_outputs, the TaggedOutputs are ignored). And even if one >>> > does, there's nothing that forces you to name all the outputs, or use >>> > them even if they're named. >>> >>> I missed that this is our default behavior for DoFns. While I disagree >>> with that choice, that ship has sailed long ago, and it could influence how >>> I think about this - thanks for calling it out. >>> >>> Unifying our behavior across DoFns and composites matters more to me >>> than avoiding the non-obvious dropping of a PCollection. But I don't think >>> this proposal quite does that since we're not talking about changing the >>> default behavior. >>> >> >> >> This is an interesting idea, though it does increase the scope of the >> proposal quite a bit, at least in terms of implementation. We could extend >> `PTransform` to have a `.with_outputs()` method so `ParDo`s and >> `PTransforms` have the same interface. Then `PTransform.expand` can >> optionally return a `PCollectionWithSideOutputs` (I agree with Robert that >> `ForkedPCollections` doesn't really make sense). >> >> ``` >> class MyFilter(beam.PTransform): >> def expand(self, pcoll): >> results = pcoll | beam.ParDo(FilterDoFn()).with_outputs( >> 'filtered', main='main') >> return PCollectionWithSideOutputs(main=results.main, >> filtered=results.filtered) >> ``` >> Then you'd have similar behavior for both ParDos and PTransforms: >> ``` >> # ParDo >> pcoll | ParDo(fn) >> pcoll | ParDo(fn).with_outputs('side', main='main') # DoOutputsTuple >> >> # Composite PTransfor >> pcoll | MyTransform() # PCollection >> pcoll | MyTransform().with_outputs('filtered', main='main') # >> PCollectionOutputsTuple >> ``` >> >> I recognize this still doesn't really address your concerns about making >> it easier to hide dropped outputs though. >> > > If we can unify things I'm much more amenable to this, because it at least > helps with a different problem (inconsistency across transform > experiences). I think we'd still have different default behaviors in this > case, though, right? If we wanted to really unify things, we'd need to wrap > things and always return a PCollectionWithSideOutputs, but that probably > doesn't work since composites don't necessarily have a "main" PCollection > in the same way. Argubably, its always the first PCollection. > > I don't quite follow what you mean about different default behaviors. The default for both ParDo and MyTransform when they're applied to a pcollection is to return a PCollection, regardless of whether or not they had tagged/side outputs. Do you mean that ParDo().with_outputs() and PTransform().with_outputs() return different types? > >> >>> Secondarily, it is optimizing for writability over readability; you save >>> a few lines of code, but the reader might not have any idea that there was >>> a dropped output (whether that's a harmless Filter secondary collection or >>> an error collection). >>> >>> >> FWIW, much of the value of this proposal to me is the better readability >> from not having to consider multiple versions of transforms and not having >> to break up chains to extract main outputs. I appreciate though that we'd >> be making a trade-off of readability of the "sad path" for readability of >> the "happy path" >> > > Yeah, that makes sense; what do you think of the other alternative > mentioned as an option for optimizing for both kinds of readability? > Specifically, allowing: > > pcoll | Partition(...)['main'] | ChainedParDo() > > I guess the downside there is education (all pipeline authors need to know > this is an option as opposed to only one expert transform author), but I'm > curious if it is sufficient for your context. > > Is the suggestion here to implement `__getitem__` on PTransform/ParDo so a particular pcollection can be specified? This would definitely be an improvement from the current state. I think one further improvement would be if we could specify the pcollection by attribute rather than by key/string, so `Partition(...).main` instead, but that risks pcollection name and ptransform method collisions. I'm still partial toward the other suggestions, particularly towards implementing `PTransform.with_outputs`, but this is probably sufficient for my context. > > pcoll | Partition(...)['main'] | ChainedParDo() >>> >>> We definitely could introduce this syntax FWIW, and I'd be supportive if >>> it solves the underlying problem sufficiently. It is a little less clean >>> than the proposed syntax, but much less ambiguous in intent. It should be >>> straightforward to expand it to SelectOutput('main'). >>> >>> Thanks, >>> Danny >>> >>> On Fri, Feb 6, 2026 at 11:54 AM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> On Fri, Feb 6, 2026 at 7:21 AM Danny McCormick >>>> <[email protected]> wrote: >>>> > >>>> > On Thu, Feb 5, 2026 at 6:06 PM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >> >>>> >> On Thu, Feb 5, 2026 at 1:44 PM Joey Tran <[email protected]> >>>> wrote: >>>> >>> >>>> >>> Thanks for such quick feedback! >>>> >>> >>>> >>>> On Thu, Feb 5, 2026 at 3:27 PM Danny McCormick via dev < >>>> [email protected]> wrote: >>>> >>>>> >>>> >>>>> Would you mind opening up the doc for comments? >>>> >>>>> >>>> >>>>> At a high level, I'm skeptical of the pattern; it seems to me >>>> like it moves the burden of choosing the correct behavior from authors to >>>> consumers in non-obvious ways which range from harmless to potentially >>>> causing silent data loss. I think if a user wants to drop a PCollection, >>>> that should always be an active choice since the risk of data loss is much >>>> greater than the EoU benefit of extra code. >>>> >>>>> >>>> >>> I think perhaps I poorly chose a few motivating examples, but it >>>> was at least helpful in clarifying two distinct patterns. >>>> >>> - Filters/Samplers/Deduplicators >>>> >>> - Transforms that may run into issues with certain inputs >>>> >> >>>> >> >>>> >> One usecase that comes to mind is running some computation with an >>>> (safely ignorable) side output that has statistics about what was >>>> computed/encountered. >>>> >> >>>> >>> >>>> >>> >>>> >>>>> >>>> >>>>> I'd argue that a better pattern than having a single transform >>>> which handles this is to either have a Filter or a Partition transform >>>> which a user can use as needed. These are different transforms because they >>>> have different purposes/core functionalities. >>>> >>>>> >>>> >>> This can become unwieldy for a large library of filtering / >>>> sampling / data processing transforms. At Schrodinger for example, we may >>>> have maybe a dozen transforms some of which... >>>> >>> - are samplers where most consumers will just be interested in >>>> the "sample", while other consumers may be interested in both the sample >>>> and remaining >>>> >>> - are data processing transforms with a concept of processed >>>> outputs and "dropped for well-understood reason" >>>> >> >>>> >> >>>> >> I prefer transforms that can be used multiple ways than distinct >>>> transforms that do the same thing. >>>> >> >>>> >> FWIW, our existing DoFn has somewhat this behavior: by default one >>>> consumes only the "main" output, but the caller can ask for the side >>>> outputs if desired. This extends it a bit further in that >>>> >> >>>> >> 1. Composite operations are supported (though I guess technically >>>> nothing is stopping someone from manually constructing a DoFnTuple), and >>>> >> 2. Side outputs are always returned and available for consumption, >>>> without inconveniencing use of the main output, rather than having to >>>> explicitly switch modes at application time. >>>> >> >>>> >> This seems like a strict win to me. >>>> > >>>> > >>>> > I guess fundamentally, the question is: should we make it easier to >>>> drop outputs with no indication to a consumer (and more importantly IMO, >>>> future readers of a consumer's code)? >>>> > >>>> > For example: >>>> > >>>> > pcoll | Filter(...) | ChainedParDo() >>>> > >>>> > gives no indication of there being multiple outputs from Partition. >>>> In some cases, this is fine, but in others it is dangerous. >>>> >>>> But that is exactly the behavior our current DoFns have (if one does >>>> not call with_outputs, the TaggedOutputs are ignored). And even if one >>>> does, there's nothing that forces you to name all the outputs, or use >>>> them even if they're named. >>>> >>>> Or, with the Filter example here, it's not considered "data loss" that >>>> only some of the elements make it through, but unfortunate that you >>>> can't peek at the elements that were dropped if you want. Making this >>>> a "side output" that is optionally inspected seems strictly better. >>>> (In fact, we could (should) update our Filter DoFn to do this today, >>>> though this would force one to write >>>> Filter(...).with_side_outputs('filtered', main='main'). >>>> >>>> >>>>> > A parser that routes malformed input to a dead-letter output >>>> >>>>> > A validator that routes violations separately >>>> >>>>> > An enrichment that routes lookup failures aside >>>> >>>>> >>>> >>>>> These are the ones I'm really worried about. In all of these >>>> cases, we are silently dropping error output in a way that might be >>>> non-obvious to a user. As a user, if I use a parser that returns a single >>>> output, I would assume that any parse failures would lead to exceptions. >>>> >>>>> >>>> >>> I agree that it'd be an antipattern for these types of transforms >>>> to silently capture and drop these erroneous records, but there is nothing >>>> preventing an author of parser/validator/enrichment transform from doing >>>> this today even without ForkedPCollections. With ForkedPCollections, I >>>> think we can and still should discourage transform authors from silently >>>> handling errors without some active user configuration (e.g. by requiring >>>> as a keyword arg `error_handling_pcoll_name= "failed" to enable any error >>>> capturing at all). e.g. >>>> >>> ``` >>>> >>> parsed = pcoll | ParseData() >>>> >>> # parsed.failed --> should not exist, ParseData should not >>>> automatically do this >>>> >>> >>>> >>> parsed = pcoll | ParseData(failed_pcoll_tag="failed") >>>> >>> # parsed.failed --> exists now but only with user intent >>>> >>> ``` >>>> >> >>>> >> >>>> >> +1. Errors should fail the pipeline unless one explicitly requests >>>> they be passed somewhere else. >>>> > >>>> > >>>> > I'm glad we're generally aligned on faliing the pipeline by default - >>>> I'll note that this is exactly what with_exception_handling is doing, but >>>> I'd still be hesitant to apply this to with_exception_handling, with >>>> something like >>>> > >>>> > parsed = pcoll | ParseData(failed_pcoll_tag="failed") >>>> > >>>> > Should we really be helping transform authors make it easier for >>>> consumers to swallow unhandled error output? >>>> >>>> Yeah, for error handling, I would encourage using the existing error >>>> handling mechanisms (including raising an exception rather that >>>> emitting an output). >>>> >>>> >>>>> With all that said, I am aligned with the goal of making >>>> pipelines like this easier to chain. Maybe an in between option would be >>>> adding a DoFn utility like: >>>> >>>>> >>>> >>>>> ``` >>>> >>>>> pcoll | Partition(...).keep_tag('main') | ChainedParDo() >>>> >>>>> ``` >>>> >>>>> >>>> >>>>> Where `keep_tag` forces an expansion where all tags other than >>>> main are dropped. What do you think? >>>> >> >>>> >> >>>> >> One can already write >>>> >> >>>> >> pcoll | Partition(...)['main'] | ChainedParDo() >>>> > >>>> > This is a good point, though as a nit I think it needs to be: >>>> > >>>> > (pcoll | Partition(...))['main'] | ChainedParDo() >>>> > >>>> > since the tag is applied to the PCollection, not the ParDo. But this >>>> does give us the expressivity we need here IMO. >>>> >>>> Good point. And that makes it a bit uglier. We could introduce a new >>>> trivial transform >>>> >>>> pcoll | Partition(...) | SelectOutput('main') | ChainedParDo >>>> >>>> but that is cumbersome to write compared to >>>> >>>> pcoll | Filter(...) | ChainedParDo >>>> >>>> when 90% of the time you just want to filter out the elements, but 10% >>>> of the time you might be interested in what was filtered out (and >>>> why). >>>> >>>
