This is an automated email from the ASF dual-hosted git repository.

pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d2eed7a0d60 BigQueryInsertJobOperator: log transient error and check 
job state before marking task as success (#44279)
d2eed7a0d60 is described below

commit d2eed7a0d6008890135c747eda8656581030f8ba
Author: Pankaj Singh <[email protected]>
AuthorDate: Sun Dec 1 15:19:38 2024 +0530

    BigQueryInsertJobOperator: log transient error and check job state before 
marking task as success (#44279)
    
    * BigQueryInsertJobOperator: log transient error and check job state before 
marking task as success
---
 .../providers/google/cloud/operators/bigquery.py   | 10 ++-
 .../tests/google/cloud/operators/test_bigquery.py  | 91 +++++++++++++++++++---
 2 files changed, 88 insertions(+), 13 deletions(-)

diff --git a/providers/src/airflow/providers/google/cloud/operators/bigquery.py 
b/providers/src/airflow/providers/google/cloud/operators/bigquery.py
index a46a33a9108..ee4d8fbc5b1 100644
--- a/providers/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -2593,10 +2593,16 @@ class 
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
             nowait=True,
         )
 
-    @staticmethod
-    def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
+    def _handle_job_error(self, job: BigQueryJob | UnknownJob) -> None:
+        self.log.info("Job %s is completed. Checking the job status", 
self.job_id)
+        # Log any transient errors encountered during the job execution
+        for error in job.errors or []:
+            self.log.error("BigQuery Job Error: %s", error)
         if job.error_result:
             raise AirflowException(f"BigQuery job {job.job_id} failed: 
{job.error_result}")
+        # Check the final state.
+        if job.state != "DONE":
+            raise AirflowException(f"Job failed with state: {job.state}")
 
     def execute(self, context: Any):
         hook = BigQueryHook(
diff --git a/providers/tests/google/cloud/operators/test_bigquery.py 
b/providers/tests/google/cloud/operators/test_bigquery.py
index 1088f8489b4..26aa18ae52f 100644
--- a/providers/tests/google/cloud/operators/test_bigquery.py
+++ b/providers/tests/google/cloud/operators/test_bigquery.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import json
+import logging
 import os
 from contextlib import suppress
 from unittest import mock
@@ -105,6 +106,15 @@ TEST_FULL_JOB_ID = 
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_
 TEST_FULL_JOB_ID_2 = 
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"
 
 
+def create_bigquery_job(errors=None, error_result=None, state="DONE"):
+    mock_job = MagicMock()
+    mock_job.errors = errors or []
+    mock_job.error_result = error_result
+    mock_job.state = state
+    mock_job.job_id = "mock-job-id"
+    return mock_job
+
+
 class TestBigQueryCreateEmptyTableOperator:
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_execute(self, mock_hook):
@@ -823,7 +833,9 @@ class TestBigQueryInsertJobOperator:
                 "useLegacySql": False,
             }
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=False
+        )
         mock_hook.return_value.generate_job_id.return_value = real_job_id
 
         op = BigQueryInsertJobOperator(
@@ -864,7 +876,9 @@ class TestBigQueryInsertJobOperator:
             "configuration": configuration,
             "jobReference": "a",
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=False
+        )
         mock_hook.return_value.generate_job_id.return_value = real_job_id
         
mock_hook.return_value.insert_job.return_value.to_api_repr.return_value = 
mock_configuration
 
@@ -902,7 +916,9 @@ class TestBigQueryInsertJobOperator:
                 "useLegacySql": False,
             }
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=False
+        )
         mock_hook.return_value.generate_job_id.return_value = real_job_id
 
         op = BigQueryInsertJobOperator(
@@ -942,6 +958,7 @@ class TestBigQueryInsertJobOperator:
 
         mock_job.job_id = real_job_id
         mock_job.error_result = False
+        mock_job.state = "DONE"
         mock_job.result.side_effect = AirflowTaskTimeout()
 
         mock_hook.return_value.insert_job.return_value = mock_job
@@ -977,7 +994,9 @@ class TestBigQueryInsertJobOperator:
                 "useLegacySql": False,
             }
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=True)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=True
+        )
         mock_hook.return_value.generate_job_id.return_value = real_job_id
 
         op = BigQueryInsertJobOperator(
@@ -990,8 +1009,11 @@ class TestBigQueryInsertJobOperator:
         with pytest.raises(AirflowException):
             op.execute(context=MagicMock())
 
+    @mock.patch(
+        
"airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator._handle_job_error"
+    )
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
-    def test_execute_reattach(self, mock_hook):
+    def test_execute_reattach(self, mock_hook, _handle_job_error):
         job_id = "123456"
         hash_ = "hash"
         real_job_id = f"{job_id}_{hash_}"
@@ -1085,6 +1107,7 @@ class TestBigQueryInsertJobOperator:
         }
 
         job = MagicMock(
+            state="DONE",
             job_id=real_job_id,
             error_result=False,
         )
