This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 749e53def4 remove stale serialized dags (#22917)
749e53def4 is described below
commit 749e53def43055225a2e5d09596af7821d91b4ac
Author: Ping Zhang <[email protected]>
AuthorDate: Thu May 12 12:01:47 2022 -0700
remove stale serialized dags (#22917)
---
airflow/dag_processing/manager.py | 10 +++++++++-
tests/dag_processing/test_manager.py | 32 ++++++++++++++++++++++++++------
2 files changed, 35 insertions(+), 7 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index 4cb11cff82..f16a913725 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -481,7 +481,11 @@ class DagFileProcessorManager(LoggingMixin):
@provide_session
def _deactivate_stale_dags(self, session=None):
- """Detects DAGs which are no longer present in files and deactivate
them."""
+ """
+ Detects DAGs which are no longer present in files
+
+ Deactivate them and remove them in the serialized_dag table
+ """
now = timezone.utcnow()
elapsed_time_since_refresh = (now -
self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.deactivate_stale_dags_interval:
@@ -514,6 +518,10 @@ class DagFileProcessorManager(LoggingMixin):
if deactivated:
self.log.info("Deactivated %i DAGs which are no longer
present in file.", deactivated)
+ for dag_id in to_deactivate:
+ SerializedDagModel.remove_dag(dag_id)
+ self.log.info("Deleted DAG %s in serialized_dag table",
dag_id)
+
self.last_deactivate_stale_dags_time = timezone.utcnow()
def _run_parsing_loop(self):
diff --git a/tests/dag_processing/test_manager.py
b/tests/dag_processing/test_manager.py
index 6a65116d51..ad613d877d 100644
--- a/tests/dag_processing/test_manager.py
+++ b/tests/dag_processing/test_manager.py
@@ -34,6 +34,7 @@ from unittest.mock import MagicMock, PropertyMock
import pytest
from freezegun import freeze_time
+from sqlalchemy import func
from airflow.callbacks.callback_requests import CallbackRequest,
DagCallbackRequest, SlaCallbackRequest
from airflow.config_templates.airflow_local_settings import
DEFAULT_LOGGING_CONFIG
@@ -476,6 +477,7 @@ class TestDagFileProcessorManager:
dag = dagbag.get_dag('test_example_bash_operator')
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db()
+ SerializedDagModel.write_dag(dag)
# Add DAG to the file_parsing_stats
stat = DagFileStat(
@@ -488,18 +490,36 @@ class TestDagFileProcessorManager:
manager._file_paths = [test_dag_path]
manager._file_stats[test_dag_path] = stat
- active_dags = (
- session.query(DagModel).filter(DagModel.is_active,
DagModel.fileloc == test_dag_path).all()
+ active_dag_count = (
+ session.query(func.count(DagModel.dag_id))
+ .filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
+ .scalar()
)
- assert len(active_dags) == 1
+ assert active_dag_count == 1
+
+ serialized_dag_count = (
+ session.query(func.count(SerializedDagModel.dag_id))
+ .filter(SerializedDagModel.fileloc == test_dag_path)
+ .scalar()
+ )
+ assert serialized_dag_count == 1
manager._file_stats[test_dag_path] = stat
manager._deactivate_stale_dags()
- active_dags = (
- session.query(DagModel).filter(DagModel.is_active,
DagModel.fileloc == test_dag_path).all()
+
+ active_dag_count = (
+ session.query(func.count(DagModel.dag_id))
+ .filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
+ .scalar()
)
+ assert active_dag_count == 0
- assert len(active_dags) == 0
+ serialized_dag_count = (
+ session.query(func.count(SerializedDagModel.dag_id))
+ .filter(SerializedDagModel.fileloc == test_dag_path)
+ .scalar()
+ )
+ assert serialized_dag_count == 0
@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle",
new_callable=PropertyMock