MatrixManAtYrService opened a new issue #21450:
URL: https://github.com/apache/airflow/issues/21450


   ### Apache Airflow version
   
   2.2.3 (latest released)
   
   ### What happened
   
   I had just kicked off a DAG and I was periodically running `airflow dags 
status ...` to see if it was done yet.  At first it seemed to work, but later 
it failed with this error:
   
   ```
   $ airflow dags state load_13 '2022-02-09T05:25:28+00:00'
   
       [2022-02-09 05:26:56,493] {dagbag.py:500} INFO - Filling up the DagBag 
from /usr/local/airflow/dags
       queued
   
   $ airflow dags state load_13 '2022-02-09T05:25:28+00:00'
   
       [2022-02-09 05:27:29,096] {dagbag.py:500} INFO - Filling up the DagBag 
from /usr/local/airflow/dags
       [2022-02-09 05:27:59,084] {timeout.py:36} ERROR - Process timed out, 
PID: 759
       [2022-02-09 05:27:59,088] {dagbag.py:334} ERROR - Failed to import: 
/usr/local/airflow/dags/many_tasks.py
       Traceback (most recent call last):
         File 
"/usr/local/lib/python3.9/site-packages/airflow/models/dagbag.py", line 331, in 
_load_modules_from_file
           loader.exec_module(new_module)
         File "<frozen importlib._bootstrap_external>", line 850, in exec_module
         File "<frozen importlib._bootstrap>", line 228, in 
_call_with_frames_removed
         File "/usr/local/airflow/dags/many_tasks.py", line 61, in <module>
           globals()["dag_{:02d}".format(i)] = parameterized_load(i, step)
         File "/usr/local/airflow/dags/many_tasks.py", line 50, in 
parameterized_load
           return load()
         File "/usr/local/lib/python3.9/site-packages/airflow/models/dag.py", 
line 2984, in factory
           f(**f_kwargs)
         File "/usr/local/airflow/dags/many_tasks.py", line 48, in load
           [worker_factory(i) for i in range(1, size**2 + 1)]
         File "/usr/local/airflow/dags/many_tasks.py", line 48, in <listcomp>
           [worker_factory(i) for i in range(1, size**2 + 1)]
         File "/usr/local/airflow/dags/many_tasks.py", line 37, in 
worker_factory
           return worker(num)
         File 
"/usr/local/lib/python3.9/site-packages/airflow/decorators/base.py", line 219, 
in factory
           op = decorated_operator_class(
         File 
"/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 
188, in apply_defaults
           result = func(self, *args, **kwargs)
         File 
"/usr/local/lib/python3.9/site-packages/airflow/decorators/python.py", line 59, 
in __init__
           super().__init__(kwargs_to_upstream=kwargs_to_upstream, **kwargs)
         File 
"/usr/local/lib/python3.9/site-packages/airflow/models/baseoperator.py", line 
152, in apply_defaults
           dag_params = copy.deepcopy(dag.params) or {}
         File "/usr/local/lib/python3.9/copy.py", line 172, in deepcopy
           y = _reconstruct(x, memo, *rv)
         File "/usr/local/lib/python3.9/copy.py", line 264, in _reconstruct
           y = func(*args)
         File "/usr/local/lib/python3.9/copy.py", line 263, in <genexpr>
           args = (deepcopy(arg, memo) for arg in args)
         File 
"/usr/local/lib/python3.9/site-packages/airflow/utils/timeout.py", line 37, in 
handle_timeout
           raise AirflowTaskTimeout(self.error_message)
       airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for 
/usr/local/airflow/dags/many_tasks.py after 30.0s.
       Please take a look at these docs to improve your DAG import time:
       * 
http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/best-practices.html#top-level-python-code
       * 
http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com/docs/apache-airflow/latest/best-practices.html#reducing-dag-complexity,
 PID: 759
       Traceback (most recent call last):
         File "/usr/local/bin/airflow", line 8, in <module>
           sys.exit(main())
         File "/usr/local/lib/python3.9/site-packages/airflow/__main__.py", 
line 48, in main
           args.func(args)
         File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in 
command
           return func(*args, **kwargs)
         File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 92, in wrapper
           return f(*args, **kwargs)
         File 
"/usr/local/lib/python3.9/site-packages/airflow/cli/commands/dag_command.py", 
line 241, in dag_state
           dag = get_dag(args.subdir, args.dag_id)
         File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", 
line 192, in get_dag
           raise AirflowException(
       airflow.exceptions.AirflowException: Dag 'load_13' could not be found; 
either it does not exist or it failed to parse.
   ```
   
   ### What you expected to happen
   
   If we were able to parse the DAG in the first place, I expect that 
downstream actions (like querying for status) would not fail due to a dag 
parsing timeout.
   
   Also, is parsing the dag necessary for this action?
   
   ### How to reproduce
   
   1. start with the dag shown here: 
https://gist.github.com/MatrixManAtYrService/842266aac42390aadee75fe014cd372e
   2. increase "scale" until `airflow dags list` stop showing the load dags
   3. decrease by one and check that they start showing back up
   4. trigger a dag run
   5. check its status (periodically), eventually the status check will fail
   
   I initially discovered this using the `CeleryExecutor` and a much messier 
dag, but once I understood what I was looking for I was able to recreate it 
using the dag linked above and `astro dev start`
   
   ### Operating System
   
   docker/debian
   
   ### Versions of Apache Airflow Providers
   
   n/a
   
   ### Deployment
   
   Astronomer
   
   ### Deployment details
   
   ```
   FROM quay.io/astronomer/ap-airflow:2.2.3-onbuild
   ```
   
   ### Anything else
   
   When I was running this via the CeleryExecutor (deployed via helm on a 
single-node k8s cluster), I noticed similar dag-parsing timeouts showing up in 
the worker logs.  I failed to capture them because I didn't yet know what I was 
looking for, but if they would be helpful I can recreate that scenario and post 
them here.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to