This is an automated email from the ASF dual-hosted git repository.
utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v2-10-test by this push:
new 996af78376 Fix: DAGs are not marked as stale if the dags folder change
(#41433) (#41829)
996af78376 is described below
commit 996af78376e48dca34d469a9bcca569647ae17de
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Wed Aug 28 18:18:42 2024 +0530
Fix: DAGs are not marked as stale if the dags folder change (#41433)
(#41829)
* Fix: DAGs are not marked as stale if the AIRFLOW__CORE__DAGS_FOLDER
changes
* Update airflow/dag_processing/manager.py
* Add testcase
* Add code comment
* Update code comment
* Update the logic for checking the current dag_directory
* Update testcases
* Remove unwanted code
* Uncomment code
* Add processor_subdir when creating processor_subdir
* Fix test_retry_still_in_executor test
* Remove config from test
* Update airflow/dag_processing/manager.py
Co-authored-by: Jed Cunningham
<[email protected]>
* Update if condition for readability
---------
Co-authored-by: Jed Cunningham
<[email protected]>
(cherry picked from commit 9f30a41874454696ae2b215b2d86cb9a62968006)
---
airflow/dag_processing/manager.py | 10 ++++-
tests/dag_processing/test_job_runner.py | 71 +++++++++++++++++++++++++++++++--
tests/jobs/test_scheduler_job.py | 1 +
3 files changed, 76 insertions(+), 6 deletions(-)
diff --git a/airflow/dag_processing/manager.py
b/airflow/dag_processing/manager.py
index c03bc074d0..819da5d7e1 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -526,14 +526,20 @@ class DagFileProcessorManager(LoggingMixin):
dags_parsed = session.execute(query)
for dag in dags_parsed:
+ # When the DAG processor runs as part of the scheduler, and the
user changes the DAGs folder,
+ # DAGs from the previous DAGs folder will be marked as stale. Note
that this change has no impact
+ # on standalone DAG processors.
+ dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc,
dag_directory]) != dag_directory
# The largest valid difference between a DagFileStat's
last_finished_time and a DAG's
# last_parsed_time is the processor_timeout. Longer than that
indicates that the DAG is
# no longer present in the file. We have a stale_dag_threshold
configured to prevent a
# significant delay in deactivation of stale dags when a large
timeout is configured
- if (
+ dag_removed_from_dag_folder_or_file = (
dag.fileloc in last_parsed
and (dag.last_parsed_time +
timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
- ):
+ )
+
+ if dag_not_in_current_dag_folder or
dag_removed_from_dag_folder_or_file:
cls.logger().info("DAG %s is missing and will be
deactivated.", dag.dag_id)
to_deactivate.add(dag.dag_id)
diff --git a/tests/dag_processing/test_job_runner.py
b/tests/dag_processing/test_job_runner.py
index 8112b7222a..b5d0b35580 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -26,6 +26,7 @@ import pathlib
import random
import socket
import sys
+import tempfile
import textwrap
import threading
import time
@@ -638,7 +639,7 @@ class TestDagProcessorJobRunner:
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory="directory",
+ dag_directory=str(TEST_DAG_FOLDER),
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
@@ -712,11 +713,11 @@ class TestDagProcessorJobRunner:
"""
Ensure only dags from current dag_directory are updated
"""
- dag_directory = "directory"
+ dag_directory = str(TEST_DAG_FOLDER)
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
- dag_directory=dag_directory,
+ dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
signal_conn=MagicMock(),
@@ -740,7 +741,7 @@ class TestDagProcessorJobRunner:
# Add stale DAG to the DB
other_dag = other_dagbag.get_dag("test_start_date_scheduling")
other_dag.last_parsed_time = timezone.utcnow()
- other_dag.sync_to_db(processor_subdir="other")
+ other_dag.sync_to_db(processor_subdir="/other")
# Add DAG to the file_parsing_stats
stat = DagFileStat(
@@ -762,6 +763,68 @@ class TestDagProcessorJobRunner:
active_dag_count =
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1
+ def test_scan_stale_dags_when_dag_folder_change(self):
+ """
+ Ensure dags from old dag_folder is marked as stale when dag processor
+ is running as part of scheduler.
+ """
+
+ def get_dag_string(filename) -> str:
+ return open(TEST_DAG_FOLDER / filename).read()
+
+ with tempfile.TemporaryDirectory() as tmpdir:
+ old_dag_home = tempfile.mkdtemp(dir=tmpdir)
+ old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home,
suffix=".py")
+
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
+ old_dag_file.flush()
+ new_dag_home = tempfile.mkdtemp(dir=tmpdir)
+ new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home,
suffix=".py")
+
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
+ new_dag_file.flush()
+
+ manager = DagProcessorJobRunner(
+ job=Job(),
+ processor=DagFileProcessorManager(
+ dag_directory=new_dag_home,
+ max_runs=1,
+ processor_timeout=timedelta(minutes=10),
+ signal_conn=MagicMock(),
+ dag_ids=[],
+ pickle_dags=False,
+ async_mode=True,
+ ),
+ )
+
+ dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
+ other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)
+
+ with create_session() as session:
+ # Add DAG from old dah home to the DB
+ dag = dagbag.get_dag("test_example_bash_operator")
+ dag.fileloc = old_dag_file.name
+ dag.last_parsed_time = timezone.utcnow()
+ dag.sync_to_db(processor_subdir=old_dag_home)
+
+ # Add DAG from new DAG home to the DB
+ other_dag = other_dagbag.get_dag("test_start_date_scheduling")
+ other_dag.fileloc = new_dag_file.name
+ other_dag.last_parsed_time = timezone.utcnow()
+ other_dag.sync_to_db(processor_subdir=new_dag_home)
+
+ manager.processor._file_paths = [new_dag_file]
+
+ active_dag_count = (
+
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+ )
+ assert active_dag_count == 2
+
+ manager.processor._scan_stale_dags()
+
+ active_dag_count = (
+
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+ )
+ assert active_dag_count == 1
+
@mock.patch(
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle",
new_callable=PropertyMock
)
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 2e96728d5e..38bde8bf26 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3568,6 +3568,7 @@ class TestSchedulerJob:
dag_id="test_retry_still_in_executor",
schedule="@once",
session=session,
+ fileloc=os.devnull + "/test_retry_still_in_executor.py",
):
dag_task1 = BashOperator(
task_id="test_retry_handling_op",