This is an automated email from the ASF dual-hosted git repository.
ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ea982127b60 Override BQ load job location when necessary (#31986)
ea982127b60 is described below
commit ea982127b60545164e0e280eb0d4140f35ae3156
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Wed Aug 7 22:48:03 2024 -0400
Override BQ load job location when necessary (#31986)
---
sdks/python/apache_beam/io/gcp/bigquery_file_loads.py | 18 +++++++++++++++++-
.../apache_beam/io/gcp/bigquery_file_loads_test.py | 10 ++++++++++
sdks/python/apache_beam/io/gcp/bigquery_tools.py | 8 ++++++--
3 files changed, 33 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index e1a4af31f1c..3203c21a8e6 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -765,10 +765,26 @@ class TriggerLoadJobs(beam.DoFn):
GlobalWindows.windowed_value((destination, job_reference)))
def finish_bundle(self):
+ dataset_locations = {}
+
for windowed_value in self.pending_jobs:
+ table_ref = bigquery_tools.parse_table_reference(windowed_value.value[0])
+ project_dataset = (table_ref.projectId, table_ref.datasetId)
+
job_ref = windowed_value.value[1]
+ # In some cases (e.g. when the load job op returns a 409 ALREADY_EXISTS),
+ # the returned job reference may not include a location. In such cases,
+ # we need to override with the dataset's location.
+ job_location = job_ref.location
+ if not job_location and project_dataset not in dataset_locations:
+ job_location = self.bq_wrapper.get_table_location(
+ table_ref.projectId, table_ref.datasetId, table_ref.tableId)
+ dataset_locations[project_dataset] = job_location
+
self.bq_wrapper.wait_for_bq_job(
- job_ref, sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS)
+ job_ref,
+ sleep_duration_sec=_SLEEP_DURATION_BETWEEN_POLLS,
+ location=job_location)
return self.pending_jobs
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index 0605206714e..f27c7899f9f 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -426,6 +426,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
+ job_reference.location = 'US'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference
@@ -481,6 +482,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'loadJobProject'
job_reference.jobId = 'job_name1'
+ job_reference.location = 'US'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference
@@ -515,6 +517,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'loadJobProject'
job_reference.jobId = 'job_name1'
+ job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference
@@ -567,10 +570,12 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
job_1.jobReference = bigquery_api.JobReference()
job_1.jobReference.projectId = 'project1'
job_1.jobReference.jobId = 'jobId1'
+ job_1.jobReference.location = 'US'
job_2 = bigquery_api.Job()
job_2.jobReference = bigquery_api.JobReference()
job_2.jobReference.projectId = 'project1'
job_2.jobReference.jobId = 'jobId2'
+ job_2.jobReference.location = 'US'
job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
@@ -610,10 +615,12 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
job_1.jobReference = bigquery_api.JobReference()
job_1.jobReference.projectId = 'project1'
job_1.jobReference.jobId = 'jobId1'
+ job_1.jobReference.location = 'US'
job_2 = bigquery_api.Job()
job_2.jobReference = bigquery_api.JobReference()
job_2.jobReference.projectId = 'project1'
job_2.jobReference.jobId = 'jobId2'
+ job_2.jobReference.location = 'US'
job_1_waiting = mock.Mock()
job_1_waiting.status.state = 'RUNNING'
@@ -650,6 +657,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
+ job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference
@@ -732,6 +740,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
+ job_reference.location = 'US'
result_job = mock.Mock()
result_job.jobReference = job_reference
@@ -774,6 +783,7 @@ class TestBigQueryFileLoads(_TestCaseWithTempDirCleanUp):
job_reference = bigquery_api.JobReference()
job_reference.projectId = 'project1'
job_reference.jobId = 'job_name1'
+ job_reference.location = 'US'
result_job = bigquery_api.Job()
result_job.jobReference = job_reference
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
index a92f30ec35c..c7128e7899e 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_tools.py
@@ -631,7 +631,8 @@ class BigQueryWrapper(object):
return self._start_job(request)
- def wait_for_bq_job(self, job_reference, sleep_duration_sec=5,
max_retries=0):
+ def wait_for_bq_job(
+ self, job_reference, sleep_duration_sec=5, max_retries=0, location=None):
"""Poll job until it is DONE.
Args:
@@ -639,6 +640,7 @@ class BigQueryWrapper(object):
sleep_duration_sec: Specifies the delay in seconds between retries.
max_retries: The total number of times to retry. If equals to 0,
the function waits forever.
+ location: Fall back on this location if job_reference doesn't have one.
Raises:
`RuntimeError`: If the job is FAILED or the number of retries has been
@@ -648,7 +650,9 @@ class BigQueryWrapper(object):
while True:
retry += 1
job = self.get_job(
- job_reference.projectId, job_reference.jobId, job_reference.location)
+ job_reference.projectId,
+ job_reference.jobId,
+ job_reference.location or location)
_LOGGER.info('Job %s status: %s', job.id, job.status.state)
if job.status.state == 'DONE' and job.status.errorResult:
raise RuntimeError(