Yes, I think we can definitely start with spark-specific config option, but there could be value for other runners to know if output of @ProcessElement is somewhat limited in size (e.g. can be can be included in single bundle), or needs to be actively split. This could then be incorporated into the naive bounded implementation that is reused by multiple runners [1], which currently does not do any (active) splits of running restriction. But this might be a different discussion.

 Jan

[1]https://github.com/apache/beam/blob/master/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java#L111

On 1/3/23 11:38, Jozef Vilcek wrote:
Regarding splitting, I think SDF is being split on spark runner, but I agree with Jan's comments about split's contract. Specific SDF is also free to make decisions about how big the minimal split will be and the runner should be able to process that with reasonable resources. E.g. ParquetIO is splitting on format's row groups. If the row group is larger and format contains a lot of well compressed column data, it will challenge memory resources.

Jan, as for suggested options to implement it, I have an MR with approach 1) to translate all SDFs to two-threaded executions. I did consider something like option 3) but I was not sure if it makes sense in general for other runners as well for Spark. It begs a question for me if it ever makes sense to create SDF and want it on Spark not to use 2 thread execution and possibly apply memory pressure.

On Mon, Jan 2, 2023 at 4:49 PM Jan Lukavský <[email protected]> wrote:

    There are different translations of streaming and batch Pipelines
    in SparkRunner, this thread was focused on the batch part, if I
    understand it correctly. Unbounded PCollections are not supported
    in batch Spark (by definition). I agree that fixing the splitting
    is a valid option, though it still requires unnecessarily big heap
    for buffering and/or might induce some overhead with splitting the
    restriction. Not to mention, that the splitting is somewhat
    optional in the contract of SDF (the DoFn might not support it, if
    it is bounded), so it might not solve the issue for all SDFs. The
    source might not even be splittable at all (e.g. a completely
    compressed blob, without any blocks).

     Jan

    On 1/2/23 16:22, Daniel Collins via dev wrote:
    If spark's SDF solution doesn't support splitting, fixing that
    seems like the best solution to me. Splitting is the mechanism
    exposed by the model to actually limit the amount of data
    produced in a bundle. If unsupported, then unbounded-per-element
    SDFs wouldn't be supported at all.

    -Daniel

    On Mon, Jan 2, 2023 at 7:46 AM Jan Lukavský <[email protected]> wrote:

        Hi Jozef,

        I agree that this issue is most likely related to Spark for
        the reason how Spark uses functional style for doing flatMap().

        It could be fixed with the following two options:

         a) SparkRunner's SDF implementation does not use splitting -
        it could be fixed so that the SDF is stopped after N elements
        buffered via trySplit, buffer gets flushed and the
        restriction is resumed

         b) alternatively use two threads and a BlockingQueue between
        them, which is what you propose

        The number of output elements per input element is bounded
        (we are talking about batch case anyway), but bounded does
        not mean it has to fit to memory. Furthermore, unnecessary
        buffering of large number of elements is memory-inefficient,
        which is why I think that the two-thread approach (b) should
        be the most efficient. The option (a) seems orthogonal and
        might be implemented as well.

        It rises the question of how to determine if the runner
        should do some special translation of SDF in this case. There
        are probably only these options:

         1) translate all SDFs to two-thread execution

         2) add runtime flag, that will turn the translation on (once
        turned on, it will translate all SDFs) - this is the current
        proposal

         3) extend @DoFn.BoundedPerElement annotation with some kind
        of (optional) hint - e.g.
        @DoFn.BoundedPerElement(Bounded.POSSIBLY_HUGE), the default
        would be Bounded.FITS_IN_MEMORY (which is the current approach)

        The approach (3) seems to give more information to all
        runners and might result in the ability to apply various
        optimizations for multiple runners, so I'd say that this
        might be the ideal variant.

          Jan

        On 12/29/22 13:07, Jozef Vilcek wrote:
        I am surprised to hear that Dataflow runner ( which I never
        used ) would have this kind oflimitation. I see that the
        `OutputManager` interface is implemented to write to
        `Receiver` [1] which follows the push model. Do you have a
        reference I can take a look to review the must fit memory
        limitation?

        In Spark, the problem is that the leaf operator pulls data
        from previous ones by consuming an `Iterator` of values. As
        per your suggestion, this is not a problem with `sources`
        because they hold e.g. source file and can pull data as they
        are being requested. This gets problematic exactly with SDF
        and flatMaps and not sources. It could be one of the reasons
        why SDF performed badly on Spark where community reported
        performance degradation [2] and increases memory use [3]

        My proposed solution is to, similar as Dataflow, use
        `Receiver`-like implementation for DoFns which can output
        large number of elements. For now, this WIP targets SDFs only.

        [1]
        
https://github.com/apache/beam/blob/v2.43.0/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java#L285
        [2] https://github.com/apache/beam/pull/14755
        [3]
        
https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005
        
<https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17332005&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17332005>

        On Wed, Dec 28, 2022 at 8:52 PM Daniel Collins via dev
        <[email protected]> wrote:

            I believe that for dataflow runner, the result of
            processElement must also fit in memory, so this is not
            just a constraint for the spark runner.

            The best approach at present might be to convert the
            source from a flatMap to an SDF that reads out chunks of
            the file at a time, and supports runner checkpointing
            (i.e. with a file seek point to resume from) to chunk
            your data in a way that doesn't require the runner to
            support unbounded outputs from any individual
            @ProcessElements downcall.

            -Daniel

            On Wed, Dec 28, 2022 at 1:36 PM Jozef Vilcek
            <[email protected]> wrote:

                Hello,

                I am working on an issue which currently limits
                spark runner by requiring the result of
                processElement to fit the memory [1]. This is
                problematic e.g for flatMap where the input element
                is file split and generates possibly large output.

                The intended fix is to add an option to have dofn
                processing over input in one thread and consumption
                of outputs and forwarding them to downstream
                operators in another thread. One challenge for me is
                to identify which DoFn should be using this async
                approach.

                Here [2] is a commit which is WIP and use async
                processing only for SDF naive expansion. I would
                like to get feedback on:

                1) does the approach make sense overall

                2) to target DoFn which needs an async processing __
                generates possibly large output __ I am currently
                just checking if it is DoFn of SDF naive expansion
                type [3]. I failed to find a better / more
                systematic approach for identifying which DoFn
                should benefit from that. I would appreciate any
                thoughts how to make this better.

                3) Config option and validatesRunner tests - do we
                want to make it possible to turn async DoFn off? If
                yes, do we want to run validatesRunner tests for
                borth options? How do I make sure of that?
                Looking forward to the feedback.
                Best,
                Jozef

                [1] https://github.com/apache/beam/issues/23852
                [2]
                
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff
                [3]
                
https://github.com/JozoVilcek/beam/commit/895c4973fe9adc6225fcf35d039e3eb1a81ffcff#diff-bd72087119a098aa8c947d0989083ec9a6f2b54ef18da57d50e0978799c79191R362

Reply via email to