Copilot commented on code in PR #68558:
URL: https://github.com/apache/airflow/pull/68558#discussion_r3411648601
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -84,27 +91,43 @@ def __init__(
self._lock: RLock | nullcontext = RLock() if self._use_cache else
nullcontext()
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
- """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
+ """Read and cache a SerializedDAG (with its ``dag_hash`` for staleness
detection)."""
serdag.load_op_links = self.load_op_links
dag = serdag.dag
if not dag:
return None
with self._lock:
- self._dags[serdag.dag_version_id] = dag
+ self._dags[serdag.dag_version_id] = _CacheEntry(dag,
serdag.dag_hash)
cache_size = len(self._dags)
if self._use_cache:
stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1)
return dag
+ @staticmethod
+ def _current_dag_hash(version_id: UUID | str, session: Session) -> str |
None:
+ """Return the current ``dag_hash`` of the serialized DAG for
``version_id``, or None."""
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ return session.scalar(
+
select(SerializedDagModel.dag_hash).where(SerializedDagModel.dag_version_id ==
version_id)
+ )
Review Comment:
There is a new import inside `_current_dag_hash()`. Airflow’s code quality
guidelines generally require imports at module top-level (unless there’s a
known circular-import or lazy-load reason). Since `serialized_dag.py` doesn’t
appear to import `airflow.models.dagbag`, this can likely be moved to the
module import section and reused here (and potentially in the other methods
that currently import `SerializedDagModel` locally).
##########
airflow-core/src/airflow/models/dagbag.py:
##########
@@ -84,27 +91,43 @@ def __init__(
self._lock: RLock | nullcontext = RLock() if self._use_cache else
nullcontext()
def _read_dag(self, serdag: SerializedDagModel) -> SerializedDAG | None:
- """Read and optionally cache a SerializedDAG from a
SerializedDagModel."""
+ """Read and cache a SerializedDAG (with its ``dag_hash`` for staleness
detection)."""
serdag.load_op_links = self.load_op_links
dag = serdag.dag
if not dag:
return None
with self._lock:
- self._dags[serdag.dag_version_id] = dag
+ self._dags[serdag.dag_version_id] = _CacheEntry(dag,
serdag.dag_hash)
cache_size = len(self._dags)
if self._use_cache:
stats.gauge("api_server.dag_bag.cache_size", cache_size, rate=0.1)
return dag
+ @staticmethod
+ def _current_dag_hash(version_id: UUID | str, session: Session) -> str |
None:
+ """Return the current ``dag_hash`` of the serialized DAG for
``version_id``, or None."""
+ from airflow.models.serialized_dag import SerializedDagModel
+
+ return session.scalar(
+
select(SerializedDagModel.dag_hash).where(SerializedDagModel.dag_version_id ==
version_id)
+ )
+
def _get_dag(self, version_id: UUID | str, session: Session) ->
SerializedDAG | None:
- # Check cache first
with self._lock:
- dag = self._dags.get(version_id)
-
- if dag:
- if self._use_cache:
- stats.incr("api_server.dag_bag.cache_hit")
- return dag
+ cached = self._dags.get(version_id)
+
+ if cached is not None:
+ # A version may have been updated in place (same dag_version_id,
new content + new
+ # dag_hash) by SerializedDagModel.write_dag, so validate the
cached copy against the
+ # current dag_hash before serving it. That validation is a
single-row lookup on the
+ # uniquely-indexed serialized_dag.dag_version_id column.
+ if self._current_dag_hash(version_id, session) == cached.dag_hash:
+ if self._use_cache:
+ stats.incr("api_server.dag_bag.cache_hit")
+ return cached.dag
Review Comment:
`_get_dag()` now performs a DB `scalar(select(dag_hash) ...)` on every cache
hit (even when the cached DAG is still valid). This changes a previously
0-query hot-path into 1 query per `get_dag()` call, which may materially reduce
the benefit of caching in both the scheduler’s long-lived `scheduler_dag_bag`
and the FastAPI singleton DagBag. Consider whether the hash validation can be
amortized (e.g., only revalidate after a short TTL/“last_checked” interval, or
gate it behind bounded caching) to avoid a DB round-trip on every hit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]