This is an automated email from the ASF dual-hosted git repository.

ash pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c2916954582 Refactor stat helper methods on DagFileProcessorManager 
(#44818)
c2916954582 is described below

commit c29169545825fdc9486db0d6e2281b473518ff05
Author: Ash Berlin-Taylor <[email protected]>
AuthorDate: Tue Dec 10 17:29:07 2024 +0000

    Refactor stat helper methods on DagFileProcessorManager (#44818)
    
    This is some cleanup/preporatory work in order to swap the Dag processor 
over
    to use the TaskSDK, and this small change is done to make the future work
    easier.
    
    The main bulk of this change is to remote the `get_*` helper methods and 
make
    `_file_stats` a defaultdict instead, and then also swap the DagFileStat 
class
    from a NamedTuple (which can't have defaults) to an attrs-defined class 
which
    does.
    
    To make some of the places of use nicer/still one line, the type of
    last_duration was changed form a timedelta to a float, as this was what the
    `get_last_runtime` method did, and we don't need any of the capabilities of 
a
    timedelta object elsewhere.
---
 airflow/dag_processing/manager.py    | 134 ++++++++---------------------------
 tests/dag_processing/test_manager.py |  11 +--
 2 files changed, 37 insertions(+), 108 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index d0fbcf44d95..2df7890a48f 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -31,12 +31,13 @@ import sys
 import time
 import zipfile
 from collections import defaultdict, deque
-from collections.abc import Iterator
+from collections.abc import Iterator, MutableMapping
 from datetime import datetime, timedelta
 from importlib import import_module
 from pathlib import Path
 from typing import TYPE_CHECKING, Any, Callable, NamedTuple, cast
 
+import attrs
 from setproctitle import setproctitle
 from sqlalchemy import delete, select, update
 from tabulate import tabulate
@@ -81,15 +82,16 @@ class DagParsingStat(NamedTuple):
     all_files_processed: bool
 
 
-class DagFileStat(NamedTuple):
[email protected]
+class DagFileStat:
     """Information about single processing of one file."""
 
-    num_dags: int
-    import_errors: int
-    last_finish_time: datetime | None
-    last_duration: timedelta | None
-    run_count: int
-    last_num_of_db_queries: int
+    num_dags: int = 0
+    import_errors: int = 0
+    last_finish_time: datetime | None = None
+    last_duration: float | None = None
+    run_count: int = 0
+    last_num_of_db_queries: int = 0
 
 
 class DagParsingSignal(enum.Enum):
@@ -353,15 +355,6 @@ class DagFileProcessorManager(LoggingMixin):
     :param async_mode: whether to start the manager in async mode
     """
 
-    DEFAULT_FILE_STAT = DagFileStat(
-        num_dags=0,
-        import_errors=0,
-        last_finish_time=None,
-        last_duration=None,
-        run_count=0,
-        last_num_of_db_queries=0,
-    )
-
     def __init__(
         self,
         dag_directory: os.PathLike[str],
@@ -416,7 +409,7 @@ class DagFileProcessorManager(LoggingMixin):
         self._num_run = 0
 
         # Map from file path to stats about the file
-        self._file_stats: dict[str, DagFileStat] = {}
+        self._file_stats: MutableMapping[str, DagFileStat] = 
defaultdict(DagFileStat)
 
         # Last time that the DAG dir was traversed to look for files
         self.last_dag_dir_refresh_time = 
timezone.make_aware(datetime.fromtimestamp(0))
@@ -488,7 +481,7 @@ class DagFileProcessorManager(LoggingMixin):
         elapsed_time_since_refresh = (now - 
self.last_deactivate_stale_dags_time).total_seconds()
         if elapsed_time_since_refresh > self.parsing_cleanup_interval:
             last_parsed = {
-                fp: self.get_last_finish_time(fp) for fp in self.file_paths if 
self.get_last_finish_time(fp)
+                fp: stat.last_finish_time for fp, stat in 
self._file_stats.items() if stat.last_finish_time
             }
             DagFileProcessorManager.deactivate_stale_dags(
                 last_parsed=last_parsed,
@@ -501,7 +494,7 @@ class DagFileProcessorManager(LoggingMixin):
     @provide_session
     def deactivate_stale_dags(
         cls,
-        last_parsed: dict[str, datetime | None],
+        last_parsed: dict[str, datetime],
         dag_directory: str,
         stale_dag_threshold: int,
         session: Session = NEW_SESSION,
@@ -655,7 +648,9 @@ class DagFileProcessorManager(LoggingMixin):
                     span.add_event(name="print_stat")
                 self._print_stat()
 
-                all_files_processed = all(self.get_last_finish_time(x) is not 
None for x in self.file_paths)
+                all_files_processed = all(
+                    self._file_stats[x].last_finish_time is not None for x in 
self.file_paths
+                )
                 max_runs_reached = self.max_runs_reached()
 
                 try:
@@ -872,30 +867,27 @@ class DagFileProcessorManager(LoggingMixin):
         rows = []
         now = timezone.utcnow()
         for file_path in known_file_paths:
-            last_runtime = self.get_last_runtime(file_path)
-            num_dags = self.get_last_dag_count(file_path)
-            num_errors = self.get_last_error_count(file_path)
+            stat = self._file_stats[file_path]
             file_name = Path(file_path).stem
             processor_pid = self.get_pid(file_path)
             processor_start_time = self.get_start_time(file_path)
             runtime = (now - processor_start_time) if processor_start_time 
else None
-            last_run = self.get_last_finish_time(file_path)
+            last_run = stat.last_finish_time
             if last_run:
                 seconds_ago = (now - last_run).total_seconds()
                 
Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
-            last_num_of_db_queries = self.get_last_num_of_db_queries(file_path)
-            Stats.gauge(f"dag_processing.last_num_of_db_queries.{file_name}", 
last_num_of_db_queries)
+            Stats.gauge(f"dag_processing.last_num_of_db_queries.{file_name}", 
stat.last_num_of_db_queries)
 
             rows.append(
                 (
                     file_path,
                     processor_pid,
                     runtime,
-                    num_dags,
-                    num_errors,
-                    last_runtime,
+                    stat.num_dags,
+                    stat.import_errors,
+                    stat.last_duration,
                     last_run,
-                    last_num_of_db_queries,
+                    stat.last_num_of_db_queries,
                 )
             )
 
@@ -955,58 +947,6 @@ class DagFileProcessorManager(LoggingMixin):
         """
         return [x.pid for x in self._processors.values()]
 
-    def get_last_runtime(self, file_path) -> float | None:
-        """
-        Retrieve the last processing time of a specific path.
-
-        :param file_path: the path to the file that was processed
-        :return: the runtime (in seconds) of the process of the last run, or
-            None if the file was never processed.
-        """
-        stat = self._file_stats.get(file_path)
-        return stat.last_duration.total_seconds() if stat and 
stat.last_duration else None
-
-    def get_last_dag_count(self, file_path) -> int | None:
-        """
-        Retrieve the total DAG count at a specific path.
-
-        :param file_path: the path to the file that was processed
-        :return: the number of dags loaded from that file, or None if the file 
was never processed.
-        """
-        stat = self._file_stats.get(file_path)
-        return stat.num_dags if stat else None
-
-    def get_last_error_count(self, file_path) -> int | None:
-        """
-        Retrieve the total number of errors from processing a specific path.
-
-        :param file_path: the path to the file that was processed
-        :return: the number of import errors from processing, or None if the 
file was never processed.
-        """
-        stat = self._file_stats.get(file_path)
-        return stat.import_errors if stat else None
-
-    def get_last_num_of_db_queries(self, file_path) -> int | None:
-        """
-        Retrieve the number of queries performed to the Airflow database 
during last parsing of the file.
-
-        :param file_path: the path to the file that was processed
-        :return: the number of queries performed to the Airflow database 
during last parsing of the file,
-            or None if the file was never processed.
-        """
-        stat = self._file_stats.get(file_path)
-        return stat.last_num_of_db_queries if stat else None
-
-    def get_last_finish_time(self, file_path) -> datetime | None:
-        """
-        Retrieve the last completion time for processing a specific path.
-
-        :param file_path: the path to the file that was processed
-        :return: the finish time of the process of the last run, or None if 
the file was never processed.
-        """
-        stat = self._file_stats.get(file_path)
-        return stat.last_finish_time if stat else None
-
     def get_start_time(self, file_path) -> datetime | None:
         """
         Retrieve the last start time for processing a specific path.
@@ -1019,15 +959,6 @@ class DagFileProcessorManager(LoggingMixin):
             return self._processors[file_path].start_time
         return None
 
-    def get_run_count(self, file_path) -> int:
-        """
-        Return the number of times the given file has been parsed.
-
-        :param file_path: the path to the file that's being processed.
-        """
-        stat = self._file_stats.get(file_path)
-        return stat.run_count if stat else 0
-
     def get_dag_directory(self) -> str:
         """Return the dag_director as a string."""
         if isinstance(self._dag_directory, Path):
@@ -1092,13 +1023,13 @@ class DagFileProcessorManager(LoggingMixin):
             num_dags = 0
             last_num_of_db_queries = 0
 
-        last_duration = last_finish_time - processor.start_time
+        last_duration = (last_finish_time - 
processor.start_time).total_seconds()
         stat = DagFileStat(
             num_dags=num_dags,
             import_errors=count_import_errors,
             last_finish_time=last_finish_time,
             last_duration=last_duration,
-            run_count=self.get_run_count(processor.file_path) + 1,
+            run_count=self._file_stats[processor.file_path].run_count + 1,
             last_num_of_db_queries=last_num_of_db_queries,
         )
         self._file_stats[processor.file_path] = stat
@@ -1110,7 +1041,7 @@ class DagFileProcessorManager(LoggingMixin):
         span.set_attributes(
             {
                 "file_path": processor.file_path,
-                "run_count": self.get_run_count(processor.file_path) + 1,
+                "run_count": stat.run_count,
             }
         )
 
@@ -1146,8 +1077,8 @@ class DagFileProcessorManager(LoggingMixin):
 
         span.end(end_time=datetime_to_nano(last_finish_time))
 
-        Stats.timing(f"dag_processing.last_duration.{file_name}", 
last_duration)
-        Stats.timing("dag_processing.last_duration", last_duration, 
tags={"file_name": file_name})
+        Stats.timing(f"dag_processing.last_duration.{file_name}", 
last_duration * 1000.0)
+        Stats.timing("dag_processing.last_duration", last_duration * 1000.0, 
tags={"file_name": file_name})
 
     def collect_results(self) -> None:
         """Collect the result from any finished DAG processors."""
@@ -1219,7 +1150,6 @@ class DagFileProcessorManager(LoggingMixin):
             if file_path not in self._file_stats:
                 # We found new file after refreshing dir. add to parsing queue 
at start
                 self.log.info("Adding new file %s to parsing queue", file_path)
-                self._file_stats[file_path] = 
DagFileProcessorManager.DEFAULT_FILE_STAT
                 self._file_path_queue.appendleft(file_path)
                 span = Trace.get_current_span()
                 if span.is_recording():
@@ -1266,7 +1196,7 @@ class DagFileProcessorManager(LoggingMixin):
             # from being added to file_path_queue
             # unless they were modified recently and parsing mode is 
"modified_time"
             # in which case we don't honor "self._file_process_interval" 
(min_file_process_interval)
-            last_finish_time = self.get_last_finish_time(file_path)
+            last_finish_time = self._file_stats[file_path].last_finish_time
             if (
                 last_finish_time is not None
                 and (now - last_finish_time).total_seconds() < 
self._file_process_interval
@@ -1316,8 +1246,6 @@ class DagFileProcessorManager(LoggingMixin):
                 "Queuing the following files for processing:\n\t%s", 
"\n\t".join(files_paths_to_queue)
             )
 
-        for file_path in files_paths_to_queue:
-            self._file_stats.setdefault(file_path, 
DagFileProcessorManager.DEFAULT_FILE_STAT)
         self._add_paths_to_queue(files_paths_to_queue, False)
         Stats.incr("dag_processing.file_path_queue_update_count")
 
@@ -1355,8 +1283,8 @@ class DagFileProcessorManager(LoggingMixin):
                     num_dags=0,
                     import_errors=1,
                     last_finish_time=now,
-                    last_duration=duration,
-                    run_count=self.get_run_count(file_path) + 1,
+                    last_duration=duration.total_seconds(),
+                    run_count=self._file_stats[processor.file_path].run_count 
+ 1,
                     last_num_of_db_queries=0,
                 )
                 self._file_stats[processor.file_path] = stat
diff --git a/tests/dag_processing/test_manager.py 
b/tests/dag_processing/test_manager.py
index 76652740fbc..3154f3a49df 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -264,7 +264,7 @@ class TestDagProcessorJobRunner:
         mock_processor.terminate.side_effect = None
 
         manager._processors["missing_file.txt"] = mock_processor
-        manager._file_stats["missing_file.txt"] = DagFileStat(0, 0, None, 
None, 0, 0)
+        manager._file_stats["missing_file.txt"] = DagFileStat()
 
         manager.set_file_paths(["abc.txt"])
         assert manager._processors == {}
@@ -533,7 +533,7 @@ class TestDagProcessorJobRunner:
             assert last_finish_time < file_1_new_mtime
             assert (
                 manager._file_process_interval
-                > (freezed_base_time - 
manager.get_last_finish_time("file_1.py")).total_seconds()
+                > (freezed_base_time - 
manager._file_stats["file_1.py"].last_finish_time).total_seconds()
             )
 
     @mock.patch("zipfile.is_zipfile", return_value=True)
@@ -937,17 +937,18 @@ class TestDagProcessorJobRunner:
         )
 
         self.run_processor_manager_one_loop(manager, parent_pipe)
-        last_runtime = manager.get_last_runtime(manager.file_paths[0])
+        last_runtime = manager._file_stats[manager.file_paths[0]].last_duration
+        assert last_runtime is not None
 
         child_pipe.close()
         parent_pipe.close()
 
         statsd_timing_mock.assert_has_calls(
             [
-                mock.call("dag_processing.last_duration.temp_dag", 
timedelta(seconds=last_runtime)),
+                mock.call("dag_processing.last_duration.temp_dag", 
last_runtime * 1000.0),
                 mock.call(
                     "dag_processing.last_duration",
-                    timedelta(seconds=last_runtime),
+                    last_runtime * 1000.0,
                     tags={"file_name": "temp_dag"},
                 ),
             ],

Reply via email to