This is an automated email from the ASF dual-hosted git repository.
ephraimanierobi pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-1-test by this push:
new 3b41866a2ee dag_processing: initialize versioned bundles for callbacks
(#52040) (#60734) (#61230)
3b41866a2ee is described below
commit 3b41866a2ee244a7e00332213bd0a8a5e8eaedfe
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Jan 30 07:49:06 2026 +0100
dag_processing: initialize versioned bundles for callbacks (#52040)
(#60734) (#61230)
* dag_processing: initialize versioned bundles for callbacks (#52040)
* Apply suggestion from @amoghrajesh
* Apply suggestion from @amoghrajesh
* Apply suggestion from @amoghrajesh
* Add unit tests for callback handling and improve logging in Dag processing
* fix CI static checks #2
---------
(cherry picked from commit cac9c0ba51480c16464799b6a651e25209163d13)
Co-authored-by: Aaron Chen <[email protected]>
Co-authored-by: Amogh Desai <[email protected]>
---
airflow-core/src/airflow/dag_processing/manager.py | 30 +++++++++++
.../src/airflow/dag_processing/processor.py | 17 ++++---
.../tests/unit/dag_processing/test_manager.py | 59 ++++++++++++++++++++++
.../tests/unit/dag_processing/test_processor.py | 29 +++++++++++
4 files changed, 129 insertions(+), 6 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index d325ede7f05..c1c267fe99c 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -49,6 +49,7 @@ from uuid6 import uuid7
from airflow._shared.timezones import timezone
from airflow.api_fastapi.execution_api.app import InProcessExecutionAPI
from airflow.configuration import conf
+from airflow.dag_processing.bundles.base import BundleUsageTrackingManager
from airflow.dag_processing.bundles.manager import DagBundlesManager
from airflow.dag_processing.collection import update_dag_parsing_results_in_db
from airflow.dag_processing.processor import DagFileParsingResult,
DagFileProcessorProcess
@@ -175,6 +176,9 @@ class DagFileProcessorManager(LoggingMixin):
parsing_cleanup_interval: float = attrs.field(
factory=_config_int_factory("scheduler", "parsing_cleanup_interval")
)
+ stale_bundle_cleanup_interval: float = attrs.field(
+ factory=_config_int_factory("dag_processor",
"stale_bundle_cleanup_interval")
+ )
_file_process_interval: float = attrs.field(
factory=_config_int_factory("dag_processor",
"min_file_process_interval")
)
@@ -183,6 +187,7 @@ class DagFileProcessorManager(LoggingMixin):
)
_last_deactivate_stale_dags_time: float = attrs.field(default=0,
init=False)
+ _last_stale_bundle_cleanup_time: float = attrs.field(default=0, init=False)
print_stats_interval: float = attrs.field(
factory=_config_int_factory("dag_processor", "print_stats_interval")
)
@@ -302,6 +307,20 @@ class DagFileProcessorManager(LoggingMixin):
self.deactivate_stale_dags(last_parsed=last_parsed)
self._last_deactivate_stale_dags_time = time.monotonic()
+ def _cleanup_stale_bundle_versions(self):
+ if self.stale_bundle_cleanup_interval <= 0:
+ return
+ now = time.monotonic()
+ elapsed_time_since_cleanup = now - self._last_stale_bundle_cleanup_time
+ if elapsed_time_since_cleanup < self.stale_bundle_cleanup_interval:
+ return
+ try:
+ BundleUsageTrackingManager().remove_stale_bundle_versions()
+ except Exception:
+ self.log.exception("Error removing stale bundle versions")
+ finally:
+ self._last_stale_bundle_cleanup_time = now
+
@provide_session
def deactivate_stale_dags(
self,
@@ -383,6 +402,7 @@ class DagFileProcessorManager(LoggingMixin):
for callback in self._fetch_callbacks():
self._add_callback_to_queue(callback)
self._scan_stale_dags()
+ self._cleanup_stale_bundle_versions()
DagWarning.purge_inactive_dag_warnings()
# Update number of loop iteration.
@@ -497,6 +517,16 @@ class DagFileProcessorManager(LoggingMixin):
# Bundle no longer configured
self.log.error("Bundle %s no longer configured, skipping
callback", request.bundle_name)
return None
+ if bundle.supports_versioning and request.bundle_version:
+ try:
+ bundle.initialize()
+ except Exception:
+ self.log.exception(
+ "Error initializing bundle %s version %s for callback,
skipping",
+ request.bundle_name,
+ request.bundle_version,
+ )
+ return None
file_info = DagFileInfo(
rel_path=Path(request.filepath),
diff --git a/airflow-core/src/airflow/dag_processing/processor.py
b/airflow-core/src/airflow/dag_processing/processor.py
index 49f997f9a1e..50982726e4b 100644
--- a/airflow-core/src/airflow/dag_processing/processor.py
+++ b/airflow-core/src/airflow/dag_processing/processor.py
@@ -34,6 +34,7 @@ from airflow.callbacks.callback_requests import (
TaskCallbackRequest,
)
from airflow.configuration import conf
+from airflow.dag_processing.bundles.base import BundleVersionLock
from airflow.exceptions import TaskNotFound
from airflow.models.dagbag import BundleDagBag, DagBag
from airflow.sdk.execution_time.comms import (
@@ -279,12 +280,16 @@ def _execute_callbacks(
) -> None:
for request in callback_requests:
log.debug("Processing Callback Request", request=request.to_json())
- if isinstance(request, TaskCallbackRequest):
- _execute_task_callbacks(dagbag, request, log)
- elif isinstance(request, DagCallbackRequest):
- _execute_dag_callbacks(dagbag, request, log)
- elif isinstance(request, EmailRequest):
- _execute_email_callbacks(dagbag, request, log)
+ with BundleVersionLock(
+ bundle_name=request.bundle_name,
+ bundle_version=request.bundle_version,
+ ):
+ if isinstance(request, TaskCallbackRequest):
+ _execute_task_callbacks(dagbag, request, log)
+ elif isinstance(request, DagCallbackRequest):
+ _execute_dag_callbacks(dagbag, request, log)
+ elif isinstance(request, EmailRequest):
+ _execute_email_callbacks(dagbag, request, log)
def _execute_dag_callbacks(dagbag: DagBag, request: DagCallbackRequest, log:
FilteringBoundLogger) -> None:
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 922543add0c..c06e96388e0 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -588,6 +588,20 @@ class TestDagFileProcessorManager:
# SerializedDagModel gives history about Dags
assert serialized_dag_count == 1
+ @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
+ def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.stale_bundle_cleanup_interval = 10
+
+ manager._last_stale_bundle_cleanup_time = time.monotonic() - 20
+ manager._cleanup_stale_bundle_versions()
+
mock_bundle_manager.return_value.remove_stale_bundle_versions.assert_called_once()
+
+
mock_bundle_manager.return_value.remove_stale_bundle_versions.reset_mock()
+ manager._last_stale_bundle_cleanup_time = time.monotonic()
+ manager._cleanup_stale_bundle_versions()
+
mock_bundle_manager.return_value.remove_stale_bundle_versions.assert_not_called()
+
def test_kill_timed_out_processors_kill(self):
manager = DagFileProcessorManager(max_runs=1, processor_timeout=5)
# Set start_time to ensure timeout occurs: start_time = current_time -
(timeout + 1) = always (timeout + 1) seconds
@@ -1036,6 +1050,51 @@ class TestDagFileProcessorManager:
assert dag1_path not in manager._callback_to_execute
assert dag2_path not in manager._callback_to_execute
+ @mock.patch("airflow.dag_processing.manager.DagBundlesManager")
+ def test_add_callback_initializes_versioned_bundle(self,
mock_bundle_manager):
+ manager = DagFileProcessorManager(max_runs=1)
+ bundle = MagicMock()
+ bundle.supports_versioning = True
+ bundle.path = Path("/tmp/bundle")
+ mock_bundle_manager.return_value.get_bundle.return_value = bundle
+
+ request = DagCallbackRequest(
+ filepath="file1.py",
+ dag_id="dag1",
+ run_id="run1",
+ is_failure_callback=False,
+ bundle_name="testing",
+ bundle_version="some_commit_hash",
+ msg=None,
+ )
+
+ manager._add_callback_to_queue(request)
+
+ bundle.initialize.assert_called_once()
+
+ @mock.patch("airflow.dag_processing.manager.DagBundlesManager")
+ def test_add_callback_skips_when_bundle_init_fails(self,
mock_bundle_manager):
+ manager = DagFileProcessorManager(max_runs=1)
+ bundle = MagicMock()
+ bundle.supports_versioning = True
+ bundle.initialize.side_effect = Exception("clone failed")
+ mock_bundle_manager.return_value.get_bundle.return_value = bundle
+
+ request = DagCallbackRequest(
+ filepath="file1.py",
+ dag_id="dag1",
+ run_id="run1",
+ is_failure_callback=False,
+ bundle_name="testing",
+ bundle_version="some_commit_hash",
+ msg=None,
+ )
+
+ manager._add_callback_to_queue(request)
+
+ bundle.initialize.assert_called_once()
+ assert len(manager._callback_to_execute) == 0
+
def test_dag_with_assets(self, session, configure_testing_dag_bundle):
"""'Integration' test to ensure that the assets get parsed and stored
correctly for parsed dags."""
test_dag_path = str(TEST_DAG_FOLDER / "test_assets.py")
diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py
b/airflow-core/tests/unit/dag_processing/test_processor.py
index 63e63a57643..c4027bb92bd 100644
--- a/airflow-core/tests/unit/dag_processing/test_processor.py
+++ b/airflow-core/tests/unit/dag_processing/test_processor.py
@@ -56,6 +56,7 @@ from airflow.dag_processing.processor import (
DagFileProcessorProcess,
ToDagProcessor,
ToManager,
+ _execute_callbacks,
_execute_dag_callbacks,
_execute_email_callbacks,
_execute_task_callbacks,
@@ -654,6 +655,34 @@ def test_import_error_updates_timestamps(session):
assert stat.import_errors == 1
+class TestExecuteCallbacks:
+ def test_execute_callbacks_locks_bundle_version(self):
+ callbacks = [
+ DagCallbackRequest(
+ filepath="test.py",
+ dag_id="test_dag",
+ run_id="test_run",
+ bundle_name="testing",
+ bundle_version="some_commit_hash",
+ is_failure_callback=False,
+ msg=None,
+ )
+ ]
+ log = MagicMock(spec=FilteringBoundLogger)
+ dagbag = MagicMock(spec=DagBag)
+
+ with (
+ patch("airflow.dag_processing.processor.BundleVersionLock") as
mock_lock,
+ patch("airflow.dag_processing.processor._execute_dag_callbacks")
as mock_execute,
+ ):
+ _execute_callbacks(dagbag, callbacks, log)
+
+ mock_lock.assert_called_once_with(bundle_name="testing",
bundle_version="some_commit_hash")
+ mock_lock.return_value.__enter__.assert_called_once()
+ mock_lock.return_value.__exit__.assert_called_once()
+ mock_execute.assert_called_once_with(dagbag, callbacks[0], log)
+
+
class TestExecuteDagCallbacks:
"""Test the _execute_dag_callbacks function with context_from_server"""