ashb commented on code in PR #44898:
URL: https://github.com/apache/airflow/pull/44898#discussion_r1883756309
##########
airflow/dag_processing/processor.py:
##########
@@ -417,113 +419,8 @@ def __init__(self, dag_directory: str, log:
logging.Logger):
super().__init__()
self._log = log
self._dag_directory = dag_directory
- self.dag_warnings: set[tuple[str, str]] = set()
self._last_num_of_db_queries = 0
- @staticmethod
- @provide_session
- def update_import_errors(
- file_last_changed: dict[str, datetime],
- import_errors: dict[str, str],
- processor_subdir: str | None,
- session: Session = NEW_SESSION,
- ) -> None:
- """
- Update any import errors to be displayed in the UI.
-
- For the DAGs in the given DagBag, record any associated import errors
and clears
- errors for files that no longer have them. These are usually displayed
through the
- Airflow UI so that users know that there are issues parsing DAGs.
- :param file_last_changed: Dictionary containing the last changed time
of the files
- :param import_errors: Dictionary containing the import errors
- :param session: session for ORM operations
- """
- files_without_error = file_last_changed - import_errors.keys()
-
- # Clear the errors of the processed files
- # that no longer have errors
- for dagbag_file in files_without_error:
- session.execute(
- delete(ParseImportError)
- .where(ParseImportError.filename.startswith(dagbag_file))
- .execution_options(synchronize_session="fetch")
- )
-
- # files that still have errors
- existing_import_error_files = [x.filename for x in
session.query(ParseImportError.filename).all()]
-
- # Add the errors of the processed files
- for filename, stacktrace in import_errors.items():
- if filename in existing_import_error_files:
-
session.query(ParseImportError).filter(ParseImportError.filename ==
filename).update(
- {"filename": filename, "timestamp": timezone.utcnow(),
"stacktrace": stacktrace},
- synchronize_session="fetch",
- )
- # sending notification when an existing dag import error occurs
- get_listener_manager().hook.on_existing_dag_import_error(
- filename=filename, stacktrace=stacktrace
- )
- else:
- session.add(
- ParseImportError(
- filename=filename,
- timestamp=timezone.utcnow(),
- stacktrace=stacktrace,
- processor_subdir=processor_subdir,
- )
- )
- # sending notification when a new dag import error occurs
-
get_listener_manager().hook.on_new_dag_import_error(filename=filename,
stacktrace=stacktrace)
- (
- session.query(DagModel)
- .filter(DagModel.fileloc == filename)
- .update({"has_import_errors": True},
synchronize_session="fetch")
- )
-
- session.commit()
- session.flush()
-
- @classmethod
- @provide_session
- def update_dag_warnings(cla, *, dagbag: DagBag, session: Session =
NEW_SESSION) -> None:
Review Comment:
Split, generating the warnings moved on to dagbag, writing moved to
collection
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]