This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 3b41866a2ee dag_processing: initialize versioned bundles for callbacks 
(#52040) (#60734) (#61230)
3b41866a2ee is described below

commit 3b41866a2ee244a7e00332213bd0a8a5e8eaedfe
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Jan 30 07:49:06 2026 +0100

    dag_processing: initialize versioned bundles for callbacks (#52040) 
(#60734) (#61230)
    
    * dag_processing: initialize versioned bundles for callbacks (#52040)
    
    * Apply suggestion from @amoghrajesh
    
    
    
    * Apply suggestion from @amoghrajesh
    
    
    
    * Apply suggestion from @amoghrajesh
    
    
    
    * Add unit tests for callback handling and improve logging in Dag processing
    
    * fix CI static checks #2
    
    ---------
    
    
    (cherry picked from commit cac9c0ba51480c16464799b6a651e25209163d13)
    
    Co-authored-by: Aaron Chen <[email protected]>
    Co-authored-by: Amogh Desai <[email protected]>
---
 airflow-core/src/airflow/dag_processing/manager.py | 30 +++++++++++
 .../src/airflow/dag_processing/processor.py        | 17 ++++---
 .../tests/unit/dag_processing/test_manager.py      | 59 ++++++++++++++++++++++
 .../tests/unit/dag_processing/test_processor.py    | 29 +++++++++++
 4 files changed, 129 insertions(+), 6 deletions(-)

diff --git a/airflow-core/src/airflow/dag_processing/manager.py 
b/airflow-core/src/airflow/dag_processing/manager.py
index d325ede7f05..c1c267fe99c 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -49,6 +49,7 @@ from uuid6 import uuid7
 from airflow._shared.timezones import timezone
 from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
 from airflow.configuration import conf
+from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
 from airflow.dag_processing.bundles.manager import DagBundlesManager
 from airflow.dag_processing.collection import update_dag_parsing_results_in_db
 from airflow.dag_processing.processor import DagFileParsingResult, 
DagFileProcessorProcess
@@ -175,6 +176,9 @@ class DagFileProcessorManager(LoggingMixin):
     parsing_cleanup_interval: float = attrs.field(
         factory=_config_int_factory("scheduler", "parsing_cleanup_interval")
     )
+    stale_bundle_cleanup_interval: float = attrs.field(
+        factory=_config_int_factory("dag_processor", 
"stale_bundle_cleanup_interval")
+    )
     _file_process_interval: float = attrs.field(
         factory=_config_int_factory("dag_processor", 
"min_file_process_interval")
     )
@@ -183,6 +187,7 @@ class DagFileProcessorManager(LoggingMixin):
     )
 
     _last_deactivate_stale_dags_time: float = attrs.field(default=0, 
init=False)
+    _last_stale_bundle_cleanup_time: float = attrs.field(default=0, init=False)
     print_stats_interval: float = attrs.field(
         factory=_config_int_factory("dag_processor", "print_stats_interval")
     )
@@ -302,6 +307,20 @@ class DagFileProcessorManager(LoggingMixin):
             self.deactivate_stale_dags(last_parsed=last_parsed)
             self._last_deactivate_stale_dags_time = time.monotonic()
 
+    def _cleanup_stale_bundle_versions(self):
+        if self.stale_bundle_cleanup_interval <= 0:
+            return
+        now = time.monotonic()
+        elapsed_time_since_cleanup = now - self._last_stale_bundle_cleanup_time
+        if elapsed_time_since_cleanup < self.stale_bundle_cleanup_interval:
+            return
+        try:
+            BundleUsageTrackingManager().remove_stale_bundle_versions()
+        except Exception:
+            self.log.exception("Error removing stale bundle versions")
+        finally:
+            self._last_stale_bundle_cleanup_time = now
+
     @provide_session
     def deactivate_stale_dags(
         self,
@@ -383,6 +402,7 @@ class DagFileProcessorManager(LoggingMixin):
             for callback in self._fetch_callbacks():
                 self._add_callback_to_queue(callback)
             self._scan_stale_dags()
+            self._cleanup_stale_bundle_versions()
             DagWarning.purge_inactive_dag_warnings()
 
             # Update number of loop iteration.
@@ -497,6 +517,16 @@ class DagFileProcessorManager(LoggingMixin):
             # Bundle no longer configured
             self.log.error("Bundle %s no longer configured, skipping 
callback", request.bundle_name)
             return None
+        if bundle.supports_versioning and request.bundle_version:
+            try:
+                bundle.initialize()
+            except Exception:
+                self.log.exception(
+                    "Error initializing bundle %s version %s for callback, 
skipping",
+                    request.bundle_name,
+                    request.bundle_version,
+                )
+                return None
 
         file_info = DagFileInfo(
             rel_path=Path(request.filepath),
diff --git a/airflow-core/src/airflow/dag_processing/processor.py 
b/airflow-core/src/airflow/dag_processing/processor.py
index 49f997f9a1e..50982726e4b 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -34,6 +34,7 @@ from airflow.callbacks.callback_requests import (
     TaskCallbackRequest,
 )
 from airflow.configuration import conf
+from airflow.dag_processing.bundles.base import BundleVersionLock
 from airflow.exceptions import TaskNotFound
 from airflow.models.dagbag import BundleDagBag, DagBag
 from airflow.sdk.execution_time.comms import (
@@ -279,12 +280,16 @@ def _execute_callbacks(
 ) -> None:
     for request in callback_requests:
         log.debug("Processing Callback Request", request=request.to_json())
-        if isinstance(request, TaskCallbackRequest):
-            _execute_task_callbacks(dagbag, request, log)
-        elif isinstance(request, DagCallbackRequest):
-            _execute_dag_callbacks(dagbag, request, log)
-        elif isinstance(request, EmailRequest):
-            _execute_email_callbacks(dagbag, request, log)
+        with BundleVersionLock(
+            bundle_name=request.bundle_name,
+            bundle_version=request.bundle_version,
+        ):
+            if isinstance(request, TaskCallbackRequest):
+                _execute_task_callbacks(dagbag, request, log)
+            elif isinstance(request, DagCallbackRequest):
+                _execute_dag_callbacks(dagbag, request, log)
+            elif isinstance(request, EmailRequest):
+                _execute_email_callbacks(dagbag, request, log)
 
 
 def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: 
FilteringBoundLogger) -> None:
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py 
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 922543add0c..c06e96388e0 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -588,6 +588,20 @@ class TestDagFileProcessorManager:
         # SerializedDagModel gives history about Dags
         assert serialized_dag_count == 1
 
+    @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
+    def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
+        manager = DagFileProcessorManager(max_runs=1)
+        manager.stale_bundle_cleanup_interval = 10
+
+        manager._last_stale_bundle_cleanup_time = time.monotonic() - 20
+        manager._cleanup_stale_bundle_versions()
+        
mock_bundle_manager.return_value.remove_stale_bundle_versions.assert_called_once()
+
+        
mock_bundle_manager.return_value.remove_stale_bundle_versions.reset_mock()
+        manager._last_stale_bundle_cleanup_time = time.monotonic()
+        manager._cleanup_stale_bundle_versions()
+        
mock_bundle_manager.return_value.remove_stale_bundle_versions.assert_not_called()
+
     def test_kill_timed_out_processors_kill(self):
         manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)
         # Set start_time to ensure timeout occurs: start_time = current_time - 
(timeout + 1) = always (timeout + 1) seconds
@@ -1036,6 +1050,51 @@ class TestDagFileProcessorManager:
             assert dag1_path not in manager._callback_to_execute
             assert dag2_path not in manager._callback_to_execute
 
+    @mock.patch("airflow.dag_processing.manager.DagBundlesManager")
+    def test_add_callback_initializes_versioned_bundle(self, 
mock_bundle_manager):
+        manager = DagFileProcessorManager(max_runs=1)
+        bundle = MagicMock()
+        bundle.supports_versioning = True
+        bundle.path = Path("/tmp/bundle")
+        mock_bundle_manager.return_value.get_bundle.return_value = bundle
+
+        request = DagCallbackRequest(
+            filepath="file1.py",
+            dag_id="dag1",
+            run_id="run1",
+            is_failure_callback=False,
+            bundle_name="testing",
+            bundle_version="some_commit_hash",
+            msg=None,
+        )
+
+        manager._add_callback_to_queue(request)
+
+        bundle.initialize.assert_called_once()
+
+    @mock.patch("airflow.dag_processing.manager.DagBundlesManager")
+    def test_add_callback_skips_when_bundle_init_fails(self, 
mock_bundle_manager):
+        manager = DagFileProcessorManager(max_runs=1)
+        bundle = MagicMock()
+        bundle.supports_versioning = True
+        bundle.initialize.side_effect = Exception("clone failed")
+        mock_bundle_manager.return_value.get_bundle.return_value = bundle
+
+        request = DagCallbackRequest(
+            filepath="file1.py",
+            dag_id="dag1",
+            run_id="run1",
+            is_failure_callback=False,
+            bundle_name="testing",
+            bundle_version="some_commit_hash",
+            msg=None,
+        )
+
+        manager._add_callback_to_queue(request)
+
+        bundle.initialize.assert_called_once()
+        assert len(manager._callback_to_execute) == 0
+
     def test_dag_with_assets(self, session, configure_testing_dag_bundle):
         """'Integration' test to ensure that the assets get parsed and stored 
correctly for parsed dags."""
         test_dag_path = str(TEST_DAG_FOLDER / "test_assets.py")
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py 
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 63e63a57643..c4027bb92bd 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -56,6 +56,7 @@ from airflow.dag_processing.processor import (
     DagFileProcessorProcess,
     ToDagProcessor,
     ToManager,
+    _execute_callbacks,
     _execute_dag_callbacks,
     _execute_email_callbacks,
     _execute_task_callbacks,
@@ -654,6 +655,34 @@ def test_import_error_updates_timestamps(session):
     assert stat.import_errors == 1
 
 
+class TestExecuteCallbacks:
+    def test_execute_callbacks_locks_bundle_version(self):
+        callbacks = [
+            DagCallbackRequest(
+                filepath="test.py",
+                dag_id="test_dag",
+                run_id="test_run",
+                bundle_name="testing",
+                bundle_version="some_commit_hash",
+                is_failure_callback=False,
+                msg=None,
+            )
+        ]
+        log = MagicMock(spec=FilteringBoundLogger)
+        dagbag = MagicMock(spec=DagBag)
+
+        with (
+            patch("airflow.dag_processing.processor.BundleVersionLock") as 
mock_lock,
+            patch("airflow.dag_processing.processor._execute_dag_callbacks") 
as mock_execute,
+        ):
+            _execute_callbacks(dagbag, callbacks, log)
+
+        mock_lock.assert_called_once_with(bundle_name="testing", 
bundle_version="some_commit_hash")
+        mock_lock.return_value.__enter__.assert_called_once()
+        mock_lock.return_value.__exit__.assert_called_once()
+        mock_execute.assert_called_once_with(dagbag, callbacks[0], log)
+
+
 class TestExecuteDagCallbacks:
     """Test the _execute_dag_callbacks function with context_from_server"""
 

Reply via email to