Copilot commented on code in PR #60804:
URL: https://github.com/apache/airflow/pull/60804#discussion_r3066474525
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG
+log = logging.getLogger(__name__)
+
class DBDagBag:
"""
- Internal class for retrieving and caching dags in the scheduler.
+ Internal class for retrieving dags from the database.
+
+ Optionally supports LRU+TTL caching when cache_size is provided.
+ The scheduler uses this without caching, while the API server can
+ enable caching via configuration.
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
+ def __init__(
+ self,
+ load_op_links: bool = True,
+ cache_size: int | None = None,
+ cache_ttl: int | None = None,
+ ) -> None:
+ """
+ Initialize DBDagBag.
+
+ :param load_op_links: Should the extra operator link be loaded when
de-serializing the DAG?
+ :param cache_size: Size of LRU cache. If None or 0, uses unbounded
dict (no eviction).
+ :param cache_ttl: Time-to-live for cache entries in seconds. If None
or 0, no TTL (LRU only).
+ """
self.load_op_links = load_op_links
+ self._dags: MutableMapping[str, SerializedDAG] = {}
+ self._lock: RLock | None = None
+ self._use_cache = False
+
+ # Initialize bounded cache if cache_size is provided and > 0
+ if cache_size and cache_size > 0:
+ if cache_ttl and cache_ttl > 0:
+ self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+ else:
+ self._dags = LRUCache(maxsize=cache_size)
+ # Lock required: cachetools caches are NOT thread-safe
+ # (LRU reordering and TTL cleanup mutate internal linked lists)
+ self._lock = RLock()
+ self._use_cache = True
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+ """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
serdag.load_op_links = self.load_op_links
- if dag := serdag.dag:
+ dag = serdag.dag
+ if not dag:
+ return None
+ if self._lock:
+ with self._lock:
+ self._dags[serdag.dag_version_id] = dag
+ Stats.gauge("api_server.dag_bag.cache_size", len(self._dags))
+ else:
self._dags[serdag.dag_version_id] = dag
return dag
def _get_dag(self, version_id: str, session: Session) -> SerializedDAG |
None:
- if dag := self._dags.get(version_id):
+ # Check cache first
+ if self._lock:
+ with self._lock:
+ dag = self._dags.get(version_id)
+ else:
+ dag = self._dags.get(version_id)
+
+ if dag:
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_hit")
return dag
+
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_miss")
+
dag_version = session.get(DagVersion, version_id,
options=[joinedload(DagVersion.serialized_dag)])
if not dag_version:
return None
if not (serdag := dag_version.serialized_dag):
return None
+
+ # Double-checked locking: another thread may have cached it while we
queried DB
+ if self._lock:
+ with self._lock:
+ if dag := self._dags.get(version_id):
+ return dag
return self._read_dag(serdag)
+ def clear_cache(self) -> int:
+ """
+ Clear all cached DAGs.
+
+ :return: Number of entries cleared from the cache.
+ """
Review Comment:
The `api_server.dag_bag.cache_size` gauge is only emitted on cache insert in
`_read_dag`, so after `clear_cache()` the gauge may remain stale (non-zero)
until the next insert. Consider also emitting
`Stats.gauge(\"api_server.dag_bag.cache_size\", 0)` when `_use_cache` is
enabled (and potentially after clears inside the lock) so the gauge reflects
the cleared state.
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG
+log = logging.getLogger(__name__)
+
class DBDagBag:
"""
- Internal class for retrieving and caching dags in the scheduler.
+ Internal class for retrieving dags from the database.
+
+ Optionally supports LRU+TTL caching when cache_size is provided.
+ The scheduler uses this without caching, while the API server can
+ enable caching via configuration.
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
+ def __init__(
+ self,
+ load_op_links: bool = True,
+ cache_size: int | None = None,
+ cache_ttl: int | None = None,
+ ) -> None:
+ """
+ Initialize DBDagBag.
+
+ :param load_op_links: Should the extra operator link be loaded when
de-serializing the DAG?
+ :param cache_size: Size of LRU cache. If None or 0, uses unbounded
dict (no eviction).
+ :param cache_ttl: Time-to-live for cache entries in seconds. If None
or 0, no TTL (LRU only).
+ """
self.load_op_links = load_op_links
+ self._dags: MutableMapping[str, SerializedDAG] = {}
+ self._lock: RLock | None = None
+ self._use_cache = False
+
+ # Initialize bounded cache if cache_size is provided and > 0
+ if cache_size and cache_size > 0:
+ if cache_ttl and cache_ttl > 0:
+ self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+ else:
+ self._dags = LRUCache(maxsize=cache_size)
+ # Lock required: cachetools caches are NOT thread-safe
+ # (LRU reordering and TTL cleanup mutate internal linked lists)
+ self._lock = RLock()
+ self._use_cache = True
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+ """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
serdag.load_op_links = self.load_op_links
- if dag := serdag.dag:
+ dag = serdag.dag
+ if not dag:
+ return None
+ if self._lock:
+ with self._lock:
+ self._dags[serdag.dag_version_id] = dag
+ Stats.gauge("api_server.dag_bag.cache_size", len(self._dags))
Review Comment:
The metrics call is executed while holding the cache lock. If
`Stats.gauge(...)` is slow or internally synchronized, it can increase lock
hold time and reduce throughput. Prefer updating the cache under the lock,
capturing the size into a local variable, then emitting `Stats.gauge(...)`
after releasing the lock.
```suggestion
cache_size = len(self._dags)
Stats.gauge("api_server.dag_bag.cache_size", cache_size)
```
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG
+log = logging.getLogger(__name__)
+
class DBDagBag:
"""
- Internal class for retrieving and caching dags in the scheduler.
+ Internal class for retrieving dags from the database.
+
+ Optionally supports LRU+TTL caching when cache_size is provided.
+ The scheduler uses this without caching, while the API server can
+ enable caching via configuration.
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
+ def __init__(
+ self,
+ load_op_links: bool = True,
+ cache_size: int | None = None,
+ cache_ttl: int | None = None,
+ ) -> None:
+ """
+ Initialize DBDagBag.
+
+ :param load_op_links: Should the extra operator link be loaded when
de-serializing the DAG?
+ :param cache_size: Size of LRU cache. If None or 0, uses unbounded
dict (no eviction).
+ :param cache_ttl: Time-to-live for cache entries in seconds. If None
or 0, no TTL (LRU only).
+ """
self.load_op_links = load_op_links
+ self._dags: MutableMapping[str, SerializedDAG] = {}
+ self._lock: RLock | None = None
+ self._use_cache = False
+
+ # Initialize bounded cache if cache_size is provided and > 0
+ if cache_size and cache_size > 0:
+ if cache_ttl and cache_ttl > 0:
+ self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+ else:
+ self._dags = LRUCache(maxsize=cache_size)
+ # Lock required: cachetools caches are NOT thread-safe
+ # (LRU reordering and TTL cleanup mutate internal linked lists)
+ self._lock = RLock()
+ self._use_cache = True
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+ """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
serdag.load_op_links = self.load_op_links
- if dag := serdag.dag:
+ dag = serdag.dag
+ if not dag:
+ return None
+ if self._lock:
+ with self._lock:
+ self._dags[serdag.dag_version_id] = dag
+ Stats.gauge("api_server.dag_bag.cache_size", len(self._dags))
+ else:
self._dags[serdag.dag_version_id] = dag
return dag
def _get_dag(self, version_id: str, session: Session) -> SerializedDAG |
None:
- if dag := self._dags.get(version_id):
+ # Check cache first
+ if self._lock:
+ with self._lock:
+ dag = self._dags.get(version_id)
+ else:
+ dag = self._dags.get(version_id)
+
+ if dag:
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_hit")
return dag
+
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_miss")
+
dag_version = session.get(DagVersion, version_id,
options=[joinedload(DagVersion.serialized_dag)])
if not dag_version:
return None
if not (serdag := dag_version.serialized_dag):
return None
+
+ # Double-checked locking: another thread may have cached it while we
queried DB
+ if self._lock:
+ with self._lock:
+ if dag := self._dags.get(version_id):
Review Comment:
The double-checked-locking fast-path returns a cached DAG without
incrementing the `api_server.dag_bag.cache_hit` metric, which will undercount
hits under concurrency. Consider incrementing the hit counter before `return
dag` in this branch (when `_use_cache` is true) to keep hit/miss metrics
accurate.
```suggestion
if dag := self._dags.get(version_id):
if self._use_cache:
Stats.incr("api_server.dag_bag.cache_hit")
```
##########
airflow-core/src/airflow/cli/commands/api_server_command.py:
##########
@@ -95,6 +98,8 @@ def _run_api_server(args, apps: str, num_workers: int,
worker_timeout: int, prox
"log_level": uvicorn_log_level,
"proxy_headers": proxy_headers,
}
+ if worker_max_requests > 0:
+ uvicorn_kwargs["limit_max_requests"] = worker_max_requests
Review Comment:
This PR introduces API server worker recycling (`worker_max_requests` +
Uvicorn `limit_max_requests`) in addition to the DAG cache changes, but the PR
title/description focuses on `DBDagBag` caching and does not mention worker
recycling. Either update the PR description to explicitly include this feature
(including rationale/metrics/compat notes if needed) or split it into a
separate PR to keep scope aligned.
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG
+log = logging.getLogger(__name__)
+
class DBDagBag:
"""
- Internal class for retrieving and caching dags in the scheduler.
+ Internal class for retrieving dags from the database.
+
+ Optionally supports LRU+TTL caching when cache_size is provided.
+ The scheduler uses this without caching, while the API server can
+ enable caching via configuration.
:meta private:
"""
- def __init__(self, load_op_links: bool = True) -> None:
- self._dags: dict[str, SerializedDAG] = {} # dag_version_id to dag
+ def __init__(
+ self,
+ load_op_links: bool = True,
+ cache_size: int | None = None,
+ cache_ttl: int | None = None,
+ ) -> None:
+ """
+ Initialize DBDagBag.
+
+ :param load_op_links: Should the extra operator link be loaded when
de-serializing the DAG?
+ :param cache_size: Size of LRU cache. If None or 0, uses unbounded
dict (no eviction).
+ :param cache_ttl: Time-to-live for cache entries in seconds. If None
or 0, no TTL (LRU only).
+ """
self.load_op_links = load_op_links
+ self._dags: MutableMapping[str, SerializedDAG] = {}
+ self._lock: RLock | None = None
+ self._use_cache = False
+
+ # Initialize bounded cache if cache_size is provided and > 0
+ if cache_size and cache_size > 0:
+ if cache_ttl and cache_ttl > 0:
+ self._dags = TTLCache(maxsize=cache_size, ttl=cache_ttl)
+ else:
+ self._dags = LRUCache(maxsize=cache_size)
+ # Lock required: cachetools caches are NOT thread-safe
+ # (LRU reordering and TTL cleanup mutate internal linked lists)
+ self._lock = RLock()
+ self._use_cache = True
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
+ """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
serdag.load_op_links = self.load_op_links
- if dag := serdag.dag:
+ dag = serdag.dag
+ if not dag:
+ return None
+ if self._lock:
+ with self._lock:
+ self._dags[serdag.dag_version_id] = dag
+ Stats.gauge("api_server.dag_bag.cache_size", len(self._dags))
+ else:
self._dags[serdag.dag_version_id] = dag
return dag
def _get_dag(self, version_id: str, session: Session) -> SerializedDAG |
None:
- if dag := self._dags.get(version_id):
+ # Check cache first
+ if self._lock:
+ with self._lock:
+ dag = self._dags.get(version_id)
+ else:
+ dag = self._dags.get(version_id)
+
+ if dag:
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_hit")
return dag
+
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_miss")
+
dag_version = session.get(DagVersion, version_id,
options=[joinedload(DagVersion.serialized_dag)])
if not dag_version:
return None
if not (serdag := dag_version.serialized_dag):
return None
+
+ # Double-checked locking: another thread may have cached it while we
queried DB
+ if self._lock:
+ with self._lock:
+ if dag := self._dags.get(version_id):
+ return dag
return self._read_dag(serdag)
+ def clear_cache(self) -> int:
+ """
+ Clear all cached DAGs.
+
+ :return: Number of entries cleared from the cache.
+ """
+ if self._lock:
+ with self._lock:
+ count = len(self._dags)
+ self._dags.clear()
+ else:
+ count = len(self._dags)
+ self._dags.clear()
+
+ if self._use_cache:
+ Stats.incr("api_server.dag_bag.cache_clear")
+ return count
Review Comment:
The `api_server.dag_bag.cache_size` gauge is only emitted on cache insert in
`_read_dag`, so after `clear_cache()` the gauge may remain stale (non-zero)
until the next insert. Consider also emitting
`Stats.gauge(\"api_server.dag_bag.cache_size\", 0)` when `_use_cache` is
enabled (and potentially after clears inside the lock) so the gauge reflects
the cleared state.
##########
airflow-core/src/airflow/cli/commands/api_server_command.py:
##########
@@ -82,6 +82,9 @@ def _run_api_server(args, apps: str, num_workers: int,
worker_timeout: int, prox
# Control access log based on uvicorn log level - disable for ERROR and
above
access_log_enabled = uvicorn_log_level not in ("error", "critical",
"fatal")
+ # Worker recycling: restart workers after N requests to prevent memory
accumulation
+ worker_max_requests = conf.getint("api", "worker_max_requests",
fallback=10000)
Review Comment:
This PR introduces API server worker recycling (`worker_max_requests` +
Uvicorn `limit_max_requests`) in addition to the DAG cache changes, but the PR
title/description focuses on `DBDagBag` caching and does not mention worker
recycling. Either update the PR description to explicitly include this feature
(including rationale/metrics/compat notes if needed) or split it into a
separate PR to keep scope aligned.
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -37,34 +42,110 @@
from airflow.models.serialized_dag import SerializedDagModel
from airflow.serialization.definitions.dag import SerializedDAG
+log = logging.getLogger(__name__)
Review Comment:
`log` is introduced here but is not used in the shown changes. If it’s not
used elsewhere in this module, consider removing it to avoid dead code;
otherwise, add a concrete usage (e.g., debug logging around cache
initialization) so the extra import/variable is justified.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]