I believe that for dataflow runner, the result of processElement must also
fit in memory, so this is not just a constraint for the spark runner.

The best approach at present might be to convert the source from a flatMap
to an SDF that reads out chunks of the file at a time, and supports runner
checkpointing (i.e. with a file seek point to resume from) to chunk your
data in a way that doesn't require the runner to support unbounded outputs
from any individual @ProcessElements downcall.

-Daniel

On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek <[email protected]> wrote:

> Hello,
>
> I am working on an issue which currently limits spark runner by requiring
> the result of processElement to fit the memory [1]. This is problematic e.g
> for flatMap where the input element is file split and generates possibly
> large output.
>
> The intended fix is to add an option to have dofn processing over input in
> one thread and consumption of outputs and forwarding them to downstream
> operators in another thread. One challenge for me is to identify which DoFn
> should be using this async approach.
>
> Here [2] is a commit which is WIP and use async processing only for SDF
> naive expansion. I would like to get feedback on:
>
> 1) does the approach make sense overall
>
> 2) to target DoFn which needs an async processing __ generates possibly
> large output __ I am currently just checking if it is DoFn of SDF naive
> expansion type [3]. I failed to find a better / more systematic approach
> for identifying which DoFn should benefit from that. I would appreciate any
> thoughts how to make this better.
>
> 3) Config option and validatesRunner tests - do we want to make it
> possible to turn async DoFn off? If yes, do we want to run validatesRunner
> tests for borth options? How do I make sure of that?
>
> Looking forward to the feedback.
> Best,
> Jozef
>
> [1] https://github.com/apache/beam/issues/23852
> [2]
> https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
> [3]
> https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362
>
>

Reply via email to