Hi Jozef,
I agree that this issue is most likely related to Spark for the reason
how Spark uses functional style for doing flatMap().
It could be fixed with the following two options:
a) SparkRunner's SDF implementation does not use splitting - it could
be fixed so that the SDF is stopped after N elements buffered via
trySplit, buffer gets flushed and the restriction is resumed
b) alternatively use two threads and a BlockingQueue between them,
which is what you propose
The number of output elements per input element is bounded (we are
talking about batch case anyway), but bounded does not mean it has to
fit to memory. Furthermore, unnecessary buffering of large number of
elements is memory-inefficient, which is why I think that the two-thread
approach (b) should be the most efficient. The option (a) seems
orthogonal and might be implemented as well.
It rises the question of how to determine if the runner should do some
special translation of SDF in this case. There are probably only these
options:
1) translate all SDFs to two-thread execution
2) add runtime flag, that will turn the translation on (once turned
on, it will translate all SDFs) - this is the current proposal
3) extend @DoFn.BoundedPerElement annotation with some kind of
(optional) hint - e.g. @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE),
the default would be Bounded.FITS_IN_MEMORY (which is the current approach)
The approach (3) seems to give more information to all runners and might
result in the ability to apply various optimizations for multiple
runners, so I'd say that this might be the ideal variant.
Jan
On 12/29/22 13:07, Jozef Vilcek wrote:
I am surprised to hear that Dataflow runner ( which I never used )
would have this kind oflimitation. I see that the `OutputManager`
interface is implemented to write to `Receiver` [1] which follows the
push model. Do you have a reference I can take a look to review the
must fit memory limitation?
In Spark, the problem is that the leaf operator pulls data from
previous ones by consuming an `Iterator` of values. As per your
suggestion, this is not a problem with `sources` because they hold
e.g. source file and can pull data as they are being requested. This
gets problematic exactly with SDF and flatMaps and not sources. It
could be one of the reasons why SDF performed badly on Spark where
community reported performance degradation [2] and increases memory
use [3]
My proposed solution is to, similar as Dataflow, use `Receiver`-like
implementation for DoFns which can output large number of elements.
For now, this WIP targets SDFs only.
[1]
https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
[2] https://github.com/apache/beam/pull/14755
[3]
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005
<https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005>
On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev
<[email protected]> wrote:
I believe that for dataflow runner, the result of processElement
must also fit in memory, so this is not just a constraint for the
spark runner.
The best approach at present might be to convert the source from a
flatMap to an SDF that reads out chunks of the file at a time, and
supports runner checkpointing (i.e. with a file seek point to
resume from) to chunk your data in a way that doesn't require the
runner to support unbounded outputs from any individual
@ProcessElements downcall.
-Daniel
On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek
<[email protected]> wrote:
Hello,
I am working on an issue which currently limits spark runner
by requiring the result of processElement to fit the memory
[1]. This is problematic e.g for flatMap where the input
element is file split and generates possibly large output.
The intended fix is to add an option to have dofn processing
over input in one thread and consumption of outputs and
forwarding them to downstream operators in another thread. One
challenge for me is to identify which DoFn should be using
this async approach.
Here [2] is a commit which is WIP and use async processing
only for SDF naive expansion. I would like to get feedback on:
1) does the approach make sense overall
2) to target DoFn which needs an async processing __ generates
possibly large output __ I am currently just checking if it is
DoFn of SDF naive expansion type [3]. I failed to find a
better / more systematic approach for identifying which DoFn
should benefit from that. I would appreciate any thoughts how
to make this better.
3) Config option and validatesRunner tests - do we want to
make it possible to turn async DoFn off? If yes, do we want to
run validatesRunner tests for borth options? How do I make
sure of that?
Looking forward to the feedback.
Best,
Jozef
[1] https://github.com/apache/beam/issues/23852
[2]
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
[3]
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362