kaxil commented on code in PR #60804:
URL: https://github.com/apache/airflow/pull/60804#discussion_r3067307163


##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -63,45 +67,239 @@ def test__read_dag_returns_none_when_no_dag(self):
         assert result is None
         assert "v1" not in self.db_dag_bag._dags
 
-    def test_get_serialized_dag_model(self):
-        """It should return the cached SerializedDagModel if already loaded."""
+    def test_get_dag_fetches_from_db_on_miss(self):
+        """It should query the DB and cache the result when not in cache."""
+        mock_dag = MagicMock(spec=SerializedDAG)
         mock_serdag = MagicMock(spec=SerializedDagModel)
+        mock_serdag.dag = mock_dag
         mock_serdag.dag_version_id = "v1"
         mock_dag_version = MagicMock()
         mock_dag_version.serialized_dag = mock_serdag
         self.session.get.return_value = mock_dag_version
 
-        self.db_dag_bag.get_serialized_dag_model("v1", session=self.session)
-        result = self.db_dag_bag.get_serialized_dag_model("v1", 
session=self.session)
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
 
-        assert result == mock_serdag
         self.session.get.assert_called_once()
+        assert result == mock_dag
 
-    def test_get_serialized_dag_model_returns_none_when_not_found(self):
+    def test_get_dag_returns_cached_on_hit(self):
+        """It should return cached DAG without querying DB."""
+        mock_dag = MagicMock(spec=SerializedDAG)
+        self.db_dag_bag._dags["v1"] = mock_dag
+
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
+
+        assert result == mock_dag
+        self.session.get.assert_not_called()
+
+    def test_get_dag_returns_none_when_not_found(self):
         """It should return None if version_id not found in DB."""
         self.session.get.return_value = None
 
-        result = self.db_dag_bag.get_serialized_dag_model("v1", 
session=self.session)
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
 
         assert result is None
 
-    def test_get_dag_calls_get_dag_model_and__read_dag(self):
-        """It should call get_dag_model and then _read_dag."""
+
+class TestDBDagBagCache:
+    """Tests for DBDagBag optional caching behavior."""
+
+    def test_no_caching_by_default(self):
+        """Test that DBDagBag uses a simple dict without caching by default."""
+        dag_bag = DBDagBag()
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_lru_cache_enabled_with_cache_size(self):
+        """Test that LRU cache is enabled when cache_size is provided."""
+        dag_bag = DBDagBag(cache_size=10)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, LRUCache)
+
+    def test_ttl_cache_enabled_with_cache_size_and_ttl(self):
+        """Test that TTL cache is enabled when both cache_size and cache_ttl 
are provided."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, TTLCache)
+
+    def test_zero_cache_size_uses_unbounded_dict(self):
+        """Test that cache_size=0 uses unbounded dict (same as no caching)."""
+        dag_bag = DBDagBag(cache_size=0, cache_ttl=60)
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_clear_cache_with_caching(self):
+        """Test clear_cache() with caching enabled."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        dag_bag._dags["version_2"] = mock_dag
+        assert len(dag_bag._dags) == 2
+
+        count = dag_bag.clear_cache()
+        assert count == 2
+        assert len(dag_bag._dags) == 0
+
+    def test_clear_cache_without_caching(self):
+        """Test clear_cache() without caching enabled."""
+        dag_bag = DBDagBag()
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        assert len(dag_bag._dags) == 1
+
+        count = dag_bag.clear_cache()
+        assert count == 1
+        assert len(dag_bag._dags) == 0
+
+    def test_ttl_cache_expiry(self):
+        """Test that cached DAGs expire after TTL."""
+        # TTLCache defaults to time.monotonic which time_machine cannot 
control.
+        # Use time.time as the timer so time_machine can advance it.
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=1)
+        dag_bag._dags = TTLCache(maxsize=10, ttl=1, timer=time.time)
+
+        with time_machine.travel("2025-01-01 00:00:00", tick=False):
+            dag_bag._dags["test_version_id"] = MagicMock()
+            assert "test_version_id" in dag_bag._dags

Review Comment:
   These mocks are testing cache mechanics (TTL expiry, LRU eviction), not 
SerializedDAG interface correctness. Adding spec here would just add noise 
without catching real bugs. The mocks stored in the cache are stand-ins to 
verify eviction/expiry behavior.



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -63,45 +67,239 @@ def test__read_dag_returns_none_when_no_dag(self):
         assert result is None
         assert "v1" not in self.db_dag_bag._dags
 
