ziyangRen commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3396470760

   Recently, we have encountered a new issue: In addition to the taskinstance 
calling this code, the scheduler also frequently invokes this code for the 
remove operation. When we have offline a large number of tasks, Airflow gets 
stuck in a situation where it cannot schedule tasks. The scheduler is 
constantly processing the remove operation. We believe this is caused by an 
unreasonable change in the past and should not repeatedly load the dagbag 
within the for loop. Moreover, when the scheduler calls this method, it does 
not need to load the dagbag.Here is the revised code we have made:
   ```
   def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI], 
source='scheduler') -> Iterable[TI]:
       """Populate ``ti.task`` while excluding those missing one, marking them 
as REMOVED."""
       cached_latest_dag = None
       removed_tis = []
       for ti in tis:
           try:
               ti.task = dag.get_task(ti.task_id)
           except TaskNotFound:
               if source == 'scheduler' and ti.state != 
TaskInstanceState.REMOVED:
                   #scheduler循环触发的频率极高,不需要更新最新dagbag,按照源码逻辑直接remove即可
                   self.log.error("Failed to get task for ti %s. Marking it as 
removed.", ti)
                   ti.state = TaskInstanceState.REMOVED
                   session.flush()
               elif source == 'task_instance':
                   # task_instance是任务级别触发,需要更新最新dagbag
                   # 尝试通过DagBag更新dag后再重试,只加载一次DagBag实例
                   try:
                       if cached_latest_dag is None:
                           self.log.info("Tasks not found, reloading DAG: %s", 
self.dag_id)
                           from airflow.models.dagbag import DagBag
                           cached_latest_dag = 
DagBag(read_dags_from_db=True).get_dag(self.dag_id, session=session)
                       self.log.debug("tasks not found appeared, dag_id= %s, 
task_id=%s"  % (self.dag_id, ti.task_id))
                       # 不能直接修改self.dag(会引起session冲突) 而是从数据库获取最新的dag实例,并添加task引用
                       ti.task = cached_latest_dag.get_task(ti.task_id)
                       yield ti
                       self.log.debug("Task %s found in reloaded DAG." % ti)
                   except TaskNotFound:
                       if ti.state != TaskInstanceState.REMOVED:
                           self.log.error("Failed to get task for ti %s from 
TaskInstance. Marking it as removed.", ti)
                           ti.state = TaskInstanceState.REMOVED
                           removed_tis.append(ti)
           else:
               yield ti
       # 循环结束后,统一刷新会话数据到数据库
       if removed_tis:
           self.log.error("Marking %d tasks as REMOVED", len(removed_tis))
           session.flush()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to