Charlie Briggs created AIRFLOW-6085:
---------------------------------------
Summary: Scheduler fails to reload dags on error if imports have
changed
Key: AIRFLOW-6085
URL: https://issues.apache.org/jira/browse/AIRFLOW-6085
Project: Apache Airflow
Issue Type: Bug
Components: scheduler
Affects Versions: 1.10.6
Reporter: Charlie Briggs
We have observed an issue when the Airflow scheduler attempts to load a dag in
order to handle the failure and the imported dependencies of the DAG have
changed on disk since the scheduler started.
It seems that this block of code:
[https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]
attempts to reload the dag file from disk. The loading code clears the module
cache for the filename of the DAG and only the filename of the DAG. This means
that if new imports have been added to the dag file from an existing module
which did not exist at the time of initial load, the loading fails and the dag
is never retried or rescheduled.
The expectation would be that the DAG would fully reload from disk, including
dependencies.
Note that in this issue I'm not sure why the task instance failed initially,
but this seems inconsequential to the error handling failure.
h3. Imports Example
BEFORE
/dags/dag.py
{code:java}
from library.util.cron import DAILY
...
{code}
/dags/library/util/cron.py
{code:java}
DAILY = "@daily"
HOURLY = "@hourly
{code}
AFTER
/dags/dag.py
{code:java}
from library.util.cron import DAILY, HOURLY
...
{code}
/dags/library/util/cron.py
{code:java}
DAILY = "@daily"
HOURLY = "@hourly
{code}
this will fail in the above scenario as /dags/library/util/cron.py is not
reloaded, so the import for {{HOURLY}} does not exist.
h3. Stack trace
{noformat}
[2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports
execution of
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger
execution_date=2019-11-26 23:00:00+00:00 exited with status success for
try_number 1
[2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports
task instance <TaskInstance:
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 2019-11-26
23:00:00+00:00 [queued]> finished (success) although the task says its queued.
Was the task killed externally?
[2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from
/usr/local/airflow/dags/dbt/dbt_analytics.py
[2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import:
/usr/local/airflow/dags/dbt/dbt_analytics.py
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line
204, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
from library.util.cron import HOURLY, DAILY
ImportError: cannot import name 'HOURLY' from 'library.util.cron'
(/usr/local/airflow/dags/library/util/cron.py)
[2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import:
/usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)