-    def test_get_serialized_dag_model(self):
-        """It should return the cached SerializedDagModel if already loaded."""
+    def test_get_dag_fetches_from_db_on_miss(self):
+        """It should query the DB and cache the result when not in cache."""
+        mock_dag = MagicMock(spec=SerializedDAG)
         mock_serdag = MagicMock(spec=SerializedDagModel)
+        mock_serdag.dag = mock_dag
         mock_serdag.dag_version_id = "v1"
         mock_dag_version = MagicMock()
         mock_dag_version.serialized_dag = mock_serdag
         self.session.get.return_value = mock_dag_version
 
-        self.db_dag_bag.get_serialized_dag_model("v1", session=self.session)
-        result = self.db_dag_bag.get_serialized_dag_model("v1", 
session=self.session)
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
 
-        assert result == mock_serdag
         self.session.get.assert_called_once()
+        assert result == mock_dag
 
-    def test_get_serialized_dag_model_returns_none_when_not_found(self):
+    def test_get_dag_returns_cached_on_hit(self):
+        """It should return cached DAG without querying DB."""
+        mock_dag = MagicMock(spec=SerializedDAG)
+        self.db_dag_bag._dags["v1"] = mock_dag
+
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
+
+        assert result == mock_dag
+        self.session.get.assert_not_called()
+
+    def test_get_dag_returns_none_when_not_found(self):
         """It should return None if version_id not found in DB."""
         self.session.get.return_value = None
 
-        result = self.db_dag_bag.get_serialized_dag_model("v1", 
session=self.session)
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
 
         assert result is None
 
-    def test_get_dag_calls_get_dag_model_and__read_dag(self):
-        """It should call get_dag_model and then _read_dag."""
+
+class TestDBDagBagCache:
+    """Tests for DBDagBag optional caching behavior."""
+
+    def test_no_caching_by_default(self):
+        """Test that DBDagBag uses a simple dict without caching by default."""
+        dag_bag = DBDagBag()
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_lru_cache_enabled_with_cache_size(self):
+        """Test that LRU cache is enabled when cache_size is provided."""
+        dag_bag = DBDagBag(cache_size=10)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, LRUCache)
+
+    def test_ttl_cache_enabled_with_cache_size_and_ttl(self):
+        """Test that TTL cache is enabled when both cache_size and cache_ttl 
are provided."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, TTLCache)
+
+    def test_zero_cache_size_uses_unbounded_dict(self):
+        """Test that cache_size=0 uses unbounded dict (same as no caching)."""
+        dag_bag = DBDagBag(cache_size=0, cache_ttl=60)
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_clear_cache_with_caching(self):
+        """Test clear_cache() with caching enabled."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        dag_bag._dags["version_2"] = mock_dag
+        assert len(dag_bag._dags) == 2
+
+        count = dag_bag.clear_cache()
+        assert count == 2
+        assert len(dag_bag._dags) == 0
+
+    def test_clear_cache_without_caching(self):
+        """Test clear_cache() without caching enabled."""
+        dag_bag = DBDagBag()
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        assert len(dag_bag._dags) == 1
+
+        count = dag_bag.clear_cache()
+        assert count == 1
+        assert len(dag_bag._dags) == 0
+
+    def test_ttl_cache_expiry(self):
+        """Test that cached DAGs expire after TTL."""
+        # TTLCache defaults to time.monotonic which time_machine cannot 
control.
+        # Use time.time as the timer so time_machine can advance it.
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=1)
+        dag_bag._dags = TTLCache(maxsize=10, ttl=1, timer=time.time)
+
+        with time_machine.travel("2025-01-01 00:00:00", tick=False):
+            dag_bag._dags["test_version_id"] = MagicMock()
+            assert "test_version_id" in dag_bag._dags
+
+        # Jump ahead beyond TTL
+        with time_machine.travel("2025-01-01 00:00:02", tick=False):
+            assert dag_bag._dags.get("test_version_id") is None
+
+    def test_lru_eviction(self):
+        """Test that LRU eviction works when cache is full."""
+        dag_bag = DBDagBag(cache_size=2)
+
+        dag_bag._dags["version_1"] = MagicMock()
+        dag_bag._dags["version_2"] = MagicMock()
+        dag_bag._dags["version_3"] = MagicMock()

Review Comment:
   Same as above -- LRU eviction test validates cache size behavior, not the 
cached object's interface.



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -63,45 +67,239 @@ def test__read_dag_returns_none_when_no_dag(self):
         assert result is None
         assert "v1" not in self.db_dag_bag._dags
 
