On Thu, Feb 5, 2026 at 6:06 PM Robert Bradshaw <[email protected]> wrote:
> On Thu, Feb 5, 2026 at 1:44 PM Joey Tran <[email protected]> > wrote: > >> Thanks for such quick feedback! >> >> On Thu, Feb 5, 2026 at 3:27 PM Danny McCormick via dev < >>> [email protected]> wrote: >>> >>>> Would you mind opening up the doc for comments? >>>> >>>> At a high level, I'm skeptical of the pattern; it seems to me like it >>>> moves the burden of choosing the correct behavior from authors to consumers >>>> in non-obvious ways which range from harmless to potentially causing silent >>>> data loss. I think if a user wants to drop a PCollection, that should >>>> always be an active choice since the risk of data loss is much greater than >>>> the EoU benefit of extra code. >>>> >>>> I think perhaps I poorly chose a few motivating examples, but it was at >> least helpful in clarifying two distinct patterns. >> - Filters/Samplers/Deduplicators >> - Transforms that may run into issues with certain inputs >> > > One usecase that comes to mind is running some computation with an > (safely ignorable) side output that has statistics about what was > computed/encountered. > > >> >> >>> I'd argue that a better pattern than having a single transform which >>>> handles this is to either have a *Filter *or a *Partition* transform >>>> which a user can use as needed. These are different transforms because they >>>> have different purposes/core functionalities. >>>> >>>> This can become unwieldy for a large library of filtering / sampling / >> data processing transforms. At Schrodinger for example, we may have maybe a >> dozen transforms some of which... >> - are samplers where most consumers will just be interested in the >> "sample", while other consumers may be interested in both the sample and >> remaining >> - are data processing transforms with a concept of processed outputs >> and "dropped for well-understood reason" >> > > I prefer transforms that can be used multiple ways than distinct > transforms that do the same thing. > > FWIW, our existing DoFn has somewhat this behavior: by default one > consumes only the "main" output, but the caller can ask for the side > outputs if desired. This extends it a bit further in that > > 1. Composite operations are supported (though I guess technically nothing > is stopping someone from manually constructing a DoFnTuple), and > 2. Side outputs are always returned and available for consumption, without > inconveniencing use of the main output, rather than having to explicitly > switch modes at application time. > > This seems like a strict win to me. > I guess fundamentally, the question is: should we make it easier to drop outputs with no indication to a consumer (and more importantly IMO, future readers of a consumer's code)? For example: pcoll | Filter(...) | ChainedParDo() gives no indication of there being multiple outputs from Partition. In some cases, this is fine, but in others it is dangerous. I think we can draw on existing language design principles here. IMO, this is essentially the same as allowing: def foo(): return out1, out2, out3 be successfully unpacked with: out1, out2 = foo() This is convenient if you only need out1/out2, but it makes mistakes less obvious in the name of removing a line of code. In my experience, people already find Beam's pipe operator surprising, and this seems like it would make it worse - we're changing the meaning of the pipe operator with a transform level change. With all that said, my core claim is: *As much as possible, there should be some indication in the code of a consumer if they are, or might be, dropping** output from a function or transformation.* > > We'd likely need to double the size of our library in order to have both >> Filter and Partition versions of these transforms. >> >> >>> > A parser that routes malformed input to a dead-letter output >>>> > A validator that routes violations separately >>>> > An enrichment that routes lookup failures aside >>>> >>>> These are the ones I'm really worried about. In all of these cases, we >>>> are silently dropping error output in a way that might be non-obvious to a >>>> user. As a user, if I use a parser that returns a single output, I would >>>> assume that any parse failures would lead to exceptions. >>>> >>>> I agree that it'd be an antipattern for these types of transforms to >> silently capture and drop these erroneous records, but there is nothing >> preventing an author of parser/validator/enrichment transform from doing >> this today even without ForkedPCollections. With ForkedPCollections, I >> think we can and still should discourage transform authors from silently >> handling errors without some active user configuration (e.g. by requiring >> as a keyword arg `error_handling_pcoll_name= "failed" to enable any error >> capturing at all). e.g. >> ``` >> parsed = pcoll | ParseData() >> # parsed.failed --> should not exist, ParseData should not automatically >> do this >> >> parsed = pcoll | ParseData(failed_pcoll_tag="failed") >> # parsed.failed --> exists now but only with user intent >> ``` >> > > +1. Errors should fail the pipeline unless one explicitly requests they be > passed somewhere else. > I'm glad we're generally aligned on faliing the pipeline by default - I'll note that this is exactly what with_exception_handling is doing, but I'd still be hesitant to apply this to with_exception_handling, with something like parsed = pcoll | ParseData(failed_pcoll_tag="failed") Should we really be helping transform authors make it easier for consumers to swallow unhandled error output? > >> >> >> >>> With all that said, I am aligned with the goal of making pipelines like >>>> this easier to chain. Maybe an in between option would be adding a DoFn >>>> utility like: >>>> >>>> ``` >>>> pcoll | Partition(...).keep_tag('main') | ChainedParDo() >>>> ``` >>>> >>>> Where `keep_tag` forces an expansion where all tags other than main are >>>> dropped. What do you think? >>>> >>> > One can already write > > pcoll | Partition(...)['main'] | ChainedParDo() > This is a good point, though as a nit I think it needs to be: *(*pcoll | Partition(...)*)*['main'] | ChainedParDo() since the tag is applied to the PCollection, not the ParDo. But this does give us the expressivity we need here IMO. > This would help but this solution would be limited to ParDos. If you have >> a composite transform like a sophisticated `CompositeFilter` or >> `CompositeSampler`, then you wouldn't be able to use `.keep_tag`. >> > I don't think there's any reason we couldn't apply the same to composites. Thanks, Danny >
