Yes you’re right, I forgot to mention that important piece of information 😅 thanks for catching it. The GBK keeps the DoFns separate at pipeline execution.
>From what I’ve learned fusion is a Dataflow thing, do other runners do it too? On Thu, Sep 1, 2022 at 6:08 PM Brian Hulette <bhule...@google.com> wrote: > Thanks for sharing the learnings Ahmed! > > > The solution lies in keeping the retry of each step separate. A good > example of this is in how steps 2 and 3 are implemented [3]. They are > separated into different DoFns and step 3 can start only after step 2 > completes successfully. This way, any failure in step 3 does not go back to > affect step 2. Is it enough just that they're in different DoFns? I thought > the key was that the DoFns are separated by a GroupByKey, so they will be > in different fused stages, which are retried independently. > > Brian > > On Thu, Sep 1, 2022 at 1:43 PM Ahmed Abualsaud via dev < > dev@beam.apache.org> wrote: > >> Hi all, >> >> TLDR: When writing IO connectors, be wary of how bundle retries can >> affect the work flow. >> >> A faulty implementation of a step in BigQuery batch loads was discovered >> recently. I raised an issue [1] but also wanted to mention it here as a >> potentially helpful lesson for those developing new/existing IO connectors. >> >> For those unfamiliar with BigQueryIO file loads, a write that is too >> large for a single load job [2] looks roughly something like this: >> >> >> 1. >> >> Take input rows and write them to temporary files. >> 2. >> >> Load temporary files to temporary BQ tables. >> 3. >> >> Delete temporary files. >> 4. >> >> Copy the contents of temporary tables over to the final table. >> 5. >> >> Delete temporary tables. >> >> >> The faulty part here is that steps 4 and 5 are done in the same DoFn (4 >> in processElement and 5 in finishBundle). In the case a bundle fails in >> the middle of table deletion, let’s say an error occurs when deleting the n >> th table, the whole bundle will retry and we will perform the copy >> again. But tables 1~n have already been deleted and so we get stuck trying >> to copy from non-existent sources. >> >> The solution lies in keeping the retry of each step separate. A good >> example of this is in how steps 2 and 3 are implemented [3]. They are >> separated into different DoFns and step 3 can start only after step 2 >> completes successfully. This way, any failure in step 3 does not go back to >> affect step 2. >> >> That's all, thanks for your attention :) >> >> Ahmed >> >> [1] https://github.com/apache/beam/issues/22920 >> >> [2] >> https://github.com/apache/beam/blob/f921a2f1996cf906d994a9d62aeb6978bab09dd5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoads.java#L100-L105 >> >> >> [3] >> https://github.com/apache/beam/blob/149ed074428ff9b5344169da7d54e8ee271aaba1/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java#L437-L454 >> >> >>