This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new eca91dc5c2c Restore fail_fast handling when reschedule exceeds MySQL
TIMESTAMP limit (#67353)
eca91dc5c2c is described below
commit eca91dc5c2cab36b3a8e6d53772cf13716d98a8d
Author: Kaxil Naik <[email protected]>
AuthorDate: Fri May 22 20:20:02 2026 +0100
Restore fail_fast handling when reschedule exceeds MySQL TIMESTAMP limit
(#67353)
PR #59686 dropped the _handle_fail_fast_for_dag call in the
MySQL-TIMESTAMP-limit
branch of the reschedule path based on an incorrect SQLA2 deadlock concern.
As a
result, DAGs with fail_fast=True silently fail to stop sibling tasks when a
reschedule date exceeds 2038-01-19 on MySQL.
The actual deadlock that motivated #59686 came from a different path (FOR
UPDATE
expanding to the lazy-joined dag_run row), fixed in #67246 by scoping the
lock
with with_for_update={"of": TI}. With that scope in place, the fail-fast
call is
safe and matches the file's two existing fail-fast sites.
Also drops a second misleading comment in the same function claiming
session.get
was avoided to "avoid SQLA2 lock contention issues" -- the code itself is
fine;
the rationale was wrong.
---
.../execution_api/routes/task_instances.py | 8 ++--
.../versions/head/test_task_instances.py | 55 ++++++++++++++++++++++
2 files changed, 58 insertions(+), 5 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 18b5842f77b..c2d67926a21 100644
---
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -644,13 +644,11 @@ def _create_ti_state_update_query_and_update_state(
if session.bind is not None:
query = TI.duration_expression_update(timezone.utcnow(),
query, session.bind)
query = query.values(state=TaskInstanceState.FAILED)
- # We skip fail_fast handling in this error case to avoid
fetching the TI object while the row
- # is still locked from the earlier with_for_update() query,
which might cause deadlock issues
- # in SQLA2. The task is marked as FAILED regardless.
+ ti = session.get(TI, task_instance_id, with_for_update={"of":
TI})
+ if ti is not None:
+ _handle_fail_fast_for_dag(ti=ti, dag_id=dag_id,
session=session, dag_bag=dag_bag)
return query, TaskInstanceState.FAILED
- # We can directly use task_instance_id instead of fetching the
TaskInstance object to avoid SQLA2
- # lock contention issues when the TaskInstance row is already locked
from before.
actual_start_date = timezone.utcnow()
session.add(
TaskReschedule(
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 6f19cff9389..2d15e548a13 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1894,6 +1894,61 @@ class TestTIUpdateState:
ti1 = session.get(TaskInstance, ti1.id)
assert ti1.state == State.FAILED
+ def test_ti_update_state_reschedule_mysql_limit_triggers_fail_fast(
+ self, client, session, dag_maker, time_machine
+ ):
+ """
+ When a reschedule date exceeds MySQL's TIMESTAMP limit and the DAG has
fail_fast=True,
+ sibling tasks must still be stopped. The MySQL-limit branch routes
through a different
+ FAILED transition than the regular fail path -- both must honor
fail_fast.
+ """
+ instant = timezone.datetime(2024, 10, 30)
+ time_machine.move_to(instant, tick=False)
+
+ with dag_maker(dag_id="test_dag_with_fail_fast_mysql_reschedule",
fail_fast=True, serialized=True):
+ EmptyOperator(task_id="task1")
+ EmptyOperator(task_id="task2")
+
+ dr = dag_maker.create_dagrun()
+ ti1 = dr.get_task_instance(task_id="task1", session=session)
+ ti1.state = State.RUNNING
+ ti1.start_date = instant
+
+ ti2 = dr.get_task_instance(task_id="task2", session=session)
+ ti2.state = State.QUEUED
+ session.commit()
+ session.refresh(ti1)
+ session.refresh(ti2)
+
+ # Date beyond MySQL's TIMESTAMP limit (2038-01-19 03:14:07).
+ future_date = timezone.datetime(2038, 1, 19, 3, 14, 8)
+
+ with (
+ mock.patch(
+
"airflow.api_fastapi.execution_api.routes.task_instances.get_dialect_name",
+ return_value="mysql",
+ ),
+ mock.patch(
+
"airflow.api_fastapi.execution_api.routes.task_instances._stop_remaining_tasks",
+ autospec=True,
+ ) as mock_stop,
+ ):
+ response = client.patch(
+ f"/execution/task-instances/{ti1.id}/state",
+ json={
+ "state": TaskInstanceState.UP_FOR_RESCHEDULE,
+ "reschedule_date": future_date.isoformat(),
+ "end_date": DEFAULT_END_DATE.isoformat(),
+ },
+ )
+
+ assert response.status_code == 204
+ mock_stop.assert_called_once()
+
+ session.expire_all()
+ ti1 = session.get(TaskInstance, ti1.id)
+ assert ti1.state == State.FAILED
+
@pytest.mark.db_test
@conf_vars({("state_store", "clear_on_success"): "True"})
def test_ti_update_state_to_success_clears_task_state(self, client,
session, create_task_instance):