This is an automated email from the ASF dual-hosted git repository.
kaxilnaik pushed a commit to branch v2-11-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-11-test by this push:
new c42577a3209 [v2-11-test] Don't resolve path for DAGs folder (#46877)
c42577a3209 is described below
commit c42577a32095a39de944186e5fa7f52d7ca4f16e
Author: Jed Cunningham <[email protected]>
AuthorDate: Thu May 8 05:13:08 2025 -0600
[v2-11-test] Don't resolve path for DAGs folder (#46877)
(cherry picked from commit b157d1f023aa7fa8121451f5bf1947de3758be97)
---
airflow/dag_processing/manager.py | 5 +--
tests/dag_processing/test_manager.py | 74 ++++++++++++++++++++++++++++++++++++
2 files changed, 75 insertions(+), 4 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index c03bc074d0a..99e66cb19ff 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -1089,10 +1089,7 @@ class DagFileProcessorManager(LoggingMixin):
def get_dag_directory(self) -> str:
"""Return the dag_director as a string."""
- if isinstance(self._dag_directory, Path):
- return str(self._dag_directory.resolve())
- else:
- return str(self._dag_directory)
+ return str(self._dag_directory)
def set_file_paths(self, new_file_paths):
"""
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
new file mode 100644
index 00000000000..822347abc51
--- /dev/null
+++ b/tests/dag_processing/test_manager.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+from datetime import timedelta
+
+import pytest
+
+from airflow.configuration import conf
+from airflow.dag_processing.manager import DagFileProcessorManager
+from airflow.models.dag import DagModel
+from airflow.utils import timezone
+from airflow.utils.session import create_session
+from tests.test_utils.db import clear_db_dags
+
+
+class TestStaleDagCleanup:
+ """Test that stale DAGs get deactivated based on raw dag_directory path"""
+
+ @pytest.mark.db_test
+ def test_deactivate_stale_dags(self):
+ threshold = conf.getint("scheduler", "stale_dag_threshold")
+ now = timezone.utcnow()
+
+ stale_time = now - timedelta(seconds=threshold + 5)
+ fresh_time = now - timedelta(seconds=threshold - 5)
+
+ clear_db_dags()
+ with create_session() as session:
+ dm_stale = DagModel(
+ dag_id="dag_stale",
+ fileloc="/link/dag_stale.py",
+ is_active=True,
+ last_parsed_time=stale_time,
+ )
+ dm_fresh = DagModel(
+ dag_id="dag_fresh",
+ fileloc="/link/dag_fresh.py",
+ is_active=True,
+ last_parsed_time=fresh_time,
+ )
+ session.add_all([dm_stale, dm_fresh])
+ session.commit()
+
+ last_parsed = {
+ "/link/dag_stale.py": now,
+ "/link/dag_fresh.py": now,
+ }
+ DagFileProcessorManager.deactivate_stale_dags(
+ last_parsed=last_parsed,
+ dag_directory="/link",
+ stale_dag_threshold=threshold,
+ session=session,
+ )
+ session.commit()
+
+ ref1 = session.get(DagModel, "dag_stale")
+ ref2 = session.get(DagModel, "dag_fresh")
+ assert not ref1.is_active, "dag_stale should be deactivated as
stale"
+ assert ref2.is_active, "dag_fresh should remain active as fresh"