This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new c29eeb18ec Fix behavior for reattach_state parameter in
BigQueryInsertJobOperator (#40664)
c29eeb18ec is described below
commit c29eeb18ec0f4ba260775f71968e5d75291b208f
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Jul 10 12:59:58 2024 +0200
Fix behavior for reattach_state parameter in BigQueryInsertJobOperator
(#40664)
---
.../providers/google/cloud/operators/bigquery.py | 19 +++++++----
.../google/cloud/operators/test_bigquery.py | 39 ++++++++++++++++++++--
2 files changed, 50 insertions(+), 8 deletions(-)
diff --git a/airflow/providers/google/cloud/operators/bigquery.py
b/airflow/providers/google/cloud/operators/bigquery.py
index 43131c549a..d55651d06b 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2955,6 +2955,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator,
_BigQueryOpenLineageMix
try:
self.log.info("Executing: %s'", self.configuration)
+ # Create a job
job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id)
except Conflict:
# If the job already exists retrieve it
@@ -2963,18 +2964,24 @@ class
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
location=self.location,
job_id=self.job_id,
)
- if job.state in self.reattach_states:
- # We are reattaching to a job
- job._begin()
- self._handle_job_error(job)
- else:
- # Same job configuration so we need force_rerun
+
+ if job.state not in self.reattach_states:
+ # Same job configuration, so we need force_rerun
raise AirflowException(
f"Job with id: {self.job_id} already exists and is in
{job.state} state. If you "
f"want to force rerun it consider setting
`force_rerun=True`."
f"Or, if you want to reattach in this scenario add
{job.state} to `reattach_states`"
)
+ else:
+ # Job already reached state DONE
+ if job.state == "DONE":
+ raise AirflowException("Job is already in state DONE. Can
not reattach to this job.")
+
+ # We are reattaching to a job
+ self.log.info("Reattaching to existing Job in state %s",
job.state)
+ self._handle_job_error(job)
+
job_types = {
LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"],
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py
b/tests/providers/google/cloud/operators/test_bigquery.py
index d49e75c950..346c50382d 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1395,7 +1395,7 @@ class TestBigQueryInsertJobOperator:
job = MagicMock(
job_id=real_job_id,
error_result=False,
- state="PENDING",
+ state="RUNNING",
done=lambda: False,
)
mock_hook.return_value.get_job.return_value = job
@@ -1407,7 +1407,7 @@ class TestBigQueryInsertJobOperator:
location=TEST_DATASET_LOCATION,
job_id=job_id,
project_id=TEST_GCP_PROJECT_ID,
- reattach_states={"PENDING"},
+ reattach_states={"PENDING", "RUNNING"},
)
result = op.execute(context=MagicMock())
@@ -1424,6 +1424,41 @@ class TestBigQueryInsertJobOperator:
assert result == real_job_id
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+ def test_execute_reattach_to_done_state(self, mock_hook):
+ job_id = "123456"
+ hash_ = "hash"
+ real_job_id = f"{job_id}_{hash_}"
+
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ }
+ }
+
+ mock_hook.return_value.insert_job.side_effect = Conflict("any")
+ job = MagicMock(
+ job_id=real_job_id,
+ error_result=False,
+ state="DONE",
+ done=lambda: False,
+ )
+ mock_hook.return_value.get_job.return_value = job
+ mock_hook.return_value.generate_job_id.return_value = real_job_id
+
+ op = BigQueryInsertJobOperator(
+ task_id="insert_query_job",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ job_id=job_id,
+ project_id=TEST_GCP_PROJECT_ID,
+ reattach_states={"PENDING"},
+ )
+ with pytest.raises(AirflowException):
+ # Not possible to reattach to any state if job is already DONE
+ op.execute(context=MagicMock())
+
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute_force_rerun(self, mock_hook):
job_id = "123456"