dlesco opened a new issue, #38515:
URL: https://github.com/apache/airflow/issues/38515
### Apache Airflow version
Other Airflow 2 version (please specify below)
### If "Other Airflow 2 version" selected, which one?
2.6.3
### What happened?
I was checking the airflow-scheduler logs, and I found that it is parsing
dagfiles on every loop, not only when the mtime of the dagfile changes.
I noticed when looking into one dagfile that takes 12s to parse, since it
produces ten DAGs, each DAG with 122 tasks. The DAGs are disabled in the
Airflow UI, have not run in over a week, and the mtime of the dagfile was a
week old. Since the dags were disabled, the processor was not doing things
like processing callbacks.
I examined the Airflow code, and I think I see why it is happening.
In `DagBag`, it has an instance variable of:
```python
self.file_last_changed: dict[str, datetime] = {}
```
The `collect_dags` method calls the `process_file` method, which compares
the file mtime to the saved datetime in this instance variable. So the DagBag
code is written to handle skipping the parsing of the dagfile if the mtime
hasn't changed.
However, the problem is in DagFileProcessor. The `process_file` method does
the following to get a reference to the DagBag for this dagfile:
```python
dagbag = DagFileProcessor._get_dagbag(file_path)
```
The implementation of this method is:
```python
@classmethod
def _get_dagbag(cls, file_path: str):
try:
return DagBag(file_path, include_examples=False)
except Exception:
cls.logger().exception("Failed at reloading the DAG file %s",
file_path)
Stats.incr("dag_file_refresh_error", tags={"file_path":
file_path})
raise
```
It then saves the dags to the db, but this `dagbag` reference is not saved
in memory anywhere. Next time the DagFileProcessor runs, it constructs a brand
new empty DagBag instance by calling the _get_dagbag class method.
### What you think should happen instead?
Instead, the DagFileProcessor should save the dagbag in memory.
The `__init__` method should define a dictionary named self._dagfile_dagbag.
Then `process_file` could call the following method instead of the
`_get_dagbag` classmethod:
```python
def _get_dagfile_dagbag(self, file_path: str):
try:
if file_path in self._dagfile_dagbag:
self._file_path_dagbag[file_path].collect_dags(
dag_folder=file_path,
include_examples=False
)
else:
self._file_path_dagbag[file_path] = DagBag(
file_path,
include_examples=False
)
return self._file_path_dagbag[file_path]
except Exception:
cls.logger().exception("Failed at reloading the DAG file %s",
file_path)
Stats.incr("dag_file_refresh_error", tags={"file_path":
file_path})
raise
```
This will keep the parsed DagBag in memory, and allow skipping parsing if
the mtime hasn't changed.
For multiple DagFileProcessors, each one will have to parse it once to get
it cached in memory; but that is much better than parsing every time.
### How to reproduce
One can make a simple dagfile that just uses EmptyOperator for the dag, but
has top-level code of time.sleep(5).
Then you can check the airflow-scheduler logs, and see if every parse of the
file takes longer than 5 seconds, or if many times parsing it takes less time.
In the DagBag.process_file method, the mtime check and early-exit happens
before it calls `_load_modules_from_file`.
### Operating System
PRETTY_NAME="Ubuntu 20.04.6 LTS"
### Versions of Apache Airflow Providers
not relevant, core issue.
### Deployment
Google Cloud Composer
### Deployment details
composer version 2.4.6, airflow 2.6.3
### Anything else?
_No response_
### Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]