On Mon, Jan 27, 2025 at 1:00 PM Joey Tran <joey.t...@schrodinger.com> wrote: > > I heard mention that there is a flatten unzipping optimization implemented by > some runners. I didn't see that in the python optimizations in > translations.py[1]. Just curious what this optimization is?
It's done to increase the possibility of fusion. Suppose one has ... --> DoFnA \ --> Flatten --> DoFnC -> ... ... --> DoFnB / When determining the physical execution plan, one can re-write this as ... --> DoFnA --> DoFnC \ --> Flatten --> ... ... --> DoFnB --> DoFnC / which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC). One can progressively do this up to the point that the consumer of the flatten already requires materialization that permits multiple inputs (e.g. writing to a shuffle/grouping operation). > I think I get the general gist in that you dont necessarily need to combine > the input pcollections to a flatten and instead you can just apply > non-aggregating consuming transforms to all input pcollections, but when is a > good time to do that? Do runners that implement this optimization always > apply this to all flattens? Pretty much whenever they can, though there are limitations (e.g. if DoFnC is stateful). I think it depends on the internal implementation of the runner whether this makes sense. > Cheers, > Joey > > [1] > https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py