[
https://issues.apache.org/jira/browse/AIRFLOW-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kaxil Naik reassigned AIRFLOW-6085:
-----------------------------------
Assignee: Kaxil Naik
> Scheduler fails to reload dags on error if imports have changed
> ---------------------------------------------------------------
>
> Key: AIRFLOW-6085
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6085
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.10.6
> Reporter: Charlie Briggs
> Assignee: Kaxil Naik
> Priority: Minor
>
> We have observed an issue when the Airflow scheduler attempts to load a dag
> in order to handle the failure and the imported dependencies of the DAG have
> changed on disk since the scheduler started.
>
> It seems that this block of code:
> [https://github.com/apache/airflow/blob/1.10.6/airflow/jobs/scheduler_job.py#L1266-L1271|https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]
> attempts to reload the dag file from disk. The loading code clears the module
> cache for the filename of the DAG
> ([https://github.com/apache/airflow/blob/1.10.6/airflow/models/dagbag.py#L199-L200)]
> and only the filename of the DAG. This means that if new imports have been
> added to the dag file from an existing module which did not exist at the time
> of initial load, the loading fails and the dag is never retried or
> rescheduled.
> The expectation would be that the DAG would fully reload from disk, including
> dependencies.
> Note that in this issue I'm not sure why the task instance failed initially,
> but this seems inconsequential to the error handling failure.
> h3. Imports Example
> BEFORE
> /dags/dag.py
> {code:java}
> from library.util.cron import DAILY
> ...
> {code}
> /dags/library/util/cron.py
>
> {code:java}
> DAILY = "@daily"
> HOURLY = "@hourly
> {code}
>
> AFTER
>
> /dags/dag.py
> {code:java}
> from library.util.cron import DAILY, HOURLY
> ...
> {code}
> /dags/library/util/cron.py
>
> {code:java}
> DAILY = "@daily"
> HOURLY = "@hourly
> {code}
> this will fail in the above scenario as /dags/library/util/cron.py is not
> reloaded, so the import for {{HOURLY}} does not exist.
>
> h3. Stack trace
>
> {noformat}
> [2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports
> execution of
> dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger
> execution_date=2019-11-26 23:00:00+00:00 exited with status success for
> try_number 1
> [2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports
> task instance <TaskInstance:
> dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger
> 2019-11-26 23:00:00+00:00 [queued]> finished (success) although the task says
> its queued. Was the task killed externally?
> [2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/dbt/dbt_analytics.py
> [2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import:
> /usr/local/airflow/dags/dbt/dbt_analytics.py
> Traceback (most recent call last):
> File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py",
> line 204, in process_file
> m = imp.load_source(mod_name, filepath)
> File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
> module = _load(spec)
> File "<frozen importlib._bootstrap>", line 696, in _load
> File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
> File "<frozen importlib._bootstrap_external>", line 728, in exec_module
> File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
> File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
> from library.util.cron import HOURLY, DAILY
> ImportError: cannot import name 'HOURLY' from 'library.util.cron'
> (/usr/local/airflow/dags/library/util/cron.py)
> [2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import:
> /usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)