This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 1f7b078c7037144989759a46e278641ff11d7ba5
Author: Nathan Hadfield <[email protected]>
AuthorDate: Wed Jan 7 16:59:21 2026 +0000

    Fix `TriggerDagRunOperator` deferring when `wait_for_completion=False` 
(#60052)
    
    The bug was in the task runner's DagRunTriggerException handler, which
    checked the deferrable flag before checking wait_for_completion. This
    caused tasks with deferrable=True to incorrectly enter DEFERRED state
    even in fire-and-forget mode (wait_for_completion=False).
    
    Fixed by nesting the deferrable check inside the wait_for_completion
    check, ensuring tasks only defer when BOTH conditions are true.
    
    Also updated test expectations to validate the correct behaviour.
    
    (cherry picked from commit 2daadf4abfa7c93c128229ca8028a818c0ddfad5)
---
 .../src/airflow/sdk/execution_time/task_runner.py  | 46 +++++++++++++---------
 .../task_sdk/execution_time/test_task_runner.py    |  4 +-
 2 files changed, 29 insertions(+), 21 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index a08f842acba..83002aa2ad0 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1210,26 +1210,26 @@ def _handle_trigger_dag_run(
     # be used when creating the extra link on the webserver.
     ti.xcom_push(key="trigger_run_id", value=drte.dag_run_id)
 
-    if drte.deferrable:
-        from airflow.exceptions import TaskDeferred
-        from airflow.providers.standard.triggers.external_task import 
DagStateTrigger
-
-        defer = TaskDeferred(
-            trigger=DagStateTrigger(
-                dag_id=drte.trigger_dag_id,
-                states=drte.allowed_states + drte.failed_states,  # type: 
ignore[arg-type]
-                # Don't filter by execution_dates when run_ids is provided.
-                # run_id uniquely identifies a DAG run, and when 
reset_dag_run=True,
-                # drte.logical_date might be a newly calculated value that 
doesn't match
-                # the persisted logical_date in the database, causing the 
trigger to never find the run.
-                execution_dates=None,
-                run_ids=[drte.dag_run_id],
-                poll_interval=drte.poke_interval,
-            ),
-            method_name="execute_complete",
-        )
-        return _defer_task(defer, ti, log)
     if drte.wait_for_completion:
+        if drte.deferrable:
+            from airflow.exceptions import TaskDeferred
+            from airflow.providers.standard.triggers.external_task import 
DagStateTrigger
+
+            defer = TaskDeferred(
+                trigger=DagStateTrigger(
+                    dag_id=drte.trigger_dag_id,
+                    states=drte.allowed_states + drte.failed_states,  # type: 
ignore[arg-type]
+                    # Don't filter by execution_dates when run_ids is provided.
+                    # run_id uniquely identifies a DAG run, and when 
reset_dag_run=True,
+                    # drte.logical_date might be a newly calculated value that 
doesn't match
+                    # the persisted logical_date in the database, causing the 
trigger to never find the run.
+                    execution_dates=None,
+                    run_ids=[drte.dag_run_id],
+                    poll_interval=drte.poke_interval,
+                ),
+                method_name="execute_complete",
+            )
+            return _defer_task(defer, ti, log)
         while True:
             log.info(
                 "Waiting for dag run to complete execution in allowed state.",
@@ -1265,6 +1265,14 @@ def _handle_trigger_dag_run(
                 dag_id=drte.trigger_dag_id,
                 state=comms_msg.state,
             )
+    else:
+        # Fire-and-forget mode: wait_for_completion=False
+        if drte.deferrable:
+            log.info(
+                "Ignoring deferrable=True because wait_for_completion=False. "
+                "Task will complete immediately without waiting for the 
triggered DAG run.",
+                trigger_dag_id=drte.trigger_dag_id,
+            )
 
     return _handle_current_task_success(context, ti)
 
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index fe8f1ad667d..d5f5b5109e0 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -3670,7 +3670,7 @@ class TestTriggerDagRunOperator:
     @pytest.mark.parametrize(
         ["allowed_states", "failed_states", "intermediate_state"],
         [
-            ([DagRunState.SUCCESS], None, TaskInstanceState.DEFERRED),
+            ([DagRunState.SUCCESS], None, TaskInstanceState.SUCCESS),
         ],
     )
     def test_handle_trigger_dag_run_deferred(
@@ -3682,7 +3682,7 @@ class TestTriggerDagRunOperator:
         mock_supervisor_comms,
     ):
         """
-        Test that TriggerDagRunOperator defers when the deferrable flag is set 
to True
+        Test that TriggerDagRunOperator does not defer when 
wait_for_completion=False
         """
         from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
 

Reply via email to