This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ea982127b60 Override BQ load job location when necessary (#31986)
ea982127b60 is described below

commit ea982127b60545164e0e280eb0d4140f35ae3156
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Aug 7 22:48:03 2024 -0400

    Override BQ load job location when necessary (#31986)
---
 sdks/python/apache_beam/io/gcp/bigquery_file_loads.py  | 18 +++++++++++++++++-
 .../apache_beam/io/gcp/bigquery_file_loads_test.py     | 10 ++++++++++
 sdks/python/apache_beam/io/gcp/bigquery_tools.py       |  8 ++++++--
 3 files changed, 33 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index e1a4af31f1c..3203c21a8e6 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -765,10 +765,26 @@ class TriggerLoadJobs(beam.DoFn):
         GlobalWindows.windowed_value((destination, job_reference)))
 
   def finish_bundle(self):
+    dataset_locations = {}
+
     for windowed_value in self.pending_jobs:
+      table_ref = bigquery_tools.parse_table_reference(windowed_value.value[0])
+      project_dataset = (table_ref.projectId, table_ref.datasetId)
+
       job_ref = windowed_value.value[1]
+      # In some cases (e.g. when the load job op returns a 409 ALREADY_EXISTS),
+      # the returned job reference may not include a location. In such cases,
+      # we need to override with the dataset's location.
+      job_location = job_ref.location
+      if not job_location and project_dataset not in dataset_locations:
+        job_location = self.bq_wrapper.get_table_location(
+            table_ref.projectId, table_ref.datasetId, table_ref.tableId)
+        dataset_locations[project_dataset] = job_location
+
       self.bq_wrapper.wait_for_bq_job(
-          job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
+          job_ref,
+          sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS,
+          location=job_location)
     return self.pending_jobs
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 0605206714e..f27c7899f9f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -426,6 +426,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     job_reference = bigquery_api.JobReference()
     job_reference.projectId = 'project1'
     job_reference.jobId = 'job_name1'
+    job_reference.location = 'US'
     result_job = bigquery_api.Job()
     result_job.jobReference = job_reference
 
@@ -481,6 +482,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     job_reference = bigquery_api.JobReference()
     job_reference.projectId = 'loadJobProject'
     job_reference.jobId = 'job_name1'
+    job_reference.location = 'US'
 
     result_job = bigquery_api.Job()
     result_job.jobReference = job_reference
@@ -515,6 +517,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     job_reference = bigquery_api.JobReference()
     job_reference.projectId = 'loadJobProject'
     job_reference.jobId = 'job_name1'
+    job_reference.location = 'US'
     result_job = mock.Mock()
     result_job.jobReference = job_reference
 
@@ -567,10 +570,12 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     job_1.jobReference = bigquery_api.JobReference()
     job_1.jobReference.projectId = 'project1'
     job_1.jobReference.jobId = 'jobId1'
+    job_1.jobReference.location = 'US'
     job_2 = bigquery_api.Job()
     job_2.jobReference = bigquery_api.JobReference()
     job_2.jobReference.projectId = 'project1'
     job_2.jobReference.jobId = 'jobId2'
+    job_2.jobReference.location = 'US'
 
     job_1_waiting = mock.Mock()
     job_1_waiting.status.state = 'RUNNING'
@@ -610,10 +615,12 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     job_1.jobReference = bigquery_api.JobReference()
     job_1.jobReference.projectId = 'project1'
     job_1.jobReference.jobId = 'jobId1'
+    job_1.jobReference.location = 'US'
     job_2 = bigquery_api.Job()
     job_2.jobReference = bigquery_api.JobReference()
     job_2.jobReference.projectId = 'project1'
     job_2.jobReference.jobId = 'jobId2'
+    job_2.jobReference.location = 'US'
 
     job_1_waiting = mock.Mock()
     job_1_waiting.status.state = 'RUNNING'
@@ -650,6 +657,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     job_reference = bigquery_api.JobReference()
     job_reference.projectId = 'project1'
     job_reference.jobId = 'job_name1'
+    job_reference.location = 'US'
     result_job = mock.Mock()
     result_job.jobReference = job_reference
 
@@ -732,6 +740,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     job_reference = bigquery_api.JobReference()
     job_reference.projectId = 'project1'
     job_reference.jobId = 'job_name1'
+    job_reference.location = 'US'
     result_job = mock.Mock()
     result_job.jobReference = job_reference
 
@@ -774,6 +783,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
     job_reference = bigquery_api.JobReference()
     job_reference.projectId = 'project1'
     job_reference.jobId = 'job_name1'
+    job_reference.location = 'US'
     result_job = bigquery_api.Job()
     result_job.jobReference = job_reference
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py 
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index a92f30ec35c..c7128e7899e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -631,7 +631,8 @@ class BigQueryWrapper(object):
 
     return self._start_job(request)
 
-  def wait_for_bq_job(self, job_reference, sleep_duration_sec=5, 
max_retries=0):
+  def wait_for_bq_job(
+      self, job_reference, sleep_duration_sec=5, max_retries=0, location=None):
     """Poll job until it is DONE.
 
     Args:
@@ -639,6 +640,7 @@ class BigQueryWrapper(object):
       sleep_duration_sec: Specifies the delay in seconds between retries.
       max_retries: The total number of times to retry. If equals to 0,
         the function waits forever.
+      location: Fall back on this location if job_reference doesn't have one.
 
     Raises:
       `RuntimeError`: If the job is FAILED or the number of retries has been
@@ -648,7 +650,9 @@ class BigQueryWrapper(object):
     while True:
       retry += 1
       job = self.get_job(
-          job_reference.projectId, job_reference.jobId, job_reference.location)
+          job_reference.projectId,
+          job_reference.jobId,
+          job_reference.location or location)
       _LOGGER.info('Job %s status: %s', job.id, job.status.state)
       if job.status.state == 'DONE' and job.status.errorResult:
         raise RuntimeError(

Reply via email to