manipatnam opened a new issue, #63697:
URL: https://github.com/apache/airflow/issues/63697
### Apache Airflow version
3.1.8
### If "Other Airflow 3 version" selected, which one?
_No response_
### What happened?
When a task fails, the scheduler starts marking its downstream tasks as
upstream_failed one layer at a time. If you mark the failed task as success via
the API (PATCH) while the scheduler is still cascading, some downstream tasks
get permanently stuck in upstream_failed and never run — even though the root
cause (the failed task) is now successful.
This happens because the scheduler and the API operate on separate database
connections. The scheduler takes a snapshot of task states at the start of each
loop and uses that snapshot for the entire loop. If the API changes task states
after the snapshot was taken but before the scheduler finishes evaluating, the
scheduler makes decisions based on stale/outdated data. It sees tasks as
upstream_failed that have already been cleared, and marks the next layer of
downstream tasks as upstream_failed too.
The problem is permanent because upstream_failed is a terminal state — the
scheduler never re-evaluates tasks in that state.
**Example:**
DAG: task_a >> task_b >> task_c
1. task_a fails
2. Scheduler marks task_b as upstream_failed
3. User marks task_a as success via the API — this also clears task_b back
to a runnable state
4. But the scheduler was mid-loop with old data, still sees task_b as
upstream_failed, and marks task_c as upstream_failed
5. task_c is now stuck forever — task_a is success, task_b runs fine, but
task_c never runs
### What you think should happen instead?
After marking a task as success via the API, all downstream tasks should
eventually be re-evaluated correctly and run if their dependencies are met. A
concurrent scheduler loop should not be able to permanently corrupt task states
based on stale data.
### How to reproduce
**DAG**: a linear chain like fail_task >> t0 >> t1 >> t2 with default
trigger_rule=all_success.
1. Trigger the DAG — fail_task fails intentionally
2. The scheduler starts cascading upstream_failed down the chain
3. Before the cascade reaches the end, PATCH fail_task to success via the API
4. Some downstream tasks (e.g. t1 or t2) end up stuck in upstream_failed
despite all their upstreams being clear
The race is more likely with longer chains and when the scheduler is busy
processing multiple DAG runs at once.
**Deterministic unit test (no timing dependency):**
```
"""
Uses two independent SQLAlchemy sessions to simulate the scheduler and API
running concurrently.
"""
import pytest
from sqlalchemy.orm import Session as SASession
from airflow.models.dagrun import DagRun
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.utils.state import DagRunState, State, TaskInstanceState
from tests_common.test_utils import db
pytestmark = [pytest.mark.db_test,
pytest.mark.skip_if_database_isolation_mode]
def _assign_serialized_tasks(tis, dag_run):
serialized_dag = dag_run.get_dag()
for ti in tis:
ti.task = serialized_dag.get_task(ti.task_id)
class TestUpstreamFailedRaceCondition:
@pytest.fixture(autouse=True)
def setup_test_cases(self):
db.clear_db_runs()
yield
db.clear_db_runs()
def test_stale_cache_causes_stuck_upstream_failed(self, dag_maker,
session):
with dag_maker("test_upstream_failed_race", session=session):
fail_task = EmptyOperator(task_id="fail_task")
t0 = EmptyOperator(task_id="t0")
t1 = EmptyOperator(task_id="t1")
t2 = EmptyOperator(task_id="t2")
fail_task >> t0 >> t1 >> t2
dr = dag_maker.create_dagrun(state=DagRunState.RUNNING)
tis = {ti.task_id: ti for ti in dr.task_instances}
# Step 1: fail_task fails, scheduler propagates upstream_failed to t0
tis["fail_task"].state = TaskInstanceState.FAILED
tis["t0"].state = TaskInstanceState.UPSTREAM_FAILED
session.flush()
session.commit()
# Step 2: Scheduler takes snapshot in its own session
bind = session.get_bind()
scheduler_session = SASession(bind=bind)
try:
sched_dr = scheduler_session.get(DagRun, dr.id)
sched_dr.dag = dr.dag
stale_finished_tis = sched_dr.get_task_instances(
state=State.finished, session=scheduler_session
)
_assign_serialized_tasks(stale_finished_tis, sched_dr)
# Step 3: API PATCH in the primary session
session.expire_all()
api_tis = {ti.task_id: ti for ti in
dr.get_task_instances(session=session)}
api_tis["fail_task"].state = TaskInstanceState.SUCCESS
api_tis["t0"].state = None # cleared by clear(only_failed=True)
session.flush()
session.commit()
# Step 4: Scheduler continues with stale cache
unfinished_tis = sched_dr.get_task_instances(
state=State.unfinished, session=scheduler_session
)
_assign_serialized_tasks(unfinished_tis, sched_dr)
sched_dr._are_premature_tis(
unfinished_tis=unfinished_tis,
finished_tis=stale_finished_tis,
session=scheduler_session,
)
scheduler_session.flush()
scheduler_session.commit()
# Step 5: t1 is incorrectly stuck in upstream_failed
session.expire_all()
final_states = {
ti.task_id: ti.state for ti in
dr.get_task_instances(session=session)
}
assert final_states["fail_task"] == TaskInstanceState.SUCCESS
assert final_states["t0"] is None
# BUG: t1 is upstream_failed even though t0 is cleared
assert final_states["t1"] == TaskInstanceState.UPSTREAM_FAILED
finally:
scheduler_session.close()
```
### Operating System
debian
### Versions of Apache Airflow Providers
_No response_
### Deployment
Astronomer
### Deployment details
_No response_
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]