vincent-heng opened a new issue, #62050:
URL: https://github.com/apache/airflow/issues/62050

   ### Apache Airflow version
   
   Other Airflow 3 version (please specify below)
   
   ### If "Other Airflow 3 version" selected, which one?
   
   3.0.6, still on main
   
   ### What happened?
   
   My instance has been hitting intermittent task failures on MWAA (Airflow 
3.0.6, ~150 DAGs, 4 schedulers). Tasks failed in bulk with no obvious cause but 
succeeded on manual retry. I noticed this on `scheduler_job_runner.py`:
   
   When the scheduler can't find a DAG in the `serialized_dag` table, it does 
this:
   
   ```python
   session.execute(
       update(TI)
       .where(TI.dag_id == dag_id, TI.state == TaskInstanceState.SCHEDULED)
       .values(state=TaskInstanceState.FAILED)
       .execution_options(synchronize_session="fetch")
   )
   ```
   
   It sets every SCHEDULED task instance for that DAG to FAILED.
   
   With PR #58259 and #56422, it probably happens less often but the 
bulk-failure issue has never been addressed
   
   ### What you think should happen instead?
   
   The scheduler could skip scheduling that DAG for the current iteration and 
try again next time, instead of immediately failing everything.
   
   I thought of logging a warning instead of error, tracking a counter per DAG, 
and only failing tasks after several consecutive misses to distinguish 
transient gaps from genuinely missing DAGs. What do you think about this 
solution?
   
   PR #55126 already does something similar for stale DAGs (skipping and 
continuing)
   
   ### How to reproduce
   
   ```python
   def test_missing_serialized_dag_bulk_fails(self, dag_maker, session):
       dag_id = "SchedulerJobTest.test_missing_serialized_dag_bulk_fails"
   
       with dag_maker(dag_id=dag_id, session=session):
           EmptyOperator(task_id="task_a")
           EmptyOperator(task_id="task_b")
   
       scheduler_job = Job()
       self.job_runner = SchedulerJobRunner(job=scheduler_job)
   
       # Simulate serialized DAG being transiently missing
       self.job_runner.scheduler_dag_bag = mock.MagicMock()
       self.job_runner.scheduler_dag_bag.get_dag_for_run.return_value = None
   
       dr = dag_maker.create_dagrun(state=DagRunState.RUNNING, 
run_type=DagRunType.SCHEDULED)
       for ti in dr.task_instances:
           ti.state = State.SCHEDULED
           session.merge(ti)
       session.flush()
   
       res = self.job_runner._executable_task_instances_to_queued(max_tis=32, 
session=session)
       session.flush()
   
       assert len(res) == 0
       tis = dr.get_task_instances(session=session)  # Both tasks are FAILED 
instead of SCHEDULED
       for ti in tis:
           print(f"{ti.task_id}: {ti.state}")
   ```
   
   
   ### Operating System
   
   AWS MWAA
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   - MWAA environment running 3.0.6 (latest available on MWAA as of Feb 2026)
   - mw1.2xlarge instance class
   - 4 schedulers
   - 5-20 workers (auto-scaling)
   - ~150 DAGs
   - min_file_process_interval=300
   
   ### Anything else?
   
   would appreciate guidance on the preferred approach (retry counter or just 
ignoring)
   
   ### Are you willing to submit PR?
   
   - [x] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to