kennknowles commented on code in PR #34657:
URL: https://github.com/apache/beam/pull/34657#discussion_r2052623420


##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -1101,6 +1101,18 @@ def _load_data(
          of the load jobs would fail but not other. If any of them fails, then
          copy jobs are not triggered.
     """
+    self.reshuffle_before_load = not util.is_compat_version_prior_to(

Review Comment:
   Structural comments:
   
    - This reshuffle should be added outside of this transform. Make it the 
responsibility of the caller to ensure stable inputs.
    - It is be nicer to fully fork `expand` whenever we move to a new 
update-incompatible version. Basically freeze the old one and leave it behind 
(like 
https://github.com/apache/beam/blob/32620186a0029106b1396560257d68663891021d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java#L87).
 You still get bugfixes applied to helper transforms, but keep `expand` itself 
straight-line code except for the top-level decision of which version to expand.



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py:
##########
@@ -478,6 +478,44 @@ def test_records_traverse_transform_with_mocks(self):
 
       assert_that(jobs, equal_to([job_reference]), label='CheckJobs')
 
+  @parameterized.expand([
+      param(compat_version=None),
+      param(compat_version="2.64.0"),
+  ])
+  def test_reshuffle_before_load(self, compat_version):

Review Comment:
   I don't think this really tests what you want to test. You want:
   
   - correctness of the transform with both settings of the flag
   - if possible, some way to reproduce the issue you had before that is red 
before, green after
   
   You also want to build an internal dataflow test of update compatibility 
with requested 2.64.0 version. I can show you a CL that does that if you 
haven't already seen them.



##########
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py:
##########
@@ -1101,6 +1101,18 @@ def _load_data(
          of the load jobs would fail but not other. If any of them fails, then
          copy jobs are not triggered.
     """
+    self.reshuffle_before_load = not util.is_compat_version_prior_to(
+        p.options, "2.65.0")
+    if self.reshuffle_before_load:
+      # Ensure that TriggerLoadJob retry inputs are deterministic by breaking

Review Comment:
   > Thinking about it more, does Reshuffle force determinism by grouping by 
unique id's?
   
   Reshuffle checkpoints on Dataflow but not other runners. Otherwise any 
randomly generated ids in a fused stage will be generated again on retry. Hence 
this behavior getting the name `RequiresStableInput` which is the "right" way 
to express this (but unfortunately I don't think complete enough to use here).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to