blag commented on PR #27745:
URL: https://github.com/apache/airflow/pull/27745#issuecomment-1323102173

   @jedcunningham is correct. While this works when you use `airflow dags 
reserialize` (which is what I was using to exercise this code during 
development), it doesn't work with the DAG processors, since those spin off a 
process per `.py` file and none of them have a global view of all referenced 
datasets in all DAGs across all Python files/modules.
   
   So I don't think this approach is going to work after all without a large 
amount of refactoring. 😕 Closing.
   
   <details><summary>Click to expand the code trace</summary>
   I've been using `airflow dags reserialize` to reload DAGs. That command is 
mapped in `airflow.cli.cli_parser` to 
`airflow.cli.commands.dag_command.dag_reserialize`.
   
   `airflow/cli/commands/dag_command.py`:
   ```python
   @provide_session
   @cli_utils.action_cli
   def dag_reserialize(args, session: Session = NEW_SESSION):
       """Serialize a DAG instance."""
       session.query(SerializedDagModel).delete(synchronize_session=False)
   
       if not args.clear_only:
           dagbag = DagBag(process_subdir(args.subdir))
           ...
   ```
   
   `airflow/models/dagbag.py`:
   ```python
   class DagBag(LoggingMixin):
       ...
   
       def __init__(
           self,
           dag_folder: str | pathlib.Path | None = None,
           ...
           collect_dags: bool = True,
       ):
           ...
           self.dags: dict[str, DAG] = {}
           ...
           if collect_dags:
               self.collect_dags(
                   dag_folder=dag_folder,
                   include_examples=include_examples,
                   safe_mode=safe_mode,
               )
           ...
       ...
   
       def collect_dags(
           self,
           dag_folder: str | pathlib.Path | None = None,
           only_if_updated: bool = True,
           include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
           safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
       ):
           ...
           # Ensure dag_folder is a str -- it may have been a pathlib.Path
           dag_folder = correct_maybe_zipped(str(dag_folder))
           for filepath in list_py_file_paths(
               dag_folder,
               safe_mode=safe_mode,
               include_examples=include_examples,
           ):
               try:
                   file_parse_start_dttm = timezone.utcnow()
                   found_dags = self.process_file(filepath, 
only_if_updated=only_if_updated, safe_mode=safe_mode)
                   ...
               ...
           ...
       ...
   
       def process_file(self, filepath, only_if_updated=True, safe_mode=True):
           ...
           if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
               mods = self._load_modules_from_file(filepath, safe_mode)
           else:
               mods = self._load_modules_from_zip(filepath, safe_mode)
   
           found_dags = self._process_modules(filepath, mods, 
file_last_changed_on_disk)
           ...
       ...
   
       def _process_modules(self, filepath, mods, file_last_changed_on_disk):
           ...
           for (dag, mod) in top_level_dags:
               try:
                   dag.validate()
                   self.bag_dag(dag=dag, root_dag=dag)
               ...
           ...
       ...
   
       def bag_dag(self, dag, root_dag):
           ...
           self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
   
       def _bag_dag(self, *, dag, root_dag, recursive):
           ...
           try:
               ...
               self.dags[dag.dag_id] = dag
           ...
       ...
   
   ```
   
   So at this point in `airflow dags reserialize` all of the dags have been 
read and stuffed into `dagbag.dags`, and the next line writes to the DB.
   
   `airflow/cli/commands/dag_command.py`:
   ```python
   @provide_session
   @cli_utils.action_cli
   def dag_reserialize(args, session: Session = NEW_SESSION):
       """Serialize a DAG instance."""
       session.query(SerializedDagModel).delete(synchronize_session=False)
   
       if not args.clear_only:
           ...
           dagbag.sync_to_db(session=session)
   ```
   
   `airflow/models/dagbag.py`:
   ```python
   class DagBag(LoggingMixin):
       ...
   
       @provide_session
       def sync_to_db(self, processor_subdir: str | None = None, session: 
Session = None):
           ...
           for attempt in run_with_db_retries(logger=self.log):
               with attempt:
                  ...
                   try:
                       ...
                       DAG.bulk_write_to_db(
                           self.dags.values(), 
processor_subdir=processor_subdir, session=session
                       )
   ```
   
   However, the DAG processor only processes files one at a time, not as a 
batch.
   
   `airflow/dag_processing/processor.py`:
   ```python
   class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
       ...
   
       def start(self) -> None:
           ...
           process = context.Process(
               target=type(self)._run_file_processor,
               ...
           )
           ...
           process.start()
           ...
   
       @staticmethod
       def _run_file_processor(
           result_channel: MultiprocessingConnection,
           parent_channel: MultiprocessingConnection,
           file_path: str,
           pickle_dags: bool,
           dag_ids: list[str] | None,
           thread_name: str,
           dag_directory: str,
           callback_requests: list[CallbackRequest],
       ) -> None:
           ...
           def _handle_dag_file_processing():
               ...
               dag_file_processor = DagFileProcessor(dag_ids=dag_ids, 
dag_directory=dag_directory, log=log)
               result: tuple[int, int] = dag_file_processor.process_file(
                   file_path=file_path,
                   pickle_dags=pickle_dags,
                   callback_requests=callback_requests,
               )
               ...
   
           try:
               ...
                   # The following line ensures that stdout goes to the same 
destination as the logs. If stdout
                   # gets sent to logs and logs are sent to stdout, this leads 
to an infinite loop. This
                   # necessitates this conditional based on the value of 
DAG_PROCESSOR_LOG_TARGET.
                   with redirect_stdout(StreamLogWriter(log, logging.INFO)), 
redirect_stderr(
                       StreamLogWriter(log, logging.WARN)
                   ), Stats.timer() as timer:
                       _handle_dag_file_processing()
       ...
   ...
   
   class DagFileProcessor(LoggingMixin):
       ...
       @provide_session
       def process_file(
           self,
           file_path: str,
           callback_requests: list[CallbackRequest],
           pickle_dags: bool = False,
           session: Session = NEW_SESSION,
       ) -> tuple[int, int]:
           ...
           try:
               dagbag = DagBag(file_path, include_examples=False)
           ...
           dagbag.sync_to_db(processor_subdir=self._dag_directory, 
session=session)
   
   ```
   
   So there isn't a way to figure out which datasets are still in use and which 
ones are not.
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to