-    def test_get_serialized_dag_model(self):
-        """It should return the cached SerializedDagModel if already loaded."""
+    def test_get_dag_fetches_from_db_on_miss(self):
+        """It should query the DB and cache the result when not in cache."""
+        mock_dag = MagicMock(spec=SerializedDAG)
         mock_serdag = MagicMock(spec=SerializedDagModel)
+        mock_serdag.dag = mock_dag
         mock_serdag.dag_version_id = "v1"
         mock_dag_version = MagicMock()
         mock_dag_version.serialized_dag = mock_serdag
         self.session.get.return_value = mock_dag_version
 
-        self.db_dag_bag.get_serialized_dag_model("v1", session=self.session)
-        result = self.db_dag_bag.get_serialized_dag_model("v1", 
session=self.session)
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
 
-        assert result == mock_serdag
         self.session.get.assert_called_once()
+        assert result == mock_dag
 
-    def test_get_serialized_dag_model_returns_none_when_not_found(self):
+    def test_get_dag_returns_cached_on_hit(self):
+        """It should return cached DAG without querying DB."""
+        mock_dag = MagicMock(spec=SerializedDAG)
+        self.db_dag_bag._dags["v1"] = mock_dag
+
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
+
+        assert result == mock_dag
+        self.session.get.assert_not_called()
+
+    def test_get_dag_returns_none_when_not_found(self):
         """It should return None if version_id not found in DB."""
         self.session.get.return_value = None
 
-        result = self.db_dag_bag.get_serialized_dag_model("v1", 
session=self.session)
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
 
         assert result is None
 
-    def test_get_dag_calls_get_dag_model_and__read_dag(self):
-        """It should call get_dag_model and then _read_dag."""
+
+class TestDBDagBagCache:
+    """Tests for DBDagBag optional caching behavior."""
+
+    def test_no_caching_by_default(self):
+        """Test that DBDagBag uses a simple dict without caching by default."""
+        dag_bag = DBDagBag()
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_lru_cache_enabled_with_cache_size(self):
+        """Test that LRU cache is enabled when cache_size is provided."""
+        dag_bag = DBDagBag(cache_size=10)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, LRUCache)
+
+    def test_ttl_cache_enabled_with_cache_size_and_ttl(self):
+        """Test that TTL cache is enabled when both cache_size and cache_ttl 
are provided."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, TTLCache)
+
+    def test_zero_cache_size_uses_unbounded_dict(self):
+        """Test that cache_size=0 uses unbounded dict (same as no caching)."""
+        dag_bag = DBDagBag(cache_size=0, cache_ttl=60)
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_clear_cache_with_caching(self):
+        """Test clear_cache() with caching enabled."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        dag_bag._dags["version_2"] = mock_dag
+        assert len(dag_bag._dags) == 2
+
+        count = dag_bag.clear_cache()
+        assert count == 2
+        assert len(dag_bag._dags) == 0
+
+    def test_clear_cache_without_caching(self):
+        """Test clear_cache() without caching enabled."""
+        dag_bag = DBDagBag()
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        assert len(dag_bag._dags) == 1
+
+        count = dag_bag.clear_cache()
+        assert count == 1
+        assert len(dag_bag._dags) == 0
+
+    def test_ttl_cache_expiry(self):
+        """Test that cached DAGs expire after TTL."""
+        # TTLCache defaults to time.monotonic which time_machine cannot 
control.
+        # Use time.time as the timer so time_machine can advance it.
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=1)
+        dag_bag._dags = TTLCache(maxsize=10, ttl=1, timer=time.time)
+
+        with time_machine.travel("2025-01-01 00:00:00", tick=False):
+            dag_bag._dags["test_version_id"] = MagicMock()
+            assert "test_version_id" in dag_bag._dags
+
+        # Jump ahead beyond TTL
+        with time_machine.travel("2025-01-01 00:00:02", tick=False):
+            assert dag_bag._dags.get("test_version_id") is None
+
+    def test_lru_eviction(self):
+        """Test that LRU eviction works when cache is full."""
+        dag_bag = DBDagBag(cache_size=2)
+
+        dag_bag._dags["version_1"] = MagicMock()
+        dag_bag._dags["version_2"] = MagicMock()
+        dag_bag._dags["version_3"] = MagicMock()
+
+        # version_1 should be evicted (LRU)
+        assert dag_bag._dags.get("version_1") is None
+        assert dag_bag._dags.get("version_2") is not None
+        assert dag_bag._dags.get("version_3") is not None
+
+    def test_thread_safety_with_caching(self):
+        """Test concurrent access doesn't cause race conditions with caching 
enabled."""
+        dag_bag = DBDagBag(cache_size=100, cache_ttl=60)
+        errors = []
+        mock_session = MagicMock()
+
+        def make_dag_version(version_id):
+            serdag = MagicMock()
+            serdag.dag = MagicMock()
+            serdag.dag_version_id = version_id
+            return MagicMock(serialized_dag=serdag)
+

