This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit 91906de7f8ba5bf4f430175dd3f172dfba9b9bef
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 14 22:51:03 2025 +0100

    [v3-1-test] Fix: TriggerDagRunOperator stuck in deferred state with 
reset_dag_run (#57756) (#57968) (#58333)
    
    When TriggerDagRunOperator is used with deferrable=True, 
wait_for_completion=True,
      reset_dag_run=True, and a fixed trigger_run_id, the operator becomes 
permanently
      stuck in deferred state after clearing and re-running.
    
      Root cause:
      When reset_dag_run=True is used with a fixed run_id, the database 
preserves the
      original logical_date from the first run. However, on subsequent runs 
after clearing,
      the operator calculates a NEW logical_date based on the current time. The 
DagStateTrigger
      was being created with this newly calculated logical_date, causing a 
mismatch when
      querying the database - the trigger looked for a DAG run with the new 
logical_date
      but the database contained the original logical_date, causing the query 
to return
      zero results indefinitely.
    
      Solution:
      - Modified _handle_trigger_dag_run() in task_runner.py to pass 
execution_dates=None
        to DagStateTrigger when run_ids is provided, since run_id alone is 
sufficient and
        globally unique
      - Added test 
test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only to
        verify the fix and prevent regression
    
      The fix ensures that both deferrable and non-deferrable modes use 
identical logic
      for determining DAG run completion - querying by run_id and state only, 
without
      filtering by logical_date which can become stale when resets are involved.
    (cherry picked from commit 4f3d0c55beab8d999c5526e36065f25809afd13f)
    
    Co-authored-by: Mykola Shyshov <[email protected]>
---
 .../src/airflow/sdk/execution_time/task_runner.py  |  6 ++-
 .../task_sdk/execution_time/test_task_runner.py    | 53 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 1 deletion(-)

diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py 
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index e6115ea915b..822ed9bb50a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1101,7 +1101,11 @@ def _handle_trigger_dag_run(
             trigger=DagStateTrigger(
                 dag_id=drte.trigger_dag_id,
                 states=drte.allowed_states + drte.failed_states,  # type: 
ignore[arg-type]
-                execution_dates=[drte.logical_date] if drte.logical_date else 
None,
+                # Don't filter by execution_dates when run_ids is provided.
+                # run_id uniquely identifies a DAG run, and when 
reset_dag_run=True,
+                # drte.logical_date might be a newly calculated value that 
doesn't match
+                # the persisted logical_date in the database, causing the 
trigger to never find the run.
+                execution_dates=None,
                 run_ids=[drte.dag_run_id],
                 poll_interval=drte.poke_interval,
             ),
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py 
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index ff240c5994b..58dda3371ef 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -3309,3 +3309,56 @@ class TestTriggerDagRunOperator:
             state, msg, _ = run(ti, ti.get_template_context(), log)
 
         assert state == intermediate_state
+
+    @time_machine.travel("2025-01-01 00:00:00", tick=False)
+    def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only(
+        self, create_runtime_ti, mock_supervisor_comms
+    ):
+        """
+        Test that TriggerDagRunOperator with deferrable=True and 
reset_dag_run=True
+        creates a DagStateTrigger with execution_dates=None.
+
+        This prevents the bug where reset_dag_run preserves the original 
logical_date
+        in the database but the trigger queries with a newly calculated 
logical_date,
+        causing a mismatch that makes the trigger never find the dag run.
+        """
+        from airflow.providers.standard.operators.trigger_dagrun import 
TriggerDagRunOperator
+
+        task = TriggerDagRunOperator(
+            task_id="test_task",
+            trigger_dag_id="test_dag",
+            trigger_run_id="fixed_run_id",
+            wait_for_completion=True,
+            deferrable=True,
+            reset_dag_run=True,
+            poke_interval=5,
+        )
+        ti = create_runtime_ti(
+            dag_id="test_handle_trigger_dag_run_deferred_reset", 
run_id="test_run", task=task
+        )
+
+        log = mock.MagicMock()
+        state, msg, _ = run(ti, ti.get_template_context(), log)
+
+        # Task should be deferred
+        assert state == TaskInstanceState.DEFERRED
+        assert isinstance(msg, DeferTask)
+
+        # Verify the DeferTask message structure
+        assert msg.classpath == 
"airflow.providers.standard.triggers.external_task.DagStateTrigger"
+        assert msg.next_method == "execute_complete"
+
+        # Critical assertion: execution_dates should be None to avoid 
logical_date mismatch
+        # when reset_dag_run=True. The run_id alone is sufficient and unique.
+        trigger_kwargs = msg.trigger_kwargs
+        assert trigger_kwargs["execution_dates"] is None, (
+            "execution_dates should be None when using run_ids. "
+            "When reset_dag_run=True, the logical_date in the database may 
differ from "
+            "the newly calculated logical_date, causing the trigger to never 
find the run."
+        )
+        assert trigger_kwargs["run_ids"] == ["fixed_run_id"]
+        assert trigger_kwargs["dag_id"] == "test_dag"
+        assert trigger_kwargs["poll_interval"] == 5
+
+        # Also verify it was sent to supervisor
+        mock_supervisor_comms.send.assert_any_call(msg)

Reply via email to