SameerMesiah97 opened a new issue, #61297:
URL: https://github.com/apache/airflow/issues/61297
### Apache Airflow Provider(s)
dbt-cloud
### Versions of Apache Airflow Providers
`apache-airflow-providers-dbt-cloud>=4.6.4rc1`
### Apache Airflow version
main
### Operating System
Debian GNU/Linux 12 (bookworm)
### Deployment
Other
### Deployment details
_No response_
### What happened
When calling `DbtCloudHook.wait_for_job_run_status` directly from a task, a
dbt Cloud job that reaches a terminal failure state (`ERROR` or `CANCELLED`)
can cause the Airflow task to **succeed silently** instead of failing.
`wait_for_job_run_status` blocks until the job run completes. However, if
the job run reaches a terminal failure state before reaching the expected
status (default: `SUCCESS`), the method returns `False` instead of raising an
exception. In an Airflow context, where task success and failure are
exception-driven, this allows task execution to complete successfully even
though the external dbt Cloud job failed.
This behavior is surprising given the blocking nature of the method and its
default expectation of `SUCCESS`.
### What you think should happen instead
If `DbtCloudHook.wait_for_job_run_status` reaches an unexpected terminal
state (`ERROR` or `CANCELLED`) before reaching the expected status,
`wait_for_job_run_status` should raise `DbtCloudJobRunException`.
This ensures:
* Airflow task state correctly reflects dbt Cloud job outcome
* Terminal job failures are not silently suppressed
* Callers can safely use `wait_for_job_run_status` as a synchronization
primitive
* Behavior is consistent with Airflow’s exception-driven execution model
### How to reproduce
1. Create a dbt Cloud job that fails (for example, a job with invalid SQL).
2. Configure a dbt Cloud connection in Airflow.
(The connection ID `dbt_cloud_default` is used for this reproduction.)
3. Create the following DAG:
```python
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.providers.dbt.cloud.hooks.dbt import DbtCloudHook
@task
def wait_for_failed_dbt_job():
hook = DbtCloudHook(dbt_cloud_conn_id="dbt_cloud_default")
response = hook.trigger_job_run(
job_id=<FAILED_JOB_ID>,
cause="airflow repro",
)
run_id = response.json()["data"]["id"]
hook.wait_for_job_run_status(
run_id=run_id,
check_interval=10,
timeout=600,
)
with DAG(
dag_id="dbt_cloud_wait_for_job_run_status_silent_failure",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
):
wait_for_failed_dbt_job()
```
4. Trigger the DAG and wait for the dbt Cloud job to complete and reach a
terminal failure state (`ERROR` or `CANCELLED`).
**Observed Behavior**
The Airflow task completes successfully without raising an exception.
### Anything else
`wait_for_job_run_status` is part of the public dbt Cloud hook API and is
callable directly from user code. Given its blocking behavior and default
expectation of `SUCCESS`, it is reasonable for users to rely on this method as
a synchronization primitive and expect terminal job failures to raise
exceptions.
Requiring callers to explicitly inspect a boolean return value to detect
terminal failure states is error-prone in an Airflow context, where task
success and failure are exception-driven. Supporting safe direct usage of this
method helps prevent silent failures and aligns its behavior with common
Airflow execution patterns.
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]