Review Comment:
   Acknowledged. The thread-safety test is exercising lock contention under 
concurrent access. Speccing the session and models would make the test more 
precise but wouldn't change what it validates (no corruption under 
concurrency). Can tighten in a follow-up.



##########
airflow-core/tests/unit/models/test_dagbag.py:
##########
@@ -63,45 +67,239 @@ def test__read_dag_returns_none_when_no_dag(self):
         assert result is None
         assert "v1" not in self.db_dag_bag._dags
 
-    def test_get_serialized_dag_model(self):
-        """It should return the cached SerializedDagModel if already loaded."""
+    def test_get_dag_fetches_from_db_on_miss(self):
+        """It should query the DB and cache the result when not in cache."""
+        mock_dag = MagicMock(spec=SerializedDAG)
         mock_serdag = MagicMock(spec=SerializedDagModel)
+        mock_serdag.dag = mock_dag
         mock_serdag.dag_version_id = "v1"
         mock_dag_version = MagicMock()
         mock_dag_version.serialized_dag = mock_serdag
         self.session.get.return_value = mock_dag_version
 
-        self.db_dag_bag.get_serialized_dag_model("v1", session=self.session)
-        result = self.db_dag_bag.get_serialized_dag_model("v1", 
session=self.session)
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
 
-        assert result == mock_serdag
         self.session.get.assert_called_once()
+        assert result == mock_dag
 
-    def test_get_serialized_dag_model_returns_none_when_not_found(self):
+    def test_get_dag_returns_cached_on_hit(self):
+        """It should return cached DAG without querying DB."""
+        mock_dag = MagicMock(spec=SerializedDAG)
+        self.db_dag_bag._dags["v1"] = mock_dag
+
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
+
+        assert result == mock_dag
+        self.session.get.assert_not_called()
+
+    def test_get_dag_returns_none_when_not_found(self):
         """It should return None if version_id not found in DB."""
         self.session.get.return_value = None
 
-        result = self.db_dag_bag.get_serialized_dag_model("v1", 
session=self.session)
+        result = self.db_dag_bag.get_dag("v1", session=self.session)
 
         assert result is None
 
