manipatnam opened a new issue, #63697:
URL: https://github.com/apache/airflow/issues/63697

   ### Apache Airflow version
   
   3.1.8
   
   ### If "Other Airflow 3 version" selected, which one?
   
   _No response_
   
   ### What happened?
   
   When a task fails, the scheduler starts marking its downstream tasks as 
upstream_failed one layer at a time. If you mark the failed task as success via 
the API (PATCH) while the scheduler is still cascading, some downstream tasks 
get permanently stuck in upstream_failed and never run — even though the root 
cause (the failed task) is now successful.
   
   This happens because the scheduler and the API operate on separate database 
connections. The scheduler takes a snapshot of task states at the start of each 
loop and uses that snapshot for the entire loop. If the API changes task states 
after the snapshot was taken but before the scheduler finishes evaluating, the 
scheduler makes decisions based on stale/outdated data. It sees tasks as 
upstream_failed that have already been cleared, and marks the next layer of 
downstream tasks as upstream_failed too.
   The problem is permanent because upstream_failed is a terminal state — the 
scheduler never re-evaluates tasks in that state.
   
   **Example:**
   DAG: task_a >> task_b >> task_c
   
   1. task_a fails
   2. Scheduler marks task_b as upstream_failed
   3. User marks task_a as success via the API — this also clears task_b back 
to a runnable state
   4. But the scheduler was mid-loop with old data, still sees task_b as 
upstream_failed, and marks task_c as upstream_failed
   5. task_c is now stuck forever — task_a is success, task_b runs fine, but 
task_c never runs
   
   ### What you think should happen instead?
   
   After marking a task as success via the API, all downstream tasks should 
eventually be re-evaluated correctly and run if their dependencies are met. A 
concurrent scheduler loop should not be able to permanently corrupt task states 
based on stale data.
   
   
   ### How to reproduce
   
   **DAG**: a linear chain like fail_task >> t0 >> t1 >> t2 with default 
trigger_rule=all_success.
   
   1. Trigger the DAG — fail_task fails intentionally
   2. The scheduler starts cascading upstream_failed down the chain
   3. Before the cascade reaches the end, PATCH fail_task to success via the API
   4. Some downstream tasks (e.g. t1 or t2) end up stuck in upstream_failed 
despite all their upstreams being clear
   
   The race is more likely with longer chains and when the scheduler is busy 
processing multiple DAG runs at once.
   
   **Deterministic unit test (no timing dependency):**
   ```
   """
   Uses two independent SQLAlchemy sessions to simulate the scheduler and API
   running concurrently.
   """
   import pytest
   from sqlalchemy.orm import Session as SASession
   
   from airflow.models.dagrun import DagRun
   from airflow.providers.standard.operators.empty import EmptyOperator
   from airflow.utils.state import DagRunState, State, TaskInstanceState
   from tests_common.test_utils import db
   
   pytestmark = [pytest.mark.db_test, 
pytest.mark.skip_if_database_isolation_mode]
   
   
   def _assign_serialized_tasks(tis, dag_run):
       serialized_dag = dag_run.get_dag()
       for ti in tis:
           ti.task = serialized_dag.get_task(ti.task_id)
   
   
   class TestUpstreamFailedRaceCondition:
       @pytest.fixture(autouse=True)
       def setup_test_cases(self):
           db.clear_db_runs()
           yield
           db.clear_db_runs()
   
       def test_stale_cache_causes_stuck_upstream_failed(self, dag_maker, 
session):
           with dag_maker("test_upstream_failed_race", session=session):
               fail_task = EmptyOperator(task_id="fail_task")
               t0 = EmptyOperator(task_id="t0")
               t1 = EmptyOperator(task_id="t1")
               t2 = EmptyOperator(task_id="t2")
               fail_task >> t0 >> t1 >> t2
   
           dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
           tis = {ti.task_id: ti for ti in dr.task_instances}
   
           # Step 1: fail_task fails, scheduler propagates upstream_failed to t0
           tis["fail_task"].state = TaskInstanceState.FAILED
           tis["t0"].state = TaskInstanceState.UPSTREAM_FAILED
           session.flush()
           session.commit()
   
           # Step 2: Scheduler takes snapshot in its own session
           bind = session.get_bind()
           scheduler_session = SASession(bind=bind)
           try:
               sched_dr = scheduler_session.get(DagRun, dr.id)
               sched_dr.dag = dr.dag
   
               stale_finished_tis = sched_dr.get_task_instances(
                   state=State.finished, session=scheduler_session
               )
               _assign_serialized_tasks(stale_finished_tis, sched_dr)
   
               # Step 3: API PATCH in the primary session
               session.expire_all()
               api_tis = {ti.task_id: ti for ti in 
dr.get_task_instances(session=session)}
               api_tis["fail_task"].state = TaskInstanceState.SUCCESS
               api_tis["t0"].state = None  # cleared by clear(only_failed=True)
               session.flush()
               session.commit()
   
               # Step 4: Scheduler continues with stale cache
               unfinished_tis = sched_dr.get_task_instances(
                   state=State.unfinished, session=scheduler_session
               )
               _assign_serialized_tasks(unfinished_tis, sched_dr)
   
               sched_dr._are_premature_tis(
                   unfinished_tis=unfinished_tis,
                   finished_tis=stale_finished_tis,
                   session=scheduler_session,
               )
               scheduler_session.flush()
               scheduler_session.commit()
   
               # Step 5: t1 is incorrectly stuck in upstream_failed
               session.expire_all()
               final_states = {
                   ti.task_id: ti.state for ti in 
dr.get_task_instances(session=session)
               }
               assert final_states["fail_task"] == TaskInstanceState.SUCCESS
               assert final_states["t0"] is None
               # BUG: t1 is upstream_failed even though t0 is cleared
               assert final_states["t1"] == TaskInstanceState.UPSTREAM_FAILED
           finally:
               scheduler_session.close()
   ```
   
   
   ### Operating System
   
   debian
   
   ### Versions of Apache Airflow Providers
   
   _No response_
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   _No response_
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to