This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 1b29dac0489 Fix dag processor crash by ignoring callbacks from other
bundles (#57192) (#57330)
1b29dac0489 is described below
commit 1b29dac04892be4f2af211d2198c32212f25a391
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Oct 27 11:02:16 2025 +0100
Fix dag processor crash by ignoring callbacks from other bundles (#57192)
(#57330)
When the dag Processor is made to run specific bundle, it previously
attempted to
process callback requests for bundles it does not own, leading to a
StopIteration in render_log_filename due to missing bundle references.
Now, fetch_callbacks only enqueues callbacks whose bundle_name matches
one of the manager’s active bundles, leaving others in the DB for the
correct processor to handle.
closes: #57081
(cherry picked from commit 3b97c94376af0e793451d03b7e775ea47bd324a8)
---
airflow-core/src/airflow/dag_processing/manager.py | 6 ++-
.../tests/unit/dag_processing/test_manager.py | 46 ++++++++++++++++++++++
2 files changed, 51 insertions(+), 1 deletion(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index df4f84d0ce5..5c32d7daa6f 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -450,6 +450,7 @@ class DagFileProcessorManager(LoggingMixin):
callback_queue: list[CallbackRequest] = []
with prohibit_commit(session) as guard:
+ bundle_names = [bundle.name for bundle in self._dag_bundles]
query = select(DbCallbackRequest)
query =
query.order_by(DbCallbackRequest.priority_weight.desc()).limit(
self.max_callbacks_per_loop
@@ -457,8 +458,11 @@ class DagFileProcessorManager(LoggingMixin):
query = with_row_locks(query, of=DbCallbackRequest,
session=session, skip_locked=True)
callbacks = session.scalars(query)
for callback in callbacks:
+ req = callback.get_callback_request()
+ if req.bundle_name not in bundle_names:
+ continue
try:
- callback_queue.append(callback.get_callback_request())
+ callback_queue.append(req)
session.delete(callback)
except Exception as e:
self.log.warning("Error adding callback for execution: %s,
%s", callback, e)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 85bb362785d..91692e6d0fa 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -767,6 +767,7 @@ class TestDagFileProcessorManager:
with configure_testing_dag_bundle(dag_filepath):
manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
with create_session() as session:
callbacks = manager._fetch_callbacks(session=session)
@@ -810,6 +811,51 @@ class TestDagFileProcessorManager:
manager.run()
assert session.query(DbCallbackRequest).count() == 1
+ @conf_vars({("core", "load_examples"): "False"})
+ def test_fetch_callbacks_ignores_other_bundles(self,
configure_testing_dag_bundle):
+ """Ensure callbacks for bundles not owned by current dag processor
manager are ignored and not deleted."""
+
+ dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
+
+ # Create two callbacks: one for the active 'testing' bundle and one
for a different bundle
+ matching = DagCallbackRequest(
+ dag_id="test_start_date_scheduling",
+ bundle_name="testing",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
+ is_failure_callback=True,
+ run_id="match",
+ )
+ non_matching = DagCallbackRequest(
+ dag_id="test_start_date_scheduling",
+ bundle_name="other-bundle",
+ bundle_version=None,
+ filepath="test_on_failure_callback_dag.py",
+ is_failure_callback=True,
+ run_id="no-match",
+ )
+
+ with create_session() as session:
+ session.add(DbCallbackRequest(callback=matching,
priority_weight=100))
+ session.add(DbCallbackRequest(callback=non_matching,
priority_weight=200))
+
+ with configure_testing_dag_bundle(dag_filepath):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager._dag_bundles =
list(DagBundlesManager().get_all_dag_bundles())
+
+ with create_session() as session:
+ callbacks = manager._fetch_callbacks(session=session)
+
+ # Only the matching callback should be returned
+ assert [c.run_id for c in callbacks] == ["match"]
+
+ # The non-matching callback should remain in the DB
+ remaining = session.query(DbCallbackRequest).all()
+ assert len(remaining) == 1
+ # Decode remaining request and verify it's for the other bundle
+ remaining_req = remaining[0].get_callback_request()
+ assert remaining_req.bundle_name == "other-bundle"
+
@mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
def test_callback_queue(self, mock_get_logger,
configure_testing_dag_bundle):
mock_logger = MagicMock()