This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 91906de7f8ba5bf4f430175dd3f172dfba9b9bef
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Nov 14 22:51:03 2025 +0100
[v3-1-test] Fix: TriggerDagRunOperator stuck in deferred state with
reset_dag_run (#57756) (#57968) (#58333)
When TriggerDagRunOperator is used with deferrable=True,
wait_for_completion=True,
reset_dag_run=True, and a fixed trigger_run_id, the operator becomes
permanently
stuck in deferred state after clearing and re-running.
Root cause:
When reset_dag_run=True is used with a fixed run_id, the database
preserves the
original logical_date from the first run. However, on subsequent runs
after clearing,
the operator calculates a NEW logical_date based on the current time. The
DagStateTrigger
was being created with this newly calculated logical_date, causing a
mismatch when
querying the database - the trigger looked for a DAG run with the new
logical_date
but the database contained the original logical_date, causing the query
to return
zero results indefinitely.
Solution:
- Modified _handle_trigger_dag_run() in task_runner.py to pass
execution_dates=None
to DagStateTrigger when run_ids is provided, since run_id alone is
sufficient and
globally unique
- Added test
test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only to
verify the fix and prevent regression
The fix ensures that both deferrable and non-deferrable modes use
identical logic
for determining DAG run completion - querying by run_id and state only,
without
filtering by logical_date which can become stale when resets are involved.
(cherry picked from commit 4f3d0c55beab8d999c5526e36065f25809afd13f)
Co-authored-by: Mykola Shyshov <[email protected]>
---
.../src/airflow/sdk/execution_time/task_runner.py | 6 ++-
.../task_sdk/execution_time/test_task_runner.py | 53 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 1 deletion(-)
diff --git a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
index e6115ea915b..822ed9bb50a 100644
--- a/task-sdk/src/airflow/sdk/execution_time/task_runner.py
+++ b/task-sdk/src/airflow/sdk/execution_time/task_runner.py
@@ -1101,7 +1101,11 @@ def _handle_trigger_dag_run(
trigger=DagStateTrigger(
dag_id=drte.trigger_dag_id,
states=drte.allowed_states + drte.failed_states, # type:
ignore[arg-type]
- execution_dates=[drte.logical_date] if drte.logical_date else
None,
+ # Don't filter by execution_dates when run_ids is provided.
+ # run_id uniquely identifies a DAG run, and when
reset_dag_run=True,
+ # drte.logical_date might be a newly calculated value that
doesn't match
+ # the persisted logical_date in the database, causing the
trigger to never find the run.
+ execution_dates=None,
run_ids=[drte.dag_run_id],
poll_interval=drte.poke_interval,
),
diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
index ff240c5994b..58dda3371ef 100644
--- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
+++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py
@@ -3309,3 +3309,56 @@ class TestTriggerDagRunOperator:
state, msg, _ = run(ti, ti.get_template_context(), log)
assert state == intermediate_state
+
+ @time_machine.travel("2025-01-01 00:00:00", tick=False)
+ def test_handle_trigger_dag_run_deferred_with_reset_uses_run_id_only(
+ self, create_runtime_ti, mock_supervisor_comms
+ ):
+ """
+ Test that TriggerDagRunOperator with deferrable=True and
reset_dag_run=True
+ creates a DagStateTrigger with execution_dates=None.
+
+ This prevents the bug where reset_dag_run preserves the original
logical_date
+ in the database but the trigger queries with a newly calculated
logical_date,
+ causing a mismatch that makes the trigger never find the dag run.
+ """
+ from airflow.providers.standard.operators.trigger_dagrun import
TriggerDagRunOperator
+
+ task = TriggerDagRunOperator(
+ task_id="test_task",
+ trigger_dag_id="test_dag",
+ trigger_run_id="fixed_run_id",
+ wait_for_completion=True,
+ deferrable=True,
+ reset_dag_run=True,
+ poke_interval=5,
+ )
+ ti = create_runtime_ti(
+ dag_id="test_handle_trigger_dag_run_deferred_reset",
run_id="test_run", task=task
+ )
+
+ log = mock.MagicMock()
+ state, msg, _ = run(ti, ti.get_template_context(), log)
+
+ # Task should be deferred
+ assert state == TaskInstanceState.DEFERRED
+ assert isinstance(msg, DeferTask)
+
+ # Verify the DeferTask message structure
+ assert msg.classpath ==
"airflow.providers.standard.triggers.external_task.DagStateTrigger"
+ assert msg.next_method == "execute_complete"
+
+ # Critical assertion: execution_dates should be None to avoid
logical_date mismatch
+ # when reset_dag_run=True. The run_id alone is sufficient and unique.
+ trigger_kwargs = msg.trigger_kwargs
+ assert trigger_kwargs["execution_dates"] is None, (
+ "execution_dates should be None when using run_ids. "
+ "When reset_dag_run=True, the logical_date in the database may
differ from "
+ "the newly calculated logical_date, causing the trigger to never
find the run."
+ )
+ assert trigger_kwargs["run_ids"] == ["fixed_run_id"]
+ assert trigger_kwargs["dag_id"] == "test_dag"
+ assert trigger_kwargs["poll_interval"] == 5
+
+ # Also verify it was sent to supervisor
+ mock_supervisor_comms.send.assert_any_call(msg)