[
https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17009597#comment-17009597
]
Kamil Bregula commented on AIRFLOW-6497:
----------------------------------------
I saw it yesterday. We will have to pass this event to a separate process, just
like the other DAG parsing event.
> Scheduler creates DagBag in the same process with outdated info
> ---------------------------------------------------------------
>
> Key: AIRFLOW-6497
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6497
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.10.7
> Reporter: Qian Yu
> Priority: Major
>
> The following code in scheduler_job.py seems to be called in the same process
> as the scheduler. It creates a DagBag. But since scheduler is a long running
> process, it does not pick up the latest changes made to DAGs. For example,
> changes to retries count, on_failure_callback, newly added tasks, etc are not
> reflected.
>
> {code:python}
> if ti.try_number == try_number and ti.state == State.QUEUED:
> msg = ("Executor reports task instance {} finished ({}) "
> "although the task says its {}. Was the task "
> "killed externally?".format(ti, state, ti.state))
> Stats.incr('scheduler.tasks.killed_externally')
> self.log.error(msg)
> try:
> simple_dag = simple_dag_bag.get_dag(dag_id)
> dagbag = models.DagBag(simple_dag.full_filepath)
> dag = dagbag.get_dag(dag_id)
> ti.task = dag.get_task(task_id)
> ti.handle_failure(msg)
> except Exception:
> self.log.error("Cannot load the dag bag to handle
> failure for %s"
> ". Setting task to FAILED without
> callbacks or "
> "retries. Do you have enough
> resources?", ti)
> ti.state = State.FAILED
> session.merge(ti)
> session.commit()
> {code}
> This causes errors such as AttributeError due to stale code being hit. E.g.
> when someone added a .join attribute to CustomOperator without bouncing the
> scheduler, this is what he would get after a CeleryWorker timeout error
> causes this line to be hit:
> {code}
> [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import:
> /dags/dag1.py
> Traceback (most recent call last):
> File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in
> process_file
> m = imp.load_source(mod_name, filepath)
> File "/usr/lib/python3.6/imp.py", line 172, in load_source
> module = _load(spec)
> File "<frozen importlib._bootstrap>", line 684, in _load
> File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
> File "<frozen importlib._bootstrap_external>", line 678, in exec_module
> File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
> File "/dags/dag1.py", line 280, in <module>
> task1 >> task2.join
> AttributeError: 'CustomOperator' object has no attribute 'join'
> [2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag
> bag to handle failure for <TaskInstance: dag1.task1 2020-01-02 00:00:00+00:00
> [queued]>. Setting task to FAILED without callbacks or retries. Do you have
> enough resou
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)