This is an automated email from the ASF dual-hosted git repository.

kaxilnaik pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d1794ac49e Fix downstream tasks being incorrectly skipped in HA 
scheduler mode (#63266)
6d1794ac49e is described below

commit 6d1794ac49ed60872068f5df254849e04a6cc954
Author: Sam Dumont <[email protected]>
AuthorDate: Thu Mar 19 05:46:30 2026 +0100

    Fix downstream tasks being incorrectly skipped in HA scheduler mode (#63266)
    
    In HA deployments, ti_skip_downstream() issues a bulk UPDATE without
    a state guard. When a BranchOperator decides to skip downstream tasks,
    it can overwrite a task already RUNNING on a worker to SKIPPED, causing
    a 409 heartbeat conflict that kills the task mid-execution.
    
    Add a skippable_state_clause to the UPDATE WHERE clause so RUNNING,
    SUCCESS, and FAILED tasks are never overwritten to SKIPPED.
    
    QUEUED tasks are intentionally allowed to be skipped: no work has been
    done yet and the BranchOperator's decision should take priority. The
    worker pod will get a benign 409 on PATCH /run and exit cleanly.
    
    closes: #59378
    closes: #57618
---
 airflow-core/newsfragments/63266.bugfix.rst        |  1 +
 .../execution_api/routes/task_instances.py         | 20 ++++-
 .../versions/head/test_task_instances.py           | 89 ++++++++++++++++++++++
 3 files changed, 109 insertions(+), 1 deletion(-)

diff --git a/airflow-core/newsfragments/63266.bugfix.rst 
b/airflow-core/newsfragments/63266.bugfix.rst
new file mode 100644
index 00000000000..a8e1ff44aec
--- /dev/null
+++ b/airflow-core/newsfragments/63266.bugfix.rst
@@ -0,0 +1 @@
+Fix ``ti_skip_downstream`` overwriting RUNNING tasks to SKIPPED in HA 
deployments.
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
index 60dd868c2e3..96d8f3a6c86 100644
--- 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py
@@ -596,9 +596,27 @@ def ti_skip_downstream(
     task_ids = [task if isinstance(task, tuple) else (task, -1) for task in 
tasks]
     log.debug("Prepared task IDs for skipping", task_ids=task_ids)
 
+    # Don't overwrite tasks that are already executing or finished.
+    # See: https://github.com/apache/airflow/issues/59378
+    # Note: SQL NULL NOT IN (...) is falsy, so we need an explicit IS NULL 
check.
+    skippable_state_clause = or_(
+        TI.state.is_(None),
+        TI.state.not_in(
+            [
+                TaskInstanceState.RUNNING,
+                TaskInstanceState.SUCCESS,
+                TaskInstanceState.FAILED,
+            ]
+        ),
+    )
     query = (
         update(TI)
-        .where(TI.dag_id == dag_id, TI.run_id == run_id, tuple_(TI.task_id, 
TI.map_index).in_(task_ids))
+        .where(
+            TI.dag_id == dag_id,
+            TI.run_id == run_id,
+            tuple_(TI.task_id, TI.map_index).in_(task_ids),
+            skippable_state_clause,
+        )
         .values(state=TaskInstanceState.SKIPPED, start_date=now, end_date=now)
         .execution_options(synchronize_session=False)
     )
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
index 835a8c46139..7cc7aa85594 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
@@ -1633,6 +1633,95 @@ class TestTISkipDownstream:
         assert ti1.state == State.SKIPPED
 
 
+class TestTISkipDownstreamRaceCondition:
+    """Regression tests for #59378: state guard in ti_skip_downstream()."""
+
+    def setup_method(self):
+        clear_db_runs()
+
+    def teardown_method(self):
+        clear_db_runs()
+
+    @pytest.mark.parametrize(
+        "initial_state",
+        [
+            State.RUNNING,
+            State.SUCCESS,
+            State.FAILED,
+        ],
+    )
+    def test_skip_downstream_does_not_overwrite_terminal_or_running_ti(
+        self, client, session, dag_maker, initial_state
+    ):
+        with dag_maker(f"skip_race_dag_{initial_state}", session=session):
+            branch = EmptyOperator(task_id="branch")
+            downstream = EmptyOperator(task_id="downstream")
+            branch >> downstream
+        dr = dag_maker.create_dagrun(run_id="run")
+
+        ti_branch = dr.get_task_instance("branch")
+        ti_branch.set_state(State.SUCCESS)
+
+        ti_downstream = dr.get_task_instance("downstream")
+        ti_downstream.set_state(initial_state)
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti_branch.id}/skip-downstream",
+            json={"tasks": ["downstream"]},
+        )
+        assert response.status_code == 204
+
+        session.expire_all()
+        ti_downstream = dr.get_task_instance("downstream")
+        assert ti_downstream.state == initial_state
+
+    def test_skip_downstream_does_skip_queued_ti(self, client, session, 
dag_maker):
+        with dag_maker("skip_race_dag_queued", session=session):
+            branch = EmptyOperator(task_id="branch")
+            downstream = EmptyOperator(task_id="downstream")
+            branch >> downstream
+        dr = dag_maker.create_dagrun(run_id="run")
+
+        ti_branch = dr.get_task_instance("branch")
+        ti_branch.set_state(State.SUCCESS)
+
+        ti_downstream = dr.get_task_instance("downstream")
+        ti_downstream.set_state(TaskInstanceState.QUEUED)
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti_branch.id}/skip-downstream",
+            json={"tasks": ["downstream"]},
+        )
+        assert response.status_code == 204
+
+        session.expire_all()
+        ti_downstream = dr.get_task_instance("downstream")
+        assert ti_downstream.state == State.SKIPPED
+
+    def test_skip_downstream_still_skips_none_state_ti(self, client, session, 
dag_maker):
+        with dag_maker("skip_race_dag_normal", session=session):
+            branch = EmptyOperator(task_id="branch")
+            downstream = EmptyOperator(task_id="downstream")
+            branch >> downstream
+        dr = dag_maker.create_dagrun(run_id="run")
+
+        ti_branch = dr.get_task_instance("branch")
+        ti_branch.set_state(State.SUCCESS)
+        session.commit()
+
+        response = client.patch(
+            f"/execution/task-instances/{ti_branch.id}/skip-downstream",
+            json={"tasks": ["downstream"]},
+        )
+        assert response.status_code == 204
+
+        session.expire_all()
+        ti_downstream = dr.get_task_instance("downstream")
+        assert ti_downstream.state == State.SKIPPED
+
+
 class TestTIHealthEndpoint:
     def setup_method(self):
         clear_db_runs()

Reply via email to