[ https://issues.apache.org/jira/browse/AIRFLOW-6497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17010303#comment-17010303 ]
Qian Yu commented on AIRFLOW-6497: ---------------------------------- [~kamil.bregula] Do you want to create a PR? Or if you can give more details on how it should be done, I can give it a try too. > Scheduler creates DagBag in the same process with outdated info > --------------------------------------------------------------- > > Key: AIRFLOW-6497 > URL: https://issues.apache.org/jira/browse/AIRFLOW-6497 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler > Affects Versions: 1.10.7 > Reporter: Qian Yu > Priority: Major > > The following code in scheduler_job.py seems to be called in the same process > as the scheduler. It creates a DagBag. But since scheduler is a long running > process, it does not pick up the latest changes made to DAGs. For example, > changes to retries count, on_failure_callback, newly added tasks, etc are not > reflected. > > {code:python} > if ti.try_number == try_number and ti.state == State.QUEUED: > msg = ("Executor reports task instance {} finished ({}) " > "although the task says its {}. Was the task " > "killed externally?".format(ti, state, ti.state)) > Stats.incr('scheduler.tasks.killed_externally') > self.log.error(msg) > try: > simple_dag = simple_dag_bag.get_dag(dag_id) > dagbag = models.DagBag(simple_dag.full_filepath) > dag = dagbag.get_dag(dag_id) > ti.task = dag.get_task(task_id) > ti.handle_failure(msg) > except Exception: > self.log.error("Cannot load the dag bag to handle > failure for %s" > ". Setting task to FAILED without > callbacks or " > "retries. Do you have enough > resources?", ti) > ti.state = State.FAILED > session.merge(ti) > session.commit() > {code} > This causes errors such as AttributeError due to stale code being hit. E.g. > when someone added a .join attribute to CustomOperator without bouncing the > scheduler, this is what he would get after a CeleryWorker timeout error > causes this line to be hit: > {code} > [2020-01-05 22:25:45,951] {dagbag.py:207} ERROR - Failed to import: > /dags/dag1.py > Traceback (most recent call last): > File "/lib/python3.6/site-packages/airflow/models/dagbag.py", line 204, in > process_file > m = imp.load_source(mod_name, filepath) > File "/usr/lib/python3.6/imp.py", line 172, in load_source > module = _load(spec) > File "<frozen importlib._bootstrap>", line 684, in _load > File "<frozen importlib._bootstrap>", line 665, in _load_unlocked > File "<frozen importlib._bootstrap_external>", line 678, in exec_module > File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed > File "/dags/dag1.py", line 280, in <module> > task1 >> task2.join > AttributeError: 'CustomOperator' object has no attribute 'join' > [2020-01-05 22:25:45,951] {scheduler_job.py:1314} ERROR - Cannot load the dag > bag to handle failure for <TaskInstance: dag1.task1 2020-01-02 00:00:00+00:00 > [queued]>. Setting task to FAILED without callbacks or retries. Do you have > enough resou > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)