This is an automated email from the ASF dual-hosted git repository.

gopidesu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 447771d5f5b Fix callback starvation across DAG bundles (#63795)
447771d5f5b is described below

commit 447771d5f5b8e1ad3aa4b6464c4ef03af4c93485
Author: GPK <[email protected]>
AuthorDate: Wed Mar 18 12:32:32 2026 +0000

    Fix callback starvation across DAG bundles (#63795)
    
    * Fix callback starvation across DAG bundles
    
    * Fix script
    
    * rebase changes
    
    * Update airflow-core/src/airflow/dag_processing/manager.py
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
    
    * Remove newsfragment
    
    ---------
    
    Co-authored-by: Ephraim Anierobi <[email protected]>
---
 airflow-core/docs/migrations-ref.rst               |  4 +-
 airflow-core/src/airflow/dag_processing/manager.py |  3 +-
 ...0109_3_2_0_add_bundle_name_to_callback_table.py | 94 ++++++++++++++++++++++
 airflow-core/src/airflow/models/callback.py        |  6 +-
 airflow-core/src/airflow/utils/db.py               |  2 +-
 .../tests/unit/dag_processing/test_manager.py      | 55 +++++++++++++
 airflow-core/tests/unit/models/test_callback.py    | 23 +++++-
 7 files changed, 181 insertions(+), 6 deletions(-)

diff --git a/airflow-core/docs/migrations-ref.rst 
b/airflow-core/docs/migrations-ref.rst
index cde5b6b32a5..07d6f5afc16 100644
--- a/airflow-core/docs/migrations-ref.rst
+++ b/airflow-core/docs/migrations-ref.rst
@@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are 
executed via when you ru
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | Revision ID             | Revises ID       | Airflow Version   | Description 
                                                 |
 
+=========================+==================+===================+==============================================================+
-| ``888b59e02a5b`` (head) | ``6222ce48e289`` | ``3.2.0``         | Fix 
migration file ORM inconsistencies.                      |
+| ``1d6611b6ab7c`` (head) | ``888b59e02a5b`` | ``3.2.0``         | Add 
bundle_name to callback table.                           |
++-------------------------+------------------+-------------------+--------------------------------------------------------------+
+| ``888b59e02a5b``        | ``6222ce48e289`` | ``3.2.0``         | Fix 
migration file ORM inconsistencies.                      |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
 | ``6222ce48e289``        | ``134de42d3cb0`` | ``3.2.0``         | Add 
partition fields to DagModel.                            |
 
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index f8435659e1f..10b29c0560e 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -542,6 +542,7 @@ class DagFileProcessorManager(LoggingMixin):
             bundle_names = [bundle.name for bundle in self._dag_bundles]
             query: Select[tuple[DbCallbackRequest]] = with_row_locks(
                 select(DbCallbackRequest)
+                .where(DbCallbackRequest.bundle_name.in_(bundle_names))
                 .order_by(DbCallbackRequest.priority_weight.desc())
                 .limit(self.max_callbacks_per_loop),
                 of=DbCallbackRequest,
@@ -553,8 +554,6 @@ class DagFileProcessorManager(LoggingMixin):
             ]
             for callback in callbacks:
                 req = callback.get_callback_request()
-                if req.bundle_name not in bundle_names:
-                    continue
                 try:
                     callback_queue.append(req)
                     session.delete(callback)
diff --git 
a/airflow-core/src/airflow/migrations/versions/0109_3_2_0_add_bundle_name_to_callback_table.py
 
b/airflow-core/src/airflow/migrations/versions/0109_3_2_0_add_bundle_name_to_callback_table.py
new file mode 100644
index 00000000000..c2fbc2641bd
--- /dev/null
+++ 
b/airflow-core/src/airflow/migrations/versions/0109_3_2_0_add_bundle_name_to_callback_table.py
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Add bundle_name to callback table.
+
+Revision ID: 1d6611b6ab7c
+Revises: 888b59e02a5b
+Create Date: 2026-03-17 00:23:45.305588
+
+"""
+
+from __future__ import annotations
+
+import json
+
+import sqlalchemy as sa
+from alembic import context, op
+
+from airflow.migrations.db_types import StringID
+from airflow.utils.sqlalchemy import ExtendedJSON
+
+# revision identifiers, used by Alembic.
+revision = "1d6611b6ab7c"
+down_revision = "888b59e02a5b"
+branch_labels = None
+depends_on = None
+airflow_version = "3.2.0"
+
+
+def upgrade():
+    """Add bundle_name to callback rows and backfill dag-processor 
callbacks."""
+    with op.batch_alter_table("callback", schema=None) as batch_op:
+        batch_op.add_column(sa.Column("bundle_name", StringID(), 
nullable=True))
+
+    if context.is_offline_mode():
+        print(
+            """
+            --  WARNING: Unable to backfill callback.bundle_name values while 
in offline mode!
+            --  The bundle_name column will be added without migrating 
existing dag-processor callback rows.
+            --  Run this migration in online mode if you need existing pending 
callbacks backfilled.
+            """
+        )
+        return
+
+    conn = op.get_bind()
+    callback = sa.table(
+        "callback",
+        sa.column("id", sa.Uuid()),
+        sa.column("type", sa.String(length=20)),
+        sa.column("data", ExtendedJSON()),
+        sa.column("bundle_name", StringID()),
+    )
+
+    rows = conn.execute(
+        sa.select(callback.c.id, callback.c.data).where(callback.c.type == 
"dag_processor")
+    ).mappings()
+    for row in rows:
+        data = row["data"] or {}
+        if isinstance(data, str):
+            data = json.loads(data)
+
+        req_data = data.get("req_data")
+        if isinstance(req_data, str):
+            req_data = json.loads(req_data)
+        elif not isinstance(req_data, dict):
+            continue
+
+        bundle_name = req_data.get("bundle_name")
+        if bundle_name is None:
+            continue
+
+        conn.execute(callback.update().where(callback.c.id == 
row["id"]).values(bundle_name=bundle_name))
+
+
+def downgrade():
+    """Remove bundle_name from callback rows."""
+    with op.batch_alter_table("callback", schema=None) as batch_op:
+        batch_op.drop_column("bundle_name")
diff --git a/airflow-core/src/airflow/models/callback.py 
b/airflow-core/src/airflow/models/callback.py
index e2c46153a71..e08d58fa4da 100644
--- a/airflow-core/src/airflow/models/callback.py
+++ b/airflow-core/src/airflow/models/callback.py
@@ -34,6 +34,7 @@ from airflow._shared.timezones import timezone
 from airflow.executors.workloads import BaseWorkload
 from airflow.executors.workloads.callback import CallbackFetchMethod
 from airflow.models import Base
+from airflow.models.base import StringID
 from airflow.utils.sqlalchemy import ExtendedJSON, UtcDateTime
 from airflow.utils.state import CallbackState
 
@@ -101,7 +102,6 @@ class Callback(Base, BaseWorkload):
     """Base class for callbacks."""
 
     __tablename__ = "callback"
-
     id: Mapped[UUID] = mapped_column(Uuid(), primary_key=True, 
default=uuid6.uuid7)
 
     # This is used by SQLAlchemy to be able to deserialize DB rows to 
subclasses
@@ -117,6 +117,9 @@ class Callback(Base, BaseWorkload):
     # Used by subclasses to store information about how to run the callback
     data: Mapped[dict] = mapped_column(ExtendedJSON, nullable=False)
 
+    # Used to route dag-processor callbacks to filter by bundle name.
+    bundle_name: Mapped[str | None] = mapped_column(StringID(), nullable=True)
+
     # State of the Callback of type: CallbackState. Can be null for instances 
of DagProcessorCallback.
     state: Mapped[str | None] = mapped_column(String(10))
 
@@ -269,6 +272,7 @@ class DagProcessorCallback(Callback):
 
         self.fetch_method = CallbackFetchMethod.DAG_ATTRIBUTE
         self.state = None
+        self.bundle_name = callback.bundle_name
         self.data |= {"req_class": callback.__class__.__name__, "req_data": 
callback.to_json()}
 
     def get_callback_request(self) -> CallbackRequest:
diff --git a/airflow-core/src/airflow/utils/db.py 
b/airflow-core/src/airflow/utils/db.py
index dae74d44cc7..9bc0608611b 100644
--- a/airflow-core/src/airflow/utils/db.py
+++ b/airflow-core/src/airflow/utils/db.py
@@ -115,7 +115,7 @@ _REVISION_HEADS_MAP: dict[str, str] = {
     "3.0.3": "fe199e1abd77",
     "3.1.0": "cc92b33c6709",
     "3.1.8": "509b94a1042d",
-    "3.2.0": "888b59e02a5b",
+    "3.2.0": "1d6611b6ab7c",
 }
 
 # Prefix used to identify tables holding data moved during migration.
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 00b0e50779c..40a8c4f0b66 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -1097,6 +1097,61 @@ class TestDagFileProcessorManager:
                 remaining_req = remaining[0].get_callback_request()
                 assert remaining_req.bundle_name == "other-bundle"
 
+    @conf_vars(
+        {
+            ("dag_processor", "max_callbacks_per_loop"): "2",
+            ("core", "load_examples"): "False",
+        }
+    )
+    def test_fetch_callbacks_filters_by_bundle_before_limit(self, 
configure_testing_dag_bundle):
+        dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
+
+        matching = DagCallbackRequest(
+            dag_id="test_start_date_scheduling",
+            bundle_name="testing",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
+            is_failure_callback=True,
+            run_id="match",
+        )
+        non_matching_1 = DagCallbackRequest(
+            dag_id="test_start_date_scheduling",
+            bundle_name="other-bundle-a",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
+            is_failure_callback=True,
+            run_id="no-match-1",
+        )
+        non_matching_2 = DagCallbackRequest(
+            dag_id="test_start_date_scheduling",
+            bundle_name="other-bundle-b",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
+            is_failure_callback=True,
+            run_id="no-match-2",
+        )
+
+        with create_session() as session:
+            session.add(DbCallbackRequest(callback=non_matching_1, 
priority_weight=300))
+            session.add(DbCallbackRequest(callback=non_matching_2, 
priority_weight=200))
+            session.add(DbCallbackRequest(callback=matching, 
priority_weight=100))
+
+        with configure_testing_dag_bundle(dag_filepath):
+            manager = DagFileProcessorManager(max_runs=1)
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
+
+            with create_session() as session:
+                callbacks = manager._fetch_callbacks(session=session)
+
+                assert [c.run_id for c in callbacks] == ["match"]
+
+                remaining = session.scalars(select(DbCallbackRequest)).all()
+                assert len(remaining) == 2
+                assert {callback.bundle_name for callback in remaining} == {
+                    "other-bundle-a",
+                    "other-bundle-b",
+                }
+
     @mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
     def test_callback_queue(self, mock_get_logger, 
configure_testing_dag_bundle):
         mock_logger = MagicMock()
diff --git a/airflow-core/tests/unit/models/test_callback.py 
b/airflow-core/tests/unit/models/test_callback.py
index 20bbba29fc1..5d4940b77ad 100644
--- a/airflow-core/tests/unit/models/test_callback.py
+++ b/airflow-core/tests/unit/models/test_callback.py
@@ -21,11 +21,13 @@ from unittest.mock import patch
 import pytest
 from sqlalchemy import select
 
+from airflow.callbacks.callback_requests import DagCallbackRequest
 from airflow.models import Trigger
 from airflow.models.callback import (
     Callback,
     CallbackFetchMethod,
     CallbackState,
+    DagProcessorCallback,
     ExecutorCallback,
     TriggererCallback,
     _accepts_context,
@@ -210,7 +212,26 @@ class TestExecutorCallback:
         assert callback.state == CallbackState.QUEUED
 
 
-# Note: class DagProcessorCallback is tested in 
airflow-core/tests/unit/dag_processing/test_manager.py
+class TestDagProcessorCallback:
+    def test_polymorphic_serde(self, session):
+        callback_request = DagCallbackRequest(
+            dag_id="test_start_date_scheduling",
+            bundle_name="testing",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
+            is_failure_callback=True,
+            run_id="123",
+        )
+        callback = DagProcessorCallback(priority_weight=11, 
callback=callback_request)
+        session.add(callback)
+        session.commit()
+
+        retrieved = session.scalar(select(Callback).where(Callback.id == 
callback.id))
+        assert isinstance(retrieved, DagProcessorCallback)
+        assert retrieved.fetch_method == CallbackFetchMethod.DAG_ATTRIBUTE
+        assert retrieved.bundle_name == "testing"
+        assert retrieved.priority_weight == 11
+        assert retrieved.state is None
 
 
 class TestAcceptsContext:

Reply via email to