MatrixManAtYrService opened a new issue #21450:
URL: https://github.com/apache/airflow/issues/21450
### Apache Airflow version
2.2.3 (latest released)
### What happened
I had just kicked off a DAG and I was periodically running `airflow dags
status ...` to see if it was done yet. At first it seemed to work, but later
it failed with this error:
```
$ airflow dags state load_13 '2022-02-09T05:25:28+00:00'
[2022-02-09 05:26:56,493] {dagbag.py:500} INFO - Filling up the DagBag
from /usr/local/airflow/dags
queued
$ airflow dags state load_13 '2022-02-09T05:25:28+00:00'
[2022-02-09 05:27:29,096] {dagbag.py:500} INFO - Filling up the DagBag
from /usr/local/airflow/dags
[2022-02-09 05:27:59,084] {timeout.py:36} ERROR - Process timed out,
PID: 759
[2022-02-09 05:27:59,088] {dagbag.py:334} ERROR - Failed to import:
/usr/local/airflow/dags/many_tasks.py
Traceback (most recent call last):
File
"/usr/local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 331, in
_load_modules_from_file
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 850, in exec_module
File "<frozen importlib._bootstrap>", line 228, in
_call_with_frames_removed
File "/usr/local/airflow/dags/many_tasks.py", line 61, in <module>
globals()["dag_{:02d}".format(i)] = parameterized_load(i, step)
File "/usr/local/airflow/dags/many_tasks.py", line 50, in
parameterized_load
return load()
File "/usr/local/lib/python3.9/site-packages/airflow/models/dag.py",
line 2984, in factory
f(**f_kwargs)
File "/usr/local/airflow/dags/many_tasks.py", line 48, in load
[worker_factory(i) for i in range(1, size**2 + 1)]
File "/usr/local/airflow/dags/many_tasks.py", line 48, in <listcomp>
[worker_factory(i) for i in range(1, size**2 + 1)]
File "/usr/local/airflow/dags/many_tasks.py", line 37, in
worker_factory
return worker(num)
File
"/usr/local/lib/python3.9/site-packages/airflow/decorators/base.py", line 219,
in factory
op = decorated_operator_class(
File
"/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line
188, in apply_defaults
result = func(self, *args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/decorators/python.py", line 59,
in __init__
super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line
152, in apply_defaults
dag_params = copy.deepcopy(dag.params) or {}
File "/usr/local/lib/python3.9/copy.py", line 172, in deepcopy
y = _reconstruct(x, memo, *rv)
File "/usr/local/lib/python3.9/copy.py", line 264, in _reconstruct
y = func(*args)
File "/usr/local/lib/python3.9/copy.py", line 263, in <genexpr>
args = (deepcopy(arg, memo) for arg in args)
File
"/usr/local/lib/python3.9/site-packages/airflow/utils/timeout.py", line 37, in
handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for
/usr/local/airflow/dags/many_tasks.py after 30.0s.
Please take a look at these docs to improve your DAG import time:
*
http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/best-practices.html#top-level-python-code
*
http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/best-practices.html#reducing-dag-complexity,
PID: 759
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 8, in <module>
sys.exit(main())
File "/usr/local/lib/python3.9/site-packages/airflow/__main__.py",
line 48, in main
args.func(args)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in
command
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py",
line 92, in wrapper
return f(*args, **kwargs)
File
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/dag_command.py",
line 241, in dag_state
dag = get_dag(args.subdir, args.dag_id)
File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py",
line 192, in get_dag
raise AirflowException(
airflow.exceptions.AirflowException: Dag 'load_13' could not be found;
either it does not exist or it failed to parse.
```
### What you expected to happen
If we were able to parse the DAG in the first place, I expect that
downstream actions (like querying for status) would not fail due to a dag
parsing timeout.
Also, is parsing the dag necessary for this action?
### How to reproduce
1. start with the dag shown here:
https://gist.github.com/MatrixManAtYrService/842266aac42390aadee75fe014cd372e
2. increase "scale" until `airflow dags list` stop showing the load dags
3. decrease by one and check that they start showing back up
4. trigger a dag run
5. check its status (periodically), eventually the status check will fail
I initially discovered this using the `CeleryExecutor` and a much messier
dag, but once I understood what I was looking for I was able to recreate it
using the dag linked above and `astro dev start`
### Operating System
docker/debian
### Versions of Apache Airflow Providers
n/a
### Deployment
Astronomer
### Deployment details
```
FROM quay.io/astronomer/ap-airflow:2.2.3-onbuild
```
### Anything else
When I was running this via the CeleryExecutor (deployed via helm on a
single-node k8s cluster), I noticed similar dag-parsing timeouts showing up in
the worker logs. I failed to capture them because I didn't yet know what I was
looking for, but if they would be helpful I can recreate that scenario and post
them here.
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]