ziyangRen commented on issue #52276:
URL: https://github.com/apache/airflow/issues/52276#issuecomment-3396470760
Recently, we have encountered a new issue: In addition to the taskinstance
calling this code, the scheduler also frequently invokes this code for the
remove operation. When we have offline a large number of tasks, Airflow gets
stuck in a situation where it cannot schedule tasks. The scheduler is
constantly processing the remove operation. We believe this is caused by an
unreasonable change in the past and should not repeatedly load the dagbag
within the for loop. Moreover, when the scheduler calls this method, it does
not need to load the dagbag.Here is the revised code we have made:
```
def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI],
source='scheduler') -> Iterable[TI]:
"""Populate ``ti.task`` while excluding those missing one, marking them
as REMOVED."""
cached_latest_dag = None
removed_tis = []
for ti in tis:
try:
ti.task = dag.get_task(ti.task_id)
except TaskNotFound:
if source == 'scheduler' and ti.state !=
TaskInstanceState.REMOVED:
#scheduler循环触发的频率极高,不需要更新最新dagbag,按照源码逻辑直接remove即可
self.log.error("Failed to get task for ti %s. Marking it as
removed.", ti)
ti.state = TaskInstanceState.REMOVED
session.flush()
elif source == 'task_instance':
# task_instance是任务级别触发,需要更新最新dagbag
# 尝试通过DagBag更新dag后再重试,只加载一次DagBag实例
try:
if cached_latest_dag is None:
self.log.info("Tasks not found, reloading DAG: %s",
self.dag_id)
from airflow.models.dagbag import DagBag
cached_latest_dag =
DagBag(read_dags_from_db=True).get_dag(self.dag_id, session=session)
self.log.debug("tasks not found appeared, dag_id= %s,
task_id=%s" % (self.dag_id, ti.task_id))
# 不能直接修改self.dag(会引起session冲突) 而是从数据库获取最新的dag实例,并添加task引用
ti.task = cached_latest_dag.get_task(ti.task_id)
yield ti
self.log.debug("Task %s found in reloaded DAG." % ti)
except TaskNotFound:
if ti.state != TaskInstanceState.REMOVED:
self.log.error("Failed to get task for ti %s from
TaskInstance. Marking it as removed.", ti)
ti.state = TaskInstanceState.REMOVED
removed_tis.append(ti)
else:
yield ti
# 循环结束后,统一刷新会话数据到数据库
if removed_tis:
self.log.error("Marking %d tasks as REMOVED", len(removed_tis))
session.flush()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]