dlesco opened a new issue, #38515:
URL: https://github.com/apache/airflow/issues/38515

   ### Apache Airflow version
   
   Other Airflow 2 version (please specify below)
   
   ### If "Other Airflow 2 version" selected, which one?
   
   2.6.3
   
   ### What happened?
   
   I was checking the airflow-scheduler logs, and I found that it is parsing 
dagfiles on every loop, not only when the mtime of the dagfile changes.
   
   I noticed when looking into one dagfile that takes 12s to parse, since it 
produces ten DAGs, each DAG with 122 tasks.  The DAGs are disabled in the 
Airflow UI, have not run in over a week, and the mtime of the dagfile was a 
week old.  Since the dags were disabled, the processor was not doing things 
like processing callbacks.
   
   I examined the Airflow code, and I think I see why it is happening.
   
   In `DagBag`, it has an instance variable of:
   ```python
           self.file_last_changed: dict[str, datetime] = {}
   ```
   
   The `collect_dags` method calls the `process_file` method, which compares 
the file mtime to the saved datetime in this instance variable.  So the DagBag 
code is written to handle skipping the parsing of the dagfile if the mtime 
hasn't changed.
   
   However, the problem is in DagFileProcessor.  The `process_file` method does 
the following to get a reference to the DagBag for this dagfile:
   
   ```python
   dagbag = DagFileProcessor._get_dagbag(file_path)
   ```
   
   The implementation of this method is:
   ```python
       @classmethod
       def _get_dagbag(cls, file_path: str):
           try:
               return DagBag(file_path, include_examples=False)
           except Exception:
               cls.logger().exception("Failed at reloading the DAG file %s", 
file_path)
               Stats.incr("dag_file_refresh_error", tags={"file_path": 
file_path})
               raise
   ```
   
   It then saves the dags to the db, but this `dagbag` reference is not saved 
in memory anywhere.  Next time the DagFileProcessor runs, it constructs a brand 
new empty DagBag instance by calling the _get_dagbag class method.
   
   
   ### What you think should happen instead?
   
   Instead, the DagFileProcessor should save the dagbag in memory.
   
   The `__init__` method should define a dictionary named self._dagfile_dagbag.
   
   Then `process_file` could call the following method instead of the 
`_get_dagbag` classmethod:
   
   ```python
       def _get_dagfile_dagbag(self, file_path: str):
           try:
               if file_path in self._dagfile_dagbag:
                   self._file_path_dagbag[file_path].collect_dags(
                       dag_folder=file_path,
                       include_examples=False
                   )
               else:
                   self._file_path_dagbag[file_path] = DagBag(
                       file_path,
                       include_examples=False
                   )
               return self._file_path_dagbag[file_path]
           except Exception:
               cls.logger().exception("Failed at reloading the DAG file %s", 
file_path)
               Stats.incr("dag_file_refresh_error", tags={"file_path": 
file_path})
               raise
   ```
   
   This will keep the parsed DagBag in memory, and allow skipping parsing if 
the mtime hasn't changed.
   
   For multiple DagFileProcessors, each one will have to parse it once to get 
it cached in memory; but that is much better than parsing every time.
   
   
   ### How to reproduce
   
   One can make a simple dagfile that just uses EmptyOperator for the dag, but 
has top-level code of time.sleep(5).
   
   Then you can check the airflow-scheduler logs, and see if every parse of the 
file takes longer than 5 seconds, or if many times parsing it takes less time.  
In the DagBag.process_file method, the mtime check and early-exit happens 
before it calls `_load_modules_from_file`.
   
   
   ### Operating System
   
   PRETTY_NAME="Ubuntu 20.04.6 LTS"
   
   ### Versions of Apache Airflow Providers
   
   not relevant, core issue.
   
   ### Deployment
   
   Google Cloud Composer
   
   ### Deployment details
   
   composer version 2.4.6, airflow 2.6.3
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to