SameerMesiah97 opened a new issue, #61467:
URL: https://github.com/apache/airflow/issues/61467

   ### Apache Airflow Provider(s)
   
   dbt-cloud
   
   ### Versions of Apache Airflow Providers
   
   `apache-airflow-providers-dbt-cloud==4.6.4`
   
   ### Apache Airflow version
   
   main
   
   ### Operating System
   
   Debian GNU/Linux 12 (bookworm)
   
   ### Deployment
   
   Other
   
   ### Deployment details
   
   _No response_
   
   ### What happened
   
   When using `DbtCloudRunJobOperator` in **deferrable mode** with 
`execution_timeout` set, the Airflow task times out but the underlying dbt 
Cloud job continues running.
   
   In non-deferrable mode, exceeding `execution_timeout` causes both the task 
to fail and the dbt Cloud job to be cancelled. In deferrable mode, the task 
fails due to timeout, but the external job is not stopped, which can result in 
long-running or orphaned dbt Cloud jobs.
   
   This creates inconsistent behavior between deferrable and non-deferrable 
execution modes.
   
   ### What you think should happen instead
   
   `execution_timeout` should be enforced consistently regardless of execution 
mode.
   
   When a deferrable `DbtCloudRunJobOperator` exceeds `execution_timeout`:
   
   * The Airflow task should fail due to execution timeout
   * The associated dbt Cloud job should be cancelled
   
   This ensures predictable timeout behavior and prevents leaked dbt Cloud jobs.
   
   ### How to reproduce
   
   1. Configure a dbt Cloud connection in Airflow.
   2. Create a dbt Cloud job that runs longer than 30 seconds (the 
`execution_timeout` used in this reproduction).
   3. Create the following DAG (replace <JOB_ID> wiht the ID of your DBT Cloud 
Job). 
   
   ```python
   from airflow import DAG
   from airflow.providers.dbt.cloud.operators.dbt import DbtCloudRunJobOperator
   from datetime import datetime, timedelta
   
   with DAG(
       dag_id="dbt_deferrable_execution_timeout_repro",
       start_date=datetime(2024, 1, 1),
       schedule=None,
       catchup=False,
   ) as dag:
   
       run_dbt = DbtCloudRunJobOperator(
           task_id="run_dbt",
           job_id=<JOB_ID>,
           wait_for_termination=True,
           deferrable=True,
           execution_timeout=timedelta(seconds=30),
       )
   ```
   
   4. Trigger the DAG and wait for the task to exceed `execution_timeout`.
   
   **Observed Behavior**
   
   * The Airflow task fails due to execution timeout.
   * The dbt Cloud job continues running and is not cancelled.
   
   ### Anything else
   
   This inconsistency makes `execution_timeout` unreliable for deferrable dbt 
Cloud jobs and can lead to unintended dbt Cloud resource usage. While 
deferrable operators execute via the triggerer rather than a worker process, 
this does not preclude enforcing `execution_timeout ` semantics. The 
expectation is that task-level timeout behavior remains consistent across 
execution modes, even if the underlying enforcement mechanism differs..
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to