I believe that for dataflow runner, the result of processElement must also fit in memory, so this is not just a constraint for the spark runner.
The best approach at present might be to convert the source from a flatMap to an SDF that reads out chunks of the file at a time, and supports runner checkpointing (i.e. with a file seek point to resume from) to chunk your data in a way that doesn't require the runner to support unbounded outputs from any individual @ProcessElements downcall. -Daniel On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek <[email protected]> wrote: > Hello, > > I am working on an issue which currently limits spark runner by requiring > the result of processElement to fit the memory [1]. This is problematic e.g > for flatMap where the input element is file split and generates possibly > large output. > > The intended fix is to add an option to have dofn processing over input in > one thread and consumption of outputs and forwarding them to downstream > operators in another thread. One challenge for me is to identify which DoFn > should be using this async approach. > > Here [2] is a commit which is WIP and use async processing only for SDF > naive expansion. I would like to get feedback on: > > 1) does the approach make sense overall > > 2) to target DoFn which needs an async processing __ generates possibly > large output __ I am currently just checking if it is DoFn of SDF naive > expansion type [3]. I failed to find a better / more systematic approach > for identifying which DoFn should benefit from that. I would appreciate any > thoughts how to make this better. > > 3) Config option and validatesRunner tests - do we want to make it > possible to turn async DoFn off? If yes, do we want to run validatesRunner > tests for borth options? How do I make sure of that? > > Looking forward to the feedback. > Best, > Jozef > > [1] https://github.com/apache/beam/issues/23852 > [2] > https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff > [3] > https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362 > >