@@ -1161,7 +1184,9 @@ class TestBigQueryInsertJobOperator:
                 "useLegacySql": False,
             }
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=False
+        )
         mock_hook.return_value.insert_job.return_value.running.return_value = 
False
 
         op = BigQueryInsertJobOperator(
@@ -1190,7 +1215,9 @@ class TestBigQueryInsertJobOperator:
                 "useLegacySql": False,
             }
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=True)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=True
+        )
         mock_hook.return_value.insert_job.return_value.running.return_value = 
False
 
         op = BigQueryInsertJobOperator(
@@ -1365,9 +1392,12 @@ class TestBigQueryInsertJobOperator:
         assert operator.job_id == job_id
 
     @pytest.mark.db_test
+    @mock.patch(
+        
"airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator._handle_job_error"
+    )
     
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
     def test_bigquery_insert_job_operator_with_job_id_generate(
-        self, mock_hook, create_task_instance_of_operator
+        self, mock_hook, _handle_job_error, create_task_instance_of_operator
     ):
         job_id = "123456"
         hash_ = "hash"
@@ -1425,7 +1455,9 @@ class TestBigQueryInsertJobOperator:
                 "useLegacySql": False,
             }
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=False
+        )
         mock_hook.return_value.generate_job_id.return_value = real_job_id
 
         op = BigQueryInsertJobOperator(
@@ -1559,7 +1591,9 @@ class TestBigQueryInsertJobOperator:
             },
             "labels": {"foo": "bar"},
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=False
+        )
         mock_hook.return_value.generate_job_id.return_value = real_job_id
 
         op = BigQueryInsertJobOperator(
@@ -1589,7 +1623,9 @@ class TestBigQueryInsertJobOperator:
             },
             "labels": None,
         }
-        mock_hook.return_value.insert_job.return_value = 
MagicMock(job_id=real_job_id, error_result=False)
+        mock_hook.return_value.insert_job.return_value = MagicMock(
+            state="DONE", job_id=real_job_id, error_result=False
+        )
         mock_hook.return_value.generate_job_id.return_value = real_job_id
 
         op = BigQueryInsertJobOperator(
@@ -1734,6 +1770,39 @@ class TestBigQueryInsertJobOperator:
         op._add_job_labels()
         assert "labels" not in configuration
 
+    def test_handle_job_error_raises_on_error_result_or_error(self, caplog):
+        caplog.set_level(logging.ERROR)
+        configuration = {
+            "query": {
+                "query": "SELECT * FROM any",
+                "useLegacySql": False,
+            },
+        }
+        op = BigQueryInsertJobOperator(
+            task_id="task.with.dots.is.allowed",
+            configuration=configuration,
+            location=TEST_DATASET_LOCATION,
+            project_id=TEST_GCP_PROJECT_ID,
+            job_id="12345",
+        )
+        # Test error_result
+        job_with_error_result = create_bigquery_job(error_result="Job failed 
due to some issue")
+        with pytest.raises(
+            AirflowException, match="BigQuery job mock-job-id failed: Job 
failed due to some issue"
+        ):
+            op._handle_job_error(job_with_error_result)
+
+        # Test errors
+        job_with_error = create_bigquery_job(errors=["Some transient error"])
+        op._handle_job_error(job_with_error)
+
+        assert "Some transient error" in caplog.text
+
+        # Test empty error object
+        job_empty_error = create_bigquery_job(state="RUNNING")
+        with pytest.raises(AirflowException, match="Job failed with state: 
RUNNING"):
+            op._handle_job_error(job_empty_error)
+
 
 class TestBigQueryIntervalCheckOperator:
     def test_bigquery_interval_check_operator_execute_complete(self):

Reply via email to