Copilot commented on code in PR #60734:
URL: https://github.com/apache/airflow/pull/60734#discussion_r2739600702
##########
airflow-core/src/airflow/dag_processing/processor.py:
##########
@@ -287,12 +288,16 @@ def _execute_callbacks(
) -> None:
for request in callback_requests:
log.debug("Processing Callback Request", request=request.to_json())
- if isinstance(request, TaskCallbackRequest):
- _execute_task_callbacks(dagbag, request, log)
- elif isinstance(request, DagCallbackRequest):
- _execute_dag_callbacks(dagbag, request, log)
- elif isinstance(request, EmailRequest):
- _execute_email_callbacks(dagbag, request, log)
+ with BundleVersionLock(
+ bundle_name=request.bundle_name,
+ bundle_version=request.bundle_version,
+ ):
Review Comment:
The `BundleVersionLock` context manager is applied to all callback requests,
but the lock is only necessary for versioned bundles. For non-versioned
bundles, `request.bundle_version` may be None, and acquiring this lock
unnecessarily could impact performance. Consider conditionally applying the
lock only when `request.bundle_version` is not None.
```suggestion
if request.bundle_version is not None:
with BundleVersionLock(
bundle_name=request.bundle_name,
bundle_version=request.bundle_version,
):
if isinstance(request, TaskCallbackRequest):
_execute_task_callbacks(dagbag, request, log)
elif isinstance(request, DagCallbackRequest):
_execute_dag_callbacks(dagbag, request, log)
elif isinstance(request, EmailRequest):
_execute_email_callbacks(dagbag, request, log)
else:
```
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -593,6 +593,20 @@ def test_scan_stale_dags(self, session):
# SerializedDagModel gives history about Dags
assert serialized_dag_count == 1
+ @mock.patch("airflow.dag_processing.manager.BundleUsageTrackingManager")
+ def test_cleanup_stale_bundle_versions_interval(self, mock_bundle_manager):
Review Comment:
The test verifies that `remove_stale_bundle_versions` is called based on the
cleanup interval, but it doesn't verify what happens when
`stale_bundle_cleanup_interval` is set to 0 or a negative value. According to
the implementation in `_cleanup_stale_bundle_versions`, the method should
return early without calling `remove_stale_bundle_versions` in such cases. Add
a test case to cover this behavior.
##########
airflow-core/tests/unit/dag_processing/test_manager.py:
##########
@@ -1138,6 +1152,28 @@ def test_callback_queue(self, mock_get_logger,
configure_testing_dag_bundle):
assert dag1_path not in manager._callback_to_execute
assert dag2_path not in manager._callback_to_execute
+ @mock.patch("airflow.dag_processing.manager.DagBundlesManager")
+ def test_add_callback_initializes_versioned_bundle(self,
mock_bundle_manager):
Review Comment:
This test verifies that `bundle.initialize()` is called for versioned
bundles, but it doesn't cover the exception handling path where initialization
fails. The implementation logs the exception and skips the callback. Add a test
case where `bundle.initialize()` raises an exception to verify that the error
is logged and the callback is not queued.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]