This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 1f7b078c7037144989759a46e278641ff11d7ba5 Author: Nathan Hadfield <[email protected]> AuthorDate: Wed Jan 7 16:59:21 2026 +0000 Fix `TriggerDagRunOperator` deferring when `wait_for_completion=False` (#60052) The bug was in the task runner's DagRunTriggerException handler, which checked the deferrable flag before checking wait_for_completion. This caused tasks with deferrable=True to incorrectly enter DEFERRED state even in fire-and-forget mode (wait_for_completion=False). Fixed by nesting the deferrable check inside the wait_for_completion check, ensuring tasks only defer when BOTH conditions are true. Also updated test expectations to validate the correct behaviour. (cherry picked from commit 2daadf4abfa7c93c128229ca8028a818c0ddfad5) --- .../src/airflow/sdk/execution_time/task_runner.py | 46 +++++++++++++--------- .../task_sdk/execution_time/test_task_runner.py | 4 +- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py b/task-sdk/src/airflow/sdk/execution_time/task_runner.py index a08f842acba..83002aa2ad0 100644 --- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py +++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py @@ -1210,26 +1210,26 @@ def _handle_trigger_dag_run( # be used when creating the extra link on the webserver. ti.xcom_push(key="trigger_run_id", value=drte.dag_run_id) - if drte.deferrable: - from airflow.exceptions import TaskDeferred - from airflow.providers.standard.triggers.external_task import DagStateTrigger - - defer = TaskDeferred( - trigger=DagStateTrigger( - dag_id=drte.trigger_dag_id, - states=drte.allowed_states + drte.failed_states, # type: ignore[arg-type] - # Don't filter by execution_dates when run_ids is provided. - # run_id uniquely identifies a DAG run, and when reset_dag_run=True, - # drte.logical_date might be a newly calculated value that doesn't match - # the persisted logical_date in the database, causing the trigger to never find the run. - execution_dates=None, - run_ids=[drte.dag_run_id], - poll_interval=drte.poke_interval, - ), - method_name="execute_complete", - ) - return _defer_task(defer, ti, log) if drte.wait_for_completion: + if drte.deferrable: + from airflow.exceptions import TaskDeferred + from airflow.providers.standard.triggers.external_task import DagStateTrigger + + defer = TaskDeferred( + trigger=DagStateTrigger( + dag_id=drte.trigger_dag_id, + states=drte.allowed_states + drte.failed_states, # type: ignore[arg-type] + # Don't filter by execution_dates when run_ids is provided. + # run_id uniquely identifies a DAG run, and when reset_dag_run=True, + # drte.logical_date might be a newly calculated value that doesn't match + # the persisted logical_date in the database, causing the trigger to never find the run. + execution_dates=None, + run_ids=[drte.dag_run_id], + poll_interval=drte.poke_interval, + ), + method_name="execute_complete", + ) + return _defer_task(defer, ti, log) while True: log.info( "Waiting for dag run to complete execution in allowed state.", @@ -1265,6 +1265,14 @@ def _handle_trigger_dag_run( dag_id=drte.trigger_dag_id, state=comms_msg.state, ) + else: + # Fire-and-forget mode: wait_for_completion=False + if drte.deferrable: + log.info( + "Ignoring deferrable=True because wait_for_completion=False. " + "Task will complete immediately without waiting for the triggered DAG run.", + trigger_dag_id=drte.trigger_dag_id, + ) return _handle_current_task_success(context, ti) diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index fe8f1ad667d..d5f5b5109e0 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -3670,7 +3670,7 @@ class TestTriggerDagRunOperator: @pytest.mark.parametrize( ["allowed_states", "failed_states", "intermediate_state"], [ - ([DagRunState.SUCCESS], None, TaskInstanceState.DEFERRED), + ([DagRunState.SUCCESS], None, TaskInstanceState.SUCCESS), ], ) def test_handle_trigger_dag_run_deferred( @@ -3682,7 +3682,7 @@ class TestTriggerDagRunOperator: mock_supervisor_comms, ): """ - Test that TriggerDagRunOperator defers when the deferrable flag is set to True + Test that TriggerDagRunOperator does not defer when wait_for_completion=False """ from airflow.providers.standard.operators.trigger_dagrun import TriggerDagRunOperator
