Yes, all portable runners use fusion, it is built into the machinery that translates Pipeline into protobuf representation. It is needed for the ability to run the pipeline efficiently, otherwise there would be too many calls between the runner and SDK harness. Which is why the translation creates fused "executable stages".

On 9/3/22 04:34, Ahmed Abualsaud via dev wrote:
Yes you’re right, I forgot to mention that important piece of information 😅 thanks for catching it.  The GBK keeps the DoFns separate at pipeline execution.

From what I’ve learned fusion is a Dataflow thing, do other runners do it too?

On Thu, Sep 1, 2022 at 6:08 PM Brian Hulette <bhule...@google.com> wrote:

    Thanks for sharing the learnings Ahmed!

    > The solution lies in keeping the retry of each step separate. A
    good example of this is in how steps 2 and 3 are implemented [3].
    They are separated into different DoFns and step 3 can start only
    after step 2 completes successfully. This way, any failure in step
    3 does not go back to affect step 2. Is it enough just that
    they're in different DoFns? I thought the key was that the DoFns
    are separated by a GroupByKey, so they will be in different fused
    stages, which are retried independently.
    Brian

    On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev
    <dev@beam.apache.org> wrote:

        Hi all,


        TLDR: When writing IO connectors, be wary of how bundle
        retries can affect the work flow.


        A faulty implementation of a step in BigQuery batch loads was
        discovered recently.I raised an issue [1] but also wanted to
        mention it here as a potentially helpful lesson for those
        developing new/existing IO connectors.


        For those unfamiliar with BigQueryIO file loads, a write that
        is too large for a single load job [2] looks roughly something
        like this:


        1.

            Take input rows and write them to temporary files.

        2.

            Load temporary files to temporary BQ tables.

        3.

            Delete temporary files.

        4.

            Copy the contents of temporary tables over to the final table.

        5.

            Delete temporary tables.


        The faulty part here is that steps 4 and 5 are done in the
        same DoFn (4 in processElementand 5 in finishBundle). In the
        case a bundle fails in the middle of table deletion, let’s say
        an error occurs when deleting the nthtable, the whole bundle
        will retry and we will perform the copy again. But tables 1~n
        have already been deleted and so we get stuck trying to copy
        from non-existent sources.


        The solution lies in keeping the retry of each step separate.
        A good example of this is in how steps 2 and 3 are implemented
        [3]. They are separated into different DoFns and step 3 can
        start only after step 2 completes successfully. This way, any
        failure in step 3 does not go back to affect step 2.


        That's all, thanks for your attention :)

        Ahmed


        [1] https://github.com/apache/beam/issues/22920
        <https://github.com/apache/beam/issues/22920>

        [2]
        
https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105
        
<https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105>

        [3]
        
https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454
        
<https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454>

Reply via email to