jedcunningham commented on code in PR #46443:
URL: https://github.com/apache/airflow/pull/46443#discussion_r1945574318


##########
airflow/dag_processing/manager.py:
##########
@@ -699,34 +687,51 @@ def _log_file_processing_stats(self, known_files):
 
         self.log.info(log_str)
 
-    def set_files(self, files: list[DagFileInfo]):
+    def handle_removed_files(self, known_files: dict[str, set[DagFileInfo]]):
         """
-        Update the set of files to track in the dag processor.
+        Remove from data structures the files that are missing.
+
+        Also, terminate processes that may be running on those removed files.
 
-        :param files: list of files
+        :param known_files: structure containing known files per-bundle
         :return: None
         """
-        self._files = files
+        files_set: set[DagFileInfo] = set()
+        """Set containing all observed files.
+
+        We're just converting this to a set for performance.

Review Comment:
   ```suggestion
           We're just converting this to a single set of DagFileInfos for 
performance.
   ```



##########
airflow/dag_processing/manager.py:
##########
@@ -549,60 +547,48 @@ def find_zipped_dags(abs_path: os.PathLike) -> 
Iterator[str]:
             except zipfile.BadZipFile:
                 self.log.exception("There was an error accessing ZIP file %s 
%s", abs_path)
 
-        present: set[tuple[str, str]] = set()
-        """
-        Tuple containing bundle name and relative fileloc of the dag file.
-
-        If the dag file is embedded in a zip file, the relative fileloc will 
be the
-        zip file path (relative to bundle path) joined with the path to the 
dag file (relative
-        to the zip file path).
-        """
-
-        for info in active_files:
+        rel_filelocs: list[str] = []
+        for info in present:
             abs_path = str(info.absolute_path)
             if abs_path.endswith(".py") or not zipfile.is_zipfile(abs_path):
-                present.add((info.bundle_name, str(info.rel_path)))
+                rel_filelocs.append(str(info.rel_path))
             else:
                 if TYPE_CHECKING:
                     assert info.bundle_path
                 for abs_sub_path in 
find_zipped_dags(abs_path=info.absolute_path):
                     rel_sub_path = 
Path(abs_sub_path).relative_to(info.bundle_path)
-                    present.add((info.bundle_name, str(rel_sub_path)))
+                    rel_filelocs.append(str(rel_sub_path))
 
-        DagModel.deactivate_deleted_dags(present)
+        DagModel.deactivate_deleted_dags(bundle_name=bundle_name, 
rel_filelocs=rel_filelocs)
 
-    def _print_stat(self):
+    def print_stats(self, known_files: dict[str, set[DagFileInfo]]):
         """Occasionally print out stats about how fast the files are getting 
processed."""
         if 0 < self.print_stats_interval < time.monotonic() - 
self.last_stat_print_time:
-            if self._files:
-                self._log_file_processing_stats(self._files)
+            if known_files:
+                self._log_file_processing_stats(known_files=known_files)
             self.last_stat_print_time = time.monotonic()
 
     @provide_session
-    def clear_nonexistent_import_errors(self, session=NEW_SESSION):
+    def clear_orphaned_import_errors(
+        self, bundle_name: str, observed_filelocs: set[str], session: Session 
= NEW_SESSION
+    ):
         """
         Clear import errors for files that no longer exist.
 
         :param session: session for ORM operations
         """
         self.log.debug("Removing old import errors")
         try:
-            query = delete(ParseImportError)
-
-            if self._files:
-                query = query.where(
-                    tuple_(ParseImportError.filename, 
ParseImportError.bundle_name).notin_(
-                        # todo AIP-66: ParseImportError should have rel 
fileloce + bundle name
-                        [(str(f.absolute_path), f.bundle_name) for f in 
self._files]
-                    ),
-                )
-
-            
session.execute(query.execution_options(synchronize_session="fetch"))
-            session.commit()
+            errors = session.scalars(
+                select(ParseImportError).where(ParseImportError.bundle_name == 
bundle_name)

Review Comment:
   Just need to bring back the filename, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to