This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 1b29dac0489 Fix dag processor crash by ignoring callbacks from other 
bundles (#57192) (#57330)
1b29dac0489 is described below

commit 1b29dac04892be4f2af211d2198c32212f25a391
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Mon Oct 27 11:02:16 2025 +0100

    Fix dag processor crash by ignoring callbacks from other bundles (#57192) 
(#57330)
    
    When the dag Processor is made to run specific bundle, it previously 
attempted to
    process callback requests for bundles it does not own, leading to a
    StopIteration in render_log_filename due to missing bundle references.
    
    Now, fetch_callbacks only enqueues callbacks whose bundle_name matches
    one of the manager’s active bundles, leaving others in the DB for the
    correct processor to handle.
    
    closes: #57081
    (cherry picked from commit 3b97c94376af0e793451d03b7e775ea47bd324a8)
---
 airflow-core/src/airflow/dag_processing/manager.py |  6 ++-
 .../tests/unit/dag_processing/test_manager.py      | 46 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 1 deletion(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index df4f84d0ce5..5c32d7daa6f 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -450,6 +450,7 @@ class DagFileProcessorManager(LoggingMixin):
 
         callback_queue: list[CallbackRequest] = []
         with prohibit_commit(session) as guard:
+            bundle_names = [bundle.name for bundle in self._dag_bundles]
             query = select(DbCallbackRequest)
             query = 
query.order_by(DbCallbackRequest.priority_weight.desc()).limit(
                 self.max_callbacks_per_loop
@@ -457,8 +458,11 @@ class DagFileProcessorManager(LoggingMixin):
             query = with_row_locks(query, of=DbCallbackRequest, 
session=session, skip_locked=True)
             callbacks = session.scalars(query)
             for callback in callbacks:
+                req = callback.get_callback_request()
+                if req.bundle_name not in bundle_names:
+                    continue
                 try:
-                    callback_queue.append(callback.get_callback_request())
+                    callback_queue.append(req)
                     session.delete(callback)
                 except Exception as e:
                     self.log.warning("Error adding callback for execution: %s, 
%s", callback, e)
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 85bb362785d..91692e6d0fa 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -767,6 +767,7 @@ class TestDagFileProcessorManager:
 
         with configure_testing_dag_bundle(dag_filepath):
             manager = DagFileProcessorManager(max_runs=1)
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
 
             with create_session() as session:
                 callbacks = manager._fetch_callbacks(session=session)
@@ -810,6 +811,51 @@ class TestDagFileProcessorManager:
                 manager.run()
                 assert session.query(DbCallbackRequest).count() == 1
 
+    @conf_vars({("core", "load_examples"): "False"})
+    def test_fetch_callbacks_ignores_other_bundles(self, 
configure_testing_dag_bundle):
+        """Ensure callbacks for bundles not owned by current dag processor 
manager are ignored and not deleted."""
+
+        dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
+
+        # Create two callbacks: one for the active 'testing' bundle and one 
for a different bundle
+        matching = DagCallbackRequest(
+            dag_id="test_start_date_scheduling",
+            bundle_name="testing",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
+            is_failure_callback=True,
+            run_id="match",
+        )
+        non_matching = DagCallbackRequest(
+            dag_id="test_start_date_scheduling",
+            bundle_name="other-bundle",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
+            is_failure_callback=True,
+            run_id="no-match",
+        )
+
+        with create_session() as session:
+            session.add(DbCallbackRequest(callback=matching, 
priority_weight=100))
+            session.add(DbCallbackRequest(callback=non_matching, 
priority_weight=200))
+
+        with configure_testing_dag_bundle(dag_filepath):
+            manager = DagFileProcessorManager(max_runs=1)
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
+
+            with create_session() as session:
+                callbacks = manager._fetch_callbacks(session=session)
+
+                # Only the matching callback should be returned
+                assert [c.run_id for c in callbacks] == ["match"]
+
+                # The non-matching callback should remain in the DB
+                remaining = session.query(DbCallbackRequest).all()
+                assert len(remaining) == 1
+                # Decode remaining request and verify it's for the other bundle
+                remaining_req = remaining[0].get_callback_request()
+                assert remaining_req.bundle_name == "other-bundle"
+
     @mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
     def test_callback_queue(self, mock_get_logger, 
configure_testing_dag_bundle):
         mock_logger = MagicMock()

Reply via email to