This is an automated email from the ASF dual-hosted git repository.

jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 8c3ce932327 AIP-66: Make DAG callbacks bundle aware (#45860)
8c3ce932327 is described below

commit 8c3ce9323270f21c63e37bce4c8ce11f1ed83fa2
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Tue Feb 4 02:44:07 2025 +0100

    AIP-66: Make DAG callbacks bundle aware (#45860)
    
    This involves using relative paths in the callbacks, resolving the full 
path and using the
    it to queue the callback in the file processor process.
---
 airflow/callbacks/callback_requests.py             |   4 +-
 airflow/dag_processing/manager.py                  |  40 ++---
 airflow/dag_processing/processor.py                |  13 +-
 airflow/jobs/scheduler_job_runner.py               |  31 ++--
 airflow/models/dagrun.py                           |  12 +-
 airflow/triggers/base.py                           |   4 +-
 .../executors/test_celery_kubernetes_executor.py   |   4 +-
 .../executors/test_local_kubernetes_executor.py    |   4 +-
 tests/callbacks/test_callback_requests.py          |  12 +-
 tests/dag_processing/test_manager.py               | 198 ++++++++++++---------
 tests/dag_processing/test_processor.py             |  19 +-
 tests/jobs/test_scheduler_job.py                   |  74 +++++---
 tests/models/test_dagrun.py                        |  31 +++-
 tests_common/pytest_plugin.py                      |   1 -
 14 files changed, 275 insertions(+), 172 deletions(-)

diff --git a/airflow/callbacks/callback_requests.py 
b/airflow/callbacks/callback_requests.py
index c70835dedb5..b990b0df7ea 100644
--- a/airflow/callbacks/callback_requests.py
+++ b/airflow/callbacks/callback_requests.py
@@ -34,8 +34,10 @@ class BaseCallbackRequest(BaseModel):
     :param msg: Additional Message that can be used for logging
     """
 
-    full_filepath: str
+    filepath: str
     """File Path to use to run the callback"""
+    bundle_name: str
+    bundle_version: str | None
     msg: str | None = None
     """Additional Message that can be used for logging to determine 
failure/zombie"""
 
diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 987e09bfc66..815a45b0fd9 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -107,6 +107,7 @@ class DagFileInfo:
     rel_path: Path
     bundle_name: str
     bundle_path: Path | None = field(compare=False, default=None)
+    bundle_version: str | None = None
 
     @property
     def absolute_path(self) -> Path:
@@ -191,7 +192,7 @@ class DagFileProcessorManager:
     _parsing_start_time: float = attrs.field(init=False)
     _num_run: int = attrs.field(default=0, init=False)
 
-    _callback_to_execute: dict[str, list[CallbackRequest]] = attrs.field(
+    _callback_to_execute: dict[DagFileInfo, list[CallbackRequest]] = 
attrs.field(
         factory=lambda: defaultdict(list), init=False
     )
 
@@ -407,18 +408,21 @@ class DagFileProcessorManager:
 
     def _add_callback_to_queue(self, request: CallbackRequest):
         self.log.debug("Queuing %s CallbackRequest: %s", 
type(request).__name__, request)
-        self.log.warning("Callbacks are not implemented yet!")
-        # TODO: AIP-66 make callbacks bundle aware
-        return
-        self._callback_to_execute[request.full_filepath].append(request)
-        if request.full_filepath in self._file_queue:
-            # Remove file paths matching request.full_filepath from 
self._file_queue
-            # Since we are already going to use that filepath to run callback,
-            # there is no need to have same file path again in the queue
-            # todo (AIP-66): update re bundle and rel loc
-            self._file_queue = deque(f for f in self._file_queue if f != 
request.full_filepath)
-        # todo (AIP-66): update re bundle and rel loc
-        self._add_files_to_queue([request.full_filepath], True)
+        try:
+            bundle = DagBundlesManager().get_bundle(name=request.bundle_name, 
version=request.bundle_version)
+        except ValueError:
+            # Bundle no longer configured
+            self.log.error("Bundle %s no longer configured, skipping 
callback", request.bundle_name)
+            return None
+
+        file_info = DagFileInfo(
+            rel_path=Path(request.filepath),
+            bundle_path=bundle.path,
+            bundle_name=request.bundle_name,
+            bundle_version=request.bundle_version,
+        )
+        self._callback_to_execute[file_info].append(request)
+        self._add_files_to_queue([file_info], True)
         Stats.incr("dag_processing.other_callback_count")
 
     @classmethod
@@ -708,15 +712,8 @@ class DagFileProcessorManager:
         """
         self._files = files
 
-        # remove from queue any files no longer in the _files list
-        self._file_queue = deque(x for x in self._file_queue if x in files)
         Stats.gauge("dag_processing.file_path_queue_size", 
len(self._file_queue))
 
-        # TODO: AIP-66 make callbacks bundle aware
-        # callback_paths_to_del = [x for x in self._callback_to_execute if x 
not in new_file_paths]
-        # for path_to_del in callback_paths_to_del:
-        #     del self._callback_to_execute[path_to_del]
-
         # Stop processors that are working on deleted files
         filtered_processors = {}
         for file, processor in self._processors.items():
@@ -807,8 +804,7 @@ class DagFileProcessorManager:
     def _create_process(self, dag_file: DagFileInfo) -> 
DagFileProcessorProcess:
         id = uuid7()
 
-        # callback_to_execute_for_file = 
self._callback_to_execute.pop(file_path, [])
-        callback_to_execute_for_file: list[CallbackRequest] = []
+        callback_to_execute_for_file = self._callback_to_execute.pop(dag_file, 
[])
 
         return DagFileProcessorProcess.start(
             id=id,
diff --git a/airflow/dag_processing/processor.py 
b/airflow/dag_processing/processor.py
index 2a822e1bae9..e8f5d3978cc 100644
--- a/airflow/dag_processing/processor.py
+++ b/airflow/dag_processing/processor.py
@@ -67,10 +67,11 @@ def _parse_file_entrypoint():
     log = structlog.get_logger(logger_name="task")
 
     result = _parse_file(msg, log)
-    comms_decoder.send_request(log, result)
+    if result is not None:
+        comms_decoder.send_request(log, result)
 
 
-def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult:
+def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> 
DagFileParsingResult | None:
     # TODO: Set known_pool names on DagBag!
     bag = DagBag(
         dag_folder=msg.file,
@@ -79,6 +80,11 @@ def _parse_file(msg: DagFileParseRequest, log: 
FilteringBoundLogger) -> DagFileP
         safe_mode=True,
         load_op_links=False,
     )
+    if msg.callback_requests:
+        # If the request is for callback, we shouldn't serialize the DAGs
+        _execute_callbacks(bag, msg.callback_requests, log)
+        return None
+
     serialized_dags, serialization_import_errors = _serialize_dags(bag, log)
     bag.import_errors.update(serialization_import_errors)
     dags = [LazyDeserializedDAG(data=serdag) for serdag in serialized_dags]
@@ -89,9 +95,6 @@ def _parse_file(msg: DagFileParseRequest, log: 
FilteringBoundLogger) -> DagFileP
         # TODO: Make `bag.dag_warnings` not return SQLA model objects
         warnings=[],
     )
-
-    if msg.callback_requests:
-        _execute_callbacks(bag, msg.callback_requests, log)
     return result
 
 
diff --git a/airflow/jobs/scheduler_job_runner.py 
b/airflow/jobs/scheduler_job_runner.py
index 1e98dbf2aba..df1cb5ab529 100644
--- a/airflow/jobs/scheduler_job_runner.py
+++ b/airflow/jobs/scheduler_job_runner.py
@@ -35,7 +35,7 @@ from typing import TYPE_CHECKING, Any, Callable
 from deprecated import deprecated
 from sqlalchemy import and_, delete, exists, func, select, text, tuple_, update
 from sqlalchemy.exc import OperationalError
-from sqlalchemy.orm import lazyload, load_only, make_transient, selectinload
+from sqlalchemy.orm import joinedload, lazyload, load_only, make_transient, 
selectinload
 from sqlalchemy.sql import expression
 
 from airflow import settings
@@ -756,7 +756,12 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
 
         # Check state of finished tasks
         filter_for_tis = TI.filter_for_tis(tis_with_right_state)
-        query = 
select(TI).where(filter_for_tis).options(selectinload(TI.dag_model))
+        query = (
+            select(TI)
+            .where(filter_for_tis)
+            .options(selectinload(TI.dag_model))
+            .options(joinedload(TI.dag_version))
+        )
         # row lock this entire set of taskinstances to make sure the scheduler 
doesn't fail when we have
         # multi-schedulers
         tis_query: Query = with_row_locks(query, of=TI, session=session, 
skip_locked=True)
@@ -853,7 +858,9 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     # too, which would lead to double logging
                     cls.logger().error(msg)
                     request = TaskCallbackRequest(
-                        full_filepath=ti.dag_model.fileloc,
+                        filepath=ti.dag_model.relative_fileloc,
+                        bundle_name=ti.dag_version.bundle_name,
+                        bundle_version=ti.dag_version.bundle_version,
                         ti=ti,
                         msg=msg,
                     )
@@ -1627,9 +1634,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
                     dag_model.calculate_dagrun_date_fields(dag, 
dag.get_run_data_interval(dag_run))
 
                 callback_to_execute = DagCallbackRequest(
-                    full_filepath=dag.fileloc,
+                    filepath=dag_model.relative_fileloc,
                     dag_id=dag.dag_id,
                     run_id=dag_run.run_id,
+                    bundle_name=dag_model.bundle_name,
+                    bundle_version=dag_run.bundle_version,
                     is_failure_callback=True,
                     msg="timed_out",
                 )
@@ -1991,11 +2000,11 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             if zombies := self._find_zombies(session=session):
                 self._purge_zombies(zombies, session=session)
 
-    def _find_zombies(self, *, session: Session) -> list[tuple[TI, str]]:
+    def _find_zombies(self, *, session: Session) -> list[TI]:
         self.log.debug("Finding 'running' jobs without a recent heartbeat")
         limit_dttm = timezone.utcnow() - 
timedelta(seconds=self._zombie_threshold_secs)
-        zombies = session.execute(
-            select(TI, DM.fileloc)
+        zombies = session.scalars(
+            select(TI)
             .with_hint(TI, "USE INDEX (ti_state)", dialect_name="mysql")
             .join(DM, TI.dag_id == DM.dag_id)
             .where(
@@ -2008,11 +2017,13 @@ class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
             self.log.warning("Failing %s TIs without heartbeat after %s", 
len(zombies), limit_dttm)
         return zombies
 
-    def _purge_zombies(self, zombies: list[tuple[TI, str]], *, session: 
Session) -> None:
-        for ti, file_loc in zombies:
+    def _purge_zombies(self, zombies: list[TI], *, session: Session) -> None:
+        for ti in zombies:
             zombie_message_details = self._generate_zombie_message_details(ti)
             request = TaskCallbackRequest(
-                full_filepath=file_loc,
+                filepath=ti.dag_model.relative_fileloc,
+                bundle_name=ti.dag_version.bundle_name,
+                bundle_version=ti.dag_run.bundle_version,
                 ti=ti,
                 msg=str(zombie_message_details),
             )
diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 22757c972e9..7d46c29285f 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -922,9 +922,11 @@ class DagRun(Base, LoggingMixin):
                 dag.handle_callback(self, success=False, 
reason="task_failure", session=session)
             elif dag.has_on_failure_callback:
                 callback = DagCallbackRequest(
-                    full_filepath=dag.fileloc,
+                    filepath=self.dag_model.relative_fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
+                    bundle_name=self.dag_version.bundle_name,
+                    bundle_version=self.bundle_version,
                     is_failure_callback=True,
                     msg="task_failure",
                 )
@@ -949,9 +951,11 @@ class DagRun(Base, LoggingMixin):
                 dag.handle_callback(self, success=True, reason="success", 
session=session)
             elif dag.has_on_success_callback:
                 callback = DagCallbackRequest(
-                    full_filepath=dag.fileloc,
+                    filepath=self.dag_model.relative_fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
+                    bundle_name=self.dag_version.bundle_name,
+                    bundle_version=self.bundle_version,
                     is_failure_callback=False,
                     msg="success",
                 )
@@ -966,9 +970,11 @@ class DagRun(Base, LoggingMixin):
                 dag.handle_callback(self, success=False, 
reason="all_tasks_deadlocked", session=session)
             elif dag.has_on_failure_callback:
                 callback = DagCallbackRequest(
-                    full_filepath=dag.fileloc,
+                    filepath=self.dag_model.relative_fileloc,
                     dag_id=self.dag_id,
                     run_id=self.run_id,
+                    bundle_name=self.dag_version.bundle_name,
+                    bundle_version=self.bundle_version,
                     is_failure_callback=True,
                     msg="all_tasks_deadlocked",
                 )
diff --git a/airflow/triggers/base.py b/airflow/triggers/base.py
index 4e88465d533..d36dd40b6f8 100644
--- a/airflow/triggers/base.py
+++ b/airflow/triggers/base.py
@@ -217,9 +217,11 @@ class BaseTaskEndEvent(TriggerEvent):
         """Submit a callback request if the task state is SUCCESS or FAILED."""
         if self.task_instance_state in (TaskInstanceState.SUCCESS, 
TaskInstanceState.FAILED):
             request = TaskCallbackRequest(
-                full_filepath=task_instance.dag_model.fileloc,
+                filepath=task_instance.dag_model.relative_fileloc,
                 ti=task_instance,
                 task_callback_type=self.task_instance_state,
+                bundle_name=task_instance.dag_model.bundle_name,
+                bundle_version=task_instance.dag_run.bundle_version,
             )
             log.info("Sending callback: %s", request)
             try:
diff --git 
a/providers/celery/tests/provider_tests/celery/executors/test_celery_kubernetes_executor.py
 
b/providers/celery/tests/provider_tests/celery/executors/test_celery_kubernetes_executor.py
index 027793c29d5..267b0fa78fd 100644
--- 
a/providers/celery/tests/provider_tests/celery/executors/test_celery_kubernetes_executor.py
+++ 
b/providers/celery/tests/provider_tests/celery/executors/test_celery_kubernetes_executor.py
@@ -261,7 +261,9 @@ class TestCeleryKubernetesExecutor:
         cel_k8s_exec.callback_sink = mock.MagicMock()
 
         if AIRFLOW_V_3_0_PLUS:
-            callback = DagCallbackRequest(full_filepath="fake", dag_id="fake", 
run_id="fake")
+            callback = DagCallbackRequest(
+                filepath="fake", dag_id="fake", run_id="fake", 
bundle_name="testing", bundle_version=None
+            )
         else:
             callback = CallbackRequest(full_filepath="fake")
         cel_k8s_exec.send_callback(callback)
diff --git 
a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py 
b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py
index be4d8b93673..c712b2d4b5e 100644
--- 
a/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py
+++ 
b/providers/tests/cncf/kubernetes/executors/test_local_kubernetes_executor.py
@@ -116,7 +116,9 @@ class TestLocalKubernetesExecutor:
         local_k8s_exec.callback_sink = mock.MagicMock()
 
         if AIRFLOW_V_3_0_PLUS:
-            callback = DagCallbackRequest(full_filepath="fake", dag_id="fake", 
run_id="fake")
+            callback = DagCallbackRequest(
+                filepath="fake", dag_id="fake", run_id="fake", 
bundle_name="fake", bundle_version=None
+            )
         else:
             callback = CallbackRequest(full_filepath="fake")
         local_k8s_exec.send_callback(callback)
diff --git a/tests/callbacks/test_callback_requests.py 
b/tests/callbacks/test_callback_requests.py
index 68362d36239..345af3b597c 100644
--- a/tests/callbacks/test_callback_requests.py
+++ b/tests/callbacks/test_callback_requests.py
@@ -43,10 +43,12 @@ class TestCallbackRequest:
             ),
             (
                 DagCallbackRequest(
-                    full_filepath="filepath",
+                    filepath="filepath",
                     dag_id="fake_dag",
                     run_id="fake_run",
                     is_failure_callback=False,
+                    bundle_name="testing",
+                    bundle_version=None,
                 ),
                 DagCallbackRequest,
             ),
@@ -66,8 +68,7 @@ class TestCallbackRequest:
             )
 
             input = TaskCallbackRequest(
-                full_filepath="filepath",
-                ti=ti,
+                filepath="filepath", ti=ti, bundle_name="testing", 
bundle_version=None
             )
         json_str = input.to_json()
         result = request_class.from_json(json_str)
@@ -79,10 +80,7 @@ class TestCallbackRequest:
         ti.end_date = timezone.utcnow()
         session.merge(ti)
         session.flush()
-        input = TaskCallbackRequest(
-            full_filepath="filepath",
-            ti=ti,
-        )
+        input = TaskCallbackRequest(filepath="filepath", ti=ti, 
bundle_name="testing", bundle_version=None)
         json_str = input.to_json()
         result = TaskCallbackRequest.from_json(json_str)
         assert input == result
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index ebc3536c83f..46d6fe5f50a 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -160,12 +160,14 @@ class TestDagFileProcessorManager:
 
         # TODO: AIP-66 no asserts?
 
-    def test_start_new_processes_with_same_filepath(self):
+    def test_start_new_processes_with_same_filepath(self, 
configure_testing_dag_bundle):
         """
         Test that when a processor already exist with a filepath, a new 
processor won't be created
         with that filepath. The filepath will just be removed from the list.
         """
-        manager = DagFileProcessorManager(max_runs=1)
+        with configure_testing_dag_bundle("/tmp"):
+            manager = DagFileProcessorManager(max_runs=1)
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
 
         file_1 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_1.py"), bundle_path=TEST_DAGS_FOLDER)
         file_2 = DagFileInfo(bundle_name="testing", 
rel_path=Path("file_2.py"), bundle_path=TEST_DAGS_FOLDER)
@@ -475,6 +477,7 @@ class TestDagFileProcessorManager:
             manager._kill_timed_out_processors()
         mock_kill.assert_not_called()
 
+    @pytest.mark.usefixtures("testing_dag_bundle")
     @pytest.mark.parametrize(
         ["callbacks", "path", "expected_buffer"],
         [
@@ -492,9 +495,11 @@ class TestDagFileProcessorManager:
             pytest.param(
                 [
                     DagCallbackRequest(
-                        full_filepath="/opt/airflow/dags/dag_callback_dag.py",
+                        filepath="dag_callback_dag.py",
                         dag_id="dag_id",
                         run_id="run_id",
+                        bundle_name="testing",
+                        bundle_version=None,
                         is_failure_callback=False,
                     )
                 ],
@@ -505,7 +510,9 @@ class TestDagFileProcessorManager:
                 b'"requests_fd":123,"callback_requests":'
                 b"["
                 b"{"
-                b'"full_filepath":"/opt/airflow/dags/dag_callback_dag.py",'
+                b'"filepath":"dag_callback_dag.py",'
+                b'"bundle_name":"testing",'
+                b'"bundle_version":null,'
                 b'"msg":null,'
                 b'"dag_id":"dag_id",'
                 b'"run_id":"run_id",'
@@ -650,18 +657,22 @@ class TestDagFileProcessorManager:
         assert dagbag.get_dag("test_dag2").get_is_active() is False
 
     @conf_vars({("core", "load_examples"): "False"})
-    def test_fetch_callbacks_from_database(self, tmp_path, 
configure_testing_dag_bundle):
+    def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle):
         dag_filepath = TEST_DAG_FOLDER / "test_on_failure_callback_dag.py"
 
         callback1 = DagCallbackRequest(
             dag_id="test_start_date_scheduling",
-            full_filepath=str(dag_filepath),
+            bundle_name="testing",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
             is_failure_callback=True,
             run_id="123",
         )
         callback2 = DagCallbackRequest(
             dag_id="test_start_date_scheduling",
-            full_filepath=str(dag_filepath),
+            bundle_name="testing",
+            bundle_version=None,
+            filepath="test_on_failure_callback_dag.py",
             is_failure_callback=True,
             run_id="456",
         )
@@ -670,7 +681,7 @@ class TestDagFileProcessorManager:
             session.add(DbCallbackRequest(callback=callback1, 
priority_weight=11))
             session.add(DbCallbackRequest(callback=callback2, 
priority_weight=10))
 
-        with configure_testing_dag_bundle(tmp_path):
+        with configure_testing_dag_bundle(dag_filepath):
             manager = DagFileProcessorManager(max_runs=1)
 
             with create_session() as session:
@@ -691,13 +702,15 @@ class TestDagFileProcessorManager:
             for i in range(5):
                 callback = DagCallbackRequest(
                     dag_id="test_start_date_scheduling",
-                    full_filepath=str(dag_filepath),
+                    bundle_name="testing",
+                    bundle_version=None,
+                    filepath="test_on_failure_callback_dag.py",
                     is_failure_callback=True,
                     run_id=str(i),
                 )
                 session.add(DbCallbackRequest(callback=callback, 
priority_weight=i))
 
-        with configure_testing_dag_bundle(tmp_path):
+        with configure_testing_dag_bundle(dag_filepath):
             manager = DagFileProcessorManager(max_runs=1)
 
             with create_session() as session:
@@ -708,83 +721,102 @@ class TestDagFileProcessorManager:
                 manager.run()
                 assert session.query(DbCallbackRequest).count() == 1
 
-    @pytest.mark.skip("AIP-66: callbacks are not implemented yet")
-    def test_callback_queue(self, tmp_path):
-        """
-        This test has gotten a bit out of sync with the codebase.
-
-        I am just updating it to be consistent with the changes in DagFileInfo
-        """
-        # given
-        manager = DagFileProcessorManager(
-            max_runs=1,
-            processor_timeout=365 * 86_400,
-        )
-
-        dag1_path = DagFileInfo(
-            bundle_name="testing", rel_path=Path("green_eggs/ham/file1.py"), 
bundle_path=TEST_DAGS_FOLDER
-        )
-        dag1_req1 = DagCallbackRequest(
-            full_filepath=TEST_DAGS_FOLDER / "green_eggs/ham/file1.py",
-            dag_id="dag1",
-            run_id="run1",
-            is_failure_callback=False,
-            msg=None,
-        )
-        dag1_req2 = DagCallbackRequest(
-            full_filepath=TEST_DAGS_FOLDER / "green_eggs/ham/file1.py",
-            dag_id="dag1",
-            run_id="run1",
-            is_failure_callback=False,
-            msg=None,
-        )
-
-        dag2_path = DagFileInfo(
-            bundle_name="testing", rel_path=Path("green_eggs/ham/file2.py"), 
bundle_path=TEST_DAGS_FOLDER
-        )
-        dag2_req1 = DagCallbackRequest(
-            full_filepath=TEST_DAGS_FOLDER / "green_eggs/ham/file2.py",
-            dag_id="dag2",
-            run_id="run1",
-            is_failure_callback=False,
-            msg=None,
-        )
-
-        # when
-        manager._add_callback_to_queue(dag1_req1)
-        manager._add_callback_to_queue(dag2_req1)
-
-        # then - requests should be in manager's queue, with dag2 ahead of 
dag1 (because it was added last)
-        assert manager._file_queue == deque([dag2_path, dag1_path])
-        assert set(manager._callback_to_execute.keys()) == {
-            dag1_req1.full_filepath,
-            dag2_req1.full_filepath,
-        }
-        assert manager._callback_to_execute[dag2_req1.full_filepath] == 
[dag2_req1]
+    @mock.patch.object(DagFileProcessorManager, "_get_logger_for_dag_file")
+    def test_callback_queue(self, mock_logger, configure_testing_dag_bundle):
+        tmp_path = "/green_eggs/ham"
+        with configure_testing_dag_bundle(tmp_path):
+            # given
+            manager = DagFileProcessorManager(
+                max_runs=1,
+                processor_timeout=365 * 86_400,
+            )
+            manager._dag_bundles = 
list(DagBundlesManager().get_all_dag_bundles())
 
-        # update the queue, although the callback is registered
-        assert manager._file_queue == deque([dag2_req1.full_filepath, 
dag1_req1.full_filepath])
+            dag1_path = DagFileInfo(
+                bundle_name="testing", rel_path=Path("file1.py"), 
bundle_path=Path(tmp_path)
+            )
+            dag1_req1 = DagCallbackRequest(
+                filepath="file1.py",
+                dag_id="dag1",
+                run_id="run1",
+                is_failure_callback=False,
+                bundle_name="testing",
+                bundle_version=None,
+                msg=None,
+            )
+            dag1_req2 = DagCallbackRequest(
+                filepath="file1.py",
+                dag_id="dag1",
+                run_id="run1",
+                is_failure_callback=False,
+                bundle_name="testing",
+                bundle_version=None,
+                msg=None,
+            )
 
-        # when
-        manager._add_callback_to_queue(dag1_req2)
+            dag2_path = DagFileInfo(
+                bundle_name="testing", rel_path=Path("file2.py"), 
bundle_path=Path(tmp_path)
+            )
+            dag2_req1 = DagCallbackRequest(
+                filepath="file2.py",
+                dag_id="dag2",
+                run_id="run1",
+                bundle_name=dag2_path.bundle_name,
+                bundle_version=None,
+                is_failure_callback=False,
+                msg=None,
+            )
 
-        # then - non-sla callback should have brought dag1 to the fore
-        assert manager._file_queue == deque([dag1_req1.full_filepath, 
dag2_req1.full_filepath])
-        assert manager._callback_to_execute[dag1_req1.full_filepath] == [
-            dag1_req1,
-            dag1_req2,
-        ]
+            # when
+            manager._add_callback_to_queue(dag1_req1)
+            manager._add_callback_to_queue(dag2_req1)
+
+            # then - requests should be in manager's queue, with dag2 ahead of 
dag1 (because it was added last)
+            assert manager._file_queue == deque([dag2_path, dag1_path])
+            assert set(manager._callback_to_execute.keys()) == {
+                dag1_path,
+                dag2_path,
+            }
+            assert manager._callback_to_execute[dag2_path] == [dag2_req1]
+
+            # update the queue, although the callback is registered
+            assert manager._file_queue == deque([dag2_path, dag1_path])
+
+            # when
+            manager._add_callback_to_queue(dag1_req2)
+            # Since dag1_req2 is same as dag1_req1, we now have 2 items in 
file_path_queue
+            assert manager._file_queue == deque([dag2_path, dag1_path])
+            assert manager._callback_to_execute[dag1_path] == [
+                dag1_req1,
+                dag1_req2,
+            ]
 
-        with mock.patch.object(
-            DagFileProcessorProcess, "start", side_effect=lambda *args, 
**kwargs: self.mock_processor()
-        ) as start:
-            manager._start_new_processes()
-        # Callbacks passed to process ctor
-        start.assert_any_call(
-            id=mock.ANY, path=dag1_req1.full_filepath, callbacks=[dag1_req1, 
dag1_req2], selector=mock.ANY
-        )
-        # And removed from the queue
-        assert dag1_req1.full_filepath not in manager._callback_to_execute
+            with mock.patch.object(
+                DagFileProcessorProcess, "start", side_effect=lambda *args, 
**kwargs: self.mock_processor()
+            ) as start:
+                manager._start_new_processes()
+            # Callbacks passed to processor
+            assert start.call_args_list == [
+                mock.call(
+                    id=mock.ANY,
+                    path=Path(dag2_path.bundle_path, dag2_path.rel_path),
+                    bundle_path=dag2_path.bundle_path,
+                    callbacks=[dag2_req1],
+                    selector=mock.ANY,
+                    logger=mock_logger.return_value,
+                ),
+                mock.call(
+                    id=mock.ANY,
+                    path=Path(dag1_path.bundle_path, dag1_path.rel_path),
+                    bundle_path=dag1_path.bundle_path,
+                    callbacks=[dag1_req1, dag1_req2],
+                    selector=mock.ANY,
+                    logger=mock_logger.return_value,
+                ),
+            ]
+            # And removed from the queue
+            assert dag1_path not in manager._callback_to_execute
+            assert dag2_path not in manager._callback_to_execute
 
     def test_dag_with_assets(self, session, configure_testing_dag_bundle):
         """'Integration' test to ensure that the assets get parsed and stored 
correctly for parsed dags."""
diff --git a/tests/dag_processing/test_processor.py 
b/tests/dag_processing/test_processor.py
index 9844186be1d..a01d3edb2ff 100644
--- a/tests/dag_processing/test_processor.py
+++ b/tests/dag_processing/test_processor.py
@@ -75,7 +75,7 @@ def disable_load_example():
 class TestDagFileProcessor:
     def _process_file(
         self, file_path, callback_requests: list[CallbackRequest] | None = None
-    ) -> DagFileParsingResult:
+    ) -> DagFileParsingResult | None:
         return _parse_file(
             DagFileParseRequest(
                 file=file_path,
@@ -128,7 +128,7 @@ class TestDagFileProcessor:
             dagbag.import_errors["a.py"] = "Import error"
 
         resp = self._process_file("a.py")
-
+        assert resp is not None
         assert not resp.serialized_dags
         assert resp.import_errors is not None
         assert "a.py" in resp.import_errors
@@ -184,6 +184,7 @@ def disable_capturing():
     sys.stdin, sys.stdout, sys.stderr = old_in, old_out, old_err
 
 
[email protected]("testing_dag_bundle")
 @pytest.mark.usefixtures("disable_capturing")
 def test_parse_file_entrypoint_parses_dag_callbacks(spy_agency):
     r, w = socketpair()
@@ -193,7 +194,7 @@ def 
test_parse_file_entrypoint_parses_dag_callbacks(spy_agency):
     w.makefile("wb").write(
         
b'{"file":"/files/dags/wait.py","bundle_path":"/files/dags","requests_fd":'
         + str(w2.fileno()).encode("ascii")
-        + b',"callback_requests": [{"full_filepath": "/files/dags/wait.py", '
+        + b',"callback_requests": [{"filepath": "wait.py", "bundle_name": 
"testing", "bundle_version": null, '
         b'"msg": "task_failure", "dag_id": "wait_to_fail", "run_id": '
         b'"manual__2024-12-30T21:02:55.203691+00:00", '
         b'"is_failure_callback": true, "type": "DagCallbackRequest"}], "type": 
"DagFileParseRequest"}\n'
@@ -209,11 +210,13 @@ def 
test_parse_file_entrypoint_parses_dag_callbacks(spy_agency):
     assert msg.file == "/files/dags/wait.py"
     assert msg.callback_requests == [
         DagCallbackRequest(
-            full_filepath="/files/dags/wait.py",
+            filepath="wait.py",
             msg="task_failure",
             dag_id="wait_to_fail",
             run_id="manual__2024-12-30T21:02:55.203691+00:00",
             is_failure_callback=True,
+            bundle_name="testing",
+            bundle_version=None,
         )
     ]
 
@@ -236,10 +239,12 @@ def test_parse_file_with_dag_callbacks(spy_agency):
 
     requests = [
         DagCallbackRequest(
-            full_filepath="A",
+            filepath="A",
             msg="Message",
             dag_id="a",
             run_id="b",
+            bundle_name="testing",
+            bundle_version=None,
         )
     ]
     _parse_file(
@@ -270,9 +275,11 @@ def test_parse_file_with_task_callbacks(spy_agency):
 
     requests = [
         TaskCallbackRequest(
-            full_filepath="A",
+            filepath="A",
             msg="Message",
             ti=None,
+            bundle_name="testing",
+            bundle_version=None,
         )
     ]
     _parse_file(
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index faf8fa73284..6b0f5847ce7 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -152,6 +152,7 @@ def create_dagrun(session):
             state=state,
             start_date=start_date,
             triggered_by=DagRunTriggeredByType.TEST,
+            dag_version=DagVersion.get_latest_version(dag.dag_id),
         )
 
     return _create_dagrun
@@ -370,13 +371,17 @@ class TestSchedulerJob:
 
     @mock.patch("airflow.jobs.scheduler_job_runner.TaskCallbackRequest")
     @mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr")
-    def test_process_executor_events_with_callback(self, mock_stats_incr, 
mock_task_callback, dag_maker):
+    def test_process_executor_events_with_callback(
+        self, mock_stats_incr, mock_task_callback, dag_maker, session
+    ):
         dag_id = "test_process_executor_events_with_callback"
         task_id_1 = "dummy_task"
 
         with dag_maker(dag_id=dag_id, fileloc="/test_path1/") as dag:
-            task1 = EmptyOperator(task_id=task_id_1, 
on_failure_callback=lambda x: print("hi"))
-        ti1 = dag_maker.create_dagrun().get_task_instance(task1.task_id)
+            EmptyOperator(task_id=task_id_1, on_failure_callback=lambda x: 
print("hi"))
+        dagv = DagVersion.get_latest_version(dag.dag_id)
+        dr = dag_maker.create_dagrun(dag_version=dagv)
+        ti1 = dr.task_instances[0]
 
         mock_stats_incr.reset_mock()
 
@@ -386,8 +391,6 @@ class TestSchedulerJob:
         scheduler_job = Job(executor=executor)
         self.job_runner = SchedulerJobRunner(scheduler_job)
 
-        session = settings.Session()
-
         ti1.state = State.QUEUED
         session.merge(ti1)
         session.commit()
@@ -400,8 +403,10 @@ class TestSchedulerJob:
         # will be set to failed in dag parsing process
         assert ti1.state == State.QUEUED
         mock_task_callback.assert_called_once_with(
-            full_filepath=dag.fileloc,
+            filepath=dag.relative_fileloc,
             ti=mock.ANY,
+            bundle_name="dag_maker",
+            bundle_version=None,
             msg=f"Executor {executor} reported that the task instance "
             "<TaskInstance: 
test_process_executor_events_with_callback.dummy_task test [queued]> "
             "finished with state failed, but the task instance's state 
attribute is queued. "
@@ -2415,10 +2420,12 @@ class TestSchedulerJob:
         assert isinstance(orm_dag.next_dagrun_create_after, datetime.datetime)
 
         expected_callback = DagCallbackRequest(
-            full_filepath=dr.dag.fileloc,
+            filepath=dr.dag.relative_fileloc,
             dag_id=dr.dag_id,
             is_failure_callback=True,
             run_id=dr.run_id,
+            bundle_name=orm_dag.bundle_name,
+            bundle_version=orm_dag.bundle_version,
             msg="timed_out",
         )
 
@@ -2454,10 +2461,12 @@ class TestSchedulerJob:
         assert dr.state == State.FAILED
 
         expected_callback = DagCallbackRequest(
-            full_filepath=dr.dag.fileloc,
+            filepath=dr.dag.relative_fileloc,
             dag_id=dr.dag_id,
             is_failure_callback=True,
             run_id=dr.run_id,
+            bundle_name=dr.dag.get_bundle_name(),
+            bundle_version=dr.dag.get_bundle_version(),
             msg="timed_out",
         )
 
@@ -2528,11 +2537,13 @@ class TestSchedulerJob:
             self.job_runner._do_scheduling(session)
 
         expected_callback = DagCallbackRequest(
-            full_filepath=dag.fileloc,
+            filepath=dag.relative_fileloc,
             dag_id=dr.dag_id,
             is_failure_callback=bool(state == State.FAILED),
             run_id=dr.run_id,
             msg=expected_callback_msg,
+            bundle_name=dag.get_bundle_name(),
+            bundle_version=dag.get_bundle_version(),
         )
 
         # Verify dag failure callback request is sent to file processor
@@ -2605,11 +2616,13 @@ class TestSchedulerJob:
         )
 
         expected_callback = DagCallbackRequest(
-            full_filepath=dag.fileloc,
+            filepath=dag.relative_fileloc,
             dag_id=dr.dag_id,
             is_failure_callback=True,
             run_id=dr.run_id,
             msg="timed_out",
+            bundle_name=dag.get_bundle_name(),
+            bundle_version=dag.get_bundle_version(),
         )
 
         assert callback == expected_callback
@@ -3286,25 +3299,24 @@ class TestSchedulerJob:
         session.close()
 
     @pytest.mark.need_serialized_dag
-    def test_retry_still_in_executor(self, dag_maker):
+    def test_retry_still_in_executor(self, dag_maker, session):
         """
         Checks if the scheduler does not put a task in limbo, when a task is 
retried
         but is still present in the executor.
         """
         executor = MockExecutor(do_update=False)
 
-        with create_session() as session:
-            with dag_maker(
-                dag_id="test_retry_still_in_executor",
-                schedule="@once",
-                session=session,
-            ):
-                dag_task1 = BashOperator(
-                    task_id="test_retry_handling_op",
-                    bash_command="exit 1",
-                    retries=1,
-                )
-            dag_maker.dag_model.calculate_dagrun_date_fields(dag_maker.dag, 
None)
+        with dag_maker(
+            dag_id="test_retry_still_in_executor",
+            schedule="@once",
+            session=session,
+        ):
+            dag_task1 = BashOperator(
+                task_id="test_retry_handling_op",
+                bash_command="exit 1",
+                retries=1,
+            )
+        dag_maker.dag_model.calculate_dagrun_date_fields(dag_maker.dag, None)
 
         @provide_session
         def do_schedule(session):
@@ -5448,6 +5460,8 @@ class TestSchedulerJob:
         dagbag = DagBag(dagfile)
         dag = dagbag.get_dag("example_branch_operator")
         DAG.bulk_write_to_db("testing", None, [dag])
+        SerializedDagModel.write_dag(dag=dag, bundle_name="testing")
+        dag_v = DagVersion.get_latest_version(dag.dag_id)
         data_interval = dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
         dag_run = create_dagrun(
             dag,
@@ -5467,7 +5481,7 @@ class TestSchedulerJob:
 
             for task_id in tasks_to_setup:
                 task = dag.get_task(task_id=task_id)
-                ti = TaskInstance(task, run_id=dag_run.run_id, 
state=State.RUNNING)
+                ti = TaskInstance(task, run_id=dag_run.run_id, 
state=State.RUNNING, dag_version_id=dag_v.id)
 
                 ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
                 ti.queued_by_job_id = 999
@@ -5487,7 +5501,7 @@ class TestSchedulerJob:
         callback_requests = executor.callback_sink.send.call_args.args
         assert len(callback_requests) == 1
         callback_request = callback_requests[0]
-        assert callback_request.full_filepath == dag.fileloc
+        assert callback_request.filepath == dag.relative_fileloc
         assert callback_request.msg == 
str(self.job_runner._generate_zombie_message_details(ti))
         assert callback_request.is_failure_callback is True
         assert callback_request.ti.dag_id == ti.dag_id
@@ -5572,6 +5586,7 @@ class TestSchedulerJob:
             session.query(Job).delete()
             dag = dagbag.get_dag("test_example_bash_operator")
             DAG.bulk_write_to_db("testing", None, [dag])
+            SerializedDagModel.write_dag(dag=dag, bundle_name="testing")
             data_interval = 
dag.infer_automated_data_interval(DEFAULT_LOGICAL_DATE)
             dag_run = create_dagrun(
                 dag,
@@ -5581,8 +5596,9 @@ class TestSchedulerJob:
                 data_interval=data_interval,
             )
             task = dag.get_task(task_id="run_this_last")
-
-            ti = TaskInstance(task, run_id=dag_run.run_id, state=State.RUNNING)
+            ti = TaskInstance(
+                task, run_id=dag_run.run_id, state=State.RUNNING, 
dag_version_id=dag_run.dag_version_id
+            )
             ti.last_heartbeat_at = timezone.utcnow() - timedelta(minutes=6)
 
             # TODO: If there was an actual Relationship between TI and Job
@@ -5599,9 +5615,11 @@ class TestSchedulerJob:
 
         expected_failure_callback_requests = [
             TaskCallbackRequest(
-                full_filepath=dag.fileloc,
+                filepath=dag.relative_fileloc,
                 ti=ti,
                 msg=str(self.job_runner._generate_zombie_message_details(ti)),
+                bundle_name="testing",
+                bundle_version=dag_run.bundle_version,
             )
         ]
         callback_requests = 
scheduler_job.executor.callback_sink.send.call_args.args
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 12bccca74b8..a967f6b2555 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -33,7 +33,9 @@ from airflow.callbacks.callback_requests import 
DagCallbackRequest
 from airflow.decorators import setup, task, task_group, teardown
 from airflow.models.baseoperator import BaseOperator
 from airflow.models.dag import DAG, DagModel
+from airflow.models.dag_version import DagVersion
 from airflow.models.dagrun import DagRun, DagRunNote
+from airflow.models.serialized_dag import SerializedDagModel
 from airflow.models.taskinstance import TaskInstance, TaskInstanceNote, 
clear_task_instances
 from airflow.models.taskmap import TaskMap
 from airflow.models.taskreschedule import TaskReschedule
@@ -95,6 +97,7 @@ class TestDagRun:
         logical_date: datetime.datetime | None = None,
         is_backfill: bool = False,
         state: DagRunState = DagRunState.RUNNING,
+        dag_version: DagVersion | None = None,
         session: Session,
     ):
         now = timezone.utcnow()
@@ -118,6 +121,7 @@ class TestDagRun:
             start_date=now,
             state=state,
             external_trigger=False,
+            dag_version=dag_version or 
DagVersion.get_latest_version(dag.dag_id, session=session),
             triggered_by=DagRunTriggeredByType.TEST,
         )
 
@@ -457,6 +461,7 @@ class TestDagRun:
         def on_success_callable(context):
             assert context["dag_run"].dag_id == 
"test_dagrun_update_state_with_handle_callback_success"
 
+        relative_fileloc = 
"test_dagrun_update_state_with_handle_callback_success.py"
         dag = DAG(
             dag_id="test_dagrun_update_state_with_handle_callback_success",
             schedule=datetime.timedelta(days=1),
@@ -464,6 +469,10 @@ class TestDagRun:
             on_success_callback=on_success_callable,
         )
         DAG.bulk_write_to_db("testing", None, dags=[dag], session=session)
+        dm = DagModel.get_dagmodel(dag.dag_id, session=session)
+        dm.relative_fileloc = relative_fileloc
+        session.merge(dm)
+        session.commit()
 
         dag_task1 = EmptyOperator(task_id="test_state_succeeded1", dag=dag)
         dag_task2 = EmptyOperator(task_id="test_state_succeeded2", dag=dag)
@@ -476,18 +485,23 @@ class TestDagRun:
 
         # Scheduler uses Serialized DAG -- so use that instead of the Actual 
DAG
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+        dag.relative_fileloc = relative_fileloc
+        SerializedDagModel.write_dag(dag, bundle_name="testing", 
session=session)
+        session.commit()
 
         dag_run = self.create_dag_run(dag=dag, 
task_states=initial_task_states, session=session)
+        dag_run.dag_model = dm
 
         _, callback = dag_run.update_state(execute_callbacks=False)
         assert dag_run.state == DagRunState.SUCCESS
         # Callbacks are not added until handle_callback = False is passed to 
dag_run.update_state()
-
         assert callback == DagCallbackRequest(
-            full_filepath=dag_run.dag.fileloc,
+            filepath=dag_run.dag.relative_fileloc,
             dag_id="test_dagrun_update_state_with_handle_callback_success",
             run_id=dag_run.run_id,
             is_failure_callback=False,
+            bundle_name="testing",
+            bundle_version=None,
             msg="success",
         )
 
@@ -495,6 +509,7 @@ class TestDagRun:
         def on_failure_callable(context):
             assert context["dag_run"].dag_id == 
"test_dagrun_update_state_with_handle_callback_failure"
 
+        relative_fileloc = 
"test_dagrun_update_state_with_handle_callback_failure.py"
         dag = DAG(
             dag_id="test_dagrun_update_state_with_handle_callback_failure",
             schedule=datetime.timedelta(days=1),
@@ -502,6 +517,10 @@ class TestDagRun:
             on_failure_callback=on_failure_callable,
         )
         DAG.bulk_write_to_db("testing", None, dags=[dag], session=session)
+        dm = DagModel.get_dagmodel(dag.dag_id, session=session)
+        dm.relative_fileloc = relative_fileloc
+        session.merge(dm)
+        session.commit()
 
         dag_task1 = EmptyOperator(task_id="test_state_succeeded1", dag=dag)
         dag_task2 = EmptyOperator(task_id="test_state_failed2", dag=dag)
@@ -514,19 +533,25 @@ class TestDagRun:
 
         # Scheduler uses Serialized DAG -- so use that instead of the Actual 
DAG
         dag = SerializedDAG.from_dict(SerializedDAG.to_dict(dag))
+        dag.relative_fileloc = relative_fileloc
+        SerializedDagModel.write_dag(dag, bundle_name="testing", 
session=session)
+        session.commit()
 
         dag_run = self.create_dag_run(dag=dag, 
task_states=initial_task_states, session=session)
+        dag_run.dag_model = dm
 
         _, callback = dag_run.update_state(execute_callbacks=False)
         assert dag_run.state == DagRunState.FAILED
         # Callbacks are not added until handle_callback = False is passed to 
dag_run.update_state()
 
         assert callback == DagCallbackRequest(
-            full_filepath=dag_run.dag.fileloc,
+            filepath=dag.relative_fileloc,
             dag_id="test_dagrun_update_state_with_handle_callback_failure",
             run_id=dag_run.run_id,
             is_failure_callback=True,
             msg="task_failure",
+            bundle_name="testing",
+            bundle_version=None,
         )
 
     def test_dagrun_set_state_end_date(self, dag_maker, session):
diff --git a/tests_common/pytest_plugin.py b/tests_common/pytest_plugin.py
index 66c271113a7..f777b3d6af8 100644
--- a/tests_common/pytest_plugin.py
+++ b/tests_common/pytest_plugin.py
@@ -937,7 +937,6 @@ def dag_maker(request) -> Generator[DagMaker, None, None]:
                 kwargs.pop("dag_version", None)
                 kwargs.pop("triggered_by", None)
                 kwargs["execution_date"] = logical_date
-
             self.dag_run = dag.create_dagrun(**kwargs)
             for ti in self.dag_run.task_instances:
                 ti.refresh_from_task(dag.get_task(ti.task_id))


Reply via email to