This is an automated email from the ASF dual-hosted git repository.
ephraimbuddy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 692728371f8 Fix dag processor callback cleanup for versioned bundle
files (#66484)
692728371f8 is described below
commit 692728371f8f4f20913635553e0fc30334996ff1
Author: Hemkumar Chheda <[email protected]>
AuthorDate: Wed May 13 14:04:33 2026 +0530
Fix dag processor callback cleanup for versioned bundle files (#66484)
* Fix callback orphan cleanup for versioned bundle files
closes: #66483
* Remove newsfragment for callback orphan cleanup fix
* Fix versioned dag file presence checks
* Preserve public signatures for versioned dag file checks
* Preserve manager cleanup extension points
---
airflow-core/src/airflow/dag_processing/manager.py | 63 +++++--
.../tests/unit/dag_processing/test_manager.py | 181 +++++++++++++++++++++
2 files changed, 229 insertions(+), 15 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 6e663da93a9..3d2767785ab 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -129,6 +129,11 @@ class DagFileInfo:
raise ValueError("bundle_path not set")
return self.bundle_path / self.rel_path
+ @property
+ def presence_key(self) -> tuple[str, Path]:
+ """Return the stable file identity used for presence checks."""
+ return self.bundle_name, self.rel_path
+
def _config_int_factory(section: str, key: str):
return functools.partial(conf.getint, section, key)
@@ -1036,20 +1041,22 @@ class DagFileProcessorManager(LoggingMixin):
def purge_removed_files_from_queue(self, present: set[DagFileInfo]):
"""Remove from queue any files no longer observed locally."""
- self._file_queue = deque(x for x in self._file_queue if x in present)
+ present_keys = {file.presence_key for file in present}
+ self._file_queue = deque(x for x in self._file_queue if x.presence_key
in present_keys)
stats.gauge("dag_processing.file_path_queue_size",
len(self._file_queue))
def remove_orphaned_file_stats(self, present: set[DagFileInfo]):
"""Remove the stats for any dag files that don't exist anymore."""
- # todo: store stats by bundle also?
- stats_to_remove = set(self._file_stats).difference(present)
+ present_keys = {file.presence_key for file in present}
+ stats_to_remove = {file for file in self._file_stats if
file.presence_key not in present_keys}
for file in stats_to_remove:
del self._file_stats[file]
def terminate_orphan_processes(self, present: set[DagFileInfo]):
"""Stop processors that are working on deleted files."""
+ present_keys = {file.presence_key for file in present}
for file in list(self._processors.keys()):
- if file not in present:
+ if file.presence_key not in present_keys:
processor = self._processors.pop(file, None)
if not processor:
continue
@@ -1283,11 +1290,14 @@ class DagFileProcessorManager(LoggingMixin):
A "new" file is a file that has not been processed yet and is not
currently being processed.
"""
new_files = []
+ tracked_presence_keys = {file.presence_key for file in
self._file_queue}
+ tracked_presence_keys.update(file.presence_key for file in
self._file_stats)
+ tracked_presence_keys.update(file.presence_key for file in
self._processors)
for files in known_files.values():
for file in files:
- # todo: store stats by bundle also?
- if file not in self._file_stats and file not in
self._processors:
+ if file.presence_key not in tracked_presence_keys:
new_files.append(file)
+ tracked_presence_keys.add(file.presence_key)
if new_files:
self.log.info("Adding %d new files to the front of the queue",
len(new_files))
@@ -1312,6 +1322,7 @@ class DagFileProcessorManager(LoggingMixin):
self._file_queue = deque(callback_files + sorted_regular_files)
def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
+ file_stats_by_presence_key = {file.presence_key: stat for file, stat
in self._file_stats.items()}
files_with_mtime: dict[DagFileInfo, float] = {}
changed_recently = set()
for file in files:
@@ -1319,20 +1330,35 @@ class DagFileProcessorManager(LoggingMixin):
modified_timestamp = os.path.getmtime(file.absolute_path)
modified_datetime = datetime.fromtimestamp(modified_timestamp,
tz=timezone.utc)
files_with_mtime[file] = modified_timestamp
- last_time = self._file_stats[file].last_finish_time
+ stat = file_stats_by_presence_key.get(file.presence_key)
+ last_time = stat.last_finish_time if stat else None
if not last_time:
continue
if modified_datetime > last_time:
changed_recently.add(file)
except FileNotFoundError:
self.log.warning("Skipping processing of missing file: %s",
file)
- self._file_stats.pop(file, None)
+ stats_to_remove = [
+ tracked_file
+ for tracked_file in self._file_stats
+ if tracked_file.presence_key == file.presence_key
+ ]
+ for tracked_file in stats_to_remove:
+ self._file_stats.pop(tracked_file, None)
continue
file_infos = [info for info, ts in sorted(files_with_mtime.items(),
key=itemgetter(1), reverse=True)]
return file_infos, changed_recently
def processed_recently(self, now, file):
- last_time = self._file_stats[file].last_finish_time
+ stat = next(
+ (
+ stat
+ for tracked_file, stat in self._file_stats.items()
+ if tracked_file.presence_key == file.presence_key
+ ),
+ None,
+ )
+ last_time = stat.last_finish_time if stat else None
if not last_time:
return False
elapsed_ss = (now - last_time).total_seconds()
@@ -1357,7 +1383,8 @@ class DagFileProcessorManager(LoggingMixin):
# If the file path is already being processed, or if a file was
# processed recently, wait until the next batch
- in_progress = set(self._processors)
+ in_progress_keys = {file.presence_key for file in self._processors}
+ file_stats_by_presence_key = {file.presence_key: stat for file, stat
in self._file_stats.items()}
now = timezone.utcnow()
# Sort the file paths by the parsing order mode
@@ -1367,7 +1394,9 @@ class DagFileProcessorManager(LoggingMixin):
for bundle_files in known_files.values():
for file in bundle_files:
files.append(file)
- if self.processed_recently(now, file):
+ stat = file_stats_by_presence_key.get(file.presence_key)
+ last_time = stat.last_finish_time if stat else None
+ if last_time and (now - last_time).total_seconds() <
self._file_process_interval:
recently_processed.add(file)
changed_recently: set[DagFileInfo] = set()
@@ -1380,15 +1409,19 @@ class DagFileProcessorManager(LoggingMixin):
# set of files. Since we set the seed, the sort order will remain
same per host
random.Random(get_hostname()).shuffle(files)
- at_run_limit = [info for info, stat in self._file_stats.items() if
stat.run_count == self.max_runs]
- to_exclude = in_progress.union(at_run_limit)
+ at_run_limit_keys = {
+ presence_key
+ for presence_key, stat in file_stats_by_presence_key.items()
+ if stat.run_count == self.max_runs
+ }
+ to_exclude = in_progress_keys.union(at_run_limit_keys)
# exclude recently processed unless changed recently
- to_exclude |= recently_processed - changed_recently
+ to_exclude |= {file.presence_key for file in recently_processed -
changed_recently}
# Do not convert the following list to set as set does not preserve
the order
# and we need to maintain the order of files for `[dag_processor]
file_parsing_sort_mode`
- to_queue = [x for x in files if x not in to_exclude]
+ to_queue = [x for x in files if x.presence_key not in to_exclude]
if self.log.isEnabledFor(logging.DEBUG):
for path, processor in self._processors.items():
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 2296d1b428b..3c36ee066da 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -89,6 +89,15 @@ def _get_file_infos(files: list[str | Path]) ->
list[DagFileInfo]:
return [DagFileInfo(bundle_name="testing", bundle_path=TEST_DAGS_FOLDER,
rel_path=Path(f)) for f in files]
+def _get_versioned_file_info(file: str | Path, bundle_version: str = "v1") ->
DagFileInfo:
+ return DagFileInfo(
+ bundle_name="testing",
+ bundle_path=TEST_DAGS_FOLDER,
+ rel_path=Path(file),
+ bundle_version=bundle_version,
+ )
+
+
def mock_get_mtime(file: Path):
f = str(file)
m = re.match(pattern=r".*ss=(.+?)\.\w+", string=f)
@@ -408,6 +417,91 @@ class TestDagFileProcessorManager:
manager.handle_removed_files(known_files={bundle_name: {file}})
assert manager._processors == {file: mock_processor}
+ def test_handle_removed_files_uses_public_extension_points(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ bundle_name = "testing"
+ file = DagFileInfo(bundle_name=bundle_name, rel_path=Path("abc.txt"),
bundle_path=TEST_DAGS_FOLDER)
+
+ with (
+ mock.patch.object(manager, "purge_removed_files_from_queue") as
purge_queue,
+ mock.patch.object(manager, "terminate_orphan_processes") as
terminate_processors,
+ mock.patch.object(manager, "remove_orphaned_file_stats") as
remove_stats,
+ ):
+ manager.handle_removed_files(known_files={bundle_name: {file}})
+
+ purge_queue.assert_called_once_with(present={file})
+ terminate_processors.assert_called_once_with(present={file})
+ remove_stats.assert_called_once_with(present={file})
+
+ def
test_purge_removed_files_keeps_versioned_callback_file_when_unversioned_file_is_present(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ versioned_file = _get_versioned_file_info("callbacks.py")
+ present_file = _get_file_infos(["callbacks.py"])[0]
+
+ manager._file_queue = deque([versioned_file])
+
+ manager.purge_removed_files_from_queue(present={present_file})
+
+ assert manager._file_queue == deque([versioned_file])
+
+ def
test_purge_removed_files_drops_versioned_callback_file_when_truly_absent(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ versioned_file = _get_versioned_file_info("callbacks.py")
+
+ manager._file_queue = deque([versioned_file])
+
+ manager.purge_removed_files_from_queue(present=set())
+
+ assert manager._file_queue == deque()
+
+ def
test_terminate_orphan_processes_keeps_versioned_callback_processor_when_unversioned_file_is_present(
+ self,
+ ):
+ manager = DagFileProcessorManager(max_runs=1)
+ versioned_file = _get_versioned_file_info("callbacks.py")
+ present_file = _get_file_infos(["callbacks.py"])[0]
+ processor = MagicMock()
+
+ manager._processors[versioned_file] = processor
+
+ manager.terminate_orphan_processes(present={present_file})
+
+ assert manager._processors == {versioned_file: processor}
+ processor.kill.assert_not_called()
+
+ def
test_terminate_orphan_processes_kills_processor_when_file_is_truly_absent(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ versioned_file = _get_versioned_file_info("callbacks.py")
+ processor = MagicMock()
+
+ manager._processors[versioned_file] = processor
+
+ manager.terminate_orphan_processes(present=set())
+
+ assert manager._processors == {}
+ processor.kill.assert_called_once_with(signal.SIGKILL)
+
+ def
test_remove_orphaned_file_stats_keeps_versioned_callback_stats_when_unversioned_file_is_present(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ versioned_file = _get_versioned_file_info("callbacks.py")
+ present_file = _get_file_infos(["callbacks.py"])[0]
+
+ manager._file_stats[versioned_file] = DagFileStat()
+
+ manager.remove_orphaned_file_stats(present={present_file})
+
+ assert manager._file_stats == {versioned_file: DagFileStat()}
+
+ def
test_remove_orphaned_file_stats_drops_versioned_callback_stats_when_truly_absent(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ versioned_file = _get_versioned_file_info("callbacks.py")
+
+ manager._file_stats[versioned_file] = DagFileStat()
+
+ manager.remove_orphaned_file_stats(present=set())
+
+ assert manager._file_stats == {}
+
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
def test_files_in_queue_sorted_alphabetically(self):
"""Test dag files are sorted alphabetically"""
@@ -540,6 +634,30 @@ class TestDagFileProcessorManager:
# file_1 should remain (already in queue)
assert list(manager._file_queue) == [file_2, file_1]
+ def
test_add_new_files_to_queue_skips_versioned_files_already_represented(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ queued_versioned_file = _get_versioned_file_info("file_1.py")
+ processed_versioned_file = _get_versioned_file_info("file_3.py")
+ parsed_versioned_file = _get_versioned_file_info("file_4.py")
+ new_file = _get_file_infos(["file_2.py"])[0]
+
+ manager._file_queue = deque([queued_versioned_file])
+ manager._processors[processed_versioned_file] = MagicMock()
+ manager._file_stats[parsed_versioned_file] = DagFileStat(num_dags=1)
+
+ known_files = {
+ "testing": {
+ _get_file_infos(["file_1.py"])[0],
+ new_file,
+ _get_file_infos(["file_3.py"])[0],
+ _get_file_infos(["file_4.py"])[0],
+ }
+ }
+
+ manager._add_new_files_to_queue(known_files)
+
+ assert list(manager._file_queue) == [new_file, queued_versioned_file]
+
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
def test_resort_file_queue_by_mtime(self):
@@ -666,6 +784,69 @@ class TestDagFileProcessorManager:
> (freezed_base_time -
manager._file_stats[dag_file].last_finish_time).total_seconds()
)
+ @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
+ def
test_prepare_file_queue_skips_file_when_versioned_processor_is_in_progress(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ versioned_file = _get_versioned_file_info("file_1.py")
+ known_file = _get_file_infos(["file_1.py"])[0]
+
+ manager._processors[versioned_file] = MagicMock()
+
+ manager.prepare_file_queue(known_files={"testing": {known_file}})
+
+ assert manager._file_queue == deque()
+
+ @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
+ def
test_prepare_file_queue_skips_file_when_versioned_stat_is_at_run_limit(self):
+ manager = DagFileProcessorManager(max_runs=1)
+ versioned_file = _get_versioned_file_info("file_1.py")
+ known_file = _get_file_infos(["file_1.py"])[0]
+
+ manager._file_stats[versioned_file] = DagFileStat(run_count=1)
+
+ manager.prepare_file_queue(known_files={"testing": {known_file}})
+
+ assert manager._file_queue == deque()
+
+ @conf_vars({("dag_processor", "file_parsing_sort_mode"): "alphabetical"})
+ def
test_prepare_file_queue_skips_recently_processed_file_with_versioned_stats(self):
+ manager = DagFileProcessorManager(max_runs=3)
+ versioned_file = _get_versioned_file_info("file_1.py")
+ known_file = _get_file_infos(["file_1.py"])[0]
+
+ manager._file_stats[versioned_file] = DagFileStat(
+ last_finish_time=timezone.utcnow() - timedelta(seconds=10),
+ run_count=1,
+ )
+
+ manager.prepare_file_queue(known_files={"testing": {known_file}})
+
+ assert manager._file_queue == deque()
+
+ @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
+ @mock.patch("airflow.utils.file.os.path.getmtime")
+ def
test_recently_modified_file_uses_versioned_stats_without_creating_duplicate_entries(
+ self, mock_getmtime
+ ):
+ freezed_base_time = timezone.datetime(2020, 1, 5, 0, 0, 0)
+ versioned_file = _get_versioned_file_info("file_1.py")
+ known_file = _get_file_infos(["file_1.py"])[0]
+ known_files = {"testing": {known_file}}
+ last_finish_time = freezed_base_time - timedelta(seconds=10)
+
+ manager = DagFileProcessorManager(max_runs=3)
+ manager._file_stats = {
+ versioned_file: DagFileStat(1, 0, last_finish_time, 1.0, 1, 1),
+ }
+
+ with time_machine.travel(freezed_base_time):
+ mock_getmtime.side_effect = [(freezed_base_time -
timedelta(seconds=5)).timestamp()]
+ manager.prepare_file_queue(known_files=known_files)
+
+ assert manager._file_queue == deque([known_file])
+ assert known_file not in manager._file_stats
+ assert versioned_file in manager._file_stats
+
def test_file_paths_in_queue_sorted_by_priority(self):
from airflow.models.dagbag import DagPriorityParsingRequest