This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 08a5859832f [v3-1-test] Fix callback files losing priority during
queue resort (#61232) (#61243)
08a5859832f is described below
commit 08a5859832fc59b3413abba09c7c182f15001d48
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Fri Jan 30 09:39:35 2026 +0100
[v3-1-test] Fix callback files losing priority during queue resort (#61232)
(#61243)
When the DAG processor resorts its file queue by modification time
(e.g., after a bundle refresh), files with pending callbacks could
lose their position at the front of the queue. This could delay
callback execution (like DAG failure callbacks) if those files
happened to have older modification times.
The fix partitions the queue during resort: callback files stay at
the front in their original order, while only regular files are
sorted by mtime.
(cherry picked from commit f5e70fc34f1999e4800f6dcd9713bb56484d5002)
Co-authored-by: Jed Cunningham
<[email protected]>
---
airflow-core/src/airflow/dag_processing/manager.py | 17 ++++++++--
.../tests/unit/dag_processing/test_manager.py | 36 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index c1c267fe99c..65d41533e24 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -1009,8 +1009,21 @@ class DagFileProcessorManager(LoggingMixin):
def _resort_file_queue(self):
if self._file_parsing_sort_mode == "modified_time" and
self._file_queue:
- files, _ = self._sort_by_mtime(self._file_queue)
- self._file_queue = deque(files)
+ # Separate files with pending callbacks from regular files
+ # Callbacks should stay at the front regardless of mtime
+ callback_files = []
+ regular_files = []
+ for file in self._file_queue:
+ if file in self._callback_to_execute:
+ callback_files.append(file)
+ else:
+ regular_files.append(file)
+
+ # Sort only the regular files by mtime
+ sorted_regular_files, _ = self._sort_by_mtime(regular_files)
+
+ # Put callback files at the front, then sorted regular files
+ self._file_queue = deque(callback_files + sorted_regular_files)
def _sort_by_mtime(self, files: Iterable[DagFileInfo]):
files_with_mtime: dict[DagFileInfo, float] = {}
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index c06e96388e0..00de3148c96 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -429,6 +429,42 @@ class TestDagFileProcessorManager:
# Order should remain unchanged
assert list(manager._file_queue) == [file_b, file_a]
+ @conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
+ @mock.patch("airflow.utils.file.os.path.getmtime", new=mock_get_mtime)
+ def test_resort_file_queue_keeps_callbacks_at_front(self):
+ """
+ Check that files with pending callbacks stay at the front of the queue
+ regardless of their modification time, and preserve their relative
order.
+ """
+ files_with_mtime = [
+ ("callback_1.py", 50.0), # has callback, oldest mtime
+ ("callback_2.py", 300.0), # has callback, newest mtime
+ ("regular_1.py", 100.0), # no callback
+ ("regular_2.py", 200.0), # no callback
+ ]
+ filenames = encode_mtime_in_filename(files_with_mtime)
+ dag_files = _get_file_infos(filenames)
+ # dag_files[0] -> callback_1 (mtime 50)
+ # dag_files[1] -> callback_2 (mtime 300)
+ # dag_files[2] -> regular_1 (mtime 100)
+ # dag_files[3] -> regular_2 (mtime 200)
+
+ manager = DagFileProcessorManager(max_runs=1)
+
+ # Queue order: callback_1, callback_2, regular_1, regular_2
+ manager._file_queue = deque([dag_files[0], dag_files[1], dag_files[2],
dag_files[3]])
+
+ # Both callback files have pending callbacks
+ manager._callback_to_execute[dag_files[0]] = [MagicMock()]
+ manager._callback_to_execute[dag_files[1]] = [MagicMock()]
+
+ manager._resort_file_queue()
+
+ # Callback files should stay at front in original order (callback_1,
callback_2)
+ # despite callback_1 having the oldest mtime and callback_2 having the
newest
+ # Regular files should be sorted by mtime (newest first): regular_2
(200), regular_1 (100)
+ assert list(manager._file_queue) == [dag_files[0], dag_files[1],
dag_files[3], dag_files[2]]
+
@conf_vars({("dag_processor", "file_parsing_sort_mode"): "modified_time"})
@mock.patch("airflow.utils.file.os.path.getmtime")
def test_recently_modified_file_is_parsed_with_mtime_mode(self,
mock_getmtime):