jedcunningham commented on code in PR #46443:
URL: https://github.com/apache/airflow/pull/46443#discussion_r1945574318
##########
airflow/dag_processing/manager.py:
##########
@@ -699,34 +687,51 @@ def _log_file_processing_stats(self, known_files):
self.log.info(log_str)
- def set_files(self, files: list[DagFileInfo]):
+ def handle_removed_files(self, known_files: dict[str, set[DagFileInfo]]):
"""
- Update the set of files to track in the dag processor.
+ Remove from data structures the files that are missing.
+
+ Also, terminate processes that may be running on those removed files.
- :param files: list of files
+ :param known_files: structure containing known files per-bundle
:return: None
"""
- self._files = files
+ files_set: set[DagFileInfo] = set()
+ """Set containing all observed files.
+
+ We're just converting this to a set for performance.
Review Comment:
```suggestion
We're just converting this to a single set of DagFileInfos for
performance.
```
##########
airflow/dag_processing/manager.py:
##########
@@ -549,60 +547,48 @@ def find_zipped_dags(abs_path: os.PathLike) ->
Iterator[str]:
except zipfile.BadZipFile:
self.log.exception("There was an error accessing ZIP file %s
%s", abs_path)
- present: set[tuple[str, str]] = set()
- """
- Tuple containing bundle name and relative fileloc of the dag file.
-
- If the dag file is embedded in a zip file, the relative fileloc will
be the
- zip file path (relative to bundle path) joined with the path to the
dag file (relative
- to the zip file path).
- """
-
- for info in active_files:
+ rel_filelocs: list[str] = []
+ for info in present:
abs_path = str(info.absolute_path)
if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
- present.add((info.bundle_name, str(info.rel_path)))
+ rel_filelocs.append(str(info.rel_path))
else:
if TYPE_CHECKING:
assert info.bundle_path
for abs_sub_path in
find_zipped_dags(abs_path=info.absolute_path):
rel_sub_path =
Path(abs_sub_path).relative_to(info.bundle_path)
- present.add((info.bundle_name, str(rel_sub_path)))
+ rel_filelocs.append(str(rel_sub_path))
- DagModel.deactivate_deleted_dags(present)
+ DagModel.deactivate_deleted_dags(bundle_name=bundle_name,
rel_filelocs=rel_filelocs)
- def _print_stat(self):
+ def print_stats(self, known_files: dict[str, set[DagFileInfo]]):
"""Occasionally print out stats about how fast the files are getting
processed."""
if 0 < self.print_stats_interval < time.monotonic() -
self.last_stat_print_time:
- if self._files:
- self._log_file_processing_stats(self._files)
+ if known_files:
+ self._log_file_processing_stats(known_files=known_files)
self.last_stat_print_time = time.monotonic()
@provide_session
- def clear_nonexistent_import_errors(self, session=NEW_SESSION):
+ def clear_orphaned_import_errors(
+ self, bundle_name: str, observed_filelocs: set[str], session: Session
= NEW_SESSION
+ ):
"""
Clear import errors for files that no longer exist.
:param session: session for ORM operations
"""
self.log.debug("Removing old import errors")
try:
- query = delete(ParseImportError)
-
- if self._files:
- query = query.where(
- tuple_(ParseImportError.filename,
ParseImportError.bundle_name).notin_(
- # todo AIP-66: ParseImportError should have rel
fileloce + bundle name
- [(str(f.absolute_path), f.bundle_name) for f in
self._files]
- ),
- )
-
-
session.execute(query.execution_options(synchronize_session="fetch"))
- session.commit()
+ errors = session.scalars(
+ select(ParseImportError).where(ParseImportError.bundle_name ==
bundle_name)
Review Comment:
Just need to bring back the filename, right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]