This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch sync_v2_10_test
in repository https://gitbox.apache.org/repos/asf/airflow.git

commit c403e755b1cc19a3f305caa4619d762b4a21d529
Author: Ephraim Anierobi <[email protected]>
AuthorDate: Fri Sep 13 10:20:24 2024 -0700

    Revert "Fix: DAGs are not marked as stale if the dags folder change" 
(41433) (#41829) (#41893) 41829 (#42220)
    
    * Revert "Skip test_scan_stale_dags_when_dag_folder_change in DB isolation 
mode (#41893)"
    
    This reverts commit 07af14ae75820a98b60ecffa2949ef7ad70bacab.
    
    * Revert "Fix: DAGs are not marked as stale if the dags folder change 
(#41433) (#41829)"
    
    This reverts commit 996af78376e48dca34d469a9bcca569647ae17de.
---
 airflow/dag_processing/manager.py       | 10 +----
 tests/dag_processing/test_job_runner.py | 72 ++-------------------------------
 tests/jobs/test_scheduler_job.py        |  1 -
 3 files changed, 6 insertions(+), 77 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 819da5d7e1..c03bc074d0 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -526,20 +526,14 @@ class DagFileProcessorManager(LoggingMixin):
         dags_parsed = session.execute(query)
 
         for dag in dags_parsed:
-            # When the DAG processor runs as part of the scheduler, and the 
user changes the DAGs folder,
-            # DAGs from the previous DAGs folder will be marked as stale. Note 
that this change has no impact
-            # on standalone DAG processors.
-            dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, 
dag_directory]) != dag_directory
             # The largest valid difference between a DagFileStat's 
last_finished_time and a DAG's
             # last_parsed_time is the processor_timeout. Longer than that 
indicates that the DAG is
             # no longer present in the file. We have a stale_dag_threshold 
configured to prevent a
             # significant delay in deactivation of stale dags when a large 
timeout is configured
-            dag_removed_from_dag_folder_or_file = (
+            if (
                 dag.fileloc in last_parsed
                 and (dag.last_parsed_time + 
timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
-            )
-
-            if dag_not_in_current_dag_folder or 
dag_removed_from_dag_folder_or_file:
+            ):
                 cls.logger().info("DAG %s is missing and will be 
deactivated.", dag.dag_id)
                 to_deactivate.add(dag.dag_id)
 
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index 9b8437d77d..8112b7222a 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -26,7 +26,6 @@ import pathlib
 import random
 import socket
 import sys
-import tempfile
 import textwrap
 import threading
 import time
@@ -639,7 +638,7 @@ class TestDagProcessorJobRunner:
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=str(TEST_DAG_FOLDER),
+                dag_directory="directory",
                 max_runs=1,
                 processor_timeout=timedelta(minutes=10),
                 signal_conn=MagicMock(),
@@ -713,11 +712,11 @@ class TestDagProcessorJobRunner:
         """
         Ensure only dags from current dag_directory are updated
         """
-        dag_directory = str(TEST_DAG_FOLDER)
+        dag_directory = "directory"
         manager = DagProcessorJobRunner(
             job=Job(),
             processor=DagFileProcessorManager(
-                dag_directory=TEST_DAG_FOLDER,
+                dag_directory=dag_directory,
                 max_runs=1,
                 processor_timeout=timedelta(minutes=10),
                 signal_conn=MagicMock(),
@@ -741,7 +740,7 @@ class TestDagProcessorJobRunner:
             # Add stale DAG to the DB
             other_dag = other_dagbag.get_dag("test_start_date_scheduling")
             other_dag.last_parsed_time = timezone.utcnow()
-            other_dag.sync_to_db(processor_subdir="/other")
+            other_dag.sync_to_db(processor_subdir="other")
 
             # Add DAG to the file_parsing_stats
             stat = DagFileStat(
@@ -763,69 +762,6 @@ class TestDagProcessorJobRunner:
             active_dag_count = 
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
             assert active_dag_count == 1
 
-    @pytest.mark.skip_if_database_isolation_mode  # Test is broken in db 
isolation mode
-    def test_scan_stale_dags_when_dag_folder_change(self):
-        """
-        Ensure dags from old dag_folder is marked as stale when dag processor
-         is running as part of scheduler.
-        """
-
-        def get_dag_string(filename) -> str:
-            return open(TEST_DAG_FOLDER / filename).read()
-
-        with tempfile.TemporaryDirectory() as tmpdir:
-            old_dag_home = tempfile.mkdtemp(dir=tmpdir)
-            old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, 
suffix=".py")
-            
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
-            old_dag_file.flush()
-            new_dag_home = tempfile.mkdtemp(dir=tmpdir)
-            new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, 
suffix=".py")
-            
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
-            new_dag_file.flush()
-
-            manager = DagProcessorJobRunner(
-                job=Job(),
-                processor=DagFileProcessorManager(
-                    dag_directory=new_dag_home,
-                    max_runs=1,
-                    processor_timeout=timedelta(minutes=10),
-                    signal_conn=MagicMock(),
-                    dag_ids=[],
-                    pickle_dags=False,
-                    async_mode=True,
-                ),
-            )
-
-            dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
-            other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)
-
-            with create_session() as session:
-                # Add DAG from old dah home to the DB
-                dag = dagbag.get_dag("test_example_bash_operator")
-                dag.fileloc = old_dag_file.name
-                dag.last_parsed_time = timezone.utcnow()
-                dag.sync_to_db(processor_subdir=old_dag_home)
-
-                # Add DAG from new DAG home to the DB
-                other_dag = other_dagbag.get_dag("test_start_date_scheduling")
-                other_dag.fileloc = new_dag_file.name
-                other_dag.last_parsed_time = timezone.utcnow()
-                other_dag.sync_to_db(processor_subdir=new_dag_home)
-
-                manager.processor._file_paths = [new_dag_file]
-
-                active_dag_count = (
-                    
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
-                )
-                assert active_dag_count == 2
-
-                manager.processor._scan_stale_dags()
-
-                active_dag_count = (
-                    
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
-                )
-                assert active_dag_count == 1
-
     @mock.patch(
         
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", 
new_callable=PropertyMock
     )
diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py
index 38bde8bf26..2e96728d5e 100644
--- a/tests/jobs/test_scheduler_job.py
+++ b/tests/jobs/test_scheduler_job.py
@@ -3568,7 +3568,6 @@ class TestSchedulerJob:
                 dag_id="test_retry_still_in_executor",
                 schedule="@once",
                 session=session,
-                fileloc=os.devnull + "/test_retry_still_in_executor.py",
             ):
                 dag_task1 = BashOperator(
                     task_id="test_retry_handling_op",

Reply via email to