tanvn commented on issue #26623:
URL: https://github.com/apache/airflow/issues/26623#issuecomment-1300679762
Hi, I upgraded from 2.1.4 to 2.4.2 and got a similar error to this.
I have set `AIRFLOW__CORE__DAGS_FOLDER` env var but it does not seem to work.
We are using KubernetesExecutor and helm-chart 1.7.0 with Airflow 2.4.2
The log in the worker pod is as below:
```
[2022-11-02T07:24:45.316+0000] {dagbag.py:537} INFO - Filling up the DagBag
from /opt/airflow/dags/prod_dynamic.py
....
[2022-11-02T07:24:47.477+0000] {cli.py:225} WARNING - Dag
'daily_client_updater' not found in path /opt/airflow/dags/prod_dynamic.py;
trying path /opt/airflow/dags/prod_dynamic.py
[2022-11-02T07:24:47.478+0000] {dagbag.py:537} INFO - Filling up the DagBag
from /opt/airflow/dags/prod_dynamic.py
Traceback (most recent call last):
File "/opt/app-root/bin/airflow", line 8, in <module>
sys.exit(main())
File "/opt/app-root/lib64/python3.8/site-packages/airflow/__main__.py",
line 39, in main
args.func(args)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/cli/cli_parser.py", line
52, in command
return func(*args, **kwargs)
File "/opt/app-root/lib64/python3.8/site-packages/airflow/utils/cli.py",
line 103, in wrapper
return f(*args, **kwargs)
File
"/opt/app-root/lib64/python3.8/site-packages/airflow/cli/commands/task_command.py",
line 366, in task_run
dag = get_dag(args.subdir, args.dag_id, include_examples=False)
File "/opt/app-root/lib64/python3.8/site-packages/airflow/utils/cli.py",
line 228, in get_dag
raise AirflowException(
airflow.exceptions.AirflowException: Dag 'daily_client_updater' could not be
found; either it does not exist or it failed to parse.
```
Here `prod_dynamic.py` is where our DAG(s) are generated dynamically, it is
something like:
```
def create_dag(dag_id, service_name, conf):
dag = DAG(
dag_id,
description=conf["description"],
schedule_interval=conf["schedule"],
default_args=default_args,
user_defined_filters=dict(
hash=lambda str: hashlib.sha256(str.encode("utf-8")).hexdigest()
),
)
...
return dag
path_base = os.getenv('DAG_CONF_PATH')
for service_name in os.listdir(path_base):
path_service = "{}/{}/conf".format(path_base, service_name)
for dag_name in os.listdir(path_service):
conf = {}
...
# Create DAG
dag_id = "{}__{}".format(service_name, conf["name"])
globals()[dag_id] = create_dag(dag_id, service_name, conf)
```
I have around 15 DAG(s) in my test environment and this bug happens randomly
on random DAG(s).
And often after some retries, somehow the exception does not happen on the
newly-created worker.
Also, if I re-deploy the service (by executing `helm upgrade` it will fix
the error too)
However I find this kind of behavior is quite dangerous as it makes the
execution of the task unstable (we have important tasks that are required to
finish in time).
Any help or information that you can provide will be greatly appreciated. 🙇
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]