hkc-8010 commented on code in PR #66484:
URL: https://github.com/apache/airflow/pull/66484#discussion_r3221305762


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1008,26 +1013,38 @@ def handle_removed_files(self, known_files: dict[str, 
set[DagFileInfo]]):
         for v in known_files.values():
             files_set |= v
 
-        self.purge_removed_files_from_queue(present=files_set)
-        self.terminate_orphan_processes(present=files_set)
-        self.remove_orphaned_file_stats(present=files_set)
+        present_keys = {file.presence_key for file in files_set}
+        self._purge_removed_files_from_queue(present_keys=present_keys)

Review Comment:
   @ephraimbuddy addressed in `9990b66c23f`. `handle_removed_files()` now calls 
the public cleanup methods again with `present=files_set`, the private 
underscore variants were removed, and each public cleanup method does its own 
`presence_key` conversion internally. I also added a regression test to keep 
`handle_removed_files()` going through those public extension points.
   
   Validation:
   - `uv run --frozen --no-sync ruff format 
airflow-core/src/airflow/dag_processing/manager.py 
airflow-core/tests/unit/dag_processing/test_manager.py`
   - `uv run --frozen --no-sync ruff check 
airflow-core/src/airflow/dag_processing/manager.py 
airflow-core/tests/unit/dag_processing/test_manager.py`
   - `.venv/bin/pytest airflow-core/tests/unit/dag_processing/test_manager.py 
-q` (`106 passed, 1 warning`)
   - `breeze --answer yes testing core-tests --backend postgres --python 3.10 
--db-reset -- airflow-core/tests/unit/dag_processing/test_manager.py -q` (`106 
passed, 1 warning`)
   - `breeze --answer yes testing core-tests --backend postgres --python 3.10 
--downgrade-pendulum --db-reset -- 
airflow-core/tests/unit/dag_processing/test_manager.py -q` (`106 passed, 1 
warning`)
   - `breeze --answer yes testing core-tests --backend sqlite --python 3.10 
--force-lowest-dependencies --db-reset -- 
airflow-core/tests/unit/dag_processing/test_manager.py -q` (`106 passed, 1 
warning`)



##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -1335,7 +1371,8 @@ def prepare_file_queue(self, known_files: dict[str, 
set[DagFileInfo]]):
 
         # If the file path is already being processed, or if a file was
         # processed recently, wait until the next batch
-        in_progress = set(self._processors)
+        in_progress_keys = {file.presence_key for file in self._processors}
+        file_stats_by_presence_key = {file.presence_key: stat for file, stat 
in self._file_stats.items()}

Review Comment:
   @ephraimbuddy addressed in `9990b66c23f`. `prepare_file_queue()` now uses 
the already-built `file_stats_by_presence_key` dict directly for the recently 
processed check, while leaving `processed_recently(now, file)` 
signature-compatible for subclass overrides and external callers.
   
   Validation is the same as above: ruff format/check, local targeted pytest, 
and all three Breeze variants passed with `106 passed, 1 warning`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to