kaxil commented on code in PR #62878:
URL: https://github.com/apache/airflow/pull/62878#discussion_r2934018869


##########
airflow-core/src/airflow/jobs/scheduler_job_runner.py:
##########
@@ -770,19 +770,17 @@ def _executable_task_instances_to_queued(self, max_tis: 
int, session: Session) -
                     serialized_dag = self.scheduler_dag_bag.get_dag_for_run(
                         dag_run=task_instance.dag_run, session=session
                     )
-                    # If the dag is missing, fail the task and continue to the 
next task.
+                    # If the dag is transiently missing, skip scheduling it 
this iteration
+                    # and try again next time instead of bulk-failing all 
scheduled tasks.
+                    # See: https://github.com/apache/airflow/issues/62050
                     if not serialized_dag:
-                        self.log.error(
-                            "DAG '%s' for task instance %s not found in 
serialized_dag table",
+                        self.log.warning(

Review Comment:
   When a batch has multiple TIs for the same DAG, each one will hit 
`get_dag_for_run` and log this warning individually before the `starved_dags` 
filter kicks in on the next query iteration. Consider checking `if dag_id in 
starved_dags: continue` before entering the `has_task_concurrency_limits` 
block, both to avoid redundant DB lookups and to reduce log noise.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to