-    def test_get_dag_calls_get_dag_model_and__read_dag(self):
-        """It should call get_dag_model and then _read_dag."""
+
+class TestDBDagBagCache:
+    """Tests for DBDagBag optional caching behavior."""
+
+    def test_no_caching_by_default(self):
+        """Test that DBDagBag uses a simple dict without caching by default."""
+        dag_bag = DBDagBag()
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_lru_cache_enabled_with_cache_size(self):
+        """Test that LRU cache is enabled when cache_size is provided."""
+        dag_bag = DBDagBag(cache_size=10)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, LRUCache)
+
+    def test_ttl_cache_enabled_with_cache_size_and_ttl(self):
+        """Test that TTL cache is enabled when both cache_size and cache_ttl 
are provided."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        assert dag_bag._use_cache is True
+        assert isinstance(dag_bag._dags, TTLCache)
+
+    def test_zero_cache_size_uses_unbounded_dict(self):
+        """Test that cache_size=0 uses unbounded dict (same as no caching)."""
+        dag_bag = DBDagBag(cache_size=0, cache_ttl=60)
+        assert dag_bag._use_cache is False
+        assert isinstance(dag_bag._dags, dict)
+
+    def test_clear_cache_with_caching(self):
+        """Test clear_cache() with caching enabled."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        dag_bag._dags["version_2"] = mock_dag
+        assert len(dag_bag._dags) == 2
+
+        count = dag_bag.clear_cache()
+        assert count == 2
+        assert len(dag_bag._dags) == 0
+
+    def test_clear_cache_without_caching(self):
+        """Test clear_cache() without caching enabled."""
+        dag_bag = DBDagBag()
+
+        mock_dag = MagicMock()
+        dag_bag._dags["version_1"] = mock_dag
+        assert len(dag_bag._dags) == 1
+
+        count = dag_bag.clear_cache()
+        assert count == 1
+        assert len(dag_bag._dags) == 0
+
+    def test_ttl_cache_expiry(self):
+        """Test that cached DAGs expire after TTL."""
+        # TTLCache defaults to time.monotonic which time_machine cannot 
control.
+        # Use time.time as the timer so time_machine can advance it.
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=1)
+        dag_bag._dags = TTLCache(maxsize=10, ttl=1, timer=time.time)
+
+        with time_machine.travel("2025-01-01 00:00:00", tick=False):
+            dag_bag._dags["test_version_id"] = MagicMock()
+            assert "test_version_id" in dag_bag._dags
+
+        # Jump ahead beyond TTL
+        with time_machine.travel("2025-01-01 00:00:02", tick=False):
+            assert dag_bag._dags.get("test_version_id") is None
+
+    def test_lru_eviction(self):
+        """Test that LRU eviction works when cache is full."""
+        dag_bag = DBDagBag(cache_size=2)
+
+        dag_bag._dags["version_1"] = MagicMock()
+        dag_bag._dags["version_2"] = MagicMock()
+        dag_bag._dags["version_3"] = MagicMock()
+
+        # version_1 should be evicted (LRU)
+        assert dag_bag._dags.get("version_1") is None
+        assert dag_bag._dags.get("version_2") is not None
+        assert dag_bag._dags.get("version_3") is not None
+
+    def test_thread_safety_with_caching(self):
+        """Test concurrent access doesn't cause race conditions with caching 
enabled."""
+        dag_bag = DBDagBag(cache_size=100, cache_ttl=60)
+        errors = []
+        mock_session = MagicMock()
+
+        def make_dag_version(version_id):
+            serdag = MagicMock()
+            serdag.dag = MagicMock()
+            serdag.dag_version_id = version_id
+            return MagicMock(serialized_dag=serdag)
+
+        def get_dag_version(model, version_id, options=None):
+            return make_dag_version(version_id)
+
+        mock_session.get.side_effect = get_dag_version
+
+        def access_cache(i):
+            try:
+                dag_bag._get_dag(f"version_{i % 5}", mock_session)
+            except Exception as e:
+                errors.append(e)
+
+        with ThreadPoolExecutor(max_workers=10) as executor:
+            futures = [executor.submit(access_cache, i) for i in range(100)]
+            for f in futures:
+                f.result()
+
+        assert not errors
+
+    def test_read_dag_stores_in_bounded_cache(self):
+        """Test that _read_dag stores DAG in bounded cache when cache_size > 
0."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_sdm = MagicMock()
+        mock_sdm.dag = MagicMock()
+        mock_sdm.dag_version_id = "test_version"
+
+        result = dag_bag._read_dag(mock_sdm)
+
+        assert result == mock_sdm.dag
+        assert "test_version" in dag_bag._dags
+
+    def test_read_dag_stores_in_unbounded_dict(self):
+        """Test that _read_dag stores DAG in unbounded dict when no 
cache_size."""
+        dag_bag = DBDagBag()
+
+        mock_sdm = MagicMock()
+        mock_sdm.dag = MagicMock()
+        mock_sdm.dag_version_id = "test_version"
+
+        result = dag_bag._read_dag(mock_sdm)
+
+        assert result == mock_sdm.dag
+        assert "test_version" in dag_bag._dags
+
+    def test_iter_all_latest_version_dags_does_not_cache(self):
+        """Test that iter_all_latest_version_dags does not cache to prevent 
thrashing."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+
+        mock_session = MagicMock()
+        mock_sdm = MagicMock()
+        mock_sdm.dag = MagicMock()
+        mock_sdm.dag_version_id = "test_version"
+        mock_session.scalars.return_value = [mock_sdm]
+
+        list(dag_bag.iter_all_latest_version_dags(session=mock_session))
+
+        # Cache should be empty -- iter doesn't cache to prevent thrashing
+        assert len(dag_bag._dags) == 0
+
+    @patch("airflow.models.dagbag.Stats")
+    def test_cache_hit_metric_emitted(self, mock_stats):
+        """Test that cache hit metric is emitted when caching is enabled."""
+        dag_bag = DBDagBag(cache_size=10, cache_ttl=60)
+        mock_session = MagicMock()
+        dag_bag._dags["test_version"] = MagicMock()
+

Review Comment:
   These are testing Stats.incr/gauge calls, not DAG object behavior. The mock 
just needs to be truthy for the cache hit path. Spec would be fine but doesn't 
add value here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to