tanvn commented on issue #26623:
URL: https://github.com/apache/airflow/issues/26623#issuecomment-1300679762

   Hi, I upgraded from 2.1.4 to 2.4.2 and got a similar error to this.
   I have set `AIRFLOW__CORE__DAGS_FOLDER` env var but it does not seem to work.
   We are using KubernetesExecutor and helm-chart 1.7.0 with Airflow 2.4.2
   The log in the worker pod is as below:
   ```
   [2022-11-02T07:24:45.316+0000] {dagbag.py:537} INFO - Filling up the DagBag 
from /opt/airflow/dags/prod_dynamic.py
   ....
   [2022-11-02T07:24:47.477+0000] {cli.py:225} WARNING - Dag 
'daily_client_updater' not found in path /opt/airflow/dags/prod_dynamic.py; 
trying path /opt/airflow/dags/prod_dynamic.py
   [2022-11-02T07:24:47.478+0000] {dagbag.py:537} INFO - Filling up the DagBag 
from /opt/airflow/dags/prod_dynamic.py
   
   Traceback (most recent call last):
     File "/opt/app-root/bin/airflow", line 8, in <module>
       sys.exit(main())
     File "/opt/app-root/lib64/python3.8/site-packages/airflow/__main__.py", 
line 39, in main
       args.func(args)
     File 
"/opt/app-root/lib64/python3.8/site-packages/airflow/cli/cli_parser.py", line 
52, in command
       return func(*args, **kwargs)
     File "/opt/app-root/lib64/python3.8/site-packages/airflow/utils/cli.py", 
line 103, in wrapper
       return f(*args, **kwargs)
     File 
"/opt/app-root/lib64/python3.8/site-packages/airflow/cli/commands/task_command.py",
 line 366, in task_run
       dag = get_dag(args.subdir, args.dag_id, include_examples=False)
     File "/opt/app-root/lib64/python3.8/site-packages/airflow/utils/cli.py", 
line 228, in get_dag
       raise AirflowException(
   airflow.exceptions.AirflowException: Dag 'daily_client_updater' could not be 
found; either it does not exist or it failed to parse.
   ```
   
   Here `prod_dynamic.py` is where our DAG(s) are generated dynamically, it is 
something like:
   ```
   def create_dag(dag_id, service_name, conf):
       dag = DAG(
           dag_id,
           description=conf["description"],
           schedule_interval=conf["schedule"],
           default_args=default_args,
           user_defined_filters=dict(
               hash=lambda str: hashlib.sha256(str.encode("utf-8")).hexdigest()
           ),
       )
       ...
   
       return dag
   
   path_base = os.getenv('DAG_CONF_PATH')
   for service_name in os.listdir(path_base):
       path_service = "{}/{}/conf".format(path_base, service_name)
   
       for dag_name in os.listdir(path_service):
           conf = {}
           ...
           # Create DAG
           dag_id = "{}__{}".format(service_name, conf["name"])
           globals()[dag_id] = create_dag(dag_id, service_name, conf)
   ```
   I have around 15 DAG(s) in my test environment and this bug happens randomly 
on random DAG(s).
   And often after some retries, somehow the exception does not happen on the 
newly-created worker.
   Also, if I re-deploy the service (by executing `helm upgrade` it will fix 
the error too)
   However I find this kind of behavior is quite dangerous as it makes the 
execution of the task unstable (we have important tasks that are required to 
finish in time).
   
   Any help or information that you can provide will be greatly appreciated. 🙇 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to