This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 10ae2b3bf56 Refactor DAG deactivation logic (#57233)
10ae2b3bf56 is described below
commit 10ae2b3bf5619d3387eacc1c1355282e16100e26
Author: Kalyan R <[email protected]>
AuthorDate: Sat Oct 25 22:39:37 2025 +0530
Refactor DAG deactivation logic (#57233)
---
airflow-core/src/airflow/dag_processing/manager.py | 7 +-
airflow-core/src/airflow/models/dag.py | 7 +-
.../tests/unit/dag_processing/test_manager.py | 91 ++++++++++++++++++++++
3 files changed, 102 insertions(+), 3 deletions(-)
diff --git a/airflow-core/src/airflow/dag_processing/manager.py
b/airflow-core/src/airflow/dag_processing/manager.py
index 84a99a480a9..52ba1e8246b 100644
--- a/airflow-core/src/airflow/dag_processing/manager.py
+++ b/airflow-core/src/airflow/dag_processing/manager.py
@@ -636,12 +636,15 @@ class DagFileProcessorManager(LoggingMixin):
rel_filelocs.append(str(rel_sub_path))
with create_session() as session:
- DagModel.deactivate_deleted_dags(
+ any_deactivated = DagModel.deactivate_deleted_dags(
bundle_name=bundle_name,
rel_filelocs=rel_filelocs,
session=session,
)
- remove_references_to_deleted_dags(session=session)
+ # Only run cleanup if we actually deactivated any DAGs
+ # This avoids unnecessary DELETE queries in the common case where
no DAGs were deleted
+ if any_deactivated:
+ remove_references_to_deleted_dags(session=session)
def print_stats(self, known_files: dict[str, set[DagFileInfo]]):
"""Occasionally print out stats about how fast the files are getting
processed."""
diff --git a/airflow-core/src/airflow/models/dag.py
b/airflow-core/src/airflow/models/dag.py
index 7cceac5e9ee..41123e80caa 100644
--- a/airflow-core/src/airflow/models/dag.py
+++ b/airflow-core/src/airflow/models/dag.py
@@ -553,13 +553,14 @@ class DagModel(Base):
bundle_name: str,
rel_filelocs: list[str],
session: Session = NEW_SESSION,
- ) -> None:
+ ) -> bool:
"""
Set ``is_active=False`` on the DAGs for which the DAG files have been
removed.
:param bundle_name: bundle for filelocs
:param rel_filelocs: relative filelocs for bundle
:param session: ORM Session
+ :return: True if any DAGs were marked as stale, False otherwise
"""
log.debug("Deactivating DAGs (for which DAG files are deleted) from %s
table ", cls.__tablename__)
dag_models = session.scalars(
@@ -575,9 +576,13 @@ class DagModel(Base):
)
)
+ any_deactivated = False
for dm in dag_models:
if dm.relative_fileloc not in rel_filelocs:
dm.is_stale = True
+ any_deactivated = True
+
+ return any_deactivated
@classmethod
def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str,
datetime]]:
diff --git a/airflow-core/tests/unit/dag_processing/test_manager.py
b/airflow-core/tests/unit/dag_processing/test_manager.py
index 205eaa4aaed..b7c3f23c6f4 100644
--- a/airflow-core/tests/unit/dag_processing/test_manager.py
+++ b/airflow-core/tests/unit/dag_processing/test_manager.py
@@ -743,6 +743,97 @@ class TestDagFileProcessorManager:
# and the DAG from test_dag2.py is deactivated
assert session.get(DagModel, "test_dag2").is_stale is True
+ @pytest.mark.parametrize(
+ "rel_filelocs, expected_return, expected_dag1_stale,
expected_dag2_stale",
+ [
+ pytest.param(
+ ["test_dag1.py"], # Only dag1 present, dag2 deleted
+ True, # Should return True
+ False, # dag1 should not be stale
+ True, # dag2 should be stale
+ id="dags_deactivated",
+ ),
+ pytest.param(
+ ["test_dag1.py", "test_dag2.py"], # Both files present
+ False, # Should return False
+ False, # dag1 should not be stale
+ False, # dag2 should not be stale
+ id="no_dags_deactivated",
+ ),
+ ],
+ )
+ def test_deactivate_deleted_dags_return_value(
+ self, dag_maker, session, rel_filelocs, expected_return,
expected_dag1_stale, expected_dag2_stale
+ ):
+ """Test that DagModel.deactivate_deleted_dags returns correct boolean
value."""
+ with dag_maker("test_dag1") as dag1:
+ dag1.relative_fileloc = "test_dag1.py"
+ with dag_maker("test_dag2") as dag2:
+ dag2.relative_fileloc = "test_dag2.py"
+ dag_maker.sync_dagbag_to_db()
+
+ any_deactivated = DagModel.deactivate_deleted_dags(
+ bundle_name="dag_maker",
+ rel_filelocs=rel_filelocs,
+ session=session,
+ )
+
+ assert any_deactivated is expected_return
+ assert session.get(DagModel, "test_dag1").is_stale is
expected_dag1_stale
+ assert session.get(DagModel, "test_dag2").is_stale is
expected_dag2_stale
+
+ @pytest.mark.parametrize(
+ "active_files, should_call_cleanup",
+ [
+ pytest.param(
+ [
+ DagFileInfo(
+ bundle_name="dag_maker",
+ rel_path=Path("test_dag1.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ ),
+ # test_dag2.py is deleted
+ ],
+ True, # Should call cleanup
+ id="dags_deactivated",
+ ),
+ pytest.param(
+ [
+ DagFileInfo(
+ bundle_name="dag_maker",
+ rel_path=Path("test_dag1.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ ),
+ DagFileInfo(
+ bundle_name="dag_maker",
+ rel_path=Path("test_dag2.py"),
+ bundle_path=TEST_DAGS_FOLDER,
+ ),
+ ],
+ False, # Should NOT call cleanup
+ id="no_dags_deactivated",
+ ),
+ ],
+ )
+
@mock.patch("airflow.dag_processing.manager.remove_references_to_deleted_dags")
+ def test_manager_deactivate_deleted_dags_cleanup_behavior(
+ self, mock_remove_references, dag_maker, session, active_files,
should_call_cleanup
+ ):
+ """Test that manager conditionally calls
remove_references_to_deleted_dags based on whether DAGs were deactivated."""
+ with dag_maker("test_dag1") as dag1:
+ dag1.relative_fileloc = "test_dag1.py"
+ with dag_maker("test_dag2") as dag2:
+ dag2.relative_fileloc = "test_dag2.py"
+ dag_maker.sync_dagbag_to_db()
+
+ manager = DagFileProcessorManager(max_runs=1)
+ manager.deactivate_deleted_dags("dag_maker", active_files)
+
+ if should_call_cleanup:
+ mock_remove_references.assert_called_once()
+ else:
+ mock_remove_references.assert_not_called()
+
@conf_vars({("core", "load_examples"): "False"})
def test_fetch_callbacks_from_database(self, configure_testing_dag_bundle):
"""Test _fetch_callbacks returns callbacks ordered by priority_weight
desc."""