kennknowles commented on code in PR #34657: URL: https://github.com/apache/beam/pull/34657#discussion_r2052623420
########## sdks/python/apache_beam/io/gcp/bigquery_file_loads.py: ########## @@ -1101,6 +1101,18 @@ def _load_data( of the load jobs would fail but not other. If any of them fails, then copy jobs are not triggered. """ + self.reshuffle_before_load = not util.is_compat_version_prior_to( Review Comment: Structural comments: - This reshuffle should be added outside of this transform. Make it the responsibility of the caller to ensure stable inputs. - It is be nicer to fully fork `expand` whenever we move to a new update-incompatible version. Basically freeze the old one and leave it behind (like https://github.com/apache/beam/blob/32620186a0029106b1396560257d68663891021d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L87). You still get bugfixes applied to helper transforms, but keep `expand` itself straight-line code except for the top-level decision of which version to expand. ########## sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py: ########## @@ -478,6 +478,44 @@ def test_records_traverse_transform_with_mocks(self): assert_that(jobs, equal_to([job_reference]), label='CheckJobs') + @parameterized.expand([ + param(compat_version=None), + param(compat_version="2.64.0"), + ]) + def test_reshuffle_before_load(self, compat_version): Review Comment: I don't think this really tests what you want to test. You want: - correctness of the transform with both settings of the flag - if possible, some way to reproduce the issue you had before that is red before, green after You also want to build an internal dataflow test of update compatibility with requested 2.64.0 version. I can show you a CL that does that if you haven't already seen them. ########## sdks/python/apache_beam/io/gcp/bigquery_file_loads.py: ########## @@ -1101,6 +1101,18 @@ def _load_data( of the load jobs would fail but not other. If any of them fails, then copy jobs are not triggered. """ + self.reshuffle_before_load = not util.is_compat_version_prior_to( + p.options, "2.65.0") + if self.reshuffle_before_load: + # Ensure that TriggerLoadJob retry inputs are deterministic by breaking Review Comment: > Thinking about it more, does Reshuffle force determinism by grouping by unique id's? Reshuffle checkpoints on Dataflow but not other runners. Otherwise any randomly generated ids in a fused stage will be generated again on retry. Hence this behavior getting the name `RequiresStableInput` which is the "right" way to express this (but unfortunately I don't think complete enough to use here). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org