This is an automated email from the ASF dual-hosted git repository.
pankaj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d2eed7a0d60 BigQueryInsertJobOperator: log transient error and check
job state before marking task as success (#44279)
d2eed7a0d60 is described below
commit d2eed7a0d6008890135c747eda8656581030f8ba
Author: Pankaj Singh <[email protected]>
AuthorDate: Sun Dec 1 15:19:38 2024 +0530
BigQueryInsertJobOperator: log transient error and check job state before
marking task as success (#44279)
* BigQueryInsertJobOperator: log transient error and check job state before
marking task as success
---
.../providers/google/cloud/operators/bigquery.py | 10 ++-
.../tests/google/cloud/operators/test_bigquery.py | 91 +++++++++++++++++++---
2 files changed, 88 insertions(+), 13 deletions(-)
diff --git a/providers/src/airflow/providers/google/cloud/operators/bigquery.py
b/providers/src/airflow/providers/google/cloud/operators/bigquery.py
index a46a33a9108..ee4d8fbc5b1 100644
--- a/providers/src/airflow/providers/google/cloud/operators/bigquery.py
+++ b/providers/src/airflow/providers/google/cloud/operators/bigquery.py
@@ -2593,10 +2593,16 @@ class
BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix
nowait=True,
)
- @staticmethod
- def _handle_job_error(job: BigQueryJob | UnknownJob) -> None:
+ def _handle_job_error(self, job: BigQueryJob | UnknownJob) -> None:
+ self.log.info("Job %s is completed. Checking the job status",
self.job_id)
+ # Log any transient errors encountered during the job execution
+ for error in job.errors or []:
+ self.log.error("BigQuery Job Error: %s", error)
if job.error_result:
raise AirflowException(f"BigQuery job {job.job_id} failed:
{job.error_result}")
+ # Check the final state.
+ if job.state != "DONE":
+ raise AirflowException(f"Job failed with state: {job.state}")
def execute(self, context: Any):
hook = BigQueryHook(
diff --git a/providers/tests/google/cloud/operators/test_bigquery.py
b/providers/tests/google/cloud/operators/test_bigquery.py
index 1088f8489b4..26aa18ae52f 100644
--- a/providers/tests/google/cloud/operators/test_bigquery.py
+++ b/providers/tests/google/cloud/operators/test_bigquery.py
@@ -18,6 +18,7 @@
from __future__ import annotations
import json
+import logging
import os
from contextlib import suppress
from unittest import mock
@@ -105,6 +106,15 @@ TEST_FULL_JOB_ID =
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_
TEST_FULL_JOB_ID_2 =
f"{TEST_GCP_PROJECT_ID}:{TEST_DATASET_LOCATION}:{TEST_JOB_ID_2}"
+def create_bigquery_job(errors=None, error_result=None, state="DONE"):
+ mock_job = MagicMock()
+ mock_job.errors = errors or []
+ mock_job.error_result = error_result
+ mock_job.state = state
+ mock_job.job_id = "mock-job-id"
+ return mock_job
+
+
class TestBigQueryCreateEmptyTableOperator:
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_execute(self, mock_hook):
@@ -823,7 +833,9 @@ class TestBigQueryInsertJobOperator:
"useLegacySql": False,
}
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=False
+ )
mock_hook.return_value.generate_job_id.return_value = real_job_id
op = BigQueryInsertJobOperator(
@@ -864,7 +876,9 @@ class TestBigQueryInsertJobOperator:
"configuration": configuration,
"jobReference": "a",
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=False
+ )
mock_hook.return_value.generate_job_id.return_value = real_job_id
mock_hook.return_value.insert_job.return_value.to_api_repr.return_value =
mock_configuration
@@ -902,7 +916,9 @@ class TestBigQueryInsertJobOperator:
"useLegacySql": False,
}
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=False
+ )
mock_hook.return_value.generate_job_id.return_value = real_job_id
op = BigQueryInsertJobOperator(
@@ -942,6 +958,7 @@ class TestBigQueryInsertJobOperator:
mock_job.job_id = real_job_id
mock_job.error_result = False
+ mock_job.state = "DONE"
mock_job.result.side_effect = AirflowTaskTimeout()
mock_hook.return_value.insert_job.return_value = mock_job
@@ -977,7 +994,9 @@ class TestBigQueryInsertJobOperator:
"useLegacySql": False,
}
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=True)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=True
+ )
mock_hook.return_value.generate_job_id.return_value = real_job_id
op = BigQueryInsertJobOperator(
@@ -990,8 +1009,11 @@ class TestBigQueryInsertJobOperator:
with pytest.raises(AirflowException):
op.execute(context=MagicMock())
+ @mock.patch(
+
"airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator._handle_job_error"
+ )
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
- def test_execute_reattach(self, mock_hook):
+ def test_execute_reattach(self, mock_hook, _handle_job_error):
job_id = "123456"
hash_ = "hash"
real_job_id = f"{job_id}_{hash_}"
@@ -1085,6 +1107,7 @@ class TestBigQueryInsertJobOperator:
}
job = MagicMock(
+ state="DONE",
job_id=real_job_id,
error_result=False,
)
@@ -1161,7 +1184,9 @@ class TestBigQueryInsertJobOperator:
"useLegacySql": False,
}
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=False
+ )
mock_hook.return_value.insert_job.return_value.running.return_value =
False
op = BigQueryInsertJobOperator(
@@ -1190,7 +1215,9 @@ class TestBigQueryInsertJobOperator:
"useLegacySql": False,
}
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=True)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=True
+ )
mock_hook.return_value.insert_job.return_value.running.return_value =
False
op = BigQueryInsertJobOperator(
@@ -1365,9 +1392,12 @@ class TestBigQueryInsertJobOperator:
assert operator.job_id == job_id
@pytest.mark.db_test
+ @mock.patch(
+
"airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator._handle_job_error"
+ )
@mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook")
def test_bigquery_insert_job_operator_with_job_id_generate(
- self, mock_hook, create_task_instance_of_operator
+ self, mock_hook, _handle_job_error, create_task_instance_of_operator
):
job_id = "123456"
hash_ = "hash"
@@ -1425,7 +1455,9 @@ class TestBigQueryInsertJobOperator:
"useLegacySql": False,
}
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=False
+ )
mock_hook.return_value.generate_job_id.return_value = real_job_id
op = BigQueryInsertJobOperator(
@@ -1559,7 +1591,9 @@ class TestBigQueryInsertJobOperator:
},
"labels": {"foo": "bar"},
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=False
+ )
mock_hook.return_value.generate_job_id.return_value = real_job_id
op = BigQueryInsertJobOperator(
@@ -1589,7 +1623,9 @@ class TestBigQueryInsertJobOperator:
},
"labels": None,
}
- mock_hook.return_value.insert_job.return_value =
MagicMock(job_id=real_job_id, error_result=False)
+ mock_hook.return_value.insert_job.return_value = MagicMock(
+ state="DONE", job_id=real_job_id, error_result=False
+ )
mock_hook.return_value.generate_job_id.return_value = real_job_id
op = BigQueryInsertJobOperator(
@@ -1734,6 +1770,39 @@ class TestBigQueryInsertJobOperator:
op._add_job_labels()
assert "labels" not in configuration
+ def test_handle_job_error_raises_on_error_result_or_error(self, caplog):
+ caplog.set_level(logging.ERROR)
+ configuration = {
+ "query": {
+ "query": "SELECT * FROM any",
+ "useLegacySql": False,
+ },
+ }
+ op = BigQueryInsertJobOperator(
+ task_id="task.with.dots.is.allowed",
+ configuration=configuration,
+ location=TEST_DATASET_LOCATION,
+ project_id=TEST_GCP_PROJECT_ID,
+ job_id="12345",
+ )
+ # Test error_result
+ job_with_error_result = create_bigquery_job(error_result="Job failed
due to some issue")
+ with pytest.raises(
+ AirflowException, match="BigQuery job mock-job-id failed: Job
failed due to some issue"
+ ):
+ op._handle_job_error(job_with_error_result)
+
+ # Test errors
+ job_with_error = create_bigquery_job(errors=["Some transient error"])
+ op._handle_job_error(job_with_error)
+
+ assert "Some transient error" in caplog.text
+
+ # Test empty error object
+ job_empty_error = create_bigquery_job(state="RUNNING")
+ with pytest.raises(AirflowException, match="Job failed with state:
RUNNING"):
+ op._handle_job_error(job_empty_error)
+
class TestBigQueryIntervalCheckOperator:
def test_bigquery_interval_check_operator_execute_complete(self):