kaxil commented on code in PR #60804:
URL: https://github.com/apache/airflow/pull/60804#discussion_r3067249655


##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
     from airflow.models.serialized_dag import SerializedDagModel
     from airflow.serialization.definitions.dag import SerializedDAG
 
+log = logging.getLogger(__name__)
+
 
 class DBDagBag:
     """
-    Internal class for retrieving and caching dags in the scheduler.
+    Internal class for retrieving dags from the database.
+
+    Optionally supports LRU+TTL caching when cache_size is provided.
+    The scheduler uses this without caching, while the API server can
+    enable caching via configuration.
 
     :meta private:
     """
 
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[str, SerializedDAG] = {}  # dag_version_id to dag
+    def __init__(
+        self,
+        load_op_links: bool = True,
+        cache_size: int | None = None,
+        cache_ttl: int | None = None,
+    ) -> None:
+        """
+        Initialize DBDagBag.
+
+        :param load_op_links: Should the extra operator link be loaded when 
de-serializing the DAG?
+        :param cache_size: Size of LRU cache. If None or 0, uses unbounded 
dict (no eviction).
+        :param cache_ttl: Time-to-live for cache entries in seconds. If None 
or 0, no TTL (LRU only).
+        """
         self.load_op_links = load_op_links
+        self._dags: MutableMapping[str, SerializedDAG] = {}
+        self._lock: RLock | None = None
+        self._use_cache = False
+
+        # Initialize bounded cache if cache_size is provided and > 0
+        if cache_size and cache_size > 0:
+            if cache_ttl and cache_ttl > 0:
+                self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+            else:
+                self._dags = LRUCache(maxsize=cache_size)
+            # Lock required: cachetools caches are NOT thread-safe
+            # (LRU reordering and TTL cleanup mutate internal linked lists)
+            self._lock = RLock()
+            self._use_cache = True
 
     def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+        """Read and optionally cache a SerializedDAG from a 
SerializedDagModel."""
         serdag.load_op_links = self.load_op_links
-        if dag := serdag.dag:
+        dag = serdag.dag
+        if not dag:
+            return None
+        if self._lock:
+            with self._lock:
+                self._dags[serdag.dag_version_id] = dag
+                Stats.gauge("api_server.dag_bag.cache_size", len(self._dags))
+        else:
             self._dags[serdag.dag_version_id] = dag
         return dag
 
     def _get_dag(self, version_id: str, session: Session) -> SerializedDAG | 
None:
-        if dag := self._dags.get(version_id):
+        # Check cache first
+        if self._lock:
+            with self._lock:
+                dag = self._dags.get(version_id)
+        else:
+            dag = self._dags.get(version_id)
+
+        if dag:
+            if self._use_cache:
+                Stats.incr("api_server.dag_bag.cache_hit")
             return dag
+
+        if self._use_cache:
+            Stats.incr("api_server.dag_bag.cache_miss")
+
         dag_version = session.get(DagVersion, version_id, 
options=[joinedload(DagVersion.serialized_dag)])
         if not dag_version:
             return None
         if not (serdag := dag_version.serialized_dag):
             return None
+
+        # Double-checked locking: another thread may have cached it while we 
queried DB
+        if self._lock:
+            with self._lock:
+                if dag := self._dags.get(version_id):

Review Comment:
   Fixed -- now emits `cache_hit` in the double-checked locking path.



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
     from airflow.models.serialized_dag import SerializedDagModel
     from airflow.serialization.definitions.dag import SerializedDAG
 
+log = logging.getLogger(__name__)
+
 
 class DBDagBag:
     """
-    Internal class for retrieving and caching dags in the scheduler.
+    Internal class for retrieving dags from the database.
+
+    Optionally supports LRU+TTL caching when cache_size is provided.
+    The scheduler uses this without caching, while the API server can
+    enable caching via configuration.
 
     :meta private:
     """
 
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[str, SerializedDAG] = {}  # dag_version_id to dag
+    def __init__(
+        self,
+        load_op_links: bool = True,
+        cache_size: int | None = None,
+        cache_ttl: int | None = None,
+    ) -> None:
+        """
+        Initialize DBDagBag.
+
+        :param load_op_links: Should the extra operator link be loaded when 
de-serializing the DAG?
+        :param cache_size: Size of LRU cache. If None or 0, uses unbounded 
dict (no eviction).
+        :param cache_ttl: Time-to-live for cache entries in seconds. If None 
or 0, no TTL (LRU only).
+        """
         self.load_op_links = load_op_links
+        self._dags: MutableMapping[str, SerializedDAG] = {}
+        self._lock: RLock | None = None
+        self._use_cache = False
+
+        # Initialize bounded cache if cache_size is provided and > 0
+        if cache_size and cache_size > 0:
+            if cache_ttl and cache_ttl > 0:
+                self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+            else:
+                self._dags = LRUCache(maxsize=cache_size)
+            # Lock required: cachetools caches are NOT thread-safe
+            # (LRU reordering and TTL cleanup mutate internal linked lists)
+            self._lock = RLock()
+            self._use_cache = True
 
     def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+        """Read and optionally cache a SerializedDAG from a 
SerializedDagModel."""
         serdag.load_op_links = self.load_op_links
-        if dag := serdag.dag:
+        dag = serdag.dag
+        if not dag:
+            return None
+        if self._lock:
+            with self._lock:
+                self._dags[serdag.dag_version_id] = dag
+                Stats.gauge("api_server.dag_bag.cache_size", len(self._dags))

Review Comment:
   Fixed -- gauge captured inside lock, emitted outside with `rate=0.1`.



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
     from airflow.models.serialized_dag import SerializedDagModel
     from airflow.serialization.definitions.dag import SerializedDAG
 
+log = logging.getLogger(__name__)
+
 
 class DBDagBag:
     """
-    Internal class for retrieving and caching dags in the scheduler.
+    Internal class for retrieving dags from the database.
+
+    Optionally supports LRU+TTL caching when cache_size is provided.
+    The scheduler uses this without caching, while the API server can
+    enable caching via configuration.
 
     :meta private:
     """
 
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[str, SerializedDAG] = {}  # dag_version_id to dag
+    def __init__(
+        self,
+        load_op_links: bool = True,
+        cache_size: int | None = None,
+        cache_ttl: int | None = None,
+    ) -> None:
+        """
+        Initialize DBDagBag.
+
+        :param load_op_links: Should the extra operator link be loaded when 
de-serializing the DAG?
+        :param cache_size: Size of LRU cache. If None or 0, uses unbounded 
dict (no eviction).
+        :param cache_ttl: Time-to-live for cache entries in seconds. If None 
or 0, no TTL (LRU only).
+        """
         self.load_op_links = load_op_links
+        self._dags: MutableMapping[str, SerializedDAG] = {}
+        self._lock: RLock | None = None
+        self._use_cache = False
+
+        # Initialize bounded cache if cache_size is provided and > 0
+        if cache_size and cache_size > 0:
+            if cache_ttl and cache_ttl > 0:
+                self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+            else:
+                self._dags = LRUCache(maxsize=cache_size)
+            # Lock required: cachetools caches are NOT thread-safe
+            # (LRU reordering and TTL cleanup mutate internal linked lists)
+            self._lock = RLock()
+            self._use_cache = True
 
     def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+        """Read and optionally cache a SerializedDAG from a 
SerializedDagModel."""
         serdag.load_op_links = self.load_op_links
-        if dag := serdag.dag:
+        dag = serdag.dag
+        if not dag:
+            return None
+        if self._lock:
+            with self._lock:
+                self._dags[serdag.dag_version_id] = dag
+                Stats.gauge("api_server.dag_bag.cache_size", len(self._dags))
+        else:
             self._dags[serdag.dag_version_id] = dag
         return dag
 
     def _get_dag(self, version_id: str, session: Session) -> SerializedDAG | 
None:
-        if dag := self._dags.get(version_id):
+        # Check cache first
+        if self._lock:
+            with self._lock:
+                dag = self._dags.get(version_id)
+        else:
+            dag = self._dags.get(version_id)
+
+        if dag:
+            if self._use_cache:
+                Stats.incr("api_server.dag_bag.cache_hit")
             return dag
+
+        if self._use_cache:
+            Stats.incr("api_server.dag_bag.cache_miss")
+
         dag_version = session.get(DagVersion, version_id, 
options=[joinedload(DagVersion.serialized_dag)])
         if not dag_version:
             return None
         if not (serdag := dag_version.serialized_dag):
             return None
+
+        # Double-checked locking: another thread may have cached it while we 
queried DB
+        if self._lock:
+            with self._lock:
+                if dag := self._dags.get(version_id):
+                    return dag
         return self._read_dag(serdag)
 
+    def clear_cache(self) -> int:
+        """
+        Clear all cached DAGs.
+
+        :return: Number of entries cleared from the cache.
+        """

Review Comment:
   Fixed -- `clear_cache()` now emits 
`Stats.gauge("api_server.dag_bag.cache_size", 0)`.



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
     from airflow.models.serialized_dag import SerializedDagModel
     from airflow.serialization.definitions.dag import SerializedDAG
 
+log = logging.getLogger(__name__)
+
 
 class DBDagBag:
     """
-    Internal class for retrieving and caching dags in the scheduler.
+    Internal class for retrieving dags from the database.
+
+    Optionally supports LRU+TTL caching when cache_size is provided.
+    The scheduler uses this without caching, while the API server can
+    enable caching via configuration.
 
     :meta private:
     """
 
