This is an automated email from the ASF dual-hosted git repository. pankaj pushed a commit to branch state_check_bq in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 8380b367bd9a9e5b7297bb1f638b18f890e8a85f Author: pankajastro <[email protected]> AuthorDate: Fri Nov 22 13:03:56 2024 +0530 BigQueryInsertJobOperator: log transient error and reload job property before marking task as failed or success --- .../src/airflow/providers/google/cloud/operators/bigquery.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/providers/src/airflow/providers/google/cloud/operators/bigquery.py b/providers/src/airflow/providers/google/cloud/operators/bigquery.py index db5566f769b..156df2c0586 100644 --- a/providers/src/airflow/providers/google/cloud/operators/bigquery.py +++ b/providers/src/airflow/providers/google/cloud/operators/bigquery.py @@ -2592,8 +2592,15 @@ class BigQueryInsertJobOperator(GoogleCloudBaseOperator, _BigQueryOpenLineageMix nowait=True, ) - @staticmethod - def _handle_job_error(job: BigQueryJob | UnknownJob) -> None: + def _handle_job_error(self, job: BigQueryJob | UnknownJob) -> None: + self.log.debug("Job %s is completed. checking the job status", self.job_id) + # I've noticed that sometimes BigQuery jobs transiently report the wrong status, causing the Airflow job to be incorrectly marked as successful. + # To avoid this, we refresh the job properties before checking the final state and handling any errors. + while job.state != 'DONE': + job.reload(timeout=self.result_timeout, retry=self.result_retry) + # Log any transient errors encountered during the job execution + for error in job.errors: + self.log.error("BigQuery Job Error:", error) if job.error_result: raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}")
