kacpermuda opened a new issue, #37885: URL: https://github.com/apache/airflow/issues/37885
### Apache Airflow Provider(s) google ### Versions of Apache Airflow Providers >=10.2.0 ### Apache Airflow version main branch ### Operating System MacOS ### Deployment Other ### Deployment details BREEZE ### What happened [This PR](https://github.com/apache/airflow/pull/31758) optimized the defferable mode in `BigQueryCheckOperator` and introduced this bug. When the operator is in defferable mode, but the job finishes quickly enough and is not deffered (it does not fall into `if job.running()` condition), no error is raised when the job fails and no value check is performed. Similar optimization has been made [here](https://github.com/apache/airflow/pull/31872/files) and then was fixed [here](https://github.com/apache/airflow/pull/34018). ### What you think should happen instead Operator should raise an error when job fails and check te value returned by the job, even without being deffered. I think something like this similar to what was made in `BigQueryValueCheckOperator ` should be enough: ``` if job.running(): self.defer( timeout=self.execution_timeout, trigger=BigQueryCheckTrigger( conn_id=self.gcp_conn_id, job_id=job.job_id, project_id=hook.project_id, location=self.location or hook.location, poll_interval=self.poll_interval, impersonation_chain=self.impersonation_chain, ), method_name="execute_complete", ) self._handle_job_error(job) # job.result() returns a RowIterator. Mypy expects an instance of SupportsNext[Any] for # the next() call which the RowIterator does not resemble to. Hence, ignore the arg-type error. records = next(job.result()) # type: ignore[arg-type] self._validate_records(records) # type: ignore[attr-defined] self.log.info("Current state of job %s is %s", job.job_id, job.state) @staticmethod def _handle_job_error(job: BigQueryJob | UnknownJob) -> None: if job.error_result: raise AirflowException(f"BigQuery job {job.job_id} failed: {job.error_result}") def _validate_records(self, records) -> None: if not records: raise AirflowException("The query returned empty results") elif not all(records): self._raise_exception( # type: ignore[attr-defined] f"Test failed.\nQuery:\n{self.sql}\nResults:\n{records!s}" ) ``` ### How to reproduce Run a really quick query in defferable mode, if it fails no error will be raised, if not, the values will not be checked. ### Anything else I'll make a PR in free time, but maybe somebody will pick it up by then. Also, there are no tests checking that operator (both CheckOperator and CheckValueOperator) raised an error, when in defferable mode but not deffered, so something like this should be added: ``` @pytest.mark.db_test @mock.patch("airflow.providers.google.cloud.operators.bigquery.BigQueryHook") def test_bigquery_value_check_operator_async_finish_with_error_before_deferred( self, mock_hook, create_task_instance_of_operator ): job_id = "123456" hash_ = "hash" real_job_id = f"{job_id}_{hash_}" mock_hook.return_value.insert_job.return_value = MagicMock(job_id=real_job_id, error_result=True) mock_hook.return_value.insert_job.return_value.running.return_value = False ti = create_task_instance_of_operator( BigQueryValueCheckOperator, dag_id="dag_id", task_id="check_value", sql="SELECT COUNT(*) FROM Any", pass_value=2, use_legacy_sql=True, deferrable=True, ) with pytest.raises(AirflowException) as exc: ti.task.execute(MagicMock()) assert str(exc.value) == f"BigQuery job {real_job_id} failed: True" ``` ### Are you willing to submit PR? - [X] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