-    def __init__(self, load_op_links: bool = True) -> None:
-        self._dags: dict[str, SerializedDAG] = {}  # dag_version_id to dag
+    def __init__(
+        self,
+        load_op_links: bool = True,
+        cache_size: int | None = None,
+        cache_ttl: int | None = None,
+    ) -> None:
+        """
+        Initialize DBDagBag.
+
+        :param load_op_links: Should the extra operator link be loaded when 
de-serializing the DAG?
+        :param cache_size: Size of LRU cache. If None or 0, uses unbounded 
dict (no eviction).
+        :param cache_ttl: Time-to-live for cache entries in seconds. If None 
or 0, no TTL (LRU only).
+        """
         self.load_op_links = load_op_links
+        self._dags: MutableMapping[str, SerializedDAG] = {}
+        self._lock: RLock | None = None
+        self._use_cache = False
+
+        # Initialize bounded cache if cache_size is provided and > 0
+        if cache_size and cache_size > 0:
+            if cache_ttl and cache_ttl > 0:
+                self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+            else:
+                self._dags = LRUCache(maxsize=cache_size)
+            # Lock required: cachetools caches are NOT thread-safe
+            # (LRU reordering and TTL cleanup mutate internal linked lists)
+            self._lock = RLock()
+            self._use_cache = True
 
     def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+        """Read and optionally cache a SerializedDAG from a 
SerializedDagModel."""
         serdag.load_op_links = self.load_op_links
-        if dag := serdag.dag:
+        dag = serdag.dag
+        if not dag:
+            return None
+        if self._lock:
+            with self._lock:
+                self._dags[serdag.dag_version_id] = dag
+                Stats.gauge("api_server.dag_bag.cache_size", len(self._dags))
+        else:
             self._dags[serdag.dag_version_id] = dag
         return dag
 
     def _get_dag(self, version_id: str, session: Session) -> SerializedDAG | 
None:
-        if dag := self._dags.get(version_id):
+        # Check cache first
+        if self._lock:
+            with self._lock:
+                dag = self._dags.get(version_id)
+        else:
+            dag = self._dags.get(version_id)
+
+        if dag:
+            if self._use_cache:
+                Stats.incr("api_server.dag_bag.cache_hit")
             return dag
+
+        if self._use_cache:
+            Stats.incr("api_server.dag_bag.cache_miss")
+
         dag_version = session.get(DagVersion, version_id, 
options=[joinedload(DagVersion.serialized_dag)])
         if not dag_version:
             return None
         if not (serdag := dag_version.serialized_dag):
             return None
+
+        # Double-checked locking: another thread may have cached it while we 
queried DB
+        if self._lock:
+            with self._lock:
+                if dag := self._dags.get(version_id):
+                    return dag
         return self._read_dag(serdag)
 
+    def clear_cache(self) -> int:
+        """
+        Clear all cached DAGs.
+
+        :return: Number of entries cleared from the cache.
+        """
+        if self._lock:
+            with self._lock:
+                count = len(self._dags)
+                self._dags.clear()
+        else:
+            count = len(self._dags)
+            self._dags.clear()
+
+        if self._use_cache:
+            Stats.incr("api_server.dag_bag.cache_clear")
+        return count

Review Comment:
   Same as above -- addressed.



##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
     from airflow.models.serialized_dag import SerializedDagModel
     from airflow.serialization.definitions.dag import SerializedDAG
 
+log = logging.getLogger(__name__)

Review Comment:
   Removed.



##########
airflow-core/src/airflow/cli/commands/api_server_command.py:
##########
@@ -82,6 +82,9 @@ def _run_api_server(args, apps: str, num_workers: int, 
worker_timeout: int, prox
     # Control access log based on uvicorn log level - disable for ERROR and 
above
     access_log_enabled = uvicorn_log_level not in ("error", "critical", 
"fatal")
 
+    # Worker recycling: restart workers after N requests to prevent memory 
accumulation
+    worker_max_requests = conf.getint("api", "worker_max_requests", 
fallback=10000)

Review Comment:
   Dropped -- `api_server_command.py` changes removed entirely. Gunicorn 
(merged in #60940) handles worker recycling natively.



##########
airflow-core/src/airflow/cli/commands/api_server_command.py:
##########
@@ -95,6 +98,8 @@ def _run_api_server(args, apps: str, num_workers: int, 
worker_timeout: int, prox
         "log_level": uvicorn_log_level,
         "proxy_headers": proxy_headers,
     }
+    if worker_max_requests > 0:
+        uvicorn_kwargs["limit_max_requests"] = worker_max_requests

Review Comment:
   Same as above -- dropped.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to