amoghrajesh commented on code in PR #60734:
URL: https://github.com/apache/airflow/pull/60734#discussion_r2740251058


##########
airflow-core/src/airflow/dag_processing/manager.py:
##########
@@ -299,6 +304,20 @@ def _scan_stale_dags(self):
             self.deactivate_stale_dags(last_parsed=last_parsed)
             self._last_deactivate_stale_dags_time = time.monotonic()
 
+    def _cleanup_stale_bundle_versions(self):
+        if self.stale_bundle_cleanup_interval <= 0:
+            return
+        now = time.monotonic()
+        elapsed_time_since_cleanup = now - self._last_stale_bundle_cleanup_time
+        if elapsed_time_since_cleanup < self.stale_bundle_cleanup_interval:
+            return
+        try:
+            BundleUsageTrackingManager().remove_stale_bundle_versions()
+        except Exception:
+            self.log.exception("Error removing stale bundle versions")

Review Comment:
   nit: Do we need to catch exception message and display too?



##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -593,6 +593,20 @@ def test_scan_stale_dags(self, session):
         # SerializedDagModel gives history about Dags
         assert serialized_dag_count == 1
 
+    @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
+    def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):

Review Comment:
   I don't think its needed as we cover whats important, which is the cleanup 
of a stale bundle



##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -666,6 +667,32 @@ def test_import_error_updates_timestamps(session):
     assert stat.import_errors == 1
 
 
+class TestExecuteCallbacks:
+    def test_execute_callbacks_locks_bundle_version(self):
+        request = DagCallbackRequest(

Review Comment:
   ```suggestion
           callbacks = [DagCallbackRequest(
   ```



##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -666,6 +667,32 @@ def test_import_error_updates_timestamps(session):
     assert stat.import_errors == 1
 
 
+class TestExecuteCallbacks:
+    def test_execute_callbacks_locks_bundle_version(self):
+        request = DagCallbackRequest(
+            filepath="test.py",
+            dag_id="test_dag",
+            run_id="test_run",
+            bundle_name="testing",
+            bundle_version="some_commit_hash",
+            is_failure_callback=False,
+            msg=None,
+        )
+        log = structlog.get_logger()
+        dagbag = MagicMock()
+
+        with (
+            patch("airflow.dag_processing.processor.BundleVersionLock") as 
mock_lock,
+            patch("airflow.dag_processing.processor._execute_dag_callbacks") 
as mock_execute,
+        ):
+            _execute_callbacks(dagbag, [request], log)

Review Comment:
   ```suggestion
               _execute_callbacks(dagbag, callbacks, log)
   ```



##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -287,12 +288,16 @@ def _execute_callbacks(
 ) -> None:
     for request in callback_requests:
         log.debug("Processing Callback Request", request=request.to_json())
-        if isinstance(request, TaskCallbackRequest):
-            _execute_task_callbacks(dagbag, request, log)
-        elif isinstance(request, DagCallbackRequest):
-            _execute_dag_callbacks(dagbag, request, log)
-        elif isinstance(request, EmailRequest):
-            _execute_email_callbacks(dagbag, request, log)
+        with BundleVersionLock(
+            bundle_name=request.bundle_name,
+            bundle_version=request.bundle_version,
+        ):
+            if isinstance(request, TaskCallbackRequest):
+                _execute_task_callbacks(dagbag, request, log)
+            elif isinstance(request, DagCallbackRequest):
+                _execute_dag_callbacks(dagbag, request, log)
+            elif isinstance(request, EmailRequest):
+                _execute_email_callbacks(dagbag, request, log)

Review Comment:
   This is OK



##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -666,6 +667,32 @@ def test_import_error_updates_timestamps(session):
     assert stat.import_errors == 1
 
 
+class TestExecuteCallbacks:
+    def test_execute_callbacks_locks_bundle_version(self):
+        request = DagCallbackRequest(
+            filepath="test.py",
+            dag_id="test_dag",
+            run_id="test_run",
+            bundle_name="testing",
+            bundle_version="some_commit_hash",
+            is_failure_callback=False,
+            msg=None,
+        )

Review Comment:
   ```suggestion
           )]
   ```



##########
airflow-core/tests/unit/dag_processing/test_processor.py:
##########
@@ -666,6 +667,32 @@ def test_import_error_updates_timestamps(session):
     assert stat.import_errors == 1
 
 
+class TestExecuteCallbacks:
+    def test_execute_callbacks_locks_bundle_version(self):
+        request = DagCallbackRequest(
+            filepath="test.py",
+            dag_id="test_dag",
+            run_id="test_run",
+            bundle_name="testing",
+            bundle_version="some_commit_hash",
+            is_failure_callback=False,
+            msg=None,
+        )
+        log = structlog.get_logger()

Review Comment:
   ```suggestion
           log = mock.MagicMock()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to