This is an automated email from the ASF dual-hosted git repository.

utkarsharma pushed a commit to branch v2-10-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v2-10-test by this push:
     new 996af78376 Fix: DAGs are not marked as stale if the dags folder change 
(#41433) (#41829)
996af78376 is described below

commit 996af78376e48dca34d469a9bcca569647ae17de
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Wed Aug 28 18:18:42 2024 +0530

    Fix: DAGs are not marked as stale if the dags folder change (#41433) 
(#41829)
    
    * Fix: DAGs are not marked as stale if the AIRFLOW__CORE__DAGS_FOLDER 
changes
    
    * Update airflow/dag_processing/manager.py
    
    * Add testcase
    
    * Add code comment
    
    * Update code comment
    
    * Update the logic for checking the current dag_directory
    
    * Update testcases
    
    * Remove unwanted code
    
    * Uncomment code
    
    * Add processor_subdir when creating processor_subdir
    
    * Fix test_retry_still_in_executor test
    
    * Remove config from test
    
    * Update airflow/dag_processing/manager.py
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    
    * Update if condition for readability
    
    ---------
    
    Co-authored-by: Jed Cunningham 
<[email protected]>
    (cherry picked from commit 9f30a41874454696ae2b215b2d86cb9a62968006)
---
 airflow/dag_processing/manager.py       | 10 ++++-
 tests/dag_processing/test_job_runner.py | 71 +++++++++++++++++++++++++++++++--
 tests/jobs/test_scheduler_job.py        |  1 +
 3 files changed, 76 insertions(+), 6 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index c03bc074d0..819da5d7e1 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -526,14 +526,20 @@ class DagFileProcessorManager(LoggingMixin):
         dags_parsed = session.execute(query)
 
         for dag in dags_parsed:
+            # When the DAG processor runs as part of the scheduler, and the 
user changes the DAGs folder,
+            # DAGs from the previous DAGs folder will be marked as stale. Note 
that this change has no impact
+            # on standalone DAG processors.
+            dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, 
dag_directory]) != dag_directory
             # The largest valid difference between a DagFileStat's 
last_finished_time and a DAG's
             # last_parsed_time is the processor_timeout. Longer than that 
indicates that the DAG is
             # no longer present in the file. We have a stale_dag_threshold 
configured to prevent a
             # significant delay in deactivation of stale dags when a large 
timeout is configured
-            if (
+            dag_removed_from_dag_folder_or_file = (
                 dag.fileloc in last_parsed
                 and (dag.last_parsed_time + 
timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
-            ):
+            )
+
+            if dag_not_in_current_dag_folder or 
dag_removed_from_dag_folder_or_file:
                 cls.logger().info("DAG %s is missing and will be 
deactivated.", dag.dag_id)
                 to_deactivate.add(dag.dag_id)
 
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index 8112b7222a..b5d0b35580 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -26,6 +26,7 @@ import pathlib
 import random
 import socket
 import sys
+import tempfile
 import textwrap
 import threading
 import time
@@ -638,7 +639,7 @@ class TestDagProcessorJobRunner:
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory="directory",
+                dag_directory=str(TEST_DAG_FOLDER),
                 max_runs=1,
                 processor_timeout=timedelta(minutes=10),
                 signal_conn=MagicMock(),
@@ -712,11 +713,11 @@ class TestDagProcessorJobRunner:
         """
         Ensure only dags from current dag_directory are updated
         """
-        dag_directory = "directory"
+        dag_directory = str(TEST_DAG_FOLDER)
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=dag_directory,
+                dag_directory=TEST_DAG_FOLDER,
                 max_runs=1,
                 processor_timeout=timedelta(minutes=10),
                 signal_conn=MagicMock(),
@@ -740,7 +741,7 @@ class TestDagProcessorJobRunner:
             # Add stale DAG to the DB
             other_dag = other_dagbag.get_dag("test_start_date_scheduling")
             other_dag.last_parsed_time = timezone.utcnow()
-            other_dag.sync_to_db(processor_subdir="other")
+            other_dag.sync_to_db(processor_subdir="/other")
 
             # Add DAG to the file_parsing_stats
             stat = DagFileStat(
@@ -762,6 +763,68 @@ class TestDagProcessorJobRunner:
             active_dag_count = 
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
             assert active_dag_count == 1
 
+    def test_scan_stale_dags_when_dag_folder_change(self):
+        """
+        Ensure dags from old dag_folder is marked as stale when dag processor
+         is running as part of scheduler.
+        """
+
+        def get_dag_string(filename) -> str:
+            return open(TEST_DAG_FOLDER / filename).read()
+
+        with tempfile.TemporaryDirectory() as tmpdir:
+            old_dag_home = tempfile.mkdtemp(dir=tmpdir)
+            old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, 
suffix=".py")
+            
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
+            old_dag_file.flush()
+            new_dag_home = tempfile.mkdtemp(dir=tmpdir)
+            new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, 
suffix=".py")
+            
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
+            new_dag_file.flush()
+
+            manager = DagProcessorJobRunner(
+                job=Job(),
+                processor=DagFileProcessorManager(
+                    dag_directory=new_dag_home,
+                    max_runs=1,
+                    processor_timeout=timedelta(minutes=10),
+                    signal_conn=MagicMock(),
+                    dag_ids=[],
+                    pickle_dags=False,
+                    async_mode=True,
+                ),
+            )
+
+            dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
+            other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)
+
+            with create_session() as session:
+                # Add DAG from old dah home to the DB
+                dag = dagbag.get_dag("test_example_bash_operator")
+                dag.fileloc = old_dag_file.name
+                dag.last_parsed_time = timezone.utcnow()
+                dag.sync_to_db(processor_subdir=old_dag_home)
+
+                # Add DAG from new DAG home to the DB
+                other_dag = other_dagbag.get_dag("test_start_date_scheduling")
+                other_dag.fileloc = new_dag_file.name
+                other_dag.last_parsed_time = timezone.utcnow()
+                other_dag.sync_to_db(processor_subdir=new_dag_home)
+
+                manager.processor._file_paths = [new_dag_file]
+
+                active_dag_count = (
+                    
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+                )
+                assert active_dag_count == 2
+
+                manager.processor._scan_stale_dags()
+
+                active_dag_count = (
+                    
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+                )
+                assert active_dag_count == 1
+
     @mock.patch(
         
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", 
new_callable=PropertyMock
     )
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 2e96728d5e..38bde8bf26 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3568,6 +3568,7 @@ class TestSchedulerJob:
                 dag_id="test_retry_still_in_executor",
                 schedule="@once",
                 session=session,
+                fileloc=os.devnull + "/test_retry_still_in_executor.py",
             ):
                 dag_task1 = BashOperator(
                     task_id="test_retry_handling_op",

Reply via email to