[
https://issues.apache.org/jira/browse/AIRFLOW-6085?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Charlie Briggs updated AIRFLOW-6085:
------------------------------------
Description:
We have observed an issue when the Airflow scheduler attempts to load a dag in
order to handle the failure and the imported dependencies of the DAG have
changed on disk since the scheduler started.
It seems that this block of code:
[https://github.com/apache/airflow/blob/1.10.6/airflow/jobs/scheduler_job.py#L1266-L1271|https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]
attempts to reload the dag file from disk. The loading code clears the module
cache for the filename of the DAG
([https://github.com/apache/airflow/blob/1.10.6/airflow/models/dagbag.py#L199-L200)]
and only the filename of the DAG. This means that if new imports have been
added to the dag file from an existing module which did not exist at the time
of initial load, the loading fails and the dag is never retried or rescheduled.
The expectation would be that the DAG would fully reload from disk, including
dependencies.
Note that in this issue I'm not sure why the task instance failed initially,
but this seems inconsequential to the error handling failure.
h3. Imports Example
BEFORE
/dags/dag.py
{code:java}
from library.util.cron import DAILY
...
{code}
/dags/library/util/cron.py
{code:java}
DAILY = "@daily"
HOURLY = "@hourly
{code}
AFTER
/dags/dag.py
{code:java}
from library.util.cron import DAILY, HOURLY
...
{code}
/dags/library/util/cron.py
{code:java}
DAILY = "@daily"
HOURLY = "@hourly
{code}
this will fail in the above scenario as /dags/library/util/cron.py is not
reloaded, so the import for {{HOURLY}} does not exist.
h3. Stack trace
{noformat}
[2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports
execution of
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger
execution_date=2019-11-26 23:00:00+00:00 exited with status success for
try_number 1
[2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports
task instance <TaskInstance:
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 2019-11-26
23:00:00+00:00 [queued]> finished (success) although the task says its queued.
Was the task killed externally?
[2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from
/usr/local/airflow/dags/dbt/dbt_analytics.py
[2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import:
/usr/local/airflow/dags/dbt/dbt_analytics.py
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line
204, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
from library.util.cron import HOURLY, DAILY
ImportError: cannot import name 'HOURLY' from 'library.util.cron'
(/usr/local/airflow/dags/library/util/cron.py)
[2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import:
/usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
was:
We have observed an issue when the Airflow scheduler attempts to load a dag in
order to handle the failure and the imported dependencies of the DAG have
changed on disk since the scheduler started.
It seems that this block of code:
[https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]
attempts to reload the dag file from disk. The loading code clears the module
cache for the filename of the DAG and only the filename of the DAG. This means
that if new imports have been added to the dag file from an existing module
which did not exist at the time of initial load, the loading fails and the dag
is never retried or rescheduled.
The expectation would be that the DAG would fully reload from disk, including
dependencies.
Note that in this issue I'm not sure why the task instance failed initially,
but this seems inconsequential to the error handling failure.
h3. Imports Example
BEFORE
/dags/dag.py
{code:java}
from library.util.cron import DAILY
...
{code}
/dags/library/util/cron.py
{code:java}
DAILY = "@daily"
HOURLY = "@hourly
{code}
AFTER
/dags/dag.py
{code:java}
from library.util.cron import DAILY, HOURLY
...
{code}
/dags/library/util/cron.py
{code:java}
DAILY = "@daily"
HOURLY = "@hourly
{code}
this will fail in the above scenario as /dags/library/util/cron.py is not
reloaded, so the import for {{HOURLY}} does not exist.
h3. Stack trace
{noformat}
[2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports
execution of
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger
execution_date=2019-11-26 23:00:00+00:00 exited with status success for
try_number 1
[2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports
task instance <TaskInstance:
dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger 2019-11-26
23:00:00+00:00 [queued]> finished (success) although the task says its queued.
Was the task killed externally?
[2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from
/usr/local/airflow/dags/dbt/dbt_analytics.py
[2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import:
/usr/local/airflow/dags/dbt/dbt_analytics.py
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line
204, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 696, in _load
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 728, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
from library.util.cron import HOURLY, DAILY
ImportError: cannot import name 'HOURLY' from 'library.util.cron'
(/usr/local/airflow/dags/library/util/cron.py)
[2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import:
/usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
> Scheduler fails to reload dags on error if imports have changed
> ---------------------------------------------------------------
>
> Key: AIRFLOW-6085
> URL: https://issues.apache.org/jira/browse/AIRFLOW-6085
> Project: Apache Airflow
> Issue Type: Bug
> Components: scheduler
> Affects Versions: 1.10.6
> Reporter: Charlie Briggs
> Priority: Minor
>
> We have observed an issue when the Airflow scheduler attempts to load a dag
> in order to handle the failure and the imported dependencies of the DAG have
> changed on disk since the scheduler started.
>
> It seems that this block of code:
> [https://github.com/apache/airflow/blob/1.10.6/airflow/jobs/scheduler_job.py#L1266-L1271|https://github.com/apache/airflow/blob/3ac5270550eb0453c5e3f9477ad607f79828bc78/airflow/jobs/scheduler_job.py#L1266-L1271]
> attempts to reload the dag file from disk. The loading code clears the module
> cache for the filename of the DAG
> ([https://github.com/apache/airflow/blob/1.10.6/airflow/models/dagbag.py#L199-L200)]
> and only the filename of the DAG. This means that if new imports have been
> added to the dag file from an existing module which did not exist at the time
> of initial load, the loading fails and the dag is never retried or
> rescheduled.
> The expectation would be that the DAG would fully reload from disk, including
> dependencies.
> Note that in this issue I'm not sure why the task instance failed initially,
> but this seems inconsequential to the error handling failure.
> h3. Imports Example
> BEFORE
> /dags/dag.py
> {code:java}
> from library.util.cron import DAILY
> ...
> {code}
> /dags/library/util/cron.py
>
> {code:java}
> DAILY = "@daily"
> HOURLY = "@hourly
> {code}
>
> AFTER
>
> /dags/dag.py
> {code:java}
> from library.util.cron import DAILY, HOURLY
> ...
> {code}
> /dags/library/util/cron.py
>
> {code:java}
> DAILY = "@daily"
> HOURLY = "@hourly
> {code}
> this will fail in the above scenario as /dags/library/util/cron.py is not
> reloaded, so the import for {{HOURLY}} does not exist.
>
> h3. Stack trace
>
> {noformat}
> [2019-11-27 00:02:21,801] {{scheduler_job.py:1288}} INFO - Executor reports
> execution of
> dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger
> execution_date=2019-11-26 23:00:00+00:00 exited with status success for
> try_number 1
> [2019-11-27 00:02:21,806] {{scheduler_job.py:1304}} ERROR - Executor reports
> task instance <TaskInstance:
> dbt-run-analytics-incremental-v3.wait_for_nla_reporting_print-ledger
> 2019-11-26 23:00:00+00:00 [queued]> finished (success) although the task says
> its queued. Was the task killed externally?
> [2019-11-27 00:02:21,806] {{dagbag.py:92}} INFO - Filling up the DagBag from
> /usr/local/airflow/dags/dbt/dbt_analytics.py
> [2019-11-27 00:02:21,808] {{dagbag.py:207}} ERROR - Failed to import:
> /usr/local/airflow/dags/dbt/dbt_analytics.py
> Traceback (most recent call last):
> File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py",
> line 204, in process_file
> m = imp.load_source(mod_name, filepath)
> File "/usr/local/lib/python3.7/imp.py", line 171, in load_source
> module = _load(spec)
> File "<frozen importlib._bootstrap>", line 696, in _load
> File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
> File "<frozen importlib._bootstrap_external>", line 728, in exec_module
> File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
> File "/usr/local/airflow/dags/dbt/dbt_analytics.py", line 30, in <module>
> from library.util.cron import HOURLY, DAILY
> ImportError: cannot import name 'HOURLY' from 'library.util.cron'
> (/usr/local/airflow/dags/library/util/cron.py)
> [2019-11-27 00:02:21,814] {{dagbag.py:207}} ERROR - Failed to import:
> /usr/local/airflow/dags/dbt/dbt_analytics.py{noformat}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)