YoannAbriel commented on code in PR #62878:
URL: https://github.com/apache/airflow/pull/62878#discussion_r3035201301
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -770,19 +770,17 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
dag_run=task_instance.dag_run, session=session
)
- # If the dag is missing, fail the task and continue to the
next task.
+ # If the dag is transiently missing, skip scheduling it
this iteration
+ # and try again next time instead of bulk-failing all
scheduled tasks.
+ # See: https://github.com/apache/airflow/issues/62050
if not serialized_dag:
- self.log.error(
- "DAG '%s' for task instance %s not found in
serialized_dag table",
+ self.log.warning(
Review Comment:
Done — added early `starved_dags` check before the
`has_task_concurrency_limits` block.
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -770,19 +770,17 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
dag_run=task_instance.dag_run, session=session
)
- # If the dag is missing, fail the task and continue to the
next task.
+ # If the dag is transiently missing, skip scheduling it
this iteration
+ # and try again next time instead of bulk-failing all
scheduled tasks.
+ # See: https://github.com/apache/airflow/issues/62050
if not serialized_dag:
- self.log.error(
- "DAG '%s' for task instance %s not found in
serialized_dag table",
+ self.log.warning(
+ "DAG '%s' for task instance %s not found in
serialized_dag table, "
+ "skipping scheduling for this iteration and will
retry next time",
dag_id,
task_instance,
)
- session.execute(
- update(TI)
- .where(TI.dag_id == dag_id, TI.state ==
TaskInstanceState.SCHEDULED)
- .values(state=TaskInstanceState.FAILED)
- .execution_options(synchronize_session="fetch")
- )
+ starved_dags.add(dag_id)
Review Comment:
Added a code comment documenting this as a known limitation. Tracking
consecutive misses can be a follow-up.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]