This is an automated email from the ASF dual-hosted git repository.

ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 692728371f8 Fix dag processor callback cleanup for versioned bundle 
files (#66484)
692728371f8 is described below

commit 692728371f8f4f20913635553e0fc30334996ff1
Author: Hemkumar Chheda <[email protected]>
AuthorDate: Wed May 13 14:04:33 2026 +0530

    Fix dag processor callback cleanup for versioned bundle files (#66484)
    
    * Fix callback orphan cleanup for versioned bundle files
    
    closes: #66483
    
    * Remove newsfragment for callback orphan cleanup fix
    
    * Fix versioned dag file presence checks
    
    * Preserve public signatures for versioned dag file checks
    
    * Preserve manager cleanup extension points
---
 airflow-core/src/airflow/dag_processing/manager.py |  63 +++++--
 .../tests/unit/dag_processing/test_manager.py      | 181 +++++++++++++++++++++
 2 files changed, 229 insertions(+), 15 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index 6e663da93a9..3d2767785ab 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -129,6 +129,11 @@ class DagFileInfo:
             raise ValueError("bundle_path not set")
         return self.bundle_path / self.rel_path
 
+    @property
+    def presence_key(self) -> tuple[str, Path]:
+        """Return the stable file identity used for presence checks."""
+        return self.bundle_name, self.rel_path
+
 
 def _config_int_factory(section: str, key: str):
     return functools.partial(conf.getint, section, key)
@@ -1036,20 +1041,22 @@ class DagFileProcessorManager(LoggingMixin):
 
     def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
         """Remove from queue any files no longer observed locally."""
-        self._file_queue = deque(x for x in self._file_queue if x in present)
+        present_keys = {file.presence_key for file in present}
+        self._file_queue = deque(x for x in self._file_queue if x.presence_key 
in present_keys)
         stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
     def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
         """Remove the stats for any dag files that don't exist anymore."""
-        # todo: store stats by bundle also?
-        stats_to_remove = set(self._file_stats).difference(present)
+        present_keys = {file.presence_key for file in present}
+        stats_to_remove = {file for file in self._file_stats if 
file.presence_key not in present_keys}
         for file in stats_to_remove:
             del self._file_stats[file]
 
     def terminate_orphan_processes(self, present: set[DagFileInfo]):
         """Stop processors that are working on deleted files."""
+        present_keys = {file.presence_key for file in present}
         for file in list(self._processors.keys()):
-            if file not in present:
+            if file.presence_key not in present_keys:
                 processor = self._processors.pop(file, None)
                 if not processor:
                     continue
@@ -1283,11 +1290,14 @@ class DagFileProcessorManager(LoggingMixin):
         A "new" file is a file that has not been processed yet and is not 
currently being processed.
         """
         new_files = []
+        tracked_presence_keys = {file.presence_key for file in 
self._file_queue}
+        tracked_presence_keys.update(file.presence_key for file in 
self._file_stats)
+        tracked_presence_keys.update(file.presence_key for file in 
self._processors)
         for files in known_files.values():
             for file in files:
-                # todo: store stats by bundle also?
-                if file not in self._file_stats and file not in 
self._processors:
+                if file.presence_key not in tracked_presence_keys:
                     new_files.append(file)
+                    tracked_presence_keys.add(file.presence_key)
 
         if new_files:
             self.log.info("Adding %d new files to the front of the queue", 
len(new_files))
@@ -1312,6 +1322,7 @@ class DagFileProcessorManager(LoggingMixin):
             self._file_queue = deque(callback_files + sorted_regular_files)
 
     def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
+        file_stats_by_presence_key = {file.presence_key: stat for file, stat 
in self._file_stats.items()}
         files_with_mtime: dict[DagFileInfo, float] = {}
         changed_recently = set()
         for file in files:
@@ -1319,20 +1330,35 @@ class DagFileProcessorManager(LoggingMixin):
                 modified_timestamp = os.path.getmtime(file.absolute_path)
                 modified_datetime = datetime.fromtimestamp(modified_timestamp, 
tz=timezone.utc)
                 files_with_mtime[file] = modified_timestamp
-                last_time = self._file_stats[file].last_finish_time
+                stat = file_stats_by_presence_key.get(file.presence_key)
+                last_time = stat.last_finish_time if stat else None
                 if not last_time:
                     continue
                 if modified_datetime > last_time:
                     changed_recently.add(file)
             except FileNotFoundError:
                 self.log.warning("Skipping processing of missing file: %s", 
file)
-                self._file_stats.pop(file, None)
+                stats_to_remove = [
+                    tracked_file
+                    for tracked_file in self._file_stats
+                    if tracked_file.presence_key == file.presence_key
+                ]
+                for tracked_file in stats_to_remove:
+                    self._file_stats.pop(tracked_file, None)
                 continue
         file_infos = [info for info, ts in sorted(files_with_mtime.items(), 
key=itemgetter(1), reverse=True)]
         return file_infos, changed_recently
 
     def processed_recently(self, now, file):
-        last_time = self._file_stats[file].last_finish_time
+        stat = next(
+            (
+                stat
+                for tracked_file, stat in self._file_stats.items()
+                if tracked_file.presence_key == file.presence_key
+            ),
+            None,
+        )
+        last_time = stat.last_finish_time if stat else None
         if not last_time:
             return False
         elapsed_ss = (now - last_time).total_seconds()
@@ -1357,7 +1383,8 @@ class DagFileProcessorManager(LoggingMixin):
 
         # If the file path is already being processed, or if a file was
         # processed recently, wait until the next batch
-        in_progress = set(self._processors)
+        in_progress_keys = {file.presence_key for file in self._processors}
+        file_stats_by_presence_key = {file.presence_key: stat for file, stat 
in self._file_stats.items()}
         now = timezone.utcnow()
 
         # Sort the file paths by the parsing order mode
@@ -1367,7 +1394,9 @@ class DagFileProcessorManager(LoggingMixin):
         for bundle_files in known_files.values():
             for file in bundle_files:
                 files.append(file)
-                if self.processed_recently(now, file):
+                stat = file_stats_by_presence_key.get(file.presence_key)
+                last_time = stat.last_finish_time if stat else None
+                if last_time and (now - last_time).total_seconds() < 
self._file_process_interval:
                     recently_processed.add(file)
 
         changed_recently: set[DagFileInfo] = set()
@@ -1380,15 +1409,19 @@ class DagFileProcessorManager(LoggingMixin):
             # set of files. Since we set the seed, the sort order will remain 
same per host
             random.Random(get_hostname()).shuffle(files)
 
-        at_run_limit = [info for info, stat in self._file_stats.items() if 
stat.run_count == self.max_runs]
-        to_exclude = in_progress.union(at_run_limit)
+        at_run_limit_keys = {
+            presence_key
+            for presence_key, stat in file_stats_by_presence_key.items()
+            if stat.run_count == self.max_runs
+        }
+        to_exclude = in_progress_keys.union(at_run_limit_keys)
 
         # exclude recently processed unless changed recently
-        to_exclude |= recently_processed - changed_recently
+        to_exclude |= {file.presence_key for file in recently_processed - 
changed_recently}
 
         # Do not convert the following list to set as set does not preserve 
the order
         # and we need to maintain the order of files for `[dag_processor] 
file_parsing_sort_mode`
-        to_queue = [x for x in files if x not in to_exclude]
+        to_queue = [x for x in files if x.presence_key not in to_exclude]
 
         if self.log.isEnabledFor(logging.DEBUG):
             for path, processor in self._processors.items():
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 2296d1b428b..3c36ee066da 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -89,6 +89,15 @@ def _get_file_infos(files: list[str | Path]) -> 
list[DagFileInfo]:
     return [DagFileInfo(bundle_name="testing", bundle_path=TEST_DAGS_FOLDER, 
rel_path=Path(f)) for f in files]
 
 
+def _get_versioned_file_info(file: str | Path, bundle_version: str = "v1") -> 
DagFileInfo:
+    return DagFileInfo(
+        bundle_name="testing",
+        bundle_path=TEST_DAGS_FOLDER,
+        rel_path=Path(file),
+        bundle_version=bundle_version,
+    )
+
+
 def mock_get_mtime(file: Path):
     f = str(file)
     m = re.match(pattern=r".*ss=(.+?)\.\w+", string=f)
@@ -408,6 +417,91 @@ class TestDagFileProcessorManager:
         manager.handle_removed_files(known_files={bundle_name: {file}})
         assert manager._processors == {file: mock_processor}
 
+    def test_handle_removed_files_uses_public_extension_points(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        bundle_name = "testing"
+        file = DagFileInfo(bundle_name=bundle_name, rel_path=Path("abc.txt"), 
bundle_path=TEST_DAGS_FOLDER)
+
+        with (
+            mock.patch.object(manager, "purge_removed_files_from_queue") as 
purge_queue,
+            mock.patch.object(manager, "terminate_orphan_processes") as 
terminate_processors,
+            mock.patch.object(manager, "remove_orphaned_file_stats") as 
remove_stats,
+        ):
+            manager.handle_removed_files(known_files={bundle_name: {file}})
+
+        purge_queue.assert_called_once_with(present={file})
+        terminate_processors.assert_called_once_with(present={file})
+        remove_stats.assert_called_once_with(present={file})
+
+    def 
test_purge_removed_files_keeps_versioned_callback_file_when_unversioned_file_is_present(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        versioned_file = _get_versioned_file_info("callbacks.py")
+        present_file = _get_file_infos(["callbacks.py"])[0]
+
+        manager._file_queue = deque([versioned_file])
+
+        manager.purge_removed_files_from_queue(present={present_file})
+
+        assert manager._file_queue == deque([versioned_file])
+
+    def 
test_purge_removed_files_drops_versioned_callback_file_when_truly_absent(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        versioned_file = _get_versioned_file_info("callbacks.py")
+
+        manager._file_queue = deque([versioned_file])
+
+        manager.purge_removed_files_from_queue(present=set())
+
+        assert manager._file_queue == deque()
+
+    def 
test_terminate_orphan_processes_keeps_versioned_callback_processor_when_unversioned_file_is_present(
+        self,
+    ):
+        manager = DagFileProcessorManager(max_runs=1)
+        versioned_file = _get_versioned_file_info("callbacks.py")
+        present_file = _get_file_infos(["callbacks.py"])[0]
+        processor = MagicMock()
+
+        manager._processors[versioned_file] = processor
+
+        manager.terminate_orphan_processes(present={present_file})
+
+        assert manager._processors == {versioned_file: processor}
+        processor.kill.assert_not_called()
+
+    def 
test_terminate_orphan_processes_kills_processor_when_file_is_truly_absent(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        versioned_file = _get_versioned_file_info("callbacks.py")
+        processor = MagicMock()
+
+        manager._processors[versioned_file] = processor
+
+        manager.terminate_orphan_processes(present=set())
+
+        assert manager._processors == {}
+        processor.kill.assert_called_once_with(signal.SIGKILL)
+
+    def 
test_remove_orphaned_file_stats_keeps_versioned_callback_stats_when_unversioned_file_is_present(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        versioned_file = _get_versioned_file_info("callbacks.py")
+        present_file = _get_file_infos(["callbacks.py"])[0]
+
+        manager._file_stats[versioned_file] = DagFileStat()
+
+        manager.remove_orphaned_file_stats(present={present_file})
+
+        assert manager._file_stats == {versioned_file: DagFileStat()}
+
+    def 
test_remove_orphaned_file_stats_drops_versioned_callback_stats_when_truly_absent(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        versioned_file = _get_versioned_file_info("callbacks.py")
+
+        manager._file_stats[versioned_file] = DagFileStat()
+
+        manager.remove_orphaned_file_stats(present=set())
+
+        assert manager._file_stats == {}
+
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
     def test_files_in_queue_sorted_alphabetically(self):
         """Test dag files are sorted alphabetically"""
@@ -540,6 +634,30 @@ class TestDagFileProcessorManager:
         # file_1 should remain (already in queue)
         assert list(manager._file_queue) == [file_2, file_1]
 
+    def 
test_add_new_files_to_queue_skips_versioned_files_already_represented(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        queued_versioned_file = _get_versioned_file_info("file_1.py")
+        processed_versioned_file = _get_versioned_file_info("file_3.py")
+        parsed_versioned_file = _get_versioned_file_info("file_4.py")
+        new_file = _get_file_infos(["file_2.py"])[0]
+
+        manager._file_queue = deque([queued_versioned_file])
+        manager._processors[processed_versioned_file] = MagicMock()
+        manager._file_stats[parsed_versioned_file] = DagFileStat(num_dags=1)
+
+        known_files = {
+            "testing": {
+                _get_file_infos(["file_1.py"])[0],
+                new_file,
+                _get_file_infos(["file_3.py"])[0],
+                _get_file_infos(["file_4.py"])[0],
+            }
+        }
+
+        manager._add_new_files_to_queue(known_files)
+
+        assert list(manager._file_queue) == [new_file, queued_versioned_file]
+
     @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
     @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
     def test_resort_file_queue_by_mtime(self):
@@ -666,6 +784,69 @@ class TestDagFileProcessorManager:
                 > (freezed_base_time - 
manager._file_stats[dag_file].last_finish_time).total_seconds()
             )
 
+    @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
+    def 
test_prepare_file_queue_skips_file_when_versioned_processor_is_in_progress(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        versioned_file = _get_versioned_file_info("file_1.py")
+        known_file = _get_file_infos(["file_1.py"])[0]
+
+        manager._processors[versioned_file] = MagicMock()
+
+        manager.prepare_file_queue(known_files={"testing": {known_file}})
+
+        assert manager._file_queue == deque()
+
+    @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
+    def 
test_prepare_file_queue_skips_file_when_versioned_stat_is_at_run_limit(self):
+        manager = DagFileProcessorManager(max_runs=1)
+        versioned_file = _get_versioned_file_info("file_1.py")
+        known_file = _get_file_infos(["file_1.py"])[0]
+
+        manager._file_stats[versioned_file] = DagFileStat(run_count=1)
+
+        manager.prepare_file_queue(known_files={"testing": {known_file}})
+
+        assert manager._file_queue == deque()
+
+    @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
+    def 
test_prepare_file_queue_skips_recently_processed_file_with_versioned_stats(self):
+        manager = DagFileProcessorManager(max_runs=3)
+        versioned_file = _get_versioned_file_info("file_1.py")
+        known_file = _get_file_infos(["file_1.py"])[0]
+
+        manager._file_stats[versioned_file] = DagFileStat(
+            last_finish_time=timezone.utcnow() - timedelta(seconds=10),
+            run_count=1,
+        )
+
+        manager.prepare_file_queue(known_files={"testing": {known_file}})
+
+        assert manager._file_queue == deque()
+
+    @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
+    @mock.patch("airflow.utils.file.os.path.getmtime")
+    def 
test_recently_modified_file_uses_versioned_stats_without_creating_duplicate_entries(
+        self, mock_getmtime
+    ):
+        freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0)
+        versioned_file = _get_versioned_file_info("file_1.py")
+        known_file = _get_file_infos(["file_1.py"])[0]
+        known_files = {"testing": {known_file}}
+        last_finish_time = freezed_base_time - timedelta(seconds=10)
+
+        manager = DagFileProcessorManager(max_runs=3)
+        manager._file_stats = {
+            versioned_file: DagFileStat(1, 0, last_finish_time, 1.0, 1, 1),
+        }
+
+        with time_machine.travel(freezed_base_time):
+            mock_getmtime.side_effect = [(freezed_base_time - 
timedelta(seconds=5)).timestamp()]
+            manager.prepare_file_queue(known_files=known_files)
+
+        assert manager._file_queue == deque([known_file])
+        assert known_file not in manager._file_stats
+        assert versioned_file in manager._file_stats
+
     def test_file_paths_in_queue_sorted_by_priority(self):
         from airflow.models.dagbag import DagPriorityParsingRequest
 

Reply via email to