pedeveaux opened a new issue, #51906:
URL: https://github.com/apache/airflow/issues/51906

   ### Apache Airflow Provider(s)
   
   standard
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-standard==1.2.0
   
   ### Apache Airflow version
   
   3.0.2
   
   ### Operating System
   
   PRETTY_NAME="Debian GNU/Linux 12 (bookworm)" NAME="Debian GNU/Linux" 
VERSION_ID="12" VERSION="12 (bookworm)" VERSION_CODENAME=bookworm
   
   ### Deployment
   
   Docker-Compose
   
   ### Deployment details
   
   Using the LocalExecutor in a docker-compose deployment. 
   There is an airflow-triggerer container running in the deployment. 
   
   ### What happened
   
   When running a DAG that triggers a dependent DAG via the 
`TriggerDagRunOperator` with `deferrable=True`, the triggering task never 
returns even though the dependent DAG has completed
   
   ### What you think should happen instead
   
   The triggering task should complete and the downstream tasks in the parent 
DAG should execute. 
   
   ### How to reproduce
   
   1. Create these two DAGs:
   Parent DAG:
   ```python
   import pendulum
   
   from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
   from airflow.sdk import dag, task
   
   
   @dag(
       schedule=None,
       start_date=pendulum.datetime(2025, 1, 1),
       catchup=False,
       tags=["test"],
   )
   def trigger_dagrun_dag():
       @task
       def start_task(task_type):
           return f"The {task_type} task has completed."
   
       @task
       def end_task(task_type):
           return f"The {task_type} task has completed."
   
       trigger_dependent_dag = TriggerDagRunOperator(
           task_id="trigger_dependent_dag",
           trigger_dag_id="dependent_dag",
           wait_for_completion=True,
           deferrable=True,
       )
   
       start_task("starting") >> trigger_dependent_dag >> end_task("ending")
   
   
   trigger_dagrun_dag()
   ```
   
   Dependent DAG:
   ```python
   from airflow.sdk import dag, task
   import pendulum
   
   @dag(
       dag_id="dependent_dag",
       schedule=None,
       start_date=pendulum.datetime(2025, 6, 1),
       catchup=False,
       tags=["test"],
   )
   def dependent_dag():
       @task
       def dependent_task():
           return "This is a dependent task."
   
       dependent_task()
   
   dependent_dag()
   ```
   2. Run the Parent DAG and see that the `trigger_dependent_dag` task never 
completes even though the dependent DAG has completed. 
   
   ### Anything else
   
   This problem can be mitigated by setting `deferrable=False` in the 
`TriggerDagRunOperator`. However this then consumes an execution slot for the 
duration of the triggered DAG. The triggerer service is supposed to allow tasks 
to be deferred and not consume an execution slot. 
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to