Charlie Briggs created AIRFLOW-6085:
---------------------------------------

             Summary: Scheduler fails to reload dags on error if imports have 
changed
                 Key: AIRFLOW-6085
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6085
             Project: Apache Airflow
          Issue Type: Bug
          Components: scheduler
    Affects Versions: 1.10.6
            Reporter: Charlie Briggs


We have observed an issue when the Airflow scheduler attempts to load a dag in 
order to handle the failure and the imported dependencies of the DAG have 
changed on disk since the scheduler started.

 

It seems that this block of code:

[https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]

attempts to reload the dag file from disk. The loading code clears the module 
cache for the filename of the DAG and only the filename of the DAG. This means 
that if new imports have been added to the dag file from an existing module 
which did not exist at the time of initial load, the loading fails and the dag 
is never retried or rescheduled.

The expectation would be that the DAG would fully reload from disk, including 
dependencies.

Note that in this issue I'm not sure why the task instance failed initially, 
but this seems inconsequential to the error handling failure.
h3. Imports Example

BEFORE

/dags/dag.py
{code:java}
from library.util.cron import DAILY

...
{code}
/dags/library/util/cron.py

 
{code:java}
DAILY = "@daily"

HOURLY = "@hourly
{code}
 

AFTER

 

/dags/dag.py
{code:java}
from library.util.cron import DAILY, HOURLY

...
{code}
/dags/library/util/cron.py

 
{code:java}
DAILY = "@daily"

HOURLY = "@hourly


{code}
 this will fail in the above scenario as /dags/library/util/cron.py is not 
reloaded, so the import for {{HOURLY}} does not exist.

 
h3. Stack trace

 
{noformat}
[2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports 
execution of 
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 
execution_date=2019-11-26 23:00:00+00:00 exited with status success for 
try_number 1
[2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports 
task instance <TaskInstance: 
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 2019-11-26 
23:00:00+00:00 [queued]> finished (success) although the task says its queued. 
Was the task killed externally?
[2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/dbt/dbt_analytics.py
[2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import: 
/usr/local/airflow/dags/dbt/dbt_analytics.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 
204, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
    from library.util.cron import HOURLY, DAILY
ImportError: cannot import name 'HOURLY' from 'library.util.cron' 
(/usr/local/airflow/dags/library/util/cron.py)
[2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import: 
/usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to