[ 
https://issues.apache.org/jira/browse/AIRFLOW-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Charlie Briggs updated AIRFLOW-6085:
------------------------------------
    Description: 
We have observed an issue when the Airflow scheduler attempts to load a dag in 
order to handle the failure and the imported dependencies of the DAG have 
changed on disk since the scheduler started.

 

It seems that this block of code:

[https://github.com/apache/airflow/blob/1.10.6/airflow/jobs/scheduler_job.py#L1266-L1271|https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]

attempts to reload the dag file from disk. The loading code clears the module 
cache for the filename of the DAG 
([https://github.com/apache/airflow/blob/1.10.6/airflow/models/dagbag.py#L199-L200)]
 and only the filename of the DAG. This means that if new imports have been 
added to the dag file from an existing module which did not exist at the time 
of initial load, the loading fails and the dag is never retried or rescheduled.

The expectation would be that the DAG would fully reload from disk, including 
dependencies.

Note that in this issue I'm not sure why the task instance failed initially, 
but this seems inconsequential to the error handling failure.
h3. Imports Example

BEFORE

/dags/dag.py
{code:java}
from library.util.cron import DAILY

...
{code}
/dags/library/util/cron.py

 
{code:java}
DAILY = "@daily"

HOURLY = "@hourly
{code}
 

AFTER

 

/dags/dag.py
{code:java}
from library.util.cron import DAILY, HOURLY

...
{code}
/dags/library/util/cron.py

 
{code:java}
DAILY = "@daily"

HOURLY = "@hourly


{code}
 this will fail in the above scenario as /dags/library/util/cron.py is not 
reloaded, so the import for {{HOURLY}} does not exist.

 
h3. Stack trace

 
{noformat}
[2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports 
execution of 
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 
execution_date=2019-11-26 23:00:00+00:00 exited with status success for 
try_number 1
[2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports 
task instance <TaskInstance: 
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 2019-11-26 
23:00:00+00:00 [queued]> finished (success) although the task says its queued. 
Was the task killed externally?
[2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/dbt/dbt_analytics.py
[2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import: 
/usr/local/airflow/dags/dbt/dbt_analytics.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 
204, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
    from library.util.cron import HOURLY, DAILY
ImportError: cannot import name 'HOURLY' from 'library.util.cron' 
(/usr/local/airflow/dags/library/util/cron.py)
[2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import: 
/usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
 

  was:
We have observed an issue when the Airflow scheduler attempts to load a dag in 
order to handle the failure and the imported dependencies of the DAG have 
changed on disk since the scheduler started.

 

It seems that this block of code:

[https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]

attempts to reload the dag file from disk. The loading code clears the module 
cache for the filename of the DAG and only the filename of the DAG. This means 
that if new imports have been added to the dag file from an existing module 
which did not exist at the time of initial load, the loading fails and the dag 
is never retried or rescheduled.

The expectation would be that the DAG would fully reload from disk, including 
dependencies.

Note that in this issue I'm not sure why the task instance failed initially, 
but this seems inconsequential to the error handling failure.
h3. Imports Example

BEFORE

/dags/dag.py
{code:java}
from library.util.cron import DAILY

...
{code}
/dags/library/util/cron.py

 
{code:java}
DAILY = "@daily"

HOURLY = "@hourly
{code}
 

AFTER

 

/dags/dag.py
{code:java}
from library.util.cron import DAILY, HOURLY

...
{code}
/dags/library/util/cron.py

 
{code:java}
DAILY = "@daily"

HOURLY = "@hourly


{code}
 this will fail in the above scenario as /dags/library/util/cron.py is not 
reloaded, so the import for {{HOURLY}} does not exist.

 
h3. Stack trace

 
{noformat}
[2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports 
execution of 
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 
execution_date=2019-11-26 23:00:00+00:00 exited with status success for 
try_number 1
[2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports 
task instance <TaskInstance: 
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 2019-11-26 
23:00:00+00:00 [queued]> finished (success) although the task says its queued. 
Was the task killed externally?
[2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from 
/usr/local/airflow/dags/dbt/dbt_analytics.py
[2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import: 
/usr/local/airflow/dags/dbt/dbt_analytics.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 
204, in process_file
    m = imp.load_source(mod_name, filepath)
  File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
    module = _load(spec)
  File "<frozen importlib._bootstrap>", line 696, in _load
  File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 728, in exec_module
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
    from library.util.cron import HOURLY, DAILY
ImportError: cannot import name 'HOURLY' from 'library.util.cron' 
(/usr/local/airflow/dags/library/util/cron.py)
[2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import: 
/usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
 


> Scheduler fails to reload dags on error if imports have changed
> ---------------------------------------------------------------
>
>                 Key: AIRFLOW-6085
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-6085
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: scheduler
>    Affects Versions: 1.10.6
>            Reporter: Charlie Briggs
>            Priority: Minor
>
> We have observed an issue when the Airflow scheduler attempts to load a dag 
> in order to handle the failure and the imported dependencies of the DAG have 
> changed on disk since the scheduler started.
>  
> It seems that this block of code:
> [https://github.com/apache/airflow/blob/1.10.6/airflow/jobs/scheduler_job.py#L1266-L1271|https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]
> attempts to reload the dag file from disk. The loading code clears the module 
> cache for the filename of the DAG 
> ([https://github.com/apache/airflow/blob/1.10.6/airflow/models/dagbag.py#L199-L200)]
>  and only the filename of the DAG. This means that if new imports have been 
> added to the dag file from an existing module which did not exist at the time 
> of initial load, the loading fails and the dag is never retried or 
> rescheduled.
> The expectation would be that the DAG would fully reload from disk, including 
> dependencies.
> Note that in this issue I'm not sure why the task instance failed initially, 
> but this seems inconsequential to the error handling failure.
> h3. Imports Example
> BEFORE
> /dags/dag.py
> {code:java}
> from library.util.cron import DAILY
> ...
> {code}
> /dags/library/util/cron.py
>  
> {code:java}
> DAILY = "@daily"
> HOURLY = "@hourly
> {code}
>  
> AFTER
>  
> /dags/dag.py
> {code:java}
> from library.util.cron import DAILY, HOURLY
> ...
> {code}
> /dags/library/util/cron.py
>  
> {code:java}
> DAILY = "@daily"
> HOURLY = "@hourly
> {code}
>  this will fail in the above scenario as /dags/library/util/cron.py is not 
> reloaded, so the import for {{HOURLY}} does not exist.
>  
> h3. Stack trace
>  
> {noformat}
> [2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports 
> execution of 
> dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 
> execution_date=2019-11-26 23:00:00+00:00 exited with status success for 
> try_number 1
> [2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports 
> task instance <TaskInstance: 
> dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 
> 2019-11-26 23:00:00+00:00 [queued]> finished (success) although the task says 
> its queued. Was the task killed externally?
> [2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from 
> /usr/local/airflow/dags/dbt/dbt_analytics.py
> [2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import: 
> /usr/local/airflow/dags/dbt/dbt_analytics.py
> Traceback (most recent call last):
>   File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", 
> line 204, in process_file
>     m = imp.load_source(mod_name, filepath)
>   File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
>     module = _load(spec)
>   File "<frozen importlib._bootstrap>", line 696, in _load
>   File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
>   File "<frozen importlib._bootstrap_external>", line 728, in exec_module
>   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
>   File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
>     from library.util.cron import HOURLY, DAILY
> ImportError: cannot import name 'HOURLY' from 'library.util.cron' 
> (/usr/local/airflow/dags/library/util/cron.py)
> [2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import: 
> /usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to