On Mon, Jan 27, 2025 at 1:00 PM Joey Tran <joey.t...@schrodinger.com> wrote:
>
> I heard mention that there is a flatten unzipping optimization implemented by 
> some runners. I didn't see that in the python optimizations in 
> translations.py[1]. Just curious what this optimization is?

It's done to increase the possibility of fusion. Suppose one has

... --> DoFnA \
             --> Flatten --> DoFnC -> ...
... --> DoFnB /

When determining the physical execution plan, one can re-write this as

... --> DoFnA --> DoFnC \
                              --> Flatten --> ...
... --> DoFnB --> DoFnC /

which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC).

One can progressively do this up to the point that the consumer of the
flatten already requires materialization that permits multiple inputs
(e.g. writing to a shuffle/grouping operation).

> I think I get the general gist in that you dont necessarily need to combine 
> the input pcollections to a flatten and instead you can just apply 
> non-aggregating consuming transforms to all input pcollections, but when is a 
> good time to do that? Do runners that implement this optimization always 
> apply this to all flattens?

Pretty much whenever they can, though there are limitations (e.g. if
DoFnC is stateful). I think it depends on the internal implementation
of the runner whether this makes sense.

> Cheers,
> Joey
>
> [1] 
> https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py

Reply via email to