This is an automated email from the ASF dual-hosted git repository.

ephraimanierobi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 435e9687b0 Handle Example dags case when checking for missing files 
(#41856)
435e9687b0 is described below

commit 435e9687b0c56499bc29c21d3cada8ae9e0a8c53
Author: Utkarsh Sharma <[email protected]>
AuthorDate: Thu Aug 29 18:35:59 2024 +0530

    Handle Example dags case when checking for missing files (#41856)
    
    Earlier PR create to address the issue was not handling the case for the 
Example Dags, due to which the example dags were marked as stale since they are 
not present in the dag_directory. This PR handles that scenarios and update the 
testcase accordingly.
    
    related: #41432
---
 airflow/dag_processing/manager.py       | 11 ++--
 tests/dag_processing/test_job_runner.py | 89 ++++++++++++++++-----------------
 2 files changed, 52 insertions(+), 48 deletions(-)

diff --git a/airflow/dag_processing/manager.py 
b/airflow/dag_processing/manager.py
index 37b4323ac4..c442c23cfc 100644
--- a/airflow/dag_processing/manager.py
+++ b/airflow/dag_processing/manager.py
@@ -41,6 +41,7 @@ from sqlalchemy import delete, select, update
 from tabulate import tabulate
 
 import airflow.models
+from airflow import example_dags
 from airflow.api_internal.internal_api_call import internal_api_call
 from airflow.callbacks.callback_requests import CallbackRequest, 
SlaCallbackRequest
 from airflow.configuration import conf
@@ -69,6 +70,8 @@ from airflow.utils.retries import retry_db_transaction
 from airflow.utils.session import NEW_SESSION, provide_session
 from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks
 
+example_dag_folder = next(iter(example_dags.__path__))
+
 if TYPE_CHECKING:
     from multiprocessing.connection import Connection as 
MultiprocessingConnection
 
@@ -527,9 +530,11 @@ class DagFileProcessorManager(LoggingMixin):
 
         for dag in dags_parsed:
             # When the DAG processor runs as part of the scheduler, and the 
user changes the DAGs folder,
-            # DAGs from the previous DAGs folder will be marked as stale. Note 
that this change has no impact
-            # on standalone DAG processors.
-            dag_not_in_current_dag_folder = os.path.commonpath([dag.fileloc, 
dag_directory]) != dag_directory
+            # DAGs from the previous DAGs folder will be marked as stale. We 
also need to handle example dags
+            # differently. Note that this change has no impact on standalone 
DAG processors.
+            dag_not_in_current_dag_folder = (
+                not os.path.commonpath([dag.fileloc, example_dag_folder]) == 
example_dag_folder
+            ) and (os.path.commonpath([dag.fileloc, dag_directory]) != 
dag_directory)
             # The largest valid difference between a DagFileStat's 
last_finished_time and a DAG's
             # last_parsed_time is the processor_timeout. Longer than that 
indicates that the DAG is
             # no longer present in the file. We have a stale_dag_threshold 
configured to prevent a
diff --git a/tests/dag_processing/test_job_runner.py 
b/tests/dag_processing/test_job_runner.py
index b5d0b35580..4f79436d14 100644
--- a/tests/dag_processing/test_job_runner.py
+++ b/tests/dag_processing/test_job_runner.py
@@ -772,58 +772,57 @@ class TestDagProcessorJobRunner:
         def get_dag_string(filename) -> str:
             return open(TEST_DAG_FOLDER / filename).read()
 
-        with tempfile.TemporaryDirectory() as tmpdir:
-            old_dag_home = tempfile.mkdtemp(dir=tmpdir)
-            old_dag_file = tempfile.NamedTemporaryFile(dir=old_dag_home, 
suffix=".py")
-            
old_dag_file.write(get_dag_string("test_example_bash_operator.py").encode())
-            old_dag_file.flush()
-            new_dag_home = tempfile.mkdtemp(dir=tmpdir)
-            new_dag_file = tempfile.NamedTemporaryFile(dir=new_dag_home, 
suffix=".py")
-            
new_dag_file.write(get_dag_string("test_scheduler_dags.py").encode())
-            new_dag_file.flush()
-
-            manager = DagProcessorJobRunner(
-                job=Job(),
-                processor=DagFileProcessorManager(
-                    dag_directory=new_dag_home,
-                    max_runs=1,
-                    processor_timeout=timedelta(minutes=10),
-                    signal_conn=MagicMock(),
-                    dag_ids=[],
-                    pickle_dags=False,
-                    async_mode=True,
-                ),
-            )
+        def add_dag_to_db(file_path, dag_id, processor_subdir):
+            dagbag = DagBag(file_path, read_dags_from_db=False)
+            dag = dagbag.get_dag(dag_id)
+            dag.fileloc = file_path
+            dag.last_parsed_time = timezone.utcnow()
+            dag.sync_to_db(processor_subdir=processor_subdir)
 
-            dagbag = DagBag(old_dag_file.name, read_dags_from_db=False)
-            other_dagbag = DagBag(new_dag_file.name, read_dags_from_db=False)
+        def create_dag_folder(dag_id):
+            dag_home = tempfile.mkdtemp(dir=tmpdir)
+            dag_file = tempfile.NamedTemporaryFile(dir=dag_home, suffix=".py")
+            dag_file.write(get_dag_string(dag_id).encode())
+            dag_file.flush()
+            return dag_home, dag_file
 
-            with create_session() as session:
-                # Add DAG from old dah home to the DB
-                dag = dagbag.get_dag("test_example_bash_operator")
-                dag.fileloc = old_dag_file.name
-                dag.last_parsed_time = timezone.utcnow()
-                dag.sync_to_db(processor_subdir=old_dag_home)
+        with tempfile.TemporaryDirectory() as tmpdir:
+            old_dag_home, old_dag_file = 
create_dag_folder("test_example_bash_operator.py")
+            new_dag_home, new_dag_file = 
create_dag_folder("test_scheduler_dags.py")
+            example_dag_home, example_dag_file = 
create_dag_folder("test_dag_warnings.py")
+
+            with 
mock.patch("airflow.dag_processing.manager.example_dag_folder", 
example_dag_home):
+                manager = DagProcessorJobRunner(
+                    job=Job(),
+                    processor=DagFileProcessorManager(
+                        dag_directory=new_dag_home,
+                        max_runs=1,
+                        processor_timeout=timedelta(minutes=10),
+                        signal_conn=MagicMock(),
+                        dag_ids=[],
+                        pickle_dags=False,
+                        async_mode=True,
+                    ),
+                )
 
-                # Add DAG from new DAG home to the DB
-                other_dag = other_dagbag.get_dag("test_start_date_scheduling")
-                other_dag.fileloc = new_dag_file.name
-                other_dag.last_parsed_time = timezone.utcnow()
-                other_dag.sync_to_db(processor_subdir=new_dag_home)
+                with create_session() as session:
+                    add_dag_to_db(old_dag_file.name, 
"test_example_bash_operator", old_dag_home)
+                    add_dag_to_db(new_dag_file.name, 
"test_start_date_scheduling", new_dag_home)
+                    add_dag_to_db(example_dag_file.name, "test_dag_warnings", 
example_dag_home)
 
-                manager.processor._file_paths = [new_dag_file]
+                    manager.processor._file_paths = [new_dag_file, 
example_dag_file]
 
-                active_dag_count = (
-                    
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
-                )
-                assert active_dag_count == 2
+                    active_dag_count = (
+                        
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+                    )
+                    assert active_dag_count == 3
 
-                manager.processor._scan_stale_dags()
+                    manager.processor._scan_stale_dags()
 
-                active_dag_count = (
-                    
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
-                )
-                assert active_dag_count == 1
+                    active_dag_count = (
+                        
session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
+                    )
+                    assert active_dag_count == 2
 
     @mock.patch(
         
"airflow.dag_processing.processor.DagFileProcessorProcess.waitable_handle", 
new_callable=PropertyMock

Reply via email to