uranusjr commented on code in PR #58242:
URL: https://github.com/apache/airflow/pull/58242#discussion_r2521900735
##########
airflow-core/src/airflow/dag_processing/collection.py:
##########
@@ -285,21 +285,22 @@ def _update_import_errors(
):
from airflow.listeners.listener import get_listener_manager
- # We can remove anything from files parsed in this batch that doesn't have
an error. We need to remove old
- # errors (i.e. from files that are removed) separately
-
- session.execute(
- delete(ParseImportError).where(
- tuple_(ParseImportError.bundle_name,
ParseImportError.filename).in_(files_parsed)
- )
- )
-
- # the below query has to match (bundle_name, filename) tuple in that order
since the
- # import_errors list is a dict with keys as (bundle_name, relative_fileloc)
+ # Check existing import errors BEFORE deleting, so we can determine if we
should update or create
existing_import_error_files = set(
session.execute(select(ParseImportError.bundle_name,
ParseImportError.filename))
)
- # Add the errors of the processed files
+
+ # Delete errors for files that were parsed but don't have errors in
import_errors
+ # (i.e., files that were successfully parsed without errors)
+ files_to_clear = files_parsed - set(import_errors.keys())
Review Comment:
```suggestion
files_to_clear = files_parsed.difference(import_errors)
```
Saves some performance.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]