This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c29eeb18ec Fix behavior for reattach_state parameter in 
BigQueryInsertJobOperator (#40664)
c29eeb18ec is described below

commit c29eeb18ec0f4ba260775f71968e5d75291b208f
Author: VladaZakharova <[email protected]>
AuthorDate: Wed Jul 10 12:59:58 2024 +0200

    Fix behavior for reattach_state parameter in BigQueryInsertJobOperator 
(#40664)
---
 .../providers/google/cloud/operators/bigquery.py   | 19 +++++++----
 .../google/cloud/operators/test_bigquery.py        | 39 ++++++++++++++++++++--
 2 files changed, 50 insertions(+), 8 deletions(-)

diff --git a/airflow/providers/google/cloud/operators/bigquery.py 
b/airflow/providers/google/cloud/operators/bigquery.py
index 43131c549a..d55651d06b 100644
--- a/airflow/providers/google/cloud/operators/bigquery.py
+++ b/airflow/providers/google/cloud/operators/bigquery.py
@@ -2955,6 +2955,7 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, 
_BigQueryOpenLineageMix
 
         try:
             self.log.info("Executing: %s'", self.configuration)
+            # Create a job
             job: BigQueryJob | UnknownJob = self._submit_job(hook, self.job_id)
         except Conflict:
             # If the job already exists retrieve it
@@ -2963,18 +2964,24 @@ class 
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
                 location=self.location,
                 job_id=self.job_id,
             )
-            if job.state in self.reattach_states:
-                # We are reattaching to a job
-                job._begin()
-                self._handle_job_error(job)
-            else:
-                # Same job configuration so we need force_rerun
+
+            if job.state not in self.reattach_states:
+                # Same job configuration, so we need force_rerun
                 raise AirflowException(
                     f"Job with id: {self.job_id} already exists and is in 
{job.state} state. If you "
                     f"want to force rerun it consider setting 
`force_rerun=True`."
                     f"Or, if you want to reattach in this scenario add 
{job.state} to `reattach_states`"
                 )
 
+            else:
+                # Job already reached state DONE
+                if job.state == "DONE":
+                    raise AirflowException("Job is already in state DONE. Can 
not reattach to this job.")
+
+                # We are reattaching to a job
+                self.log.info("Reattaching to existing Job in state %s", 
job.state)
+                self._handle_job_error(job)
+
         job_types = {
             LoadJob._JOB_TYPE: ["sourceTable", "destinationTable"],
             CopyJob._JOB_TYPE: ["sourceTable", "destinationTable"],
diff --git a/tests/providers/google/cloud/operators/test_bigquery.py 
b/tests/providers/google/cloud/operators/test_bigquery.py
index d49e75c950..346c50382d 100644
--- a/tests/providers/google/cloud/operators/test_bigquery.py
+++ b/tests/providers/google/cloud/operators/test_bigquery.py
@@ -1395,7 +1395,7 @@ class TestBigQueryInsertJobOperator:
         job = MagicMock(
             job_id=real_job_id,
             error_result=False,
-            state="PENDING",
+            state="RUNNING",
             done=lambda: False,
         )
         mock_hook.return_value.get_job.return_value = job
@@ -1407,7 +1407,7 @@ class TestBigQueryInsertJobOperator:
             location=TEST_DATASET_LOCATION,
             job_id=job_id,
             project_id=TEST_GCP_PROJECT_ID,
-            reattach_states={"PENDING"},
+            reattach_states={"PENDING", "RUNNING"},
         )
         result = op.execute(context=MagicMock())
 
@@ -1424,6 +1424,41 @@ class TestBigQueryInsertJobOperator:
 
         assert result == real_job_id
 
+    
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
+    def test_execute_reattach_to_done_state(self, mock_hook):
+        job_id = "123456"
+        hash_ = "hash"
+        real_job_id = f"{job_id}_{hash_}"
+
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            }
+        }
+
+        mock_hook.return_value.insert_job.side_effect = Conflict("any")
+        job = MagicMock(
+            job_id=real_job_id,
+            error_result=False,
+            state="DONE",
+            done=lambda: False,
+        )
+        mock_hook.return_value.get_job.return_value = job
+        mock_hook.return_value.generate_job_id.return_value = real_job_id
+
+        op = BigQueryInsertJobOperator(
+            task_id="insert_query_job",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            job_id=job_id,
+            project_id=TEST_GCP_PROJECT_ID,
+            reattach_states={"PENDING"},
+        )
+        with pytest.raises(AirflowException):
+            # Not possible to reattach to any state if job is already DONE
+            op.execute(context=MagicMock())
+
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute_force_rerun(self, mock_hook):
         job_id = "123456"

Reply via email to