jscheffl commented on code in PR #62878:
URL: https://github.com/apache/airflow/pull/62878#discussion_r3041764476
##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -763,25 +763,29 @@ def _executable_task_instances_to_queued(self, max_tis:
int, session: Session) -
starved_dags.add(dag_id)
continue
+ if dag_id in starved_dags:
+ continue
+
if task_instance.dag_model.has_task_concurrency_limits:
# Many dags don't have a task_concurrency, so where we can
avoid loading the full
# serialized DAG the better.
serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
dag_run=task_instance.dag_run, session=session
)
- # If the dag is missing, fail the task and continue to the
next task.
+ # If the dag is transiently missing, skip scheduling it
this iteration
+ # and try again next time instead of bulk-failing all
scheduled tasks.
+ # Known limitation: permanently deleted DAGs will stay
SCHEDULED and
+ # trigger this warning every iteration. A follow-up can
track consecutive
+ # misses and escalate to failure after N iterations.
+ # See: https://github.com/apache/airflow/issues/62050
if not serialized_dag:
- self.log.error(
- "DAG '%s' for task instance %s not found in
serialized_dag table",
+ self.log.warning(
Review Comment:
Replicating the comment here such that it is not over-looked:
By reading the code I had exactly the same comment like kaxil earlier.
I think we can not leave it like this, because it would maybe improve some
cases but leave endless loop of scheduled tasks. Therefore I'd suggest to add a
check such that tasks are not endless looped over.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]