This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch backport-cac9c0b-v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit f54726a69c911c97164854f05baa07584e028210 Author: Aaron Chen <[email protected]> AuthorDate: Thu Jan 29 12:14:15 2026 -0800 dag_processing: initialize versioned bundles for callbacks (#52040) (#60734) * dag_processing: initialize versioned bundles for callbacks (#52040) * Apply suggestion from @amoghrajesh Co-authored-by: Amogh Desai <[email protected]> * Apply suggestion from @amoghrajesh Co-authored-by: Amogh Desai <[email protected]> * Apply suggestion from @amoghrajesh Co-authored-by: Amogh Desai <[email protected]> * Add unit tests for callback handling and improve logging in Dag processing * fix CI static checks #2 --------- Co-authored-by: Amogh Desai <[email protected]> (cherry picked from commit cac9c0ba51480c16464799b6a651e25209163d13) --- airflow-core/src/airflow/dag_processing/manager.py | 30 +++++++++++ .../src/airflow/dag_processing/processor.py | 17 ++++--- .../tests/unit/dag_processing/test_manager.py | 59 ++++++++++++++++++++++ .../tests/unit/dag_processing/test_processor.py | 29 +++++++++++ 4 files changed, 129 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index d325ede7f05..c1c267fe99c 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -49,6 +49,7 @@ from uuid6 import uuid7 from airflow._shared.timezones import timezone from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI from airflow.configuration import conf +from airflow.dag_processing.bundles.base import BundleUsageTrackingManager from airflow.dag_processing.bundles.manager import DagBundlesManager from airflow.dag_processing.collection import update_dag_parsing_results_in_db from airflow.dag_processing.processor import DagFileParsingResult, DagFileProcessorProcess @@ -175,6 +176,9 @@ class DagFileProcessorManager(LoggingMixin): parsing_cleanup_interval: float = attrs.field( factory=_config_int_factory("scheduler", "parsing_cleanup_interval") ) + stale_bundle_cleanup_interval: float = attrs.field( + factory=_config_int_factory("dag_processor", "stale_bundle_cleanup_interval") + ) _file_process_interval: float = attrs.field( factory=_config_int_factory("dag_processor", "min_file_process_interval") ) @@ -183,6 +187,7 @@ class DagFileProcessorManager(LoggingMixin): ) _last_deactivate_stale_dags_time: float = attrs.field(default=0, init=False) + _last_stale_bundle_cleanup_time: float = attrs.field(default=0, init=False) print_stats_interval: float = attrs.field( factory=_config_int_factory("dag_processor", "print_stats_interval") ) @@ -302,6 +307,20 @@ class DagFileProcessorManager(LoggingMixin): self.deactivate_stale_dags(last_parsed=last_parsed) self._last_deactivate_stale_dags_time = time.monotonic() + def _cleanup_stale_bundle_versions(self): + if self.stale_bundle_cleanup_interval <= 0: + return + now = time.monotonic() + elapsed_time_since_cleanup = now - self._last_stale_bundle_cleanup_time + if elapsed_time_since_cleanup < self.stale_bundle_cleanup_interval: + return + try: + BundleUsageTrackingManager().remove_stale_bundle_versions() + except Exception: + self.log.exception("Error removing stale bundle versions") + finally: + self._last_stale_bundle_cleanup_time = now + @provide_session def deactivate_stale_dags( self, @@ -383,6 +402,7 @@ class DagFileProcessorManager(LoggingMixin): for callback in self._fetch_callbacks(): self._add_callback_to_queue(callback) self._scan_stale_dags() + self._cleanup_stale_bundle_versions() DagWarning.purge_inactive_dag_warnings() # Update number of loop iteration. @@ -497,6 +517,16 @@ class DagFileProcessorManager(LoggingMixin): # Bundle no longer configured self.log.error("Bundle %s no longer configured, skipping callback", request.bundle_name) return None + if bundle.supports_versioning and request.bundle_version: + try: + bundle.initialize() + except Exception: + self.log.exception( + "Error initializing bundle %s version %s for callback, skipping", + request.bundle_name, + request.bundle_version, + ) + return None file_info = DagFileInfo( rel_path=Path(request.filepath), diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 49f997f9a1e..50982726e4b 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -34,6 +34,7 @@ from airflow.callbacks.callback_requests import ( TaskCallbackRequest, ) from airflow.configuration import conf +from airflow.dag_processing.bundles.base import BundleVersionLock from airflow.exceptions import TaskNotFound from airflow.models.dagbag import BundleDagBag, DagBag from airflow.sdk.execution_time.comms import ( @@ -279,12 +280,16 @@ def _execute_callbacks( ) -> None: for request in callback_requests: log.debug("Processing Callback Request", request=request.to_json()) - if isinstance(request, TaskCallbackRequest): - _execute_task_callbacks(dagbag, request, log) - elif isinstance(request, DagCallbackRequest): - _execute_dag_callbacks(dagbag, request, log) - elif isinstance(request, EmailRequest): - _execute_email_callbacks(dagbag, request, log) + with BundleVersionLock( + bundle_name=request.bundle_name, + bundle_version=request.bundle_version, + ): + if isinstance(request, TaskCallbackRequest): + _execute_task_callbacks(dagbag, request, log) + elif isinstance(request, DagCallbackRequest): + _execute_dag_callbacks(dagbag, request, log) + elif isinstance(request, EmailRequest): + _execute_email_callbacks(dagbag, request, log) def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log: FilteringBoundLogger) -> None: diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py b/airflow-core/tests/unit/dag_processing/test_manager.py index 922543add0c..c06e96388e0 100644 --- a/airflow-core/tests/unit/dag_processing/test_manager.py +++ b/airflow-core/tests/unit/dag_processing/test_manager.py @@ -588,6 +588,20 @@ class TestDagFileProcessorManager: # SerializedDagModel gives history about Dags assert serialized_dag_count == 1 + @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager") + def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager): + manager = DagFileProcessorManager(max_runs=1) + manager.stale_bundle_cleanup_interval = 10 + + manager._last_stale_bundle_cleanup_time = time.monotonic() - 20 + manager._cleanup_stale_bundle_versions() + mock_bundle_manager.return_value.remove_stale_bundle_versions.assert_called_once() + + mock_bundle_manager.return_value.remove_stale_bundle_versions.reset_mock() + manager._last_stale_bundle_cleanup_time = time.monotonic() + manager._cleanup_stale_bundle_versions() + mock_bundle_manager.return_value.remove_stale_bundle_versions.assert_not_called() + def test_kill_timed_out_processors_kill(self): manager = DagFileProcessorManager(max_runs=1, processor_timeout=5) # Set start_time to ensure timeout occurs: start_time = current_time - (timeout + 1) = always (timeout + 1) seconds @@ -1036,6 +1050,51 @@ class TestDagFileProcessorManager: assert dag1_path not in manager._callback_to_execute assert dag2_path not in manager._callback_to_execute + @mock.patch("airflow.dag_processing.manager.DagBundlesManager") + def test_add_callback_initializes_versioned_bundle(self, mock_bundle_manager): + manager = DagFileProcessorManager(max_runs=1) + bundle = MagicMock() + bundle.supports_versioning = True + bundle.path = Path("/tmp/bundle") + mock_bundle_manager.return_value.get_bundle.return_value = bundle + + request = DagCallbackRequest( + filepath="file1.py", + dag_id="dag1", + run_id="run1", + is_failure_callback=False, + bundle_name="testing", + bundle_version="some_commit_hash", + msg=None, + ) + + manager._add_callback_to_queue(request) + + bundle.initialize.assert_called_once() + + @mock.patch("airflow.dag_processing.manager.DagBundlesManager") + def test_add_callback_skips_when_bundle_init_fails(self, mock_bundle_manager): + manager = DagFileProcessorManager(max_runs=1) + bundle = MagicMock() + bundle.supports_versioning = True + bundle.initialize.side_effect = Exception("clone failed") + mock_bundle_manager.return_value.get_bundle.return_value = bundle + + request = DagCallbackRequest( + filepath="file1.py", + dag_id="dag1", + run_id="run1", + is_failure_callback=False, + bundle_name="testing", + bundle_version="some_commit_hash", + msg=None, + ) + + manager._add_callback_to_queue(request) + + bundle.initialize.assert_called_once() + assert len(manager._callback_to_execute) == 0 + def test_dag_with_assets(self, session, configure_testing_dag_bundle): """'Integration' test to ensure that the assets get parsed and stored correctly for parsed dags.""" test_dag_path = str(TEST_DAG_FOLDER / "test_assets.py") diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 63e63a57643..c4027bb92bd 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -56,6 +56,7 @@ from airflow.dag_processing.processor import ( DagFileProcessorProcess, ToDagProcessor, ToManager, + _execute_callbacks, _execute_dag_callbacks, _execute_email_callbacks, _execute_task_callbacks, @@ -654,6 +655,34 @@ def test_import_error_updates_timestamps(session): assert stat.import_errors == 1 +class TestExecuteCallbacks: + def test_execute_callbacks_locks_bundle_version(self): + callbacks = [ + DagCallbackRequest( + filepath="test.py", + dag_id="test_dag", + run_id="test_run", + bundle_name="testing", + bundle_version="some_commit_hash", + is_failure_callback=False, + msg=None, + ) + ] + log = MagicMock(spec=FilteringBoundLogger) + dagbag = MagicMock(spec=DagBag) + + with ( + patch("airflow.dag_processing.processor.BundleVersionLock") as mock_lock, + patch("airflow.dag_processing.processor._execute_dag_callbacks") as mock_execute, + ): + _execute_callbacks(dagbag, callbacks, log) + + mock_lock.assert_called_once_with(bundle_name="testing", bundle_version="some_commit_hash") + mock_lock.return_value.__enter__.assert_called_once() + mock_lock.return_value.__exit__.assert_called_once() + mock_execute.assert_called_once_with(dagbag, callbacks[0], log) + + class TestExecuteDagCallbacks: """Test the _execute_dag_callbacks function with context_from_server"""
