vincent-heng opened a new issue, #62050:
URL: https://github.com/apache/airflow/issues/62050
### Apache Airflow version
Other Airflow 3 version (please specify below)
### If "Other Airflow 3 version" selected, which one?
3.0.6, still on main
### What happened?
My instance has been hitting intermittent task failures on MWAA (Airflow
3.0.6, ~150 DAGs, 4 schedulers). Tasks failed in bulk with no obvious cause but
succeeded on manual retry. I noticed this on `scheduler_job_runner.py`:
When the scheduler can't find a DAG in the `serialized_dag` table, it does
this:
```python
session.execute(
update(TI)
.where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED)
.values(state=TaskInstanceState.FAILED)
.execution_options(synchronize_session="fetch")
)
```
It sets every SCHEDULED task instance for that DAG to FAILED.
With PR #58259 and #56422, it probably happens less often but the
bulk-failure issue has never been addressed
### What you think should happen instead?
The scheduler could skip scheduling that DAG for the current iteration and
try again next time, instead of immediately failing everything.
I thought of logging a warning instead of error, tracking a counter per DAG,
and only failing tasks after several consecutive misses to distinguish
transient gaps from genuinely missing DAGs. What do you think about this
solution?
PR #55126 already does something similar for stale DAGs (skipping and
continuing)
### How to reproduce
```python
def test_missing_serialized_dag_bulk_fails(self, dag_maker, session):
dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails"
with dag_maker(dag_id=dag_id, session=session):
EmptyOperator(task_id="task_a")
EmptyOperator(task_id="task_b")
scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job)
# Simulate serialized DAG being transiently missing
self.job_runner.scheduler_dag_bag = mock.MagicMock()
self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None
dr = dag_maker.create_dagrun(state=DagRunState.RUNNING,
run_type=DagRunType.SCHEDULED)
for ti in dr.task_instances:
ti.state = State.SCHEDULED
session.merge(ti)
session.flush()
res = self.job_runner._executable_task_instances_to_queued(max_tis=32,
session=session)
session.flush()
assert len(res) == 0
tis = dr.get_task_instances(session=session) # Both tasks are FAILED
instead of SCHEDULED
for ti in tis:
print(f"{ti.task_id}: {ti.state}")
```
### Operating System
AWS MWAA
### Versions of Apache Airflow Providers
n/a
### Deployment
Amazon (AWS) MWAA
### Deployment details
- MWAA environment running 3.0.6 (latest available on MWAA as of Feb 2026)
- mw1.2xlarge instance class
- 4 schedulers
- 5-20 workers (auto-scaling)
- ~150 DAGs
- min_file_process_interval=300
### Anything else?
would appreciate guidance on the preferred approach (retry counter or just
ignoring)
### Are you willing to submit PR?
- [x] Yes I am willing to submit a PR!
### Code of Conduct
- [